Skip to content

Datacenter RPCs can be general and fast

March 18, 2019

Datacenter RPCs can be general and fast Kalia et al., NSDI’19

We’ve seen a lot of exciting work exploiting combinations of RDMA, FPGAs, and programmable network switches in the quest for high performance distributed systems. I’m as guilty as anyone for getting excited about all of that. The wonderful thing about today’s paper, for which Kalia et al. won a best paper award at NSDI this year, is that it shows in many cases we don’t actually need to take on that extra complexity. Or to put it another way, it seriously raises the bar for when we should.

eRPC (efficient RPC) is a new general-purpose remote procedure call (RPC) library that offers performance comparable to specialized systems, while running on commodity CPUs in traditional datacenter networks based on either lossy Ethernet or lossless fabrics… We port a production grade implementation of Raft state machine replication to eRPC without modifying the core Raft source code. We achieve 5.5 µs of replication latency on lossy Ethernet, which is faster than or comparable to specialized replication systems that use programmable switches, FPGAs, or RDMA.

eRPC just needs good old UDP. Lossy Ethernet is just fine (no need for fancy lossness networks), and it doesn’t need Priority Flow Control (PFC). The received wisdom is that you can either have general-purpose networking that works everywhere and is non-intrusive to applications but has capped performance, or you have to drop down to low-level interfaces and do a lot of your own heavy lifting to obtain really high performance.

The goal of our work is to answer the question: can a general-purpose RPC library provide performance comparable to specialized systems?

Astonishingly, Yes.

From the evaluation using two lossy Ethernet cluster (designed to mimic the setups used in Microsoft and Facebook datacenters):

  • 2.3µs median RPC latency
  • up to 10 million RPCs / second on a single core
  • large message transfer at up to 75Gbps on a single core
  • peak performance maintained even with 20,000 connections per node (2 million cluster wide)

eRPC’s median latency on CX5 is only 2.3µs, showing that latency with commodity Ethernet NICs and software networking is much lower than the widely-believed value of 10-100µs.

(CURP over eRPC in a modern datacenter would be a pretty spectacular combination!).

So the question that immediately comes to mind is how? As in, “what magic is this?”.

The secret to high-performance general-purpose RPCs

… is a carefully considered design that optimises for the common case and also avoids triggering packet loss due to switch buffer overflows for common traffic patterns.

That’s it? Yep. You won’t find any super low-level fancy new exotic algorithm here. Your periodic reminder that thoughtful design is a high leverage activity! You will of course find something pretty special in the way all the pieces come together.

So what assumptions go into the ‘common case?’

  • Small messages
  • Short duration RPC handlers
  • Congestion-free networks

Which is not to say that eRPC can’t handle larger messages, long-running handlers, and congested networks. It just doesn’t pay a contingency overhead price when they are absent.

Optimisations for the common case (which we’ll look at next) boost performance by up to 66% in total. On this base eRPC also enables zero-copy transmissions and a design that scales while retaining a constant NIC memory footprint.

The core model is as follows. RPCs are asynchronous and execute at most once. Servers register request handler functions with unique request types, and clients include the request types when issuing requests. Clients receive a continuation callback on RPC completion. Messages are stored in opaque DMA-capable buffers provided by eRPC, called msg-bufs. Each RPC endpoint (one per end user thread) has an RX and TX queue for packet I/O, an event loop, and several sessions.

The long and short of it

When request handlers are run directly in dispatch threads you can avoid expensive inter-thread communication (adding up to 400ns of request latency). That’s fine when request handlers are short in duration, but long handlers block other dispatch handling increasing tail latency, and prevent rapid congestion feedback.

eRPC supports running handlers in dispatch threads for short duration request types (up to a few hundred nanoseconds), and worker threads for longer running requests. Which mode to use is specified when the request handler is registered. This is the only additional user input needed in eRPC.

Scalable connection state

eRPC’s choice to use packet I/O over RDMA avoids the circular buffer scalability bottleneck in RDMA (see §4.1.1). By taking advantage of multi-packet RX-queue (RQ) descriptors in modern NICs, eRPC can use constant space in the NIC instead of a footprint that grows with the number of connected sessions (see Appendix A).

Furthermore, eRPC replaces NIC-managed connection state with CPU-managed connection state.

This is an explicit design choice, based upon fundamental differences between the CPU and NIC architectures. NICs and CPUs will both cache recently used connection state. CPU cache misses are served from DRAM, whereas NIC cache misses are served from the CPU’s memory subsystem over the slow PCIe bus. The CPU’s miss penalty is therefore much lower. Second, CPUs have substantially larger caches than the ~2MB available on a modern NIC, so the cache miss frequency is also lower.

Zero-copy transmission

Zero-copy packet I/O in eRPC provides performance comparable to lower level interfaces such as RDMA and DPDK. The msgbuf layout ensures that the data region is contiguous (so that applications can use it as an opaque buffer) even when the buffer contains data for multiple packets. The first packet’s data and header are also contiguous so that the NIC can fetch small messages with one DMA read. Headers for remaining packets are at the end, to allow for the contiguous data region in the middle.

eRPC must ensure that it doesn’t mess with msgbufs after ownership is returned to the application, which is fundamentally addressed by making sure it retains no reference to the buffer. Retransmissions can interfere with such a scheme though. eRPC chooses to use “unsignaled” packet transmission optimising for the common case of no retransmission. The trade-off is a more expensive process when retransmission does occur:

We flush the TX DMA queue after queuing a retransmitted packet, which blocks until all queued packets are DMA-ed. This ensures the required invariant: when a response is processed, there are no references to the request in the DMA queue.

eRPC provides zero copy reception for workloads under the common-case of single packet requests and dispatch mode request handlers too, which boosts eRPCs message rate by up to 16%.

Sessions and flow control

Sessions support concurrent requests (8 by default) that can complete out-of-order with respect to each other. Sessions use an array of slots to track RCP metadata for outstanding requests, and slots have an MTU-size preallocated msgbuf for use by request handlers that issue short responses. Session credits are used to implement packet-level flow control. Session credits also support end-to-end flow control to reduce switch queuing. Each session is given BDP/MTU credits, which ensures that each session can achieve line rate.

Client-driven wire protocol

We designed a wire protocol for eRPC that is optimized for small RPCs and accounts for per-session credit limits. For simplicity, we chose a simple client-driven protocol, meaning that each packet sent by the server is in response to a client packet.

Client-driven protocols have fewer moving parts, with only the client needing to maintain wire protocol state. Rate limiting becomes solely a client responsibility too, freeing server CPU.

Single-packet RPCs (request and response require only a single packet) use the fewest packets possible. With multi-packet responses and a client-driven protocol the server can’t immediately send response packets after the first one, so the client sends a request-for-response (RFR) packet. In practice this added latency turned out to be less than 20% for responses with four or more packets.

Congestion control

eRPC can use either Timely or DCQCN for congestion control. The evaluation uses Timely as the cluster hardware could not support DCQCN. Three optimisation brought the overhead of congestion control down from around 20% to 9%:

  1. Bypassing Timely altogether is the RTT of a received packet on an uncongested session is less than a low threshold value.
  2. Bypassing the rate limiter for uncongested sessions
  3. Sampling timers once per RX or TX batch rather than once per packet for RTT measurement

These optimisation works because datacenter networks are typically uncongested. E.g. at one-minute timescales 99% of all Facebook datacenter links are uncongested, and for web and cache traffic Google, 90% of ToR switch links (the most congested), are less than 10% utilized at 25 µs timescales.

Packet loss

eRPC keeps things simple by treating re-ordered packets as losses and dropping them (as do current RDMA NICs). When a client suspects a lost packet it rolls back the request’s wire protocol state using a ‘go-back-N’ mechanism. It reclaims credits and retransmits from the rollback point.

Evaluation highlights

This write-up is in danger of getting too long again, so I’ll keep this very brief. The following table shows the contribution of the various optimisations through ablation:

We conclude that optimizing for the common case is both necessary and sufficient for high-performance RPCs.

Here you can see latency with increasing threads. eRPC achieves high message rate, bandwidth, and scalability with low latency in a large cluster with lossy Ethernet.

For large RPCs, eRPC can achieve up to 75 Gbps with one core.

Section 7 discusses the integration of eRPC in an existing Raft library, and in the Masstree key-value store. From the Raft section the authors conclude: “the main takeaway is that microsecond-scale consistent replication is achievable in commodity Ethernet datacenters with a general-purpose networking library.

eRPC’s speed comes from prioritizing common-case performance, carefully combining a wide range of old and new optimizations, and the observation that switch buffer capacity far exceeds datacenter BDP. eRPC delivers performance that was until now believed possible only with lossless RDMA fabrics or specialized network hardware. It allows unmodified applications to perform close to the hardware limits.

Exploiting commutativity for practical fast replication

March 15, 2019

Exploiting commutativity for practical fast replication Park & Ousterhout, NSDI’19

I’m really impressed with this work. The authors give us a practical-to-implement enhancement to replication schemes (e.g., as used in primary-backup systems) that offers a signification performance boost. I’m expecting to see this picked up and rolled-out in real-world systems as word spreads. At a high level, CURP works by dividing execution into periods of commutative operation where ordering does not matter, punctuated by full syncs whenever commutativity would break.

The Consistent Unordered Replication Protocol (CURP) allows clients to replicate requests that have not yet been ordered, as long as they are commutative. This strategy allows most operations to complete in 1 RTT (the same as an unreplicated system).

When integrated with RAMCloud write latency was improved by ~2x, and write throughput by 4x. Which is impressive given that RAMCloud isn’t exactly hanging around in the first place! When integrated with Redis, CURP was able to add durability and consistency while keeping similar performance to non-durable Redis.

CURP can be easily applied to most existing systems using primary-backup replication. Changes required by CURP are not intrusive, and it works with any kind of backup mechanism (e.g., state-machine replication, file writes to network replicated drives, or scattered replication). This is important since most high-performance systems optimize their backup mechanisms, and we don’t want to lose those optimizations…

The big idea: durability, ordering, and commutativity

We’re looking for two things from a replication protocol: durability of executed operations, and consistent ordering across all replicas for linearizability. Hence the classic primary-backup protocol in which clients send requests to a primary, which by being a single instance serves to give a global order to requests. The primary then ensures that the update is propagated to backup replicas before returning to the client. This all takes 2 RTTs. Consensus protocols with strong leaders also require 2 RTTs. Fast Paxos and Generalized Paxos reduce latency to 1.5 RTT through optimistic replication with presumed ordering. Network-Ordered Paxos and Speculative Paxos can get close to 1 RTT latency, but require special networking hardware.

CURP’s big idea is as follows: when operations are commutative, ordering doesn’t matter. So the only property we have to ensure during a commutative window is durability. If the client sends a request in parallel to the primary and to f additional servers which all make the request durable, then the primary can reply immediately and once the client has received all f confirmations it can reveal the result. This gives us a 1 RTT latency for operations that commute, and we fall back to the regular 2 RTT sync mechanism when an operation does not commute.

Those f additional servers are called witnesses. If you’re building a new system from scratch, it makes a lot of sense to combine the roles of backup and witness in a single process. But this is not a requirement, and for ease of integration with an existing system witnesses can be separate.

… witnesses do not carry ordering information, so clients can directly record operations into witnesses in parallel with sending operations to masters so that all requests will finish in 1 RTT. In addition to the unordered replication to witnesses, masters still replicate ordered data to backups, but do so asynchronously after sending the execution results back to the clients.

This should all be raising a number of questions, chief among which is what should happen when a client, witness, or primary crashes. We’ll get to that shortly. The other obvious question is how do we know when operations commute?

Witnesses must be able to determine whether operations are commutative or not just from the operation parameters. For example, in key-value stores, witnesses can exploit the fact that operations on different keys are commutative… Witnesses cannot be used for operations whose commutativity depends on the system state.

So CURP works well with key-value stores, but not with SQL-based stores (in which WHERE clauses for example mean that operation outcomes can depend on system state).

The CURP protocol

CURP uses a total of f+1 replicas to tolerate f failures (as per standard primary-backup), and additionally uses f witnesses to ensure durability of updates even before replications to backups are completed. Witnesses may fail independently of backups.

To ensure the durability of the speculatively completed updates, clients multicast update operations to witnesses. To preserve linearizability, witnesses and masters enforce commutativity among operations that are not fully replicated to backups.

Operation in the absence of failures

Clients send update requests to the primary, and concurrently to the f witnesses allocated to the primary. Once the client has responses from all f witnesses (indicating durability) and the primary (which responds immediately without waiting for data to replicate to backups) then it can return the result. This is the 1 RTT path.

If the client sends an operation to a witness, and instead of confirming acceptance the witness rejects that operation (because it does not commute with the other operations currently being held by the witness), then the client cannot complete in 1 RTT. It now sends a sync request to the primary and awaits the response (indicating data is safely replicated and the operation result can be returned). Thus the operation latency in this case in 2 RTTs in the best case, and up to 3 RTTs in the worst case depending on the status of replication between primary and backups at the point the sync request was received.

Witnesses durably record operations requested by clients. Non-volatile memory (e.g. flash-backed DRAM) is a good choice for this as the amount of space required is relatively small. On receiving a client request, the witness saves the operation and sends an accept response if it commutes with all other saved operations. Otherwise it rejects the request. All witnesses operate independently. Witnesses may also receive garbage collection RPCs from their primary, indicating the RPC IDs of operations that are now durable (safely replicated by the primary to all backups). The witnesses can then delete these operations from their stores.

A primary receives, serializes, and executes all update RPC requests from clients. In general a primary will respond to clients before syncing to backups (speculative execution) leading to unsynced operations. If the primary receives an operation request which does not commute with all currently unsynced operations then it must sync before responding (and adds a ‘synced’ header in the response so that the client knows an explicit sync request is unnecessary. Primaries can batch and asynchronously replicate RPC requests. E.g. with 3-way primary-backup replication and batches of 10 RPCs, we have 1.3 RPCs per request on average. The optimal batch and window size depends on the particulars of the workload.

For read requests clients can go directly to primaries (no need for the durability afforded by witnesses). The primary must sync before returning if the read request does not commute with all currently unsynced updates. Clients can also read from a backup while retaining linearizability guarantees if they also confirm with a witness that the read operation commutes with all operations currently saved in the witness.


If a client does not get a response from a primary it resends the update RPC to a new primary, and tries to record the RPC request in the witnesses of that primary. If the client does not get a response from a witness, it can fall back to the traditional replication path by issuing a sync request to the primary.

If a witness crashes it is decommissioned and a new witness is assigned to the primary. The primary is notified of its new witness list. The primary then syncs to backups before responding that it is now safe to recover from the new witness. A witness list version number maintained by the primary and sent by the client on every request so that clients can be notified of the change and update their lists.

If a primary crashes there may have been unsynced operations for which clients have received results but the data is not yet replicated to backups. A new primary is allocated, and bootstrapped by recovering from one of the backups. The primary then picks any available witness, asks it to stop accepting more operations, and then retrieves all requests known to the witness. The new primary can execute these requests in any order since they are known to be commutative. With the operations executed, the primary syncs to backups and resets all the witnesses. It can now begin accepting requests again.

The gotcha in this scheme is that some of the requests sent by the witness may in fact have already been executed and replicated to backups before the primary crashed. We don’t want to re-execute such operations:

Duplicate executions of the requests can violate linearizability. To avoid duplicate executions of the requests that are already replicated to backups, CURP relies on exactly-once semantics provided by RIFL, which detects already executed client requests and avoids their re-execution.

(We studied RIFL in an earlier edition of The Morning Paper, RIFL promises efficient ‘bolt-on’ linearizability for distributed RPC systems).

Performance evaluation

We evaluated CURP by implementing it in the RAMCloud and Redis storage systems, which have very different backup mechanisms. First, using the RAMCloud implementation, we show that CUP improves the performance of consistently replicated systems. Second, with the Redis implementation we demonstrate that CURP can make strong consistency affordable in a system where it had previously been too expensive for practical use.

I’m out of space to do the evaluation justice, but you’ll find full details in §5. Here are a couple of highlights…

The chart below shows the performance of Redis without durability (i.e., as most people use it today), with full durability in the original Redis scheme, and with durability via CURP. You can clearly see that using CURP we get the benefits of consistency and durability with performance much closer to in-memory only Redis.

And here’s the change in latency of write operations for RAMCloud. It took me a moment to understand what we’re seeing here, since it looks like the system gets slower as the write percentage goes down. But remember that we’re only measuring write latency here, not average request latency. Presumably with more reads we need to sync more often.  

Thanks to Qian Li who points out in the comments I’d completely missed the crucial ‘cumulative’ word in the description of this chart. Now it all makes so much more sense! Only a tiny fraction of writes have higher latency, and CURP is **improving** tail latencies. 

In the RAMCloud implementation, witnesses could handle 1270K requests per second on a single thread, with interwoven gc requests at 1 in every 50 writes from the primary. The memory overhead is just 9MB. CURP does however increase network bandwidth usage for update operations by 75%.

Cloud computing simplified: a Berkeley view on serverless computing

March 13, 2019

Cloud programming simplified: a Berkeley view on serverless computing Jonas et al., arXiv 2019

With thanks to Eoin Brazil who first pointed this paper out to me via Twitter….

Ten years ago Berkeley released the ‘Berkeley view of cloud computing’ paper, predicting that cloud use would accelerate. Today’s paper choice is billed as its logical successor: it predicts that the use of serverless computing will accelerate. More precisely:

… we predict that serverless computing will grow to dominate the future of cloud computing.

The acknowledgements thank reviewers from Google, Amazon, and Microsoft among others, so it’s reasonable to assume the major cloud providers have had at least some input to the views presented here. The discussion is quite high level, and at points it has the feel of a PR piece for Berkeley (there’s even a cute collective author email address: serverlessview at, but there’s enough of interest in its 35 pages for us to get our teeth into…

The basic structure is as follows. First we get the obligatory attempt at defining serverless, together a discussion of why it matters. Then the authors look at some of the current limitations (cf. ‘Serverless computing: one step forward, two steps back’) before going on to suggest areas for improvement in the coming years. The paper closes with a nice section on common fallacies (and pitfalls) of serverless, together with a set of predictions. I’m going to start there!

Predictions, fallacies, pitfalls

Fallacy: cloud function instances cost up to 7.5x more per minute than regular VM instances of similar capacity, therefore serverless cloud computing is more expensive than serverful cloud computing.

Rebuttal: the equivalent functionality that you’re getting from a function is much more than you can achieve with a single VM instance. Plus, you don’t pay when there are no events. Putting the two together means that serverless could well end up being less expensive.

Prediction (for me, the most interesting sentence in the whole paper!) :

We see no fundamental reason why the cost of serverless computing should be higher than that of serverful computing, so we predict that billing models will evolve so that almost any application, running at almost any scale, will cost no more and perhaps much less with serverless computing.

But yes, billing by actual usage means that costs will be less predictable (hopefully in a good way!) than ‘always-on’ solutions.

Fallacy: it’s easy to port applications between serverless computing providers since functions are written in high-level languages. (Does anyone really think that serverless increases portability? All the talk I hear is of the opposite concern!).

Rebuttal/Pitfall: vendor lock-in may be stronger with serverless computing than for serverful computing. (Yes, because of the reliance on the eventing model and the set of back-end services your functions bind too).

Prediction: “We expect new BaaS storage services to be created that expand the types of applications that run well on serverless computing,” and, “The future of serverless computing will be to facilitate BaaS.

Pitfall: (today) “few so called ‘elastic’ services match the real flexibility demands of serverless computing.”

Fallacy: cloud functions cannot handle very low latency applications needing predictable performance

Rebuttal: if you pre-warm / keep a pool of functions ready then they can (combine this thought with ‘we see no fundamental reason why the cost of serverless computing should be higher than that of serverful computing…’)


While serverful cloud computing won’t disappear, the relative importance of that portion of the cloud will decline as serverless computing overcomes its current limitations. Serverless computing will become the default computing paradigm of the Cloud Era, largely replacing serverful computing and thereby bringing closure to the Client-Server Era.

(So what’s the new term? Client-Function architecture?? )

Oh, and one more thing: “we expect serverless computing to become simpler to program securely than serverful computing…

Serverless motivation

I guess it’s definition time!

Put simply, serverless computing = FaaS + BaaS. In our definition, for a service to be considered serverless, it must scale automatically with no need for explicit provisioning, and be billed based on usage. In the rest of this paper, we focus on the emergence, evolution, and future of cloud functions…

If you squint just right, you can just about make out the other important part of a serverless platform in that equation: in the ‘+’ sign connecting FaaS and BaaS is the set of platform events that can be used to glue the two together, and they’re an important part of the overall model in my mind.

One of the key things that serverless gives is a transfer of responsibility from the user to the cloud provider for many operational concerns:

The authors list three critical distinctions between serverless and serverful computing:

  1. Decoupled computation and storage: storage and computation scale separately and are provisioned and priced independently. (Hang on, don’t I get that with EBS in ‘serverful’ mode??)
  2. Executing code without managing resource allocation.
  3. Paying in proportion to resources used instead of for resources allocated.

The fundamental differentiator between serverless computing and the previous generations of PaaS is the autoscaling and associated billing model:

  • tracking load with much greater fidelity than serverful autoscaling techniques,
  • scaling all the way down to zero, and
  • charging in much more fine-grained manner for time the code spends executing, not the resources reserved to execute the program.

But then there’s also this claim that I can’t make any sense of: “by allowing users to bring their own libraries, serverless computing can support a much broader range of applications than PaaS services which are tied closely to particular use cases.” I’m pretty sure that any library / package I can reference from a function I could also reference from my Node.js / Java / Ruby / Python /… app!

The most popular uses of serverless today, according to a 2018 survey are as follows:

Current limitations

Section 3 of the paper examines a set of five applications to understand the current strengths and weaknesses. These are summarised in the table below. It should not surprise you in the least that implementing a database using functions-as-a-service turns out to be a bad idea!!

One thing that several of these workloads have in common is resource requirements that can vary significantly during their execution, hence it is attractive to try and take advantage of the fine-grained autoscaling serverless offers.

Four main current limitations emerge from this exercise:

  1. Inadequate storage support for fine-grained operations
  2. A lack of fine-grained coordination mechanisms (the latency is too high in the standard eventing mechanisms)
  3. Poor performance for standard communication patterns (which can be O(N) or O(N^2) depending on the use case, but ‘N’ typically goes up a lot as we have many more functions than we did VMs).
  4. Cold-start latency leading to unpredictable performance

Suggestions for the future

The first suggestion (§4.1) concerns the support of serverless functions with particular resource requirements (e.g. access to accelerators).

One approach would be to enable developers to specify these resource requirements explicitly. However, this would make it harder for cloud providers to achieve high utilization through statistical multiplexing, as it puts more constraints on function scheduling… A better alternative would be to raise the level of abstraction, having the cloud provider infer resource requirements instead of having the developer specify them.

So scenario A the developer (for example) explicitly specifies via metadata that the function needs a GPU, and scenario B the platform infers through inspection that the function needs a GPU. It’s not clear to me how that makes any difference to the scheduling constraints! Although with clever inference maybe we can get closer to the true resource requirements, whereas we know developers tend to over-provision when making explicit requests. We’d have to take the level of abstraction right up to something close to SLOs for things to get really interesting though, and even then there would need to be multiple viable strategies for the cloud platform to select between in order to meet those targets. This topic is also touched on in section $4.5 where we find the following, “alternatively, the cloud provider could monitor the performance of the cloud functions and migrate them to the most appropriate hardware the next time they are run. “

The second suggestion is to expose function communication patterns (computation graphs) to the platform, so that this information can inform placement. AWS Step Functions is a step in this direction ;).

Section 4.2 discusses the need for more storage options. In particular, serverless ephemeral storage used for coordination between functions (e.g., an in-memory data grid), and serverless durable storage, which here means access to a low-latency persistent shared key-value store. A related ask is for a low-latency eventing / signaling service for function coordination.

It feels worth bringing up again here that instead of trying to replicate old application development patterns in a future cloud platform, a serverless approach using something like Bloom and its underlying CALM philosophy would give much of what we’re looking for here: a higher level abstraction with a naturally exposed computation graph and ability for the platform to make informed scheduling and autoscaling decisions.

Startup times can be minimised through the investigation of new lighter-weight isolation mechanisms, using unikernels, and lazy loading of libraries.

Section 4.4 discusses security challenges: fine-grained provisioning of access to private keys, delegation of privileges across functions, and information leakage through network communication patterns (which are more revealing with a larger number of fine-grained components).

Efficient synchronisation of state-based CRDTs

March 11, 2019

Efficient synchronisation of state-based CRDTs Enes et al., arXiv’18

CRDTs are a great example of consistency as logical monotonicity. They come in two main variations:

  • operation-based CRDTs send operations to remote replicas using a reliable dissemination layer with exactly-once causal delivery. (If operations are idempotent then at-least-once is ok too).
  • state-based CRDTs exchange information about the resulting state of the data structure (not the operations that led to the state being what it is). In the original form the full-state is sent each time. State-based CRDTs can tolerate dropped, duplicated, and re-ordered messages.

State-based CRDTs look attractive therefore, but over time as the state grows sending the full state every time quickly becomes expensive. That’s where Delta-based CRDTs come in. These send only the delta to the state needed to reconstruct the full state.

Delta-based CRDTs… define delta-mutators that return a delta ( \delta ), typically much smaller than the full state of the replica, to be merged with the local state. The same \delta is also added to an outbound \delta-buffer, to be periodically propagated to remote replicas. Delta-based CRDTs have been adopted in industry as part of the Akka Distributed Data framework and IPFS.

So far so good, but the analysis in this paper reveals an unexpected issue: when updates are frequent enough that concurrent update operations are occurring between synchronisation points, delta-propagation algorithms perform no-better than sending the full state.

You can see the effect in the following chart: delta-based propagation is transmitting pretty much the same amount of data as full state-based.

Moreover, due to the overheads of computing deltas etc., delta-propagation is also consuming a lot more CPU to do so.

I’m sure you’ve also spotted in those two figures the additional data series for ‘this paper.’ Two key enhancements to the delta-propagation algorithm are introduced which greatly reduce both the amount of data transmitted and the CPU (and memory) overheads.

Some example state-based CRDTs

To bring all this to life we need a couple of examples. The paper uses grow-only CRDT counters and grow-only sets. State-based CRDTs are based upon a join semi-lattice, \mathcal{L}, a set of mutators that update that state, and a binary join operator \sqcup that derives the least upper bound for any two elements of \mathcal{L}.

For a grow-only counter, we keep track of one (sub) counter per replica. The inc operation increments the replica-local counter, the join operator takes the higher of the two replica-local counter values for each replica, and the overall counter value is produced by summing the replica-local counters.

For a grow-only set (GSet) we just add elements to the set and the join operator is a set union.

Hasse diagrams can be used to show the evolution of the lattice. For example:

Spotting inefficiencies

Let’s look at one particular evolution of a GSet in a scenario with two replicas, A and B. In the diagram below, \bullet marks a point in time where synchronisation occurs. For example, at \bullet_1, replica B sends the delta \{b\} to replica A. A has also received a local update. At \bullet_2, replica A sends all updates it has received since the last time it propagated changes back to B. This is the set \{a, b\}. Meanwhile replica B has added c, and at \bullet_3 it sends all of its changes since \bullet_1 back to A.

It’s easy to spot some redundancy in transmission here. At \bullet_1, B sends \{b\} to A, and at \bullet_2, A is including \{b\} in the update it sends back to _A_ again. All of the underlined elements in the figure above are redundant transmissions of this type.

So it’s not exactly rocket science…

> … by simply tracking the origin of each \delta-group in the \delta-buffer, replicas can avoid back-propagation of \delta-groups.

The BP optimisation in effect says “don’t send an update back to the replica that told you about it in the first place!” In order to do that, we’ll need some extra book-keeping to remember where updates have come from. The evaluation shows this is worth it.

Here’s another example trace that shows a related issue:

Here we see that at \bullet_5, C is sending delta \{b\} to D. Then C receives an update \{a, b\} from A at \bullet_6. At \bullet_7 the update \{a, b\} is propagated from C to D, with the result that C has redundantly sent \{b\} to D in this second transmission. The RR optimisation removes this redundant transfer, we should…

> … remove redundant state in received \delta-groups (RR) before adding them to the delta buffer.

And that’s it! Don’t send an update back to the replica that sent it to you, and don’t send the same update to the same replica more than once.

A revised delta-based synchronisation algorithm

Now, those ideas are formalised using the notion of a join-irreducible state. A join irreducible state x is one that can’t be reconstructed from a join of some finite set of states not containing x. So the set \{a\} is join irreducible, but the set \{a, b\} is not because we can construct it by joining \{a\} and \{b\}. Every CRDT has a unique irredundant decomposition of a state into a set of join irreducible states. For the GSet for example, these are just the sets containing individual elements, and for the GCounter they are the individual replica-local counters. We can use this decomposition to find the minimum delta, \Delta(a,b) between two states a and b.

Here’s the classic delta-based synchronization algorithm for replica i, updated to include the BP and RR optimisations. Highlighted lines show the changes.

For BP, each \delta-buffer is tagged with its origin at line 5, so that we can filter out redundant deltas when sending updates in line 11. For RR we extract the minimal delta in line 15.


The evaluation takes place over two different network topologies. Both have 15 nodes, but one is cyclic (mesh) and one is acyclic (tree).

Each node performs an update and synchronises with its neighbours once per second, as per the table below.

Comparisons are made to vanilla delta state, delta state with each optimisation individually, delta state with both optimisations, and also state-based CRDTs (full state exchange), operation-based CRDTs, and CRDTs using the Scuttlebut anti-entropy protocol.

Here’s an analysis of the amount of data transmitted by GSet and GCounter in the tree and mesh topologies.

The first observation is that the classic delta-based synchronization presents almost no improvement, when compared to state-based synchronization.

In the tree topology, BP alone is enough to give good results, whereas the mesh topology benefits from RR.

Compared to Scuttlebut and operation-based CRDTs, delta-based CRDTs with both BP and RR have a much lower metadata overhead. The overhead can be as high as 75% with the former group, whereas the optimised delta-based algorithm can get this as low as 7.7%.

When used in the Retwis twitter clone for a moderate-to-high contention workload, the BP + RR optimisations can reduce CPU overhead by up to 7.9x compared to the classic delta-based algorithm.

State-based CRDT solutions quickly become prohibitive in practice, if there is no support for treatment of small incremental state deltas. In this paper we advance the foundation of state-based CRDTs by introducing minimal deltas that precisely track state changes.

De-dup FTW!

A generalised solution to distributed consensus

March 8, 2019

A generalised solution to distributed consensus Howard & Mortier, arXiv’19

This is a draft paper that Heidi Howard recently shared with the world via Twitter, and here’s the accompanying blog post. It caught my eye for promising a generalised solution to the consensus problem, and also for using reasoning over immutable state to get there. The state maintained at each server is monotonic.

Consensus is a notoriously hard problem, and Howard has been deep in the space for several years now. See for example the 2016 paper on Flexible Paxos. The quest for the holy grail here is to find a unifying and easily understandable protocol that can be instantiated in different configurations allowing different trade-offs to be made according to the situation.

This paper re-examines the problem of distributed consensus with the aim of improving performance and understanding. We proceed as follows. Once we have defined the problem of consensus, we propose a generalised solution to consensus that uses only immutable state to enable more intuitive reasoning about correctness. We subsequently prove that both Paxos and Fast Paxos are instances of our generalised consensus algorithm and thus show that both algorithms are conservative in their approach.

The distributed consensus problem

We have a set of servers (at least two) that need to agree upon a value. Clients take as input a value to be written, and produce as output the value agreed by the servers.

  • The output value must have been the input value of a client (ruling out simple solutions that always output a fixed value for example)
  • All clients that produce an output must output the same value
  • All clients must eventually produce an output if the system is reliable and synchronous for a sufficient period

Note that we’re talking here about the ‘inner game’ – this is just to agree upon a single value, whereas in a real deployment we’ll probably run multiple rounds of the protocol to agree upon a series of values. Moreover, we’re assuming here that the set of clients and servers is fixed and known to the clients. The ‘clients’ are probably not end-user clients, but more likely to be system processes making using of a consensus service. Configuration changes and membership epochs is another layer we’ll probably add-on in real deployments, but is also out of scope here.

Building blocks: immutable registers and quorums

We have a fixed set of n servers, S_0, S_1, ..., S_n. Each server has a tape it can write to, aka, an infinite array of write-once persistent registers, R_0, R_1, ....

Initially the tape is blank – all registers are unwritten. Once we write a value in a register slot it can never be changed. In addition to values provided by clients the system has one special value, denoted nil or \bot.

We’re going to be interested in sets of values from the same register slot across all servers. A register set, i is the set of all registers R_i across all servers.

If enough of the registers in a register set have the same (non- \bot) value, then we will say that the servers have decided upon that value. How many registers in a register set is enough though? That’s something we get to decide as part of the configuration of the protocol. In fact, we specify more than just the number of register values that must be in agreement, we also specify which servers they belong to. A (non-empty) subset of servers that can decide a value between them is called a quorum. More precisely, “a quorum, Q, is a (non-empty) subset of servers, such that if all servers have the same (non-nil) value v in the same register then v is said to be decided.

The word ‘quorum’ is often strongly associated with ‘majority,’ but note that this doesn’t have to be the case. The specification allows us to declare a quorum containing just a single server if we want to. The dictionary definition of quorum is “the minimum number of members of an assembly or society that must be present at any of its meetings to make the proceedings of that meeting valid.” Note that there is also no requirement here for overlapping quorum memberships (more on that later).

Each register set is associated with a set of quorums. That is, on a register-set by register-set basis, we can specify exactly which subsets of servers are allowed to decide a value. For example:

Why on earth we would want to have different quorum sets for different register sets— since we’re only agreeing on a single value at the end of the day!— will become more apparent when we look at how to layer a consensus protocol on top of the registers.

Given quorum configurations and register values for register sets, we can see if any quorum is satisfied and a decision reached.

The state of all registers can be represented in a table, known as a state table, where each column represents the state of one server and each row represents a register set. By combining a configuration with a state table, we can determine whether any decision(s) have been reached.

Now, we have to be a little careful here and remember that the state table is a logical construct. Each server knows the values of its own registers, and clients can maintain their own view of the global state table as they receive information from servers. Given the immutable constructs though, we know that once a client has assembled enough information to call a value decided, it is finally decided.

Four rules

There are some rules that need to be followed for this to work as a consensus system.

  1. Quorum agreement. A client may only output a (non-nil) value v if it has read v from a quorum of servers in the same register set.
  2. New value. A client may only write a (non-nil) value v provided that v is the client’s input value or that the client has read v from a register.
  3. Current decision. A client may only write a (non-nil) value v to register r on server s provided that if v is decided in register set r by a quorum Q \in Q_r where s \in Q then no value v' where v \neq v' can also be decided in register set r.
  4. Previous decisions. A client may only write a (non-nil) value to register r provided no value v' where v \neq v' can be decided by the quorums in registers sets 0 to r - 1.

The first two rules are pretty straightforward. Rule 3 ensures that all decisions made by a register set will be for the same value, while rule 4 ensures that all decisions made by different register sets are for the same value.

Obeying these rules places some further constraints on the system configuration. Say we have two quorums for register set i, \{S_0\} and \{S_1\}. A client writing to the register on S_0 has no way of knowing whether or not some other client might be writing a different value to S_1, hence we couldn’t uphold rule 3. There are three ways we can handle this situation:

  • The trivial solution is to allow only one quorum per register set
  • With multiple quorums, we can require that all quorums for a register set intersect (i.e., any two quorums will have at least one overlapping member)
  • We can use client restricted configurations. Here ownership of a given register set is assigned to a given client, such that only that client can write values in that register set. Assuming the client is consistent with the values it writes, all will be well. One such strategy would be a round-robin ownership of register sets:

To comply with rule 4, clients need to consult their emerging view of the global state table, aka their local state table, and figure out whether any decision has been or could be reached by earlier register sets. The client maintains a decision table with one entry for each quorum of each register set. This table tracks what the client knows about the decisions made by the quorum. Possible values for a quorum are:

  • ANY – any value could yet be decided. (All quorums for all register sets start out in this state)
  • MAYBE v – if this quorum reaches a decision, then the value will be v
  • DECIDED v – the value v has been decided by the quorum (a final state)
  • NONE – the quorum will not decide a value; a final state

As the client receives information from servers, it updates the decision table according to the following rules:

  • If the client reads the value nil from a quorum member, then the state of that quorum is set to NONE.
  • If the client reads a non-nil value v for register set r and all quorum members have the same value then the state of the quorum is set to DECIDED. Otherwise the state is set to MAYBE v. In this latter case, for the same quorum in earlier register sets a value of ANY will also be set to MAYBE v, but a value of MAYBE v' where v \neq v' will be updated to NONE.

Clients use the decision table to implement the four rules for correctness as follows:

Our aim is to make reasoning about correctness sufficiently intuitive that proofs are not necessary to make a convincing case for the safety…

You’ll be pleased to know however that proofs are included, and you’ll find them in Appendix A.

Paxos over immutable registers

As a paper summary, I guess this write-up so far has been a spectacular fail! I’m already over my target length, and have probably taken more words to explain the core ideas than the original paper does!! The only defence I can offer is that there are no short-cuts when it comes to thinking about consensus 😉

An important part of the paper is to show that the register-based scheme just outlined can be instantiated to realise a number of consensus schemes from the literature, including Paxos and Fast Paxos. I’m out of space to do that proper justice, but here’s a sneak-peek at the core Paxos algorithm:

We observe that Paxos is a conservative instance of our generalised solution to consensus. The configuration used by Paxos is majorities for all register sets.. Paxos also uses client restricted for all register sets. The purpose of phase one is to implement rule 4, and the purpose of phase two is to implement rule 1.

If you differentiate between the quorums used for each register set and which phase of Paxos the quorum is used for, you can arrive at Flexible Paxos.

Mixing it up

Section 5 of the paper shows how to derive Fast Paxos, and in section six you’ll find a brief sketch of three other points in the configuration space named co-located consensus, fixed-majority consensus, and reconfigurable consensus.

In this paper, we have reframed the problem of distributed consensus in terms of write-once registers and thus proposed a generalised solution to distributed consensus. We have demonstrated that this solution not only unifies existing algorithms including Paxos and Fast Paxos but also demonstrates that such algorithms are conservative as their quorum intersection requirements and quorum agreement rules can be substantially weakened. We have illustrated the power of our generalised consensus algorithm by proposing three novel algorithms for consensus, demonstrating a few interesting points on the diverse array of algorithms made possible by our abstract.

Keeping CALM: when distributed consistency is easy

March 6, 2019

Keeping CALM: when distributed consistency is easy Hellerstein & Alvaro, arXiv 2019

The CALM conjecture (and later theorem) was first introduced to the world in a 2010 keynote talk at PODS. Behind its simple formulation there’s a deep lesson to be learned with the power to create ripples through our industry akin to the influence of the CAP theorem. It rewards time spent ruminating on the implications. Therefore I was delighted to see this paper from Hellerstein & Alvaro providing a fresh and very approachable look at CALM that gives us an excuse to do exactly that. All we need now is a catchy name for a movement! A CALM system is a NoCo system, “No Coordination.”

When it comes to high performing scalable distributed systems, coordination is a killer. It’s the dominant term in the Universal Scalability Law. When we can avoid or reduce the need for coordination things tend to get simpler and faster. See for example Coordination avoidance in database systems, and more recently the amazing performance of Anna which gives a two-orders-of-magnitude speed-up through coordination elimination. So we should avoid coordination whenever we can.

So far so good, but when exactly can we avoid coordination? Becoming precise in the answer to that question is what the CALM theorem is all about. You’re probably familiar with Brooks’ distinction between essential complexity and accidental complexity in his ‘No silver bullet’ essay. Here we get to tease apart the distinction between essential coordination, a guarantee that cannot be provided without coordinating, and accidental coordination, coordination that could have been avoided with a more careful design.

In many cases, coordination is not a necessary evil, it is an incidental requirement of a design decision.

One of the causes of accidental coordination is our preoccupation with trying to solve consistency questions at lower levels of the stack using storage semantics. There’s an end-to-end argument to be made that we need to be thinking about the application level semantics instead. The lower layers have a part to play, but we can’t unlock the full potential if we focus only there. As Pat Helland articulated in ‘Building on quicksand’, writes don’t commute, but application level operations can. It’s also only at the application level that we can trade coordination for apologies too.

Anyway, here’s the central question:

What is the family of problems that can be consistently computed in a distributed fashion without coordination, and what problems lie outside that family?


Monotone speech. Monotony. Dull, boring, never a surprise or variation. Monotonic systems are similar, they only ever move in one direction. With monotonicity, once we learn something to be true, no further information can come down the line later on to refute that fact.

Consider deadlock detection in a distributed graph.

As the machines in the setup above exchange information about the edges they are aware of, then at some point the cycle involving T_1 and T_3 will come to light. No matter what other edges are later discovered, we know we have a deadlock. Deadlock detection in this system is monotonic.

Here’s another very similar looking distributed graph, but this time we’re interesting in reachability for the purposes of garbage collection.

More precisely, we’re interested in unreachability. Based on information to date, we may think that a certain object is unreachable. This property is not monotonic though – the very next edge we uncover could make it reachable again.

In the first instance, we were asking a “Does there exist?” ( \exists ) question. The presence of one positive example gives us an answer in the affirmative and additional positive examples don’t change that fact. In the second instance, we were asking a “Does not exist?” ( !\exists ) question. We can only answer a question like that once we’ve looked at every example. It’s the same for a “For all…” question ( \forall ).

With exactly the same graph, and exactly the same edge discovery algorithm, if we’d been interested in the property of reachability rather than unreachability, then that would have been monotonic. Once we know an object is reachable, finding out it is also reachable via a second path doesn’t change that fact. Unless… our system allows deletion of objects and edges. Then an object that was reachable can become unreachable again, and reachability is no longer monotonic.

What we’ve learned from these examples is that negation and universal quantification mess with monotonicity. We need the property that conclusions made on partial information continue to hold once we have the full information.

This idea of monotonicity turns out to be of central importance. It’s time to meet the CALM Theorem (Consistency as Logical Monotonicity):

Consistency as Logical Monotonicity (CALM). A program has a consistent, coordination-free distributed implementation if and only if it is monotonic.

The CALM theorem delineates the frontier between the possible (that’s the ‘if’ part) and the impossible (the ‘only if’ part). It’s most satisfying that this is also exactly the famous motivational poster message “Keep CALM and Carry On”. If you keep things CALM then you are always in a position to ‘carry on’ without needing to stop and coordinate.

Some more ‘C’ words

We’re making progress. We can now refine our original question to this: “What is the family of problems that can be computed in a monotonic fashion, and what problems lie outside that family?” So far we’ve been talking about CALM, consistency, and coordination. Now we need to introduce a couple more ‘C’ words: commutative and confluent.

Recall that a binary operation is commutative if the order of its operands makes no difference to the result. Addition is commutative, subtraction isn’t. Confluence as applied to program operations is a generalisation of the same idea. An operation is confluent if it produces the same sets of outputs for any non-deterministic ordering and batching of a set of inputs.

Confluent operations compose: if the outputs of one confluent operation are consumed by another, the resulting composite operation is confluent. Hence confluence can be applied to individual operations, components in a dataflow, or even entire distributed programs. If we restrict ourselves to building programs by composing confluent operations, our programs are confluent by construction, despite orderings of messages or execution races within and across components.

Confluent operations are the building blocks of monotonic systems. We still need to take care to avoid negation though (deletions, !\exists, and its alternative formulation: \forall). One creative solution for dealing with negation / deletions, as used for example by CRDT sets, is to keep a separate growing set of deletions alongside additions. In relational algebra terms we can have selection, projection, intersection, join, and transitive closure, but not set-difference. In terms of mutable state we can allow inserts, but not updates and deletes.

The key insight in CALM is to focus on consistency from the viewpoint of program outcomes rather than the traditional histories of storage mutation. The emphasis on the program being computed shifts focus from implementation to specification: it allows us to ask questions about what computations are possible.

CALM in practice

CRDTs provide an object-oriented framework for monotonic programming patterns. We really want to use them within a functional programming context though, or at least one that avoids bare assignment to mutable variables. Immutability is a trivial monotonic pattern, mutability is non-monotonic. Going further, the Bloom language was explicitly designed to support CALM application development:

  • Bloom makes set-oriented, monotonic (and hence confluent) programming the easiest constructs for programmers to work with in the language.
  • Bloom can leverage static analysis based on CALM to certify when programs provide the state-based convergence properties provided by CRDTs, and when those properties are preserved across compositions of modules.

If you can’t find a monotonic implementation for every feature of an application, one good strategy is keep coordination off the critical path. For example, in the garbage collection example we looked at earlier, garbage collection can run in the background. Another option is to proceed without coordination but put in place a mechanism to detect when this leads to an inconsistency so that the application can “apologise”.

The CALM Theorem presents a positive result that delineates the frontier of the possible. CALM shows that monotonicity, a property of a program, implies consistency, a property of the output of any execution of that program. The inverse is also established: non-monotonic programs require runtime enforcement (coordination) to ensure consistent execution. As a program property, CALM enables reasoning via static program analysis, and limits or eliminates the use of runtime checks.

There’s plenty more good material in the paper itself that I didn’t have space to cover here, so if these ideas have caught your interest I encourage you to check it out.

Efficient large-scale fleet management via multi-agent deep reinforcement learning

March 4, 2019

Efficient large-scale fleet management via multi-agent deep reinforcement learning Lin et al., KDD’18

A couple of weeks ago we looked at a survey paper covering approaches to dynamic, stochastic, vehicle routing problems (DSVRPs). At the end of the write-up I mentioned that I couldn’t help wondering about an end-to-end deep learning based approach to learning policy as an alternative to the hand-crafted algorithms. Lenz Belzner popped up on Twitter to point me at today’s paper choice, which investigates exactly that.

The particular variation of DSVRP studied here is grounded in a ride-sharing platform with real data provided by Didi Chuxing covering four weeks of vehicle locations and trajectories, and customer orders, in the city of Chengdu. With the area covered by 504 hexagonal grid cells, the centres of which are 1.2km apart, we’re looking at around 475 square kilometers. The goal is to reposition vehicles in the fleet at each time step (10 minute intervals) so as to maximise the GMV (total value of all orders) on the platform. We’re not given information on the number of drivers, passengers, and orders in the data set (nor on the actual GMV, all results are relative), but Chengdu has a population of just under 9 million people and Didi Chuxing itself serves over 550M users across 400 cities, with 30M+ rides/day, so I’m guessing there’s a reasonable amount of data available.

One key challenge in ride-sharing platforms is to balance the demands and supplies, i.e., orders of the passengers and drivers available for picking up orders. In large cities, although millions of ride-sharing orders are served everyday, an enormous number of passenger requests remain unserviced due to the lack of available drivers nearby. On the other hand, there are plenty of available drivers looking for orders in other locations.

The authors propose to tackle this fleet management problem using deep reinforcement learning (DRL). For reinforcement learning to succeed we need a well defined agent, reward policy, and action space. If we go with a single ‘fleet management’ agent then the action space becomes intractably large (all the possible moves for all of the vehicles). So an alternative is to consider each vehicle as an agent, and formulate a multi-agent DRL problem. The challenge then becomes the effective training of many thousands of agents, with agents also arriving and leaving over time: “most existing studies only allow coordination among a small set of agents due to the high computational costs.” Furthermore, we need to find a way to coordinate the actions of the agents (it’s no good for example having every driver converge on the same location). To deal with all of this the authors introduce two contextual multi-agent reinforcement learning algorithms for multi-agent DRL: contextual multi-agent actor critic (cA2C) and contextual deep Q-learning (cDQN).

…DRL approaches are rarely seen to be applied in complicated real-world applications, especially in those with high-dimensional and non-stationary action space, lack of well-defined reward function, and in need of coordination among a large number of agents. In this paper, we show that through careful reformulation, DRL can be applied to tackle the fleet management problem.

The problem is modelled as follows:

  • The map is overlaid with a hexagonal grid, with grid centers 1.2km apart.
  • The day is split into 144 10-minute intervals
  • In each time interval a given vehicle may move up to one grid cell in any direction (unless it’s on the edge of the grid, when choice may be further restricted). Thus an agent has up to 7 actions to choose from in each time step. At time t the joint action vector \mathbf{a}_t encodes the actions chosen by all agents through concatenation.
  • Orders emerge stochastically in the grid in each time interval, and are served by vehicles in the same or a neighbouring cell.
  • Global state \mathbf{s}_t is maintained comprising the numbers of vehicles and orders available in each grid cell, and the current time t. The state for a given agent is a concatenation of this global state plus a one-hot encoding of the current grid location of the agent (vehicle).
  • The state transition probability tells us the probability of arriving in a state \mathbf{s}_{t+1} given the current state \mathbf{s}_t and the joint action vector. The actions are of course deterministic, but there are also stochastic elements due to new vehicles coming online, existing vehicles going offline, and new orders arriving.
  • The reward function for an agent maps a state and action to a reward score. All agents in the same location share the same reward function. Each agent attempts to maximise its own expected discounted return. The reward for an agent a is based on the averaged revenue of all agents arriving at the same grid cell as a in the next time step.

Such a design of rewards aims at avoiding greedy actions that send too many agents to locations with high volumes of orders, and aligns the maximization of each agent’s return with the maximization of GMV.

The policies of the agents are evaluated and trained in a simulator informed by the Didi Chuxing dataset that performs the following steps in each time step:

  1. Update vehicle status (setting some offline, and bringing some new vehicles online)
  2. Generate new orders
  3. Interact with the agents, passing the new global state and receiving the agent actions
  4. Assign available orders through a two-stage procedure: first the orders in a given grid cell are assigned to vehicles in the same cell, then the remaining unfulfilled orders are assigned to vehicles in neighbouring cells.

Following calibration, the GMV reported by the simulator follows very closely the real data from the ride-sharing platform.

Within this context, cA2C and cDQN are two independently developed algorithms that are compared against each other (and to a number of other baselines) in the evaluation. Depending on the number of vehicles assumed to be initially online, cDQN and cA2C both give up to a 15% uplift in GMV over an unmanaged baseline (denoted ‘simulation’ in the table below), and increase the percentage of orders fulfilled from around 82% to 95%.


The algorithms

Contextual DQN shares network parameters across all agents, and distinguishes them with their ids. For the reward function, a key observation is that all actions which leave a vehicle in some grid cell g must share the same action value (regardless of action). So there are only N unique action values (for N grid cells). That follows straightforwardly from the averaged revenue definition of the reward function. This simplifies the optimisation and also provides the foundation of a coordination mechanism for agent collaboration.

There are two further constraints / inputs. First, a geographic context \mathbf{G}_{g_{j}} \in \mathbb{R}^7 is used to filter out invalid actions for an agent in grid location \mathbf{g}_j. This addresses edge locations, and other impassable interior grid cells (e.g., lakes). Secondly we want to avoid needless movement, as would occur for example if an agent in location 1 moved to location 2 in the same time step as an agent from location 2 moved to location 1. A collaborative context input \mathbf{C}_{t,g_j} \in \mathbb{R}^7 encodes this constraint by further restricting the valid actions for an agent: an agent can only move to a grid location with a high action value than their current location.

Contextual Actor-Critic is a contextual multi-agent actor-critic algorithm.

There are two main ideas in the design of cA2C: 1) A centralized value function shared by all agents with an expected update; 2) Policy context embedding that establishes explicit coordination among agents, enables faster training and enjoys the flexibility of regulating policy to different action spaces.

Coordination is again achieved by masking the available actions based on the geographic and collaborative contexts.

Comparing models with (cA2C v3) and without (cA2C) the collaborative context component shows that it is indeed effective in reducing conflicts and improving GMV.

Note that the collaboration doesn’t involve any true communication between the agents, it’s more an emergent property of the constraints in the shared state space.

In action

There’s a nice worked example at the end of the paper (§6.6) illustrating how the network learns demand-supply gaps and is able to reposition vehicles effectively. Here we’re looking at the region around the airport. After midnight there are a lot of orders and less available vehicles, so the ‘value’ of the airport grid cells increases and vehicles are moved towards it. During the early evening there are more vehicles available and the state values are lower. Vehicles also move to other locations.

…both cDQN and cA2C achieve large scale agents’ coordination in the fleet management problem. cA2C enjoys both flexibility and efficiency by capitalizing on a centralized value network and decentralized policy execution embedded with contextual information.