In this first deep dive of the series we will cover the biggest change we are introducing in Corda 5: a new system architecture.
The new Corda 5 architecture and deployment methods are designed to deliver high availability and scalability. We placed these two crucial aspects of any high-dependency system at the centre of our development mindset. After a process of careful planning and re-architecting we excited to share the direction we are taking for the new Corda 5 node architecture — designed to maximise reliability, availability and scalability.
New to Corda? Have a look through our official Corda Documentation to know more about Corda’s capabilities and key concepts. We do assume in this post you know a bit about the Corda platform
As mentioned in the first post of the series — in defining Corda 5 we set out to find an architecture that would allow us to reach the high availability demands of critical services as well as a cost-effective way to scale (horizontally) to deal with ‘burst-y’ and high volume throughputs. Our ambition was to build a scalable, highly available distributed system that could scale to power an entire country’s money supply or the biggest international payment networks in the world.
In this blog post we will go through an overview of the new architecture, a review of the new services and finally have a look at the packaging and deployment of a Corda 5 worker cluster.
Overview: A new architecture for Corda
Corda 5 features a fully redundant, worker-based architecture to be applied to all critical services that are required to run a node. We use a Kafka cluster as the message broker to facilitate communication between node services.
Corda 5 architecture is made up of multiple worker clusters. It provides horizontal scaling for tasks processing distributing workload evenly among workers.
The Kafka event streaming platform handles work distribution among workers. Each worker processes some proportion of a load, which is dynamically distributed. This means we meet the hot-hot processing requirement and allow us to use cloud technologies for resizing on demand. And in periods of idle time, you can easily scale back on workers.
So, what are the ‘worker nodes’ that make up the Corda 5 clusters?
A worker is a Java Virtual Machine (JVM) instance containing a group of processing services. Services run independently and perform well-defined functions, such as running user code, querying a database, or managing configuration. Workers communicate with each other using a Kafka cluster and use compacted topics (logs) as the mechanism to store and retrieve most recent states.
The services are deployed in separate active-active clusters, and it is this hot-hot configuration that will also allow for the new architecture to scale easily and adapt to a variety of workflows.
Redundancy is also built by design into the architecture. To the uninitiated, building ‘redundancy’ into a product must seem an unusual design choice. But redundancy just means ensuring each component of a system is replaceable and that it can be done, at a moment’s notice, without any interruption to service. It is a cornerstone of high availability architecture, a failsafe for all the unpredictable events that could shut down a worker. When time-sensitive or critical transactions are being processed there is often no room for noticeable downtime. Redundancy means that transaction processes can continue uninterrupted even when some workers become unresponsive.
Workers send heartbeats to confirm they are active. If Kafka cannot detect a worker’s heartbeat, it automatically redirects the workload to another worker, therefore avoiding noticeable downtime. If a service within a worker fails, other services within that worker are unaffected.
With multiple redundant instances of a service running in separate workers, the system is completely horizontally scalable — more workers can be added to process messages in parallel when demand increases.
Corda 5 services
One of the core design principles of Corda 5 is the grouping of services into separate workers according to their function and usage. This gives system operators greater control on how to meet their specific service level agreements. Taking a modular approach to the integration of client services allows for a highly configurable system that can be tailored to fit the system requirements of consumers.
Included below is a short description for each service.
The flow engine contains the flow state machine which runs user flow code and processes events on the flow event and checkpoint topics. For each flow event, the flow engine uses a compacted topic to store the state, reads the corresponding flow checkpoint from the checkpoint topic, and restarts the flow state machine from that point. It then executes user code until it reaches the next suspension point. This generates a new flow checkpoint (which is appended to the checkpoint topic) and other events (which are appended to the relevant topics to be processed by other services). Each flow engine processes the subset of partitions assigned to it by Kafka. In the case of failure, the partitions will be passed to the remaining flow engines to process.
Flow session manager
When a request to start a flow is sent through RPC or a peer node, the Flow session manager checks the requests and deduplicates them when needed.
It then assigns a unique ID to each flow that is executed. This is required to allow audit trail of historical activities.
The Flow session manager also maintains the session state when invoking flow sessions between peers.
The database service handles database queries. For each event, it queries the database, performs any required processing, and returns the results to the caller. Push notifications inform listeners of changes to specific tables, for example vault state changes. Each instance of the database service processes one partition of the database queries topic.
The crypto service handles requests and queries involving public and private key material. This includes signing requests, generating new key pairs, and storing and retrieving certificates. For each event, the crypto service runs the query, performs any required processing, and returns the results to the caller. Each instance of the certification management service processes one partition of the crypto request topic.
HTTPS RPC service
The remote procedure call (RPC) service handles RPC requests from clients, maps flow IDs to client IDs when a new flow is started via RPC, and keeps the client informed on the progress of the request.
There is a HTTPS translation layer that converts HTTPS RPC requests for processing by the HTTPS RPC service.
P2P HTTPS gateway
When a flow needs to communicate with a peer, the P2P HTTPS gateway service manages the messages between the two nodes. The service’s reliable delivery component consumes messages generated by the flow engine and passes them to the P2P HTTPS gateway via the secure link manager.
The reliable delivery component on the receiving end sends an acknowledgement that it has received the initial message. A message is set to the flow session manager to map the session, and a flow event is then sent to the flow state machine. A deduplication component handles any messages that have been sent multiple times or that arrive from peers out of order.
Durability is not required between the secure link manager and gateway as the reliable delivery component is responsible for reliability.
The singleton service runs instances of code for which replication is required, but where only one worker should be active at any given time. It is used for services where having multiple active workers would result in unintended behaviour. For example, a service that starts a flow in response to a vault event could accidentally start several flows if there was more than one active worker. Note that this does not preclude the service itself being multi- threaded.
The leader election component is responsible for deciding which one worker is currently active. Others are placed in standby, ready to take over in the event of a failure.
Kafka requires messages to be serialised before they are added to a Kafka stream. Corda 5 uses Avro as the serialisation schema. Each service needs to be able to access the schema when serialising and deserialising objects. Initially, the schema service will be an in-process registry used by each of the processes.
In the future Avro schema evolution will be used to handle rolling upgrades of worker processes, so that older processes can be informed about newer message schemas and handle or ignore them appropriately.
For each identity running on the node, the membership group manager (MGM) cache maintains the set of identities that represent each membership group to which that node belongs. The MGM cache uses a compacted topic to store membership information. This allows a flow engine to process membership group flows and write updates to Kafka that can be picked up by other sharded workers. The MGM cache processes events on the network map topic.
The configuration service manages configuration requests for the system. Each request is validated before the service generates a response event to the caller indicating whether the requests succeeded or failed. Configuration states are appended to a compacted topic to be accessed by other parts of the system. Configuration is also persisted to the database.
It is likely that the monitoring service will use JMX metrics to monitor the health of the system. Each worker has its own monitoring service and metrics from each will be collated and reported to the system operator. Alternatively, monitoring infrastructure could pull the data directly from Kafka.
We have designed two distinct deployment models with different guaranteed levels of availability depending on the profile and requirements of an end user. For some the ‘simple’ high availability deployment will be most suitable, maximising a cost effective, efficient, reliable architecture. For others, the High SLA deployment will be more suitable, emphasising maximum performance and maximum reliability.
There will be a “all in process” option available for low cost or development scenarios alongside the HA production deployments.
Simple high availability deployment
This deployment method provides software segregation, is easier to deploy, and services are scaled horizontally as a whole. We imagine this deployment configuration will be the natural successor to Corda 4’s high availability and firewall setup–improving on its availability targets (with true hot/hot), retaining separation between services deployed in the trusted zone vs DMZ and retaining a simple deployment.
Services are deployed in two separate workers — operational services on one and the P2P HTTPS gateway on the other. The gateway service connects directly to the internet and must be separated to avoid exposing the operational services of the node. Workers are deployed on multiple redundant and parallel instances and can be horizontally scaled to reflect demand.
Each worker has its own monitoring service which provides updates on system errors and performance to the node’s administrators.
High SLA high availability deployment
This deployment method provides a higher degree of availability as processes are also segregated and gives sophisticated operators greater control over horizontal scalability. However, this results in a more complex deployment process.
Services are grouped by functional theme on four separate workers: HTTPS RPC service, flow engine, database and crypto service, and HTTPS P2P gateway. Each worker has its own monitoring service which provides updates on system errors and performance to the node’s administrators.
This deployment method makes it possible to optimise the number of parallel workers needed to further improve performance/cost ratio. More workers can be added for intensive processes, while numbers of workers for less intensive processes can be reduced.
It is possible to segregate services even further on dedicated workers according to customers need, at the cost of a more complex deployment process.
Security and incident resiliency
The services within each worker are logically segregated on distinct sandboxes to prevent malicious attacks within the system. For example, software segregation between the flow engine and the database service prevents attempts from a malicious user tunnelling out of the flow engine and gain access to the information held on the database.
If a worker suffers an unrecoverable error, then all services within that worker will fail. Kafka’s failover mechanism will redirect messages to parallel workers as described in the architecture section of this document.
With the ‘High SLA’ high availability deployment method services run on separate workers, so we can guarantee process segregation in addition to software segregation. Malicious users cannot gain access to services deployed on other workers as they exist on two separate JVM instances.
When a service running on a worker becomes unavailable or unresponsive, the solution will handle the issue by taking the following actions.
- Kafka will redirect workload to other workers. Unresponsive services will not send heartbeats (as described in the architecture section of this document). If Kafka cannot detect a worker’s heartbeat, it automatically redirects the workload to other workers.
- Preventing unresponsive services from subscribing to Kafka. When a service fails to perform start-up chores (such as loading a new configuration) or is unable to contact a related service, then that service will not begin to consume messages since it is unable to fully process them. A domino logic inside the processes will only allow services to subscribe to Kafka when all required prerequisites are online.
- Proactive monitoring. The monitoring service will include a ‘watchdog’ facility. This will kill any service that is not stable.