Note: the code for this post can be found here

I recently stumbled across temporal and thought it sounded pretty cool. After checking out their repositories I decided to see if I could get an example working in scala since a java sdk is offered. I chose the Money Transfer example since that seemed the most interesting after a quick glance.

First, let’s translate over Account.java since it’s just an interface with two methods. Since a scala trait is pretty similar to a java interface we can swap out a few words and end up with the following:

import io.temporal.activity.ActivityInterface

@ActivityInterface
trait Account {
  def deposit(accountId: String, referenceId: String, amountCents: Int): Unit

  def withdraw(accountId: String, referenceId: String, amountCents: Int): Unit
}

Simple enough. Let’s take a look at the implementation in AccountImpl.java. At first glance there isn’t anything too difficult. The only things we need to swap out are extends for implements and then Unit for void return type like so

class AccountImpl extends Account {
  override def deposit(accountId: String, referenceId: String, amountCents: Int): Unit = {
    System.out.printf(
      "Withdraw to %s of %d cents requested. ReferenceId=%s\n",
      accountId, amountCents, referenceId)
  }

  override def withdraw(accountId: String, referenceId: String, amountCents: Int): Unit = {
    System.out.printf(
      "Deposit to %s of %d cents requested. ReferenceId=%s\n",
      accountId, amountCents, referenceId);
    //    throw new RuntimeException("simulated");
  }
}

And that should be it! Two done, five to go. Next let’s try and get the workers set up. For AccountActivityWorker.java the main change will be swapping class for a scala object since we want this to be code that we want to run (I’m using IntelliJ for this so object + def main(args: Array[String] is the “magic sauce”))

object AccountActivityWorker {
  final val TASK_QUEUE: String = "AccountTransfer";

  //  @SuppressWarnings("CatchAndPrintStackTrace")
  def main(args: Array[String]): Unit = {
    // gRPC stubs wrapper that talks to the local docker instance of temporal service.
    val service = WorkflowServiceStubs.newLocalServiceStubs
    // client that can be used to start and signal workflows
    val client = WorkflowClient.newInstance(service)

    // worker factory that can be used to create workers for specific task queues
    val factory = WorkerFactory.newInstance(client)
    val worker = factory.newWorker(TASK_QUEUE)
    val account = new AccountImpl()
    worker.registerActivitiesImplementations(account)

    // Start all workers created by this factory.
    factory.start()
    System.out.println("Activity Worker started for task queue: " + TASK_QUEUE)
  }
}

And the same changes will be used for AccountTransferWorker.java plus using scala’s classOf[] in place of java’s .class:

object AccountTransferWorker {

  def main(args: Array[String]): Unit = {
    // Get worker to poll the common task queue.
    // gRPC stubs wrapper that talks to the local docker instance of temporal service.
    val service = WorkflowServiceStubs.newLocalServiceStubs
    // client that can be used to start and signal workflows
    val client = WorkflowClient.newInstance(service)

    // worker factory that can be used to create workers for specific task queues
    val factory = WorkerFactory.newInstance(client)
    val workerForCommonTaskQueue = factory.newWorker(TASK_QUEUE)
    workerForCommonTaskQueue.registerWorkflowImplementationTypes(classOf[AccountTransferWorkflowImpl])
    // Start all workers created by this factory.
    factory.start()
    System.out.println("Worker started for task queue: " + TASK_QUEUE)
  }

}

Now, looking at AccounTransferWorkflow.java it seems really similar to what we did for Account.java with replacing interface for trait and void for Unit:

@WorkflowInterface
trait AccountTransferWorkflow {
  @WorkflowMethod
  def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit
}

Almost done! Switching over to AccountTransferWorkflowImpl we have a few changes that need to be made that we’ve already seen: classOf[] for .class, extends for implements, and Unit for void

class AccountTransferWorkflowImpl extends AccountTransferWorkflow {

  private val options = ActivityOptions.newBuilder.setStartToCloseTimeout(Duration.ofSeconds(5)).build
  private val account = Workflow.newActivityStub(classOf[Account], options)

  override def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit = {
    account.withdraw(fromAccountId, referenceId, amountCents)
    account.deposit(toAccountId, referenceId, amountCents)
  }
}

Which leaves us with TransferRequester.java. This one is a little more involved than the others as we can replace the if statements with pattern matching (whether we should is debatable, ultimately not necessary for things to work), but what do we do with the :: in WorkflowClient.start(transferWorkflow::transfer, from, to, reference, amountCents); ? Hmm, what happens if we exchange it for transferWorkflow.transfer?

temporal/src/main/scala/TransferRequester.scala:26:43
missing argument list for method transfer in trait AccountTransferWorkflow
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `transfer _` or `transfer(_,_,_,_)` instead of `transfer`.
    WorkflowClient.start(transferWorkflow.transfer, from, to, reference, amountCents)

Well, so the compiler obviously doesn’t like that. With just transferWorklow.transfer the compiler looks like it is having trouble matching up the passed function with the arguments. Let’s try listening to the error and using transfer _ like so transferWorkflow.transfer _,:

temporal/src/main/scala/TransferRequester.scala:26:20
ambiguous reference to overloaded definition,
both method start in class WorkflowClient of type [A1, A2, A3, A4, R](x$1: io.temporal.workflow.Functions.Func4[A1,A2,A3,A4,R], x$2: A1, x$3: A2, x$4: A3, x$5: A4): io.temporal.api.common.v1.WorkflowExecution
and  method start in class WorkflowClient of type [A1, A2, A3, A4](x$1: io.temporal.workflow.Functions.Proc4[A1,A2,A3,A4], x$2: A1, x$3: A2, x$4: A3, x$5: A4): io.temporal.api.common.v1.WorkflowExecution
match argument types ((String, String, String, Int) => Unit,String,String,String,Int)
    WorkflowClient.start(transferWorkflow.transfer _, from, to, reference, amountCents)

Nice, now we’re getting somewhere! The compiler can’t seem to figure out whether transferWorkflow.transfer is a Procedure (Proc4) or a Function (Func4)…let’s dig into those types real quick to see if we can pick one and help the compiler out:

@FunctionalInterface
public interface Func4<T1, T2, T3, T4, R>
        extends TemporalFunctionalInterfaceMarker, Serializable {
    R apply(T1 t1, T2 t2, T3 t3, T4 t4);
}

@FunctionalInterface
public interface Proc4<T1, T2, T3, T4> extends TemporalFunctionalInterfaceMarker, Serializable {
    void apply(T1 t1, T2 t2, T3 t3, T4 t4);
}

So the biggest difference here is the R in Func4 which seems to refer to a return type. This seems to follow that a function returns a value whereas a procedure does not. So, does transferWorkflow.transfer return anything? Let’s check:

// AccountTransferWorkflow.scala
@WorkflowInterface
trait AccountTransferWorkflow {
  @WorkflowMethod
  def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit
}

// AccountTransferWorkflowImpl.scala
class AccountTransferWorkflowImpl extends AccountTransferWorkflow {
  // ...
  override def transfer(fromAccountId: String, toAccountId: String, referenceId: String, amountCents: Int): Unit = {
    account.withdraw(fromAccountId, referenceId, amountCents)
    account.deposit(toAccountId, referenceId, amountCents)
  }
}

Nope! No return value, just Unit. So let’s try explicitly typing the problematic call as a Proc4:

WorkflowClient.start(transferWorkflow.transfer: Proc4[String, String, String, Int], from, to, reference, amountCents)

Looks okay so far. Let’s build it and see if everything is OK:

...
Module 'moneytransfer' was fully rebuilt due to project configuration/dependencies changes
Executing post-compile tasks…
Loading Ant configuration...
Running Ant tasks...
Synchronizing output directories…
Build completed successfully in 2 sec, 642 ms

No issues! The final file looks like the following:

object TransferRequester {
  def main(args: Array[String]): Unit = {
    val (reference: String, amountCents: Int) = args match {
      case Array(refId, amtCents: String) => (refId, parseInt(amtCents))
      case _ => (UUID.randomUUID().toString, new Random().nextInt(5000))
    }

    val service = WorkflowServiceStubs.newLocalServiceStubs
    // client that can be used to start and signal workflows
    val workflowClient = WorkflowClient.newInstance(service)

    // now we can start running instances of the saga - its state will be persisted
    val options = WorkflowOptions.newBuilder.setTaskQueue(TASK_QUEUE).build
    val transferWorkflow = workflowClient.newWorkflowStub(classOf[AccountTransferWorkflow], options)
    val from = "account1"
    val to = "account2"
    WorkflowClient.start(transferWorkflow.transfer: Proc4[String, String, String, Int], from, to, reference, amountCents)
    System.out.printf("Transfer of %d cents from %s to %s requested", amountCents, from, to)
  }
}

Now, let’s get the local temporal instance spun up and see if we can get things working!

If you haven’t checked out temporal’s docs yet, please do! The local setup we’ll be using can be found here

Once started you should see something like the following:

> docker-compose up
Starting temporal-elasticsearch ... done
Starting temporal-postgresql    ... done
Starting temporal               ... done
Starting temporal-web           ... done
Starting temporal-admin-tools   ... done
Starting temporal-ui            ... done
Attaching to temporal-postgresql, temporal-elasticsearch, temporal, temporal-web, temporal-ui, temporal-admin-tools
....

And we can check on everything by going to the frontend at localhost:8080 which should look like this: Temporal Frontend

Now we can start up the money transfer code and see how everything runs. Following the sample docs we should start the AccountTransferWorker then the AccountActivityWorker then the TransferRequester:

// AccountTransferWorker
Worker started for task queue: AccountTransfer

// AccountActivityWorker
Activity Worker started for task queue: AccountTransfer

// TransferRequester
Transfer of 3135 cents from account1 to account2 requested
Process finished with exit code 0

Success!!

We can check in the local frontend to see how it looks as well: Temporal Frontend Successful Workflow

And if we drill down into the completed workflow, we’ll get a really nice summary of everything that happened: Temporal Fronted Successful Workflow Detail Overall, not too much work at all to get everything working. And the visibility is really nice!

For a follow-up exercise, I’d recommend uncommenting the // throw new RuntimeException("simulated"); in AccountImpl.scala to see how seamlessly temporal handles failures and retries right out of the box!

Thank you to the temporal team for all of their docs and examples!