Skip to content

Amazon Aurora: design considerations for high throughput cloud-native relational databases

March 25, 2019

Amazon Aurora: design considerations for high throughput cloud-native relational databases Verbitski et al., SIGMOD’17

Werner Vogels recently published a blog post describing Amazon Aurora as their fastest growing service ever. That post provides a high level overview of Aurora and then links to two SIGMOD papers for further details. Also of note is the recent announcement of Aurora serverless. So the plan for this week on The Morning Paper is to cover both of these Aurora papers and then look at Calvin, which underpins FaunaDB.

Say you’re AWS, and the task in hand is to take an existing relational database (MySQL) and retrofit it to work well in a cloud-native environment. Where do you start? What are the key design considerations and how can you accommodate them? These are the questions our first paper digs into. (Note that Aurora supports PostgreSQL as well these days).

Here’s the starting point:

In modern distributed cloud services, resilience and scalability are increasingly achieved by decoupling compute from storage and by replicating storage across multiple nodes. Doing so lets us handle operations such as replacing misbehaving or unreachable hosts, adding replicas, failing over from a writer to a replica, scaling the size of a database instance up or down, etc.

So we’re somehow going to take the backend of MySQL (InnoDB) and introduce a variant that sits on top of a distributed storage subsystem. Once we’ve done that, network I/O becomes the bottleneck, so we also need to rethink how chatty network communications are.

Then there are a few additional requirements for cloud databases:

  • SaaS vendors using cloud databases may have numerous customers of their own. Many of these vendors use a schema/database as the unit of tenancy (vs a single schema with tenancy defined on a per-row basis). “As a result, we see many customers with consolidated databases containing a large number of tables. Production instances of over 150,000 tables for small databases are quite common. This puts pressure on components that manage metadata like the dictionary cache.”
  • Customer traffic spikes can cause sudden demand, so the database must be able to handle many concurrent connections. “We have several customers that run at over 8000 connections per second.”
  • Frequent schema migrations for applications need to be supported (e.g. Rails DB migrations), so Aurora has an efficient online DDL implementation.
  • Updates to the database need to be made with zero downtime

The big picture for Aurora looks like this:

The database engine as a fork of “community” MySQL/InnoDB and diverges primarily in how InnoDB reads and writes data to disk.

There’s a new storage substrate (we’ll look at that next), which you can see in the bottom of the figure, isolated in its own storage VPC network. This is deployed on a cluster of EC2 VMs provisioned across at least 3 AZs in each region. The storage control plane uses Amazon DynamoDB for persistent storage of cluster and storage volume configuration, volume metadata, and S3 backup metadata. S3 itslef is used to store backups.

Amazon RDS is used for the control plane, including the RDS Host Manager (HM) for monitoring cluster health and determining when failover is required.

It’s nice to see Aurora built on many of the same foundational components that are available to us as end users of AWS too.

Durability at scale

The new durable, scalable storage layer is at the heart of Aurora.

If a database system does nothing else, it must satisfy the contract that data, once written, can be read. Not all systems do.

Storage nodes and disks can fail, and at large scale there’s a continuous low level background noise of node, disk, and network path failures. Quorum-based voting protocols can help with fault tolerance. With V copies of a replicated data item, a read must obtain V_r votes, and a write must obtain V_w votes. Each write must be aware of the most recent write, which can be achieved by configuring V_w > V/2. Reads must also be aware of the most recent write, which can be achieved by ensuring V_r + V_w > V. A common approach is to set V = 3 and V_r = V_w = 2.

We believe 2/3 quorums are inadequate [even when the three replicas are each in a different AZ]… in a large storage fleet, the background noise of failures implies that, at any given moment in time, some subset of disks or nodes may have failed and are being repaired. These failures may be spread independently across nodes in each of AZ A, B, and C. However, the failure of AZ C, due to a fire, roof failure, flood, etc., will break quorum for any of the replicas that concurrently have failures in AZ A or AZ B.

Aurora is designed to tolerate the loss of an entire AZ plus one additional node without losing data, and an entire AZ without losing the ability to write data. To achieve this data is replicated six ways across 3 AZs, with 2 copies in each AZ. Thus V = 6; V_w is set to 4, and V_r is set to 3.

Given this foundation, we want to ensure that the probability of double faults is low. Past a certain point, reducing MTTF is hard. But if we can reduce MTTR then we can narrow the ‘unlucky’ window in which an additional fault will trigger a double fault scenario. To reduce MTTR, the database volume is partitioned into small (10GB) fixed size segments. Each segment is replicated 6-ways, and the replica set is called a Protection Group (PG).

A storage volume is a concatenated set of PGs, physically implemented using a large fleet of storage nodes that are provisioned as virtual hosts with attached SSDs using Amazon EC2… Segments are now our unit of independent background noise failure and repair.

Since a 10GB segment can be repaired in 10 seconds on a 10Gbps network link, it takes two such failures in the same 10 second window, plus a failure of an entire AZ not containing either of those two independent failures to lose a quorum. “At our observed failure rates, that’s sufficiently unlikely…

This ability to tolerate failures leads to operational simplicity:

  • hotspot management can be addressed by marking one or more segments on a hot disk or node as bad, and the quorum will quickly be repaired by migrating it to some other (colder) node
  • OS and security patching can be handled like a brief unavailability event
  • Software upgrades to the storage fleet can be managed in a rolling fashion in the same way.

Combating write amplification

A six-way replicating storage subsystem is great for reliability, availability, and durability, but not so great for performance with MySQL as-is:

Unfortunately, this model results in untenable performance for a traditional database like MySQL that generates many different actual I/Os for each application write. The high I/O volume is amplified by replication.

With regular MySQL, there are lots of writes going on as shown in the figure below (see §3.1 in the paper for a description of all the individual parts).

Aurora takes a different approach:

In Aurora, the only writes that cross the network are redo log records. No pages are ever written from the database tier, not for background writes, not for checkpointing, and not for cache eviction. Instead, the log applicator is pushed to the storage tier where it can be used to generate database pages in background or on demand.

Using this approach, a benchmark with a 100GB data set showed that Aurora could complete 35x more transactions than a mirrored vanilla MySQL in a 30 minute test.

Using redo logs as the unit of replication means that crash recovery comes almost for free!

In Aurora, durable redo record application happens at the storage tier, continuously, asynchronously, and distributed across the fleet. Any read request for a data page may require some redo records to be applied if the page is not current. As a result, the process of crash recovery is spread across all normal foreground processing. Nothing is required at database startup.

Furthermore, whereas in a regular database more foreground requests also mean more background writes of pages and checkpointing, Aurora can reduce these activities under burst conditions. If a backlog does build up at the storage system then foreground activity can be throttled to prevent a long queue forming.

The complete IO picture looks like this:

Only steps 1 and 2 above are in the foreground path.

The distributed log

Each log record has an associated Log Sequence Number (LSN) – a monotonically increasing value generated by the database. Storage nodes gossip with other members of their protection group to fill holes in their logs. The storage service maintains a watermark called the VCL (Volume Complete LSN), which is the highest LSN for which it can guarantee availablity of all prior records. The database can also define consistency points through consistency point LSNs (CPLs). A consistency point is always less than the VCL, and defines a durable consistency checkpoint. The most recent consistency point is called the VDL (Volume Durable LSN). This is what we’ll roll back to on recovery.

The database and storage subsystem interact as follows:

  • Each database-level transaction is broken up into multiple mini-transactions (MTRs) that are ordered and must be performed atomically
  • Each mini-transaction is composed of multiple contiguous log records
  • The final log record in a mini-transaction is a CPL

When writing, there is a constraint that no LSN be issued which is more than a configurable limit— the LSN allocation limit— ahead of the current VDL. The limit is currently set to 10 million. It creates a natural form of back-pressure to throttle incoming writes if the storage or network cannot keep up.

Reads are served from pages in a buffer cache and only result in storage I/O requests on a cache miss. The database establishes a read point: the VDL at the time the request was issued. Any storage node that is complete with respect to the read point can be used to serve the request. Pages are reconstructed using the same log application code.

A single writer and up to 15 read replicas can all mount a single shared storage volume. As a result, read replicas add no additional costs in terms of consumed storage or disk write operations.

Aurora in action

The evaluation in section 6 of the paper demonstrates the following:

  • Aurora can scale linearly with instance sizes, and on the highest instance size can sustain 5x the writes per second of vanilla MySQL.
  • Throughput in Aurora significantly exceeds MySQL, even with larger data sizes and out-of-cache working sets:

  • Throughput in Aurora scales with the number of client connections:

  • The lag in an Aurora read replica is significantly lower than that of a MySQL replica, even with more intense workloads:

  • Aurora outperforms MySQL on workloads with hot row contention:

Customers migrating to Aurora see lower latency and practical elimination of replica lag (e.g, from 12 minutes to 20ms).

Slim: OS kernel support for a low-overhead container overlay network

March 22, 2019

Slim: OS kernel support for a low-overhead container overlay network Zhuo et al., NSDI’19

Container overlay networks rely on packet transformations, with each packet traversing the networking stack twice on its way from the sending container to the receiving container.

There are CPU, throughput, and latency overheads associated with those traversals.

In this paper, we ask whether we can design and implement a container overlay network, where packets go through the OS kernel’s network stack only once. This requires us to remove packet transformation from the overlay network’s data-plane. Instead, we implement network virtualization by manipulating connection-level metadata at connection setup time, saving CPU cycles and reducing packet latency.

Slim comes with some caveats: it requires a kernel module for secure deployment, has longer connection establishment times, doesn’t fit with packet-based network policies, and only handles TCP traffic. For UDP, ICMP, and for its own service discovery, it also relies on an existing container overlay network (Weave Net). But for longer lasting connections managed using connection-based network policies it delivers some impressive results:

  • memcached throughput up by 71%, with latency reduced by 42%, and CPU utilisation reduced by 56%.
  • Nginx CPU utilisation reduced by 22-24%
  • PostgreSQL CPU utilisation reduced by 22%
  • Apache Kafka CPU utilisation reduced by 10%

Since Slim both builds on and compares to Weave Net, I should say at this point that Weave Net was the very first open source project from Weaveworks, the “GitOps for Kubernetes” company. Accel is an investor in Weaveworks, and I am also a personal investor. If you’re using Kubernetes, you should definitely check them out. Anyway, on with the show…

Container networking

In theory there are four possible modes for container networking: a bridge mode for containers on the same host; host mode in which containers use the IP address of their host network interface; macvlan mode (or similar hardware mechanisms) to give each container its own IP address; and overlay mode in which each container is given its own own virtual network interface and each application has its own network namespace.

In practice, there are management and deployment challenges with the bridge, host, and macvlan approaches, so overlay networks such as Weave Net are the preferred solution.

Overlay packets are encapsulated with host network headers when routed on the host network. This lets the container overlay network have its own IP address space and network configuration that is disjoint from that of the host network; each can be managed completely independently. Many container overlay network solutions are available today— such as Weave, Flannel, and Docker Overlay— all of which share similar internal architectures.

Overlay network overheads

The overheads in overlay networking come from the per-packet processing inside the OS kernel: delivering a packet on the overlay network requires one extra traversal of the network stack and also packet encapsulation and decapsulation.

Here are some test measurements comparing Weave Net in fast dataplane mode to host networking to give an example:

In this test, compared to direct host networking, for two containers on the same host (Intra) the overlay network reduces throughput by 23% and increases latency by 34%. For containers communicating across hosts (Inter), throughput reduces by 48% and latency increases by 85%. The overheads are lower when communicating on the same host since packet encapsulation is not required.

Compared to host networking, the CPU utilisation also increases by 93%.

There are several known techniques to reduce the data plane overhead. Packet steering creates multiple queues, each per CPU core, for a network interface and uses consistent hashing to map packets to different queues. In this way, packets in the same network connection are processed only on a single core. Different cores therefore do not have access to the same queue, removing the overhead due to multi-core synchronization.

The authors integrated the above Receive Packet Steering (RPS), and also an enhancement called Receive Flow Steering (RFS— which further ensures that interrupt processing occurs on the same core as the application— into Weave Net. With this enhancement, throughput is within 9% of that achieved with host networking, but it makes almost no difference to latency.

Introducing Slim

The big idea in Slim is to reduce CPU utilisation and latency overheads by having packets traverse the network stack only once. That means you can’t do per-packet processing. Instead, Slim works at the connection level.

Slim virtualizes the network by manipulating connection-level metadata. SlimSocket exposes the POSIX socket interface to application binaries to intercept invocations of socket-related system calls. When SlimSocket detects an application is trying to set up a connection, it sends a request to SlimRouter. After SlimRouter sets up the network connection, it passes access to the connection as a file descriptor to the process inside the container. The application inside the container then uses the host namespace file descriptor to send/receive packets directly to/from the host network. Because SlimSocket has the exact same interface as the POSIX socket, and Slim dynamically links SlimSocket into the application, the application binary need not be modified.

Given that Slim is out of the picture once the connection is established, a separate mechanism is needed to support control plane and data plane policies. SlimRouter stores control plane policies and enforces them at connection setup time. If the policy changes, SlimRouter scans existing connections and removes the file descriptors for any connection violating the new policy. This requires the support of a kernel module, SlimKernModule. To avoid containers learning the IP addresses of host machines, SlimKernModule (in secure mode) also prohibits unsafe system calls on file descriptors (e.g. getpeername). Existing kernel mechanisms are used to enforce data plane policies.

This is what it looks like when Slim is used with blocking I/O:

Calls to the POSIX socket interface are intercepted by the SlimSocket shim and forward to the SlimRouter. For non-blocking I/O (e.g., select, epoll) Slim also intercepts these API calls and maintains mappings for epoll file descriptor sets. The SlimRouter needs to know the IP address and port mappings in order to establish connections. It does this using a Weave Net overlay network!

When the client calls connect, it actually creates an overlay network connection on the standard container overlay network. When the server receives an incoming connection on the standard overlay network, SlimSocket queries SlimRouter for the physical IP address and port and sends them to the client side inside the overlay connection. In secure mode (§4.3), the result queried from SlimRouter is encrypted. SlimSocket on the client side sends the physical IP address and port (encrypted if in secure mode) to its SlimRouter and the SlimRouter establishes the host connection. This means connection setup time is longer in Slim than that on container overlay networks based on packet transformation.

Weave Net is also used for packets that require data plane handling such as ICMP and UDP.


Microbenchmarks compare Slim to Weave Net with RFS. Creating on a TCP connection with Weave Net takes 270 µs. With Slim it takes 556µs (440µs in insecure mode). For applications with persistent connections, this additional overhead will not be significant. Compared to Weave Net, Slim reduces CPU overhead by 22-41%.

Slim and Weave Net are then further compared on four application workloads based on Memcached, Nginx, PostgreSQL, and Apache Kafka respectively.

For Memcached, Slim improves throughput by 71% and reduces latency by 42%, with 25% lower CPU utilisation.

For Nginx, PostgreSQL the main advantage of Slim is reduced CPU utilisation (around 22% reduction). For Kafka the CPU utilisation reduction is around 10%, but latency is also reduced by 28%.

Slim’s source code is available at

Understanding lifecycle management complexity of datacenter topologies

March 20, 2019

Understanding lifecycle management complexity of datacenter topologies Zhang et al., NSDI’19

There has been plenty of interesting research on network topologies for datacenters, with Clos-like tree topologies and Expander based graph topologies both shown to scale using widely deployed hardware. This research tends to focus on performance properties such as throughput and latency, together with resilience to failures. Important as these are, note that they’re also what’s right in front of you as a designer, and relatively easy to measure. The great thing about today’s paper is that the authors look beneath the surface to consider the less visible but still very important “lifecycle management” implications of topology design. In networking, this translates into how easy it is to physically deploy the network, and how easy it to subsequently expand. They find a way to quantify the associated lifecycle management costs, and then use this to help drive the design of a new class of topologies, called FatClique.

… we show that existing topology classes have low lifecycle management complexity by some measures, but not by others. Motivated by this, we design a new class of topologies, FatClique, that, while being performance-equivalent to existing topologies, is comparable to, or better than them by all our lifecycle management complexity metrics.

Now, there’s probably only a relatively small subset of The Morning Paper readers involved in designing and deploying datacenter network topologies. So my challenge to you as you read through this paper, is to think about where the hidden complexity and costs are in your own systems. Would you do things differently if these were made more visible? It would be great to see more emphasis for example on things like developer experience (DX) and operational simplicity – in my experience these kinds of attributes can have an outsize impact on the long-term success of a system.

Anyway, let’s get back to cables and switches…

Physically deploying network topologies

When it comes to laying out a network topology for real in a datacenter, you need to think about packaging, placement, and bundling. Packaging is how you group things together, e.g. the arrangement of switches in racks, and placement concerns how these racks are physically placed on the datacenter floor. Placement in turn determines the kinds of cabling you need, and for optical cables the power of the transceivers. Within a rack we might package several connected switches into a single chassis using a backplane. At the other end of the scale, blocks are larger units of co-placement and packaging that combine several racks. With all those connections, it makes things a lot easier to group together multiple fibres all connecting the same two endpoints (racks) into bundles, which contain a fixed number of identical length fibres.

Manufacturing bundles is simpler than manufacturing individual fibres, and handling such bundles significantly simplifies operational complexity.

Patch panels make bundling easier by providing a convenient aggregation point to create and route bundles. Bundles and fibres are physically routed through the datacenter on cable trays. The trays themselves have capacity constraints of course.

Here’s an example of a logical Clos topology and its physical instantiation:

The authors identify three key metrics that together capture much of the deployment complexity in a topology:

  1. The number of switches. More switches equals more packaging complexity.
  2. The number of patch panels, which is a function of topological structure and a good proxy for wiring complexity.
  3. The number of bundle types. This metric captures the other important part of wiring complexity – how many distinct bundle types are needed. A bundle type is represented by its capacity (how how many fibres) and its length.

These complexity measures are complete. The number of cable trays, the design of the chassis, and the number of racks can be derived from the number of switches (and the number of servers and the datacenter floor dimensions, which are inputs to the topology design). The number of cables and transceivers can be derived from the number of patch panels.

Here’s how Clos and Expander (Jellyfish) representative topologies for the same number of servers stack up against these metrics:

The expander graph topology shows much higher deployment complexity in terms of the number of bundle types. Clos also exposes far fewer ports outside of a rack (it has better port hiding).

Expanding existing networks

When you want to expand an existing network first you need to buy all the new gear and lay it out on the datacenter floor, and then you can begin a re-wiring process. This is all going on with live traffic flowing, so expansion is carried out in steps. During each step the capacity of the topology is guaranteed to be at least some percentage of the existing topology capacity. The percentage is sometimes known as the expansion SLO.

During a step existing links to be re-wired are drained, then human operators physical rewire links at patch panels. The new links are tested and then undrained (strange word!), i.e., brought into service.

For example, here’s a logical expansion (top row) and its physical realisation:

The most time-consuming part of all this is the physical rewiring. The two metrics that capture expansion complexity are therefore:

  1. The number of expansion steps, and
  2. The average number of rewired links in a patch panel rack.

Here’s how Clos and Expander stack up on those metrics for the same networks we saw earlier:

This time the victory goes to Expander (Jellyfish). Jellyfish has a much higher north-to-south capacity ratio. Northbound links exit a block, and southbound links are to/from servers within a block. “Fat edges” have more northbound than southbound links, and the extra capacity means you can accomplish more movement in each step.

Clos topologies re-wire more links in each patch panel during an expansion step and require many steps because they have a low north-south capacity ratio.

Enter the FatClique

Inspired by these insights, the authors define a new class of topologies called FatClique, which combine the hierarchical structure of Clos with the edge expansion capabilities of expander graphs.

There are three levels in the hierarchy. A clique of switches form a sub-block. Cliques of sub-blocks come together to form blocks. And cliques of blocks come together to from the full FatClique topology.

Four key design variables determine the particular instantiation of a FatClique topology: the number of ports in a switch that connect to other servers; the number of ports in a switch that connect to other sub-blocks in a block; the number of switches in a sub-block; and the number of sub-blocks in a block. A synthesis algorithm
takes a set of six input constraints (see §5.1) and determines the values for these four design variables.

There is plenty more detail in section 5 of the paper which I don’t have the space to do justice too here.

FatClique vs Clos vs Expander

The evaluation compares FatClique to Clos, Xpander, and Jellyfish at different network topology sizes, as shown in the table below.


Here’s how they stack up against the complexity metrics:

Number of switches

Number of patch panels

Number of bundle types

and associated cabling costs:

Number of expansion steps

Average number of rewired links

We find that FatClique is the best at most scales by all our complexity metrics. (The one exception is that at small and medium scales, Clos has slightly fewer patch panels). It uses 50% fewer switches and 33% fewer patch panels than Clos at large scale, and has a 23% lower cabling cost (an estimate we were able to derive from published cable prices). Finally, FatClique can permit fast expansion while degrading network capacity by small amounts (2.5-10%): at these levels, Clos can take 5x longer to expand the topology, and each step of Clos expansion can take longer than FatClique because the number of links to be rewired at each step per patch panel can be 30-50% higher.

The one thing I couldn’t find in the evaluation is any data to back up the opening claim that FatClique achieves all of this “while being performance-equivalent to existing topologies.”

The last word

As the management complexity of networks increases, the importance of designing for manageability will increase in the coming years. Our paper is only a first step in this direction…

Datacenter RPCs can be general and fast

March 18, 2019

Datacenter RPCs can be general and fast Kalia et al., NSDI’19

We’ve seen a lot of exciting work exploiting combinations of RDMA, FPGAs, and programmable network switches in the quest for high performance distributed systems. I’m as guilty as anyone for getting excited about all of that. The wonderful thing about today’s paper, for which Kalia et al. won a best paper award at NSDI this year, is that it shows in many cases we don’t actually need to take on that extra complexity. Or to put it another way, it seriously raises the bar for when we should.

eRPC (efficient RPC) is a new general-purpose remote procedure call (RPC) library that offers performance comparable to specialized systems, while running on commodity CPUs in traditional datacenter networks based on either lossy Ethernet or lossless fabrics… We port a production grade implementation of Raft state machine replication to eRPC without modifying the core Raft source code. We achieve 5.5 µs of replication latency on lossy Ethernet, which is faster than or comparable to specialized replication systems that use programmable switches, FPGAs, or RDMA.

eRPC just needs good old UDP. Lossy Ethernet is just fine (no need for fancy lossness networks), and it doesn’t need Priority Flow Control (PFC). The received wisdom is that you can either have general-purpose networking that works everywhere and is non-intrusive to applications but has capped performance, or you have to drop down to low-level interfaces and do a lot of your own heavy lifting to obtain really high performance.

The goal of our work is to answer the question: can a general-purpose RPC library provide performance comparable to specialized systems?

Astonishingly, Yes.

From the evaluation using two lossy Ethernet cluster (designed to mimic the setups used in Microsoft and Facebook datacenters):

  • 2.3µs median RPC latency
  • up to 10 million RPCs / second on a single core
  • large message transfer at up to 75Gbps on a single core
  • peak performance maintained even with 20,000 connections per node (2 million cluster wide)

eRPC’s median latency on CX5 is only 2.3µs, showing that latency with commodity Ethernet NICs and software networking is much lower than the widely-believed value of 10-100µs.

(CURP over eRPC in a modern datacenter would be a pretty spectacular combination!).

So the question that immediately comes to mind is how? As in, “what magic is this?”.

The secret to high-performance general-purpose RPCs

… is a carefully considered design that optimises for the common case and also avoids triggering packet loss due to switch buffer overflows for common traffic patterns.

That’s it? Yep. You won’t find any super low-level fancy new exotic algorithm here. Your periodic reminder that thoughtful design is a high leverage activity! You will of course find something pretty special in the way all the pieces come together.

So what assumptions go into the ‘common case?’

  • Small messages
  • Short duration RPC handlers
  • Congestion-free networks

Which is not to say that eRPC can’t handle larger messages, long-running handlers, and congested networks. It just doesn’t pay a contingency overhead price when they are absent.

Optimisations for the common case (which we’ll look at next) boost performance by up to 66% in total. On this base eRPC also enables zero-copy transmissions and a design that scales while retaining a constant NIC memory footprint.

The core model is as follows. RPCs are asynchronous and execute at most once. Servers register request handler functions with unique request types, and clients include the request types when issuing requests. Clients receive a continuation callback on RPC completion. Messages are stored in opaque DMA-capable buffers provided by eRPC, called msg-bufs. Each RPC endpoint (one per end user thread) has an RX and TX queue for packet I/O, an event loop, and several sessions.

The long and short of it

When request handlers are run directly in dispatch threads you can avoid expensive inter-thread communication (adding up to 400ns of request latency). That’s fine when request handlers are short in duration, but long handlers block other dispatch handling increasing tail latency, and prevent rapid congestion feedback.

eRPC supports running handlers in dispatch threads for short duration request types (up to a few hundred nanoseconds), and worker threads for longer running requests. Which mode to use is specified when the request handler is registered. This is the only additional user input needed in eRPC.

Scalable connection state

eRPC’s choice to use packet I/O over RDMA avoids the circular buffer scalability bottleneck in RDMA (see §4.1.1). By taking advantage of multi-packet RX-queue (RQ) descriptors in modern NICs, eRPC can use constant space in the NIC instead of a footprint that grows with the number of connected sessions (see Appendix A).

Furthermore, eRPC replaces NIC-managed connection state with CPU-managed connection state.

This is an explicit design choice, based upon fundamental differences between the CPU and NIC architectures. NICs and CPUs will both cache recently used connection state. CPU cache misses are served from DRAM, whereas NIC cache misses are served from the CPU’s memory subsystem over the slow PCIe bus. The CPU’s miss penalty is therefore much lower. Second, CPUs have substantially larger caches than the ~2MB available on a modern NIC, so the cache miss frequency is also lower.

Zero-copy transmission

Zero-copy packet I/O in eRPC provides performance comparable to lower level interfaces such as RDMA and DPDK. The msgbuf layout ensures that the data region is contiguous (so that applications can use it as an opaque buffer) even when the buffer contains data for multiple packets. The first packet’s data and header are also contiguous so that the NIC can fetch small messages with one DMA read. Headers for remaining packets are at the end, to allow for the contiguous data region in the middle.

eRPC must ensure that it doesn’t mess with msgbufs after ownership is returned to the application, which is fundamentally addressed by making sure it retains no reference to the buffer. Retransmissions can interfere with such a scheme though. eRPC chooses to use “unsignaled” packet transmission optimising for the common case of no retransmission. The trade-off is a more expensive process when retransmission does occur:

We flush the TX DMA queue after queuing a retransmitted packet, which blocks until all queued packets are DMA-ed. This ensures the required invariant: when a response is processed, there are no references to the request in the DMA queue.

eRPC provides zero copy reception for workloads under the common-case of single packet requests and dispatch mode request handlers too, which boosts eRPCs message rate by up to 16%.

Sessions and flow control

Sessions support concurrent requests (8 by default) that can complete out-of-order with respect to each other. Sessions use an array of slots to track RCP metadata for outstanding requests, and slots have an MTU-size preallocated msgbuf for use by request handlers that issue short responses. Session credits are used to implement packet-level flow control. Session credits also support end-to-end flow control to reduce switch queuing. Each session is given BDP/MTU credits, which ensures that each session can achieve line rate.

Client-driven wire protocol

We designed a wire protocol for eRPC that is optimized for small RPCs and accounts for per-session credit limits. For simplicity, we chose a simple client-driven protocol, meaning that each packet sent by the server is in response to a client packet.

Client-driven protocols have fewer moving parts, with only the client needing to maintain wire protocol state. Rate limiting becomes solely a client responsibility too, freeing server CPU.

Single-packet RPCs (request and response require only a single packet) use the fewest packets possible. With multi-packet responses and a client-driven protocol the server can’t immediately send response packets after the first one, so the client sends a request-for-response (RFR) packet. In practice this added latency turned out to be less than 20% for responses with four or more packets.

Congestion control

eRPC can use either Timely or DCQCN for congestion control. The evaluation uses Timely as the cluster hardware could not support DCQCN. Three optimisation brought the overhead of congestion control down from around 20% to 9%:

  1. Bypassing Timely altogether is the RTT of a received packet on an uncongested session is less than a low threshold value.
  2. Bypassing the rate limiter for uncongested sessions
  3. Sampling timers once per RX or TX batch rather than once per packet for RTT measurement

These optimisation works because datacenter networks are typically uncongested. E.g. at one-minute timescales 99% of all Facebook datacenter links are uncongested, and for web and cache traffic Google, 90% of ToR switch links (the most congested), are less than 10% utilized at 25 µs timescales.

Packet loss

eRPC keeps things simple by treating re-ordered packets as losses and dropping them (as do current RDMA NICs). When a client suspects a lost packet it rolls back the request’s wire protocol state using a ‘go-back-N’ mechanism. It reclaims credits and retransmits from the rollback point.

Evaluation highlights

This write-up is in danger of getting too long again, so I’ll keep this very brief. The following table shows the contribution of the various optimisations through ablation:

We conclude that optimizing for the common case is both necessary and sufficient for high-performance RPCs.

Here you can see latency with increasing threads. eRPC achieves high message rate, bandwidth, and scalability with low latency in a large cluster with lossy Ethernet.

For large RPCs, eRPC can achieve up to 75 Gbps with one core.

Section 7 discusses the integration of eRPC in an existing Raft library, and in the Masstree key-value store. From the Raft section the authors conclude: “the main takeaway is that microsecond-scale consistent replication is achievable in commodity Ethernet datacenters with a general-purpose networking library.

eRPC’s speed comes from prioritizing common-case performance, carefully combining a wide range of old and new optimizations, and the observation that switch buffer capacity far exceeds datacenter BDP. eRPC delivers performance that was until now believed possible only with lossless RDMA fabrics or specialized network hardware. It allows unmodified applications to perform close to the hardware limits.

Exploiting commutativity for practical fast replication

March 15, 2019

Exploiting commutativity for practical fast replication Park & Ousterhout, NSDI’19

I’m really impressed with this work. The authors give us a practical-to-implement enhancement to replication schemes (e.g., as used in primary-backup systems) that offers a signification performance boost. I’m expecting to see this picked up and rolled-out in real-world systems as word spreads. At a high level, CURP works by dividing execution into periods of commutative operation where ordering does not matter, punctuated by full syncs whenever commutativity would break.

The Consistent Unordered Replication Protocol (CURP) allows clients to replicate requests that have not yet been ordered, as long as they are commutative. This strategy allows most operations to complete in 1 RTT (the same as an unreplicated system).

When integrated with RAMCloud write latency was improved by ~2x, and write throughput by 4x. Which is impressive given that RAMCloud isn’t exactly hanging around in the first place! When integrated with Redis, CURP was able to add durability and consistency while keeping similar performance to non-durable Redis.

CURP can be easily applied to most existing systems using primary-backup replication. Changes required by CURP are not intrusive, and it works with any kind of backup mechanism (e.g., state-machine replication, file writes to network replicated drives, or scattered replication). This is important since most high-performance systems optimize their backup mechanisms, and we don’t want to lose those optimizations…

The big idea: durability, ordering, and commutativity

We’re looking for two things from a replication protocol: durability of executed operations, and consistent ordering across all replicas for linearizability. Hence the classic primary-backup protocol in which clients send requests to a primary, which by being a single instance serves to give a global order to requests. The primary then ensures that the update is propagated to backup replicas before returning to the client. This all takes 2 RTTs. Consensus protocols with strong leaders also require 2 RTTs. Fast Paxos and Generalized Paxos reduce latency to 1.5 RTT through optimistic replication with presumed ordering. Network-Ordered Paxos and Speculative Paxos can get close to 1 RTT latency, but require special networking hardware.

CURP’s big idea is as follows: when operations are commutative, ordering doesn’t matter. So the only property we have to ensure during a commutative window is durability. If the client sends a request in parallel to the primary and to f additional servers which all make the request durable, then the primary can reply immediately and once the client has received all f confirmations it can reveal the result. This gives us a 1 RTT latency for operations that commute, and we fall back to the regular 2 RTT sync mechanism when an operation does not commute.

Those f additional servers are called witnesses. If you’re building a new system from scratch, it makes a lot of sense to combine the roles of backup and witness in a single process. But this is not a requirement, and for ease of integration with an existing system witnesses can be separate.

… witnesses do not carry ordering information, so clients can directly record operations into witnesses in parallel with sending operations to masters so that all requests will finish in 1 RTT. In addition to the unordered replication to witnesses, masters still replicate ordered data to backups, but do so asynchronously after sending the execution results back to the clients.

This should all be raising a number of questions, chief among which is what should happen when a client, witness, or primary crashes. We’ll get to that shortly. The other obvious question is how do we know when operations commute?

Witnesses must be able to determine whether operations are commutative or not just from the operation parameters. For example, in key-value stores, witnesses can exploit the fact that operations on different keys are commutative… Witnesses cannot be used for operations whose commutativity depends on the system state.

So CURP works well with key-value stores, but not with SQL-based stores (in which WHERE clauses for example mean that operation outcomes can depend on system state).

The CURP protocol

CURP uses a total of f+1 replicas to tolerate f failures (as per standard primary-backup), and additionally uses f witnesses to ensure durability of updates even before replications to backups are completed. Witnesses may fail independently of backups.

To ensure the durability of the speculatively completed updates, clients multicast update operations to witnesses. To preserve linearizability, witnesses and masters enforce commutativity among operations that are not fully replicated to backups.

Operation in the absence of failures

Clients send update requests to the primary, and concurrently to the f witnesses allocated to the primary. Once the client has responses from all f witnesses (indicating durability) and the primary (which responds immediately without waiting for data to replicate to backups) then it can return the result. This is the 1 RTT path.

If the client sends an operation to a witness, and instead of confirming acceptance the witness rejects that operation (because it does not commute with the other operations currently being held by the witness), then the client cannot complete in 1 RTT. It now sends a sync request to the primary and awaits the response (indicating data is safely replicated and the operation result can be returned). Thus the operation latency in this case in 2 RTTs in the best case, and up to 3 RTTs in the worst case depending on the status of replication between primary and backups at the point the sync request was received.

Witnesses durably record operations requested by clients. Non-volatile memory (e.g. flash-backed DRAM) is a good choice for this as the amount of space required is relatively small. On receiving a client request, the witness saves the operation and sends an accept response if it commutes with all other saved operations. Otherwise it rejects the request. All witnesses operate independently. Witnesses may also receive garbage collection RPCs from their primary, indicating the RPC IDs of operations that are now durable (safely replicated by the primary to all backups). The witnesses can then delete these operations from their stores.

A primary receives, serializes, and executes all update RPC requests from clients. In general a primary will respond to clients before syncing to backups (speculative execution) leading to unsynced operations. If the primary receives an operation request which does not commute with all currently unsynced operations then it must sync before responding (and adds a ‘synced’ header in the response so that the client knows an explicit sync request is unnecessary. Primaries can batch and asynchronously replicate RPC requests. E.g. with 3-way primary-backup replication and batches of 10 RPCs, we have 1.3 RPCs per request on average. The optimal batch and window size depends on the particulars of the workload.

For read requests clients can go directly to primaries (no need for the durability afforded by witnesses). The primary must sync before returning if the read request does not commute with all currently unsynced updates. Clients can also read from a backup while retaining linearizability guarantees if they also confirm with a witness that the read operation commutes with all operations currently saved in the witness.


If a client does not get a response from a primary it resends the update RPC to a new primary, and tries to record the RPC request in the witnesses of that primary. If the client does not get a response from a witness, it can fall back to the traditional replication path by issuing a sync request to the primary.

If a witness crashes it is decommissioned and a new witness is assigned to the primary. The primary is notified of its new witness list. The primary then syncs to backups before responding that it is now safe to recover from the new witness. A witness list version number maintained by the primary and sent by the client on every request so that clients can be notified of the change and update their lists.

If a primary crashes there may have been unsynced operations for which clients have received results but the data is not yet replicated to backups. A new primary is allocated, and bootstrapped by recovering from one of the backups. The primary then picks any available witness, asks it to stop accepting more operations, and then retrieves all requests known to the witness. The new primary can execute these requests in any order since they are known to be commutative. With the operations executed, the primary syncs to backups and resets all the witnesses. It can now begin accepting requests again.

The gotcha in this scheme is that some of the requests sent by the witness may in fact have already been executed and replicated to backups before the primary crashed. We don’t want to re-execute such operations:

Duplicate executions of the requests can violate linearizability. To avoid duplicate executions of the requests that are already replicated to backups, CURP relies on exactly-once semantics provided by RIFL, which detects already executed client requests and avoids their re-execution.

(We studied RIFL in an earlier edition of The Morning Paper, RIFL promises efficient ‘bolt-on’ linearizability for distributed RPC systems).

Performance evaluation

We evaluated CURP by implementing it in the RAMCloud and Redis storage systems, which have very different backup mechanisms. First, using the RAMCloud implementation, we show that CUP improves the performance of consistently replicated systems. Second, with the Redis implementation we demonstrate that CURP can make strong consistency affordable in a system where it had previously been too expensive for practical use.

I’m out of space to do the evaluation justice, but you’ll find full details in §5. Here are a couple of highlights…

The chart below shows the performance of Redis without durability (i.e., as most people use it today), with full durability in the original Redis scheme, and with durability via CURP. You can clearly see that using CURP we get the benefits of consistency and durability with performance much closer to in-memory only Redis.

And here’s the change in latency of write operations for RAMCloud. It took me a moment to understand what we’re seeing here, since it looks like the system gets slower as the write percentage goes down. But remember that we’re only measuring write latency here, not average request latency. Presumably with more reads we need to sync more often.  

Thanks to Qian Li who points out in the comments I’d completely missed the crucial ‘cumulative’ word in the description of this chart. Now it all makes so much more sense! Only a tiny fraction of writes have higher latency, and CURP is **improving** tail latencies. 

In the RAMCloud implementation, witnesses could handle 1270K requests per second on a single thread, with interwoven gc requests at 1 in every 50 writes from the primary. The memory overhead is just 9MB. CURP does however increase network bandwidth usage for update operations by 75%.

Cloud computing simplified: a Berkeley view on serverless computing

March 13, 2019

Cloud programming simplified: a Berkeley view on serverless computing Jonas et al., arXiv 2019

With thanks to Eoin Brazil who first pointed this paper out to me via Twitter….

Ten years ago Berkeley released the ‘Berkeley view of cloud computing’ paper, predicting that cloud use would accelerate. Today’s paper choice is billed as its logical successor: it predicts that the use of serverless computing will accelerate. More precisely:

… we predict that serverless computing will grow to dominate the future of cloud computing.

The acknowledgements thank reviewers from Google, Amazon, and Microsoft among others, so it’s reasonable to assume the major cloud providers have had at least some input to the views presented here. The discussion is quite high level, and at points it has the feel of a PR piece for Berkeley (there’s even a cute collective author email address: serverlessview at, but there’s enough of interest in its 35 pages for us to get our teeth into…

The basic structure is as follows. First we get the obligatory attempt at defining serverless, together a discussion of why it matters. Then the authors look at some of the current limitations (cf. ‘Serverless computing: one step forward, two steps back’) before going on to suggest areas for improvement in the coming years. The paper closes with a nice section on common fallacies (and pitfalls) of serverless, together with a set of predictions. I’m going to start there!

Predictions, fallacies, pitfalls

Fallacy: cloud function instances cost up to 7.5x more per minute than regular VM instances of similar capacity, therefore serverless cloud computing is more expensive than serverful cloud computing.

Rebuttal: the equivalent functionality that you’re getting from a function is much more than you can achieve with a single VM instance. Plus, you don’t pay when there are no events. Putting the two together means that serverless could well end up being less expensive.

Prediction (for me, the most interesting sentence in the whole paper!) :

We see no fundamental reason why the cost of serverless computing should be higher than that of serverful computing, so we predict that billing models will evolve so that almost any application, running at almost any scale, will cost no more and perhaps much less with serverless computing.

But yes, billing by actual usage means that costs will be less predictable (hopefully in a good way!) than ‘always-on’ solutions.

Fallacy: it’s easy to port applications between serverless computing providers since functions are written in high-level languages. (Does anyone really think that serverless increases portability? All the talk I hear is of the opposite concern!).

Rebuttal/Pitfall: vendor lock-in may be stronger with serverless computing than for serverful computing. (Yes, because of the reliance on the eventing model and the set of back-end services your functions bind too).

Prediction: “We expect new BaaS storage services to be created that expand the types of applications that run well on serverless computing,” and, “The future of serverless computing will be to facilitate BaaS.

Pitfall: (today) “few so called ‘elastic’ services match the real flexibility demands of serverless computing.”

Fallacy: cloud functions cannot handle very low latency applications needing predictable performance

Rebuttal: if you pre-warm / keep a pool of functions ready then they can (combine this thought with ‘we see no fundamental reason why the cost of serverless computing should be higher than that of serverful computing…’)


While serverful cloud computing won’t disappear, the relative importance of that portion of the cloud will decline as serverless computing overcomes its current limitations. Serverless computing will become the default computing paradigm of the Cloud Era, largely replacing serverful computing and thereby bringing closure to the Client-Server Era.

(So what’s the new term? Client-Function architecture?? )

Oh, and one more thing: “we expect serverless computing to become simpler to program securely than serverful computing…

Serverless motivation

I guess it’s definition time!

Put simply, serverless computing = FaaS + BaaS. In our definition, for a service to be considered serverless, it must scale automatically with no need for explicit provisioning, and be billed based on usage. In the rest of this paper, we focus on the emergence, evolution, and future of cloud functions…

If you squint just right, you can just about make out the other important part of a serverless platform in that equation: in the ‘+’ sign connecting FaaS and BaaS is the set of platform events that can be used to glue the two together, and they’re an important part of the overall model in my mind.

One of the key things that serverless gives is a transfer of responsibility from the user to the cloud provider for many operational concerns:

The authors list three critical distinctions between serverless and serverful computing:

  1. Decoupled computation and storage: storage and computation scale separately and are provisioned and priced independently. (Hang on, don’t I get that with EBS in ‘serverful’ mode??)
  2. Executing code without managing resource allocation.
  3. Paying in proportion to resources used instead of for resources allocated.

The fundamental differentiator between serverless computing and the previous generations of PaaS is the autoscaling and associated billing model:

  • tracking load with much greater fidelity than serverful autoscaling techniques,
  • scaling all the way down to zero, and
  • charging in much more fine-grained manner for time the code spends executing, not the resources reserved to execute the program.

But then there’s also this claim that I can’t make any sense of: “by allowing users to bring their own libraries, serverless computing can support a much broader range of applications than PaaS services which are tied closely to particular use cases.” I’m pretty sure that any library / package I can reference from a function I could also reference from my Node.js / Java / Ruby / Python /… app!

The most popular uses of serverless today, according to a 2018 survey are as follows:

Current limitations

Section 3 of the paper examines a set of five applications to understand the current strengths and weaknesses. These are summarised in the table below. It should not surprise you in the least that implementing a database using functions-as-a-service turns out to be a bad idea!!

One thing that several of these workloads have in common is resource requirements that can vary significantly during their execution, hence it is attractive to try and take advantage of the fine-grained autoscaling serverless offers.

Four main current limitations emerge from this exercise:

  1. Inadequate storage support for fine-grained operations
  2. A lack of fine-grained coordination mechanisms (the latency is too high in the standard eventing mechanisms)
  3. Poor performance for standard communication patterns (which can be O(N) or O(N^2) depending on the use case, but ‘N’ typically goes up a lot as we have many more functions than we did VMs).
  4. Cold-start latency leading to unpredictable performance

Suggestions for the future

The first suggestion (§4.1) concerns the support of serverless functions with particular resource requirements (e.g. access to accelerators).

One approach would be to enable developers to specify these resource requirements explicitly. However, this would make it harder for cloud providers to achieve high utilization through statistical multiplexing, as it puts more constraints on function scheduling… A better alternative would be to raise the level of abstraction, having the cloud provider infer resource requirements instead of having the developer specify them.

So scenario A the developer (for example) explicitly specifies via metadata that the function needs a GPU, and scenario B the platform infers through inspection that the function needs a GPU. It’s not clear to me how that makes any difference to the scheduling constraints! Although with clever inference maybe we can get closer to the true resource requirements, whereas we know developers tend to over-provision when making explicit requests. We’d have to take the level of abstraction right up to something close to SLOs for things to get really interesting though, and even then there would need to be multiple viable strategies for the cloud platform to select between in order to meet those targets. This topic is also touched on in section $4.5 where we find the following, “alternatively, the cloud provider could monitor the performance of the cloud functions and migrate them to the most appropriate hardware the next time they are run. “

The second suggestion is to expose function communication patterns (computation graphs) to the platform, so that this information can inform placement. AWS Step Functions is a step in this direction ;).

Section 4.2 discusses the need for more storage options. In particular, serverless ephemeral storage used for coordination between functions (e.g., an in-memory data grid), and serverless durable storage, which here means access to a low-latency persistent shared key-value store. A related ask is for a low-latency eventing / signaling service for function coordination.

It feels worth bringing up again here that instead of trying to replicate old application development patterns in a future cloud platform, a serverless approach using something like Bloom and its underlying CALM philosophy would give much of what we’re looking for here: a higher level abstraction with a naturally exposed computation graph and ability for the platform to make informed scheduling and autoscaling decisions.

Startup times can be minimised through the investigation of new lighter-weight isolation mechanisms, using unikernels, and lazy loading of libraries.

Section 4.4 discusses security challenges: fine-grained provisioning of access to private keys, delegation of privileges across functions, and information leakage through network communication patterns (which are more revealing with a larger number of fine-grained components).

Efficient synchronisation of state-based CRDTs

March 11, 2019

Efficient synchronisation of state-based CRDTs Enes et al., arXiv’18

CRDTs are a great example of consistency as logical monotonicity. They come in two main variations:

  • operation-based CRDTs send operations to remote replicas using a reliable dissemination layer with exactly-once causal delivery. (If operations are idempotent then at-least-once is ok too).
  • state-based CRDTs exchange information about the resulting state of the data structure (not the operations that led to the state being what it is). In the original form the full-state is sent each time. State-based CRDTs can tolerate dropped, duplicated, and re-ordered messages.

State-based CRDTs look attractive therefore, but over time as the state grows sending the full state every time quickly becomes expensive. That’s where Delta-based CRDTs come in. These send only the delta to the state needed to reconstruct the full state.

Delta-based CRDTs… define delta-mutators that return a delta ( \delta ), typically much smaller than the full state of the replica, to be merged with the local state. The same \delta is also added to an outbound \delta-buffer, to be periodically propagated to remote replicas. Delta-based CRDTs have been adopted in industry as part of the Akka Distributed Data framework and IPFS.

So far so good, but the analysis in this paper reveals an unexpected issue: when updates are frequent enough that concurrent update operations are occurring between synchronisation points, delta-propagation algorithms perform no-better than sending the full state.

You can see the effect in the following chart: delta-based propagation is transmitting pretty much the same amount of data as full state-based.

Moreover, due to the overheads of computing deltas etc., delta-propagation is also consuming a lot more CPU to do so.

I’m sure you’ve also spotted in those two figures the additional data series for ‘this paper.’ Two key enhancements to the delta-propagation algorithm are introduced which greatly reduce both the amount of data transmitted and the CPU (and memory) overheads.

Some example state-based CRDTs

To bring all this to life we need a couple of examples. The paper uses grow-only CRDT counters and grow-only sets. State-based CRDTs are based upon a join semi-lattice, \mathcal{L}, a set of mutators that update that state, and a binary join operator \sqcup that derives the least upper bound for any two elements of \mathcal{L}.

For a grow-only counter, we keep track of one (sub) counter per replica. The inc operation increments the replica-local counter, the join operator takes the higher of the two replica-local counter values for each replica, and the overall counter value is produced by summing the replica-local counters.

For a grow-only set (GSet) we just add elements to the set and the join operator is a set union.

Hasse diagrams can be used to show the evolution of the lattice. For example:

Spotting inefficiencies

Let’s look at one particular evolution of a GSet in a scenario with two replicas, A and B. In the diagram below, \bullet marks a point in time where synchronisation occurs. For example, at \bullet_1, replica B sends the delta \{b\} to replica A. A has also received a local update. At \bullet_2, replica A sends all updates it has received since the last time it propagated changes back to B. This is the set \{a, b\}. Meanwhile replica B has added c, and at \bullet_3 it sends all of its changes since \bullet_1 back to A.

It’s easy to spot some redundancy in transmission here. At \bullet_1, B sends \{b\} to A, and at \bullet_2, A is including \{b\} in the update it sends back to _A_ again. All of the underlined elements in the figure above are redundant transmissions of this type.

So it’s not exactly rocket science…

> … by simply tracking the origin of each \delta-group in the \delta-buffer, replicas can avoid back-propagation of \delta-groups.

The BP optimisation in effect says “don’t send an update back to the replica that told you about it in the first place!” In order to do that, we’ll need some extra book-keeping to remember where updates have come from. The evaluation shows this is worth it.

Here’s another example trace that shows a related issue:

Here we see that at \bullet_5, C is sending delta \{b\} to D. Then C receives an update \{a, b\} from A at \bullet_6. At \bullet_7 the update \{a, b\} is propagated from C to D, with the result that C has redundantly sent \{b\} to D in this second transmission. The RR optimisation removes this redundant transfer, we should…

> … remove redundant state in received \delta-groups (RR) before adding them to the delta buffer.

And that’s it! Don’t send an update back to the replica that sent it to you, and don’t send the same update to the same replica more than once.

A revised delta-based synchronisation algorithm

Now, those ideas are formalised using the notion of a join-irreducible state. A join irreducible state x is one that can’t be reconstructed from a join of some finite set of states not containing x. So the set \{a\} is join irreducible, but the set \{a, b\} is not because we can construct it by joining \{a\} and \{b\}. Every CRDT has a unique irredundant decomposition of a state into a set of join irreducible states. For the GSet for example, these are just the sets containing individual elements, and for the GCounter they are the individual replica-local counters. We can use this decomposition to find the minimum delta, \Delta(a,b) between two states a and b.

Here’s the classic delta-based synchronization algorithm for replica i, updated to include the BP and RR optimisations. Highlighted lines show the changes.

For BP, each \delta-buffer is tagged with its origin at line 5, so that we can filter out redundant deltas when sending updates in line 11. For RR we extract the minimal delta in line 15.


The evaluation takes place over two different network topologies. Both have 15 nodes, but one is cyclic (mesh) and one is acyclic (tree).

Each node performs an update and synchronises with its neighbours once per second, as per the table below.

Comparisons are made to vanilla delta state, delta state with each optimisation individually, delta state with both optimisations, and also state-based CRDTs (full state exchange), operation-based CRDTs, and CRDTs using the Scuttlebut anti-entropy protocol.

Here’s an analysis of the amount of data transmitted by GSet and GCounter in the tree and mesh topologies.

The first observation is that the classic delta-based synchronization presents almost no improvement, when compared to state-based synchronization.

In the tree topology, BP alone is enough to give good results, whereas the mesh topology benefits from RR.

Compared to Scuttlebut and operation-based CRDTs, delta-based CRDTs with both BP and RR have a much lower metadata overhead. The overhead can be as high as 75% with the former group, whereas the optimised delta-based algorithm can get this as low as 7.7%.

When used in the Retwis twitter clone for a moderate-to-high contention workload, the BP + RR optimisations can reduce CPU overhead by up to 7.9x compared to the classic delta-based algorithm.

State-based CRDT solutions quickly become prohibitive in practice, if there is no support for treatment of small incremental state deltas. In this paper we advance the foundation of state-based CRDTs by introducing minimal deltas that precisely track state changes.

De-dup FTW!