Note: the code for this post can be found here
I recently stumbled across temporal and thought it sounded pretty cool. After checking out their repositories I decided to see if I could get an example working in scala since a java sdk is offered. I chose the Money Transfer example since that seemed the most interesting after a quick glance.
First, let’s translate over Account.java
since it’s just an interface with two methods. Since a scala trait
is
pretty similar to a java interface
we can swap out a few words and end up with the following:
import io.temporal.activity.ActivityInterface
@ActivityInterface
trait Account {
def deposit(accountId: String, referenceId: String, amountCents: Int): Unit
def withdraw(accountId: String, referenceId: String, amountCents: Int): Unit
}
Simple enough. Let’s take a look at the implementation in AccountImpl.java
. At first glance there isn’t anything too
difficult. The only things we need to swap out are extends
for implements
and then Unit
for void
return type
like so
class AccountImpl extends Account {
override def deposit(accountId: String, referenceId: String, amountCents: Int): Unit = {
System.out.printf(
"Withdraw to %s of %d cents requested. ReferenceId=%s\n",
accountId, amountCents, referenceId)
}
override def withdraw(accountId: String, referenceId: String, amountCents: Int): Unit = {
System.out.printf(
"Deposit to %s of %d cents requested. ReferenceId=%s\n",
accountId, amountCents, referenceId);
// throw new RuntimeException("simulated");
}
}
And that should be it! Two done, five to go. Next let’s try and get the workers set up. For AccountActivityWorker.java
the main change will be swapping class
for a scala object
since we want this to be code that we want to run (I’m
using IntelliJ for this so object
+ def main(args: Array[String]
is the “magic sauce”))
object AccountActivityWorker {
final val TASK_QUEUE: String = "AccountTransfer";
// @SuppressWarnings("CatchAndPrintStackTrace")
def main(args: Array[String]): Unit = {
// gRPC stubs wrapper that talks to the local docker instance of temporal service.
val service = WorkflowServiceStubs.newLocalServiceStubs
// client that can be used to start and signal workflows
val client = WorkflowClient.newInstance(service)
// worker factory that can be used to create workers for specific task queues
val factory = WorkerFactory.newInstance(client)
val worker = factory.newWorker(TASK_QUEUE)
val account = new AccountImpl()
worker.registerActivitiesImplementations(account)
// Start all workers created by this factory.
factory.start()
System.out.println("Activity Worker started for task queue: " + TASK_QUEUE)
}
}
And the same changes will be used for AccountTransferWorker.java
plus using scala’s classOf[]
in place of java’s
.class
:
object AccountTransferWorker {
def main(args: Array[String]): Unit = {
// Get worker to poll the common task queue.
// gRPC stubs wrapper that talks to the local docker instance of temporal service.
val service = WorkflowServiceStubs.newLocalServiceStubs
// client that can be used to start and signal workflows
val client = WorkflowClient.newInstance(service)
// worker factory that can be used to create workers for specific task queues
val factory = WorkerFactory.newInstance(client)
val workerForCommonTaskQueue = factory.newWorker(TASK_QUEUE)
workerForCommonTaskQueue.registerWorkflowImplementationTypes(classOf[AccountTransferWorkflowImpl])
// Start all workers created by this factory.
factory.start()
System.out.println("Worker started for task queue: " + TASK_QUEUE)
}
}
Now, looking at AccounTransferWorkflow.java
it seems really similar to what we did for Account.java
with replacing
interface
for trait
and void
for Unit
:
@WorkflowInterface
trait AccountTransferWorkflow {
@WorkflowMethod
def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit
}
Almost done! Switching over to AccountTransferWorkflowImpl
we have a few changes that need to be made that we’ve
already seen: classOf[]
for .class
, extends
for implements
, and Unit
for void
class AccountTransferWorkflowImpl extends AccountTransferWorkflow {
private val options = ActivityOptions.newBuilder.setStartToCloseTimeout(Duration.ofSeconds(5)).build
private val account = Workflow.newActivityStub(classOf[Account], options)
override def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit = {
account.withdraw(fromAccountId, referenceId, amountCents)
account.deposit(toAccountId, referenceId, amountCents)
}
}
Which leaves us with TransferRequester.java
. This one is a little more involved than the others as we can replace
the if statements with pattern matching (whether we should is debatable, ultimately not necessary for things to work),
but what do we do with the ::
in WorkflowClient.start(transferWorkflow::transfer, from, to, reference, amountCents);
? Hmm, what happens if we exchange it for transferWorkflow.transfer
?
temporal/src/main/scala/TransferRequester.scala:26:43
missing argument list for method transfer in trait AccountTransferWorkflow
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `transfer _` or `transfer(_,_,_,_)` instead of `transfer`.
WorkflowClient.start(transferWorkflow.transfer, from, to, reference, amountCents)
Well, so the compiler obviously doesn’t like that. With just transferWorklow.transfer
the compiler looks like it is
having trouble matching up the passed function with the arguments. Let’s try listening to the error
and using transfer _
like so transferWorkflow.transfer _,
:
temporal/src/main/scala/TransferRequester.scala:26:20
ambiguous reference to overloaded definition,
both method start in class WorkflowClient of type [A1, A2, A3, A4, R](x$1: io.temporal.workflow.Functions.Func4[A1,A2,A3,A4,R], x$2: A1, x$3: A2, x$4: A3, x$5: A4): io.temporal.api.common.v1.WorkflowExecution
and method start in class WorkflowClient of type [A1, A2, A3, A4](x$1: io.temporal.workflow.Functions.Proc4[A1,A2,A3,A4], x$2: A1, x$3: A2, x$4: A3, x$5: A4): io.temporal.api.common.v1.WorkflowExecution
match argument types ((String, String, String, Int) => Unit,String,String,String,Int)
WorkflowClient.start(transferWorkflow.transfer _, from, to, reference, amountCents)
Nice, now we’re getting somewhere! The compiler can’t seem to figure out whether transferWorkflow.transfer
is
a Procedure
(Proc4
) or a Function
(Func4
)…let’s dig into those types real quick to see if we can pick one and
help the compiler out:
@FunctionalInterface
public interface Func4<T1, T2, T3, T4, R>
extends TemporalFunctionalInterfaceMarker, Serializable {
R apply(T1 t1, T2 t2, T3 t3, T4 t4);
}
@FunctionalInterface
public interface Proc4<T1, T2, T3, T4> extends TemporalFunctionalInterfaceMarker, Serializable {
void apply(T1 t1, T2 t2, T3 t3, T4 t4);
}
So the biggest difference here is the R
in Func4
which seems to refer to a return type. This seems to follow that a
function returns a value whereas a procedure does not. So, does transferWorkflow.transfer
return anything?
Let’s check:
// AccountTransferWorkflow.scala
@WorkflowInterface
trait AccountTransferWorkflow {
@WorkflowMethod
def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit
}
// AccountTransferWorkflowImpl.scala
class AccountTransferWorkflowImpl extends AccountTransferWorkflow {
// ...
override def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit = {
account.withdraw(fromAccountId, referenceId, amountCents)
account.deposit(toAccountId, referenceId, amountCents)
}
}
Nope! No return value, just Unit
. So let’s try explicitly typing the problematic call as a Proc4
:
WorkflowClient.start(transferWorkflow.transfer: Proc4[String, String, String, Int], from, to, reference, amountCents)
Looks okay so far. Let’s build it and see if everything is OK:
...
Module 'moneytransfer' was fully rebuilt due to project configuration/dependencies changes
Executing post-compile tasks…
Loading Ant configuration...
Running Ant tasks...
Synchronizing output directories…
Build completed successfully in 2 sec, 642 ms
No issues! The final file looks like the following:
object TransferRequester {
def main(args: Array[String]): Unit = {
val (reference: String, amountCents: Int) = args match {
case Array(refId, amtCents: String) => (refId, parseInt(amtCents))
case _ => (UUID.randomUUID().toString, new Random().nextInt(5000))
}
val service = WorkflowServiceStubs.newLocalServiceStubs
// client that can be used to start and signal workflows
val workflowClient = WorkflowClient.newInstance(service)
// now we can start running instances of the saga - its state will be persisted
val options = WorkflowOptions.newBuilder.setTaskQueue(TASK_QUEUE).build
val transferWorkflow = workflowClient.newWorkflowStub(classOf[AccountTransferWorkflow], options)
val from = "account1"
val to = "account2"
WorkflowClient.start(transferWorkflow.transfer: Proc4[String, String, String, Int], from, to, reference, amountCents)
System.out.printf("Transfer of %d cents from %s to %s requested", amountCents, from, to)
}
}
Now, let’s get the local temporal instance spun up and see if we can get things working!
If you haven’t checked out temporal’s docs yet, please do! The local setup we’ll be using can be found here
Once started you should see something like the following:
> docker-compose up
Starting temporal-elasticsearch ... done
Starting temporal-postgresql ... done
Starting temporal ... done
Starting temporal-web ... done
Starting temporal-admin-tools ... done
Starting temporal-ui ... done
Attaching to temporal-postgresql, temporal-elasticsearch, temporal, temporal-web, temporal-ui, temporal-admin-tools
....
And we can check on everything by going to the frontend at localhost:8080
which should look like this:
Now we can start up the money transfer code and see how everything runs. Following the sample docs
we should start the AccountTransferWorker
then the AccountActivityWorker
then the TransferRequester
:
// AccountTransferWorker
Worker started for task queue: AccountTransfer
// AccountActivityWorker
Activity Worker started for task queue: AccountTransfer
// TransferRequester
Transfer of 3135 cents from account1 to account2 requested
Process finished with exit code 0
Success!!
We can check in the local frontend to see how it looks as well:
And if we drill down into the completed workflow, we’ll get a really nice summary of everything that happened: Overall, not too much work at all to get everything working. And the visibility is really nice!
For a follow-up exercise, I’d recommend uncommenting the // throw new RuntimeException("simulated");
in
AccountImpl.scala
to see how seamlessly temporal handles failures and retries right out of the box!
Thank you to the temporal team for all of their docs and examples!