Corda RPC Reconnecting Client

October 22, 2019

In distributed computing, a remote procedure call (RPC) is when a computer program causes a procedure (subroutine) to execute in a different address space (commonly on another computer on a shared network), which is coded as if it were a normal (local) procedure call, without the programmer explicitly coding the details for the remote interaction.

In all use cases that you would build on the Corda platform, you would definitely want to interact with the Corda node from external application. This application could be an existing legacy application or a new application that you would build to provide a refreshed user experience.

This article will show you how to make use of a client proxy to make RPCs to the Corda node. The RPC client can be written in any JVM compatible language using the CordaRPCClient class. Once connected you can query the vault, trigger flows, receive results and subscribe to observables for live updates. In this post we will write a simple RPC client which can be used by the client application to interact with the server node. We will also look at a reconnecting RPC client which takes care of reconnecting to the node automatically in case of node or connection failure.

How RPC works under the hood

In Corda we use Apache Artemis for RPC communication. We are using proton-j (Apache Qpid) as the AMQP 1.0 implementation. Corda uses its own extended AMQP serialization framework. The Corda-RPC library must be included on the client side in order to connect to the server.

At startup Artemis will be created on the RPC client(client side) and RPC server (within the corda node), client and server queues are created, and sessions are enabled/established between client and server. The Corda-RPC library contains a client proxy, which translates RPC client calls to low-level Artemis messages and sends them to the server Artemis instance. These RPC requests are stored on the server side in Artemis queues. The server side consumer retrieves these messages, approprite RPC calls are made, and an acknowledgement is sent to the client. Once the method completes, a reply is sent back to the client. The reply is wrapped in an Artemis message and sent across by server Artemis to the client Artemis. The client then consumes the reply from client Artemis queue.

The client proxy within the the Corda-RPC library abstracts the above processes. From a client perspective you should only create the proxy instance and make the RPC calls.

Writing the RPC Client

The RPC server exposes certain RPC operations/methods on the Corda node side to the clients. The RPC client requests are stored in a queue by Artemis on the server side and then are executed on the server side.

These operations may include retrieving vault states based on specific criteria, listing all peers on the network, retrieving the network parameters, operations which let you start flows. You can also call methods which return observables, subscribe to them, retrieve live updates and perform a particular task once you are notified.

There are two modes in which the client application can use RPC client to connect to the node. One is the one time connection proxy and the other is the reconnecting proxy. The gracefulReconnect parameter can be used to toggle between these two modes as shown below.

Current RPC client one time connection mode

  1. In the current implementation the RPC client simply throws an exception if the connection to the node is lost or if the node fails. The RPC client doesn’t try to re-connect to the node and simply stops.
  2. If there are any observables to which the client has subscribed to, the RPC client will not emit any events anymore. It’s the client’s responsibility to re-subscribe to these observables again.
  3. The clients are expected to handle the reconnection logic to the node on the client side.
  4. It is the responsibility of the client to keep a track of the triggered flow, triggered flow completion status etc.

Reconnecting RPC client mode

  1. The client reconnects itself to the node once the node is up.
  2. The client automatically re-subscribes to the previously subscribed observables and the server will keep sending events onto them. Note: some events might be lost during reconnection.
  3. There are two possible ways in which we can look at an RPC call. An RPC call which can cause a side effect — startFlow command, and an RPC call which does not cause a side effect — networkMapSnapshot command.
  4. For requests which does not cause side effects, the client will automatically try to reconnect, will block and will execute and return once the connection is established.
  5. For requests which cause side effects like the startFlow command, the re-connecting client will throw a CouldNotStartFlowException. This is done to avoid duplicate invocation of flows.
  6. In such a case it is the user’s responsibility to determine if the flow needs to be retried and retry it if required. A snippet of how to do this is shown below.

Setup

Server Side Setup

Set RPC user, password, and permissions

Make sure you have set up the correct RPC user and assigned necessary permissions to the user in build.gradle or in node.conf.

rpcSettings {
    address("localhost:10005")
    adminAddress("localhost:10006")
}
rpcUsers = [[user: "user1", "password": "test", "permissions": ["ALL"]]]

Client Side Setup

Include Corda-RPC dependency

Include the corda-rpc as a gradle dependency in your client application:

compile "net.corda:corda-rpc:$corda_release_version"

Once you include this library, you can get hold of the proxy and make RPC calls.

Reconnecting RPC client will be released as a part of Corda OS 4.3.

By default the RPC protocol is versioned using node’s platform version and the RPC client library requires the node server to be on the same or on higher version. You can override this behaviour by setting minimumServerProtocolVersion parameter in CordaRPCClientConfiguration and passing it to the Corda RPC client at the time of startup.

Steps to make RPC calls

Step 1: Create CordaRPCClient by specifying node, host, and port

CordaRPCClient client = new CordaRPCClient(networkHostAndPort);

Grab an instance of CordaRPCClient by specifying the node address and port.

As mentioned above you can override the minimumServerProtocolVersion parameter in CordaRPCClientConfiguration and pass it to the CordaRPCClient. This saves you from node upgrade, and corda-rpc version 4.3 can interact with nodes of lower version — 4.0/4.1.

CordaRPCClientConfiguration configuration = new CordaRPCClientConfiguration(Duration.ofMinutes(3), 4, java.lang.Boolean.getBoolean("net.corda.client.rpc.trackRpcCallSites"), Duration.ofSeconds(1), 4, 1, Duration.ofSeconds(5), new Double(1), 10, 10485760, Duration.ofDays(1));
CordaRPCClient client = new CordaRPCClient(networkHostAndPort, configuration);

Step 2: Call start on the client

a. Setting one time connection proxy

CordaRPCConnection rpcConnection = client.start(rpcUsername, rpcPassword);

Calling start on the client establishes an active connection between client and server Artemis.

Secure SSL connection can be established with the server if specified by the client. This can be configured by specifying the truststore path containing the RPC SSL certificate in the CordaRPCClient while creating CordaRPCClient instance. Read more here.

b. Setting reconnecting proxy

//pass true to gracefulReconnect instance to use a re-connecting //proxy. GracefulReconnect provides connection and disconnection //callbacks
GracefulReconnect gracefulReconnect = new GracefulReconnect(
() -> logger.info("on disconnect"), 
() -> logger.info("on reconnect"));
CordaRPCConnection rpcConnection = client.start(rpcUsername, rpcPassword, gracefulReconnect);

Passing gracefulReconnect parameter will return a reconnecting proxy instead of a one time connection proxy.

Step 3: Retrieve the proxy

a. One time connection proxy

CordaRPCOps proxy = rpcConnection.getProxy();

CordaRPCConnection encapsulates a client proxy which converts RPC client specific API calls to low level Artemis messages. CordaRPCOps exposes the RPC operations available to the clients.

b. Reconnecting proxy

CordaRPCOps proxy = rpcConnection.getProxy();

Call the same method to retrieve a reconnecting instance. ReconnectingCordaRPCOps is a wrapper around CordaRPCOps which handles exceptions when the node or connection to the node fails. Note from users perspective there is no API change and depending on the gracefulReconnect parameter either a re-connecting or one time proxy is returned.

Step 4: Use the client proxy to make RPC calls

List<NodeInfo> nodes = proxy.networkMapSnapshot();

Once you have the handle to the proxy you can call any exposed RPC methods on the server side. I am calling the networkMapSnapshot which returns a list of all the parties which are a part of the network.

Step 5: Return Observable

//hit the node to get snapshot and observable for IOUState
DataFeed<Vault.Page<IOUState>, Vault.Update<IOUState>> dataFeed = proxy.vaultTrack(IOUState.class);
//this gives a snapshot of IOUState as of now. so if there are 11 IOUState as of now, this will return 11 IOUState objects
Vault.Page<IOUState> snapshot = dataFeed.getSnapshot();
//this returns an observable on IOUState
Observable<Vault.Update<IOUState>> updates = dataFeed.getUpdates();
// call a method for each IOUState
snapshot.getStates().forEach(
]::actionToPerform);
//perform certain action for each update to IOUState
updates.toBlocking().subscribe(update -> update.getProduced().forEach(JavaClientRpc::actionToPerform));
/**
 * Perform certain action because of any update to Observable IOUState
 * @param state
 */
private static void actionToPerform(StateAndRef<IOUState> state) {
    logger.info("{}", state.getState().getData());
}

When a method is called on the server, it is serialised and sent to the server. The server sends back a response. This may contain an observable as seen in the above piece of code. The observable is essentially serialised as a unique integer and sent to the client, which the client subscribes to.

As discussed above, RPC commands which cause side-effects are not retried and throw CouldNotStartFlowException. Below snippet can be used to handle this exception, determine if the flow was triggered, and invoke the flow if it was not triggered. Clients are expected to handle this in their client application code.

void runFlowWithRetries(CordaRPCOps client) {
    try {
        client.startFlowDynamic(...);
    } catch (CouldNotStartFlowException exception) {
        if (!wasFlowTriggered()) {
            runFlowWithRetries(client);
        }
    }
}

Note that the snippet runFlowWithRetries() can still be subject to race conditions. So, it does not provide exactly-once guarantees, it’s a best-effort approach.

Please note: During reconnection/failover some events might be lost. As of now client will have to do some reconciliation internally at their side by scanning the vault and inferring based on what they see. A sample snippet below shows how the client can retrieve the missed events during specific time intervals. Connection and disconnection time can be retrieved from the callback hooks given by GracefulReconnect class. An offset can be added/subtracted from the time interval while retrieving the missed states. This would decrease the probability of missing events, but at the same time this makes the reconciliation less efficient. De-duplication of processed states needs to be handled as well.

// this will contain all the processed states till now
static List<StateRef> processedStates = ...
// add missed events states to this list
static List<StateRef> lostStates = new ArrayList(); 

// reconcile the missed events by querying the vault from //disconnectionTime - offset to reconnectionTime + offset
private static void reconcile(Instant disconnectionTime, Instant reconnectionTime, CordaRPCOps proxy, TemporalUnit temporalUnit , long amount) {
//time condition between disconnection and connection time
//offset is added/subtracted to the from
QueryCriteria.TimeCondition timeCondition = new QueryCriteria.TimeCondition(QueryCriteria.TimeInstantType.RECORDED, new ColumnPredicate.Between(disconnectionTime.minus(amount, temporalUnit), reconnectionTime.plus(amount, temporalUnit)));
//create the query criteria    
QueryCriteria.VaultQueryCriteria vaultQueryCriteria = new QueryCriteria.VaultQueryCriteria();
vaultQueryCriteria.withTimeCondition(timeCondition);
//hit the vault to get missed events    
proxy.vaultQueryByCriteria(vaultQueryCriteria, IOUState.class).getStates().stream().forEach(JavaClientRpc::collectStates);
}
//perform deduplication of processed states before collecting missed //states
private static void collectStates(StateAndRef<IOUState> stateAndRef) {
    StateRef stateRef = stateAndRef.getRef();
    if(!processedStates.contains(stateRef))
    lostStates.add(stateRef);
}

Note:

  1. Some observable events may be lost during failover or reconnection.
  2. The ReconnectingCordaRPCOps will be released as a part of Corda OS version 4.3.
  3. When the client doesn’t wish to get any further updates, the client should unsubscribe in order to save server resources.
  4. In the case of a failure on the server side, the client will not be able to connect to the server. Nodes can be configured in HA mode to provide redundancy in the case of server-side failures.
  5. You can also have a web server expose endpoints which can be hit by any web application over HTTP. You can then inject the RPC proxy within these endpoints.
  6. Proxies are thread safe and can be used to call multiple operations at the same time.

Complete code here

You can read more about Corda Re-connecting RPC Client here

Thanks to Shams,Dimos,Ryan,Tudor and The Corda Team

Thanks for reading — Sneha Damle, Developer Evangelist (R3).


Corda RPC Reconnecting Client was originally published in Corda on Medium, where people are continuing the conversation by highlighting and responding to this story.

Share: