Asynchronous Flow invocations with Corda Services
How can I make my Flows faster? There’s a good chance you have thought about this before if you have been working with Corda for a while. You can make reasonable tweaks to eke out performance improvements by changing a few things: transaction size, optimising queries and reducing the number of network hops required throughout the Flow’s execution. There is one other possibility that probably also crossed your mind at some point. Multi-threading.
More specifically, asynchronously starting Flows/Sub Flows from an already executing Flow. Doing so has the potential to greatly improve your CorDapps performance.
If you tried this, you probably faced a similar exception to the one I got. Furthermore, as of now, Corda does not support threading of Sub Flows. But, it can still be done. We just need to be clever about it. That’s where multi-threading within Corda Services comes in. They can be called within Flows but are not prisoners to the strict rules that Flows put on them since an executing Flow will not suspend or checkpoint from within a service.
In this post, I will focus on multi-threading the starting of Flows from within a Service. There are other area’s that threading can be used within Corda, but this is an interesting area to that I want to look into deeper. On the other hand, starting Flows from a Service is also filled with a few gotchas. These need to be accounted for and traversed around. Otherwise, you are going to wake up one day and wonder why everything has stopped for no apparent reason.
Luckily for you, I am here to help. For me, well, I had to face this problem head-on.
Luckily for me, R3 was able to help.
For reference, I will be using Corda Enterprise
3.1 for this post. To actually gain any benefit from the contents of this post you will need to be using Enterprise. This is due to Enterprise supporting multiple Flows executing asynchronously. Open Source does not currently allow this.
I also recommend looking at my previous post Corda Services 101 as we will be building off the foundation laid there.
Let’s start with outlining the scenario that we will be using for this post.
- PartyA sends PartyB some messages over time. Each message comes from a single Flow.
- PartyB responds to all messages sent to them. Each message comes from a single Flow, but they want a single place to execute the process.
A series of Flows can be quickly put together to satisfy this requirement. Doing this sequentially should prove absolutely zero problems (after we have fixed all the stupid mistakes we all make).
Although this scenario is a poor case for needing performance, it is a simple one to understand so we can focus on running this asynchronously.
The slow synchronous solution
Before we look at the asynchronous solution, it will be beneficial to have a quick look at the code we will be moving from. Below is the code from
ReplyToMessagesFlow. I don’t want to go through all of the underlying code and instead only want to focus on the code relevant to this post:
If you did have a read through Corda Services 101 then you might have recognised this class. As I mentioned earlier, putting together a solution for the proposed problem is very easy. Retrieve the
MessageStates from the Vault and start a
subFlow to reply to them.
This code will happily chug along through the messages one by one.
So, can we take this code and make it faster?
A failed attempt at asynchronicity
Let’s try and make the current code faster by introducing threading! We will use
CompletableFutures to do this:
Most of the code is the same as before and has therefore been excluded from the example.
The only change to the code is the addition of
CompletableFuture and its
supplyAsync method (comes from Java). It attempts to start executing the
reply function for each message on a separate thread.
So why is this section named “A failed attempt”? I refer you to the stack trace you get when executing the above code:
java.util.concurrent.CompletionException: java. lang.IllegalArgumentException: Required value was null. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172] Caused by: java.lang.IllegalArgumentException: Required value was null. at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?] at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?] at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?] at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?] at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?] at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?] at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?] at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]
You are going to get that, along with a lengthy list of checkpointing log lines that Corda is printing out. Furthermore, just to cover my ass and prove to you that this is not due to a problem with
CompletableFutures, here is a different error you get when using an
Exception in thread "pool-29-thread-1" Exception in thread "pool-29-thread-2" java.lang.IllegalArgumentException: Required value was null. at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48) at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalArgumentException: Required value was null. at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48) at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Hopefully you believe me at this point. If not, refer back to what I said at the start. Corda does not currently support starting new Flows asynchronously from an executing Flow. It is something that I believe they are working towards. But, as of now. Don’t use this solution.
A working asynchronous solution
We have seen that threading inside a Flow is a no go. To carry on with our quest for performance, we will now look at threading from a Corda Service. This should come as no surprise since the title and opening paragraphs already discussed this…
Sarcastic comments aside. Delegating to a Service will require a bit of rework from the original solution but the bulk of the code will remain the same. Most of it will just be copied and pasted to another class. Taking the code from the Flow and putting it into a Service.
Below is the new
MessageService that contains the code from the original
ReplyToMessagesFlow, but with a few changes and addition of threading code:
As you can see most of the code is the same as it was in the
The first point I want to highlight is the use of an
Executor thread pool. I have not used
CompletableFutures here for reasons that we will look at later.
So how does this all work? The
replyAll function executes
reply on a new system thread for each message retrieved from the Vault. This new thread, in turn, calls
startFlow. Triggering a new Flow to be put onto the Flow Worker queue. This is where all the fun happens and everything starts to get messy.
The Flow Worker queue looks after the order that Flows execute in and will fill and empty as Flows are added and completed. This queue is crucial in coordinating the execution of Flows within a node. It is also the source of pain when it comes to multi-threading Flows ourselves.
Below is a diagram demonstrating a simplistic view of the queue:
Flows enter the queue and leave once processed
Why am I talking about this queue? Well, we need to be extra careful not to fill the queue up with Flows that cannot complete.
How can that happen? By starting a Flow within an executing Flow who then awaits its finish. This won’t cause a problem until all the threads in the queue’s thread pool encounter this situation. Once it does happen, it leaves the queue in deadlock. No Flows can finish, as they all rely on a number of queued Flows to complete.
Flows stay on the queue waiting for Flows they invoked to finish
This is most likely to happen on a high throughput system that is triggering the same flow many times. The chance the queue is full of Flows waiting for other Flows to finish now shoots up.
This isn’t great and makes things a tad harder. But, as long as we are aware of this, we can accommodate for it.
This is also the reason for the
Executor thread pool, rather than
CompletableFutures. By starting new Flows and not waiting for their completion, deadlock can be avoided. This is also the downside of this solution. Without the results of the new Flows, its capabilities are extremely limited.
All that being said, if your use-case fits the sort of structure shown above then I definitely recommend using this solution.
In the next section, I will discuss using
A dangerous solution with CompletableFutures
There is one simple reason why this is dangerous. Deadlock. I recommend keeping clear of this solution. Unless your node has access to enough threads, to decrease the chance of filling the queue with threads that can’t finish. On the other hand, it is a much more ideal solution since you can await the results of the started Flows and do something with them. This makes for a much more useful solution.
Below is what the
MessageService would look like with
The code is completely the same except for the
replyAll function. The
toCompletableFuture function that the returned
CordaFuture provides, calls
join to wait for the result of all futures and returns the overall result.
As I mentioned before, this solution could lead to deadlock. But, for your scenario, maybe it doesn’t. It is up to you to determine how likely it is to happen. If the odds are against you, it’s probably best to walk away. Either choosing to stick with a synchronous or an asynchronous solution similar to what I detailed in the previous section.
Do I really need to do this?
For now, yes, I believe you do.
Moving forward, I doubt that you will need to rely on the solution I proposed in this post.
I believe that Corda is working towards removing the need to even think about threading when starting Flows from within Flows. Instead, allowing you to simply call
subFlow with an option to run it asynchronously. This would have allowed us to keep the original synchronous solution but with an option to make each
subFlow run on a separate thread.
Joining the sections together
In conclusion, in Corda Enterprise 3, it is possible to initiate new Flows asynchronously within an executing Flow. This can provide good performance benefits depending on your use-case. There are downsides though. You cannot await the results of the asynchronous Flows without endangering your node with the threat of deadlock. The node’s underlying queue cannot deal with the situation it is being put in. Therefore, you need to be careful about how you go about introducing threads to your Flow invocations. Thankfully, as Corda progresses, it’s likely you won’t even need to worry about doing this yourself. It might even be as simple as adding a boolean function argument. That is the dream!
The code used in this post can be found on my GitHub.
If you found this post helpful, you can follow me on Twitter at @LankyDanDev to keep up with my new posts.
I am a Computer Science graduate from the University of Leicester and currently working at Accenture as a Java Developer. Opinions and views found in my posts are my own and do not represent Accenture’s views on any subject. View all posts by Dan Newton