Skip to content

Slim: OS kernel support for a low-overhead container overlay network

March 22, 2019

Slim: OS kernel support for a low-overhead container overlay network Zhuo et al., NSDI’19

Container overlay networks rely on packet transformations, with each packet traversing the networking stack twice on its way from the sending container to the receiving container.

There are CPU, throughput, and latency overheads associated with those traversals.

In this paper, we ask whether we can design and implement a container overlay network, where packets go through the OS kernel’s network stack only once. This requires us to remove packet transformation from the overlay network’s data-plane. Instead, we implement network virtualization by manipulating connection-level metadata at connection setup time, saving CPU cycles and reducing packet latency.

Slim comes with some caveats: it requires a kernel module for secure deployment, has longer connection establishment times, doesn’t fit with packet-based network policies, and only handles TCP traffic. For UDP, ICMP, and for its own service discovery, it also relies on an existing container overlay network (Weave Net). But for longer lasting connections managed using connection-based network policies it delivers some impressive results:

  • memcached throughput up by 71%, with latency reduced by 42%, and CPU utilisation reduced by 56%.
  • Nginx CPU utilisation reduced by 22-24%
  • PostgreSQL CPU utilisation reduced by 22%
  • Apache Kafka CPU utilisation reduced by 10%

Since Slim both builds on and compares to Weave Net, I should say at this point that Weave Net was the very first open source project from Weaveworks, the “GitOps for Kubernetes” company. Accel is an investor in Weaveworks, and I am also a personal investor. If you’re using Kubernetes, you should definitely check them out. Anyway, on with the show…

Container networking

In theory there are four possible modes for container networking: a bridge mode for containers on the same host; host mode in which containers use the IP address of their host network interface; macvlan mode (or similar hardware mechanisms) to give each container its own IP address; and overlay mode in which each container is given its own own virtual network interface and each application has its own network namespace.

In practice, there are management and deployment challenges with the bridge, host, and macvlan approaches, so overlay networks such as Weave Net are the preferred solution.

Overlay packets are encapsulated with host network headers when routed on the host network. This lets the container overlay network have its own IP address space and network configuration that is disjoint from that of the host network; each can be managed completely independently. Many container overlay network solutions are available today— such as Weave, Flannel, and Docker Overlay— all of which share similar internal architectures.

Overlay network overheads

The overheads in overlay networking come from the per-packet processing inside the OS kernel: delivering a packet on the overlay network requires one extra traversal of the network stack and also packet encapsulation and decapsulation.

Here are some test measurements comparing Weave Net in fast dataplane mode to host networking to give an example:

In this test, compared to direct host networking, for two containers on the same host (Intra) the overlay network reduces throughput by 23% and increases latency by 34%. For containers communicating across hosts (Inter), throughput reduces by 48% and latency increases by 85%. The overheads are lower when communicating on the same host since packet encapsulation is not required.

Compared to host networking, the CPU utilisation also increases by 93%.

There are several known techniques to reduce the data plane overhead. Packet steering creates multiple queues, each per CPU core, for a network interface and uses consistent hashing to map packets to different queues. In this way, packets in the same network connection are processed only on a single core. Different cores therefore do not have access to the same queue, removing the overhead due to multi-core synchronization.

The authors integrated the above Receive Packet Steering (RPS), and also an enhancement called Receive Flow Steering (RFS— which further ensures that interrupt processing occurs on the same core as the application— into Weave Net. With this enhancement, throughput is within 9% of that achieved with host networking, but it makes almost no difference to latency.

Introducing Slim

The big idea in Slim is to reduce CPU utilisation and latency overheads by having packets traverse the network stack only once. That means you can’t do per-packet processing. Instead, Slim works at the connection level.

Slim virtualizes the network by manipulating connection-level metadata. SlimSocket exposes the POSIX socket interface to application binaries to intercept invocations of socket-related system calls. When SlimSocket detects an application is trying to set up a connection, it sends a request to SlimRouter. After SlimRouter sets up the network connection, it passes access to the connection as a file descriptor to the process inside the container. The application inside the container then uses the host namespace file descriptor to send/receive packets directly to/from the host network. Because SlimSocket has the exact same interface as the POSIX socket, and Slim dynamically links SlimSocket into the application, the application binary need not be modified.

Given that Slim is out of the picture once the connection is established, a separate mechanism is needed to support control plane and data plane policies. SlimRouter stores control plane policies and enforces them at connection setup time. If the policy changes, SlimRouter scans existing connections and removes the file descriptors for any connection violating the new policy. This requires the support of a kernel module, SlimKernModule. To avoid containers learning the IP addresses of host machines, SlimKernModule (in secure mode) also prohibits unsafe system calls on file descriptors (e.g. getpeername). Existing kernel mechanisms are used to enforce data plane policies.

This is what it looks like when Slim is used with blocking I/O:

Calls to the POSIX socket interface are intercepted by the SlimSocket shim and forward to the SlimRouter. For non-blocking I/O (e.g., select, epoll) Slim also intercepts these API calls and maintains mappings for epoll file descriptor sets. The SlimRouter needs to know the IP address and port mappings in order to establish connections. It does this using a Weave Net overlay network!

When the client calls connect, it actually creates an overlay network connection on the standard container overlay network. When the server receives an incoming connection on the standard overlay network, SlimSocket queries SlimRouter for the physical IP address and port and sends them to the client side inside the overlay connection. In secure mode (§4.3), the result queried from SlimRouter is encrypted. SlimSocket on the client side sends the physical IP address and port (encrypted if in secure mode) to its SlimRouter and the SlimRouter establishes the host connection. This means connection setup time is longer in Slim than that on container overlay networks based on packet transformation.

Weave Net is also used for packets that require data plane handling such as ICMP and UDP.

Evaluation

Microbenchmarks compare Slim to Weave Net with RFS. Creating on a TCP connection with Weave Net takes 270 µs. With Slim it takes 556µs (440µs in insecure mode). For applications with persistent connections, this additional overhead will not be significant. Compared to Weave Net, Slim reduces CPU overhead by 22-41%.

Slim and Weave Net are then further compared on four application workloads based on Memcached, Nginx, PostgreSQL, and Apache Kafka respectively.

For Memcached, Slim improves throughput by 71% and reduces latency by 42%, with 25% lower CPU utilisation.

For Nginx, PostgreSQL the main advantage of Slim is reduced CPU utilisation (around 22% reduction). For Kafka the CPU utilisation reduction is around 10%, but latency is also reduced by 28%.

Slim’s source code is available at https://github.com/danyangz/slim.

Understanding lifecycle management complexity of datacenter topologies

March 20, 2019

Understanding lifecycle management complexity of datacenter topologies Zhang et al., NSDI’19

There has been plenty of interesting research on network topologies for datacenters, with Clos-like tree topologies and Expander based graph topologies both shown to scale using widely deployed hardware. This research tends to focus on performance properties such as throughput and latency, together with resilience to failures. Important as these are, note that they’re also what’s right in front of you as a designer, and relatively easy to measure. The great thing about today’s paper is that the authors look beneath the surface to consider the less visible but still very important “lifecycle management” implications of topology design. In networking, this translates into how easy it is to physically deploy the network, and how easy it to subsequently expand. They find a way to quantify the associated lifecycle management costs, and then use this to help drive the design of a new class of topologies, called FatClique.

… we show that existing topology classes have low lifecycle management complexity by some measures, but not by others. Motivated by this, we design a new class of topologies, FatClique, that, while being performance-equivalent to existing topologies, is comparable to, or better than them by all our lifecycle management complexity metrics.

Now, there’s probably only a relatively small subset of The Morning Paper readers involved in designing and deploying datacenter network topologies. So my challenge to you as you read through this paper, is to think about where the hidden complexity and costs are in your own systems. Would you do things differently if these were made more visible? It would be great to see more emphasis for example on things like developer experience (DX) and operational simplicity – in my experience these kinds of attributes can have an outsize impact on the long-term success of a system.

Anyway, let’s get back to cables and switches…

Physically deploying network topologies

When it comes to laying out a network topology for real in a datacenter, you need to think about packaging, placement, and bundling. Packaging is how you group things together, e.g. the arrangement of switches in racks, and placement concerns how these racks are physically placed on the datacenter floor. Placement in turn determines the kinds of cabling you need, and for optical cables the power of the transceivers. Within a rack we might package several connected switches into a single chassis using a backplane. At the other end of the scale, blocks are larger units of co-placement and packaging that combine several racks. With all those connections, it makes things a lot easier to group together multiple fibres all connecting the same two endpoints (racks) into bundles, which contain a fixed number of identical length fibres.

Manufacturing bundles is simpler than manufacturing individual fibres, and handling such bundles significantly simplifies operational complexity.

Patch panels make bundling easier by providing a convenient aggregation point to create and route bundles. Bundles and fibres are physically routed through the datacenter on cable trays. The trays themselves have capacity constraints of course.

Here’s an example of a logical Clos topology and its physical instantiation:

The authors identify three key metrics that together capture much of the deployment complexity in a topology:

  1. The number of switches. More switches equals more packaging complexity.
  2. The number of patch panels, which is a function of topological structure and a good proxy for wiring complexity.
  3. The number of bundle types. This metric captures the other important part of wiring complexity – how many distinct bundle types are needed. A bundle type is represented by its capacity (how how many fibres) and its length.

These complexity measures are complete. The number of cable trays, the design of the chassis, and the number of racks can be derived from the number of switches (and the number of servers and the datacenter floor dimensions, which are inputs to the topology design). The number of cables and transceivers can be derived from the number of patch panels.

Here’s how Clos and Expander (Jellyfish) representative topologies for the same number of servers stack up against these metrics:

The expander graph topology shows much higher deployment complexity in terms of the number of bundle types. Clos also exposes far fewer ports outside of a rack (it has better port hiding).

Expanding existing networks

When you want to expand an existing network first you need to buy all the new gear and lay it out on the datacenter floor, and then you can begin a re-wiring process. This is all going on with live traffic flowing, so expansion is carried out in steps. During each step the capacity of the topology is guaranteed to be at least some percentage of the existing topology capacity. The percentage is sometimes known as the expansion SLO.

During a step existing links to be re-wired are drained, then human operators physical rewire links at patch panels. The new links are tested and then undrained (strange word!), i.e., brought into service.

For example, here’s a logical expansion (top row) and its physical realisation:

The most time-consuming part of all this is the physical rewiring. The two metrics that capture expansion complexity are therefore:

  1. The number of expansion steps, and
  2. The average number of rewired links in a patch panel rack.

Here’s how Clos and Expander stack up on those metrics for the same networks we saw earlier:

This time the victory goes to Expander (Jellyfish). Jellyfish has a much higher north-to-south capacity ratio. Northbound links exit a block, and southbound links are to/from servers within a block. “Fat edges” have more northbound than southbound links, and the extra capacity means you can accomplish more movement in each step.

Clos topologies re-wire more links in each patch panel during an expansion step and require many steps because they have a low north-south capacity ratio.

Enter the FatClique

Inspired by these insights, the authors define a new class of topologies called FatClique, which combine the hierarchical structure of Clos with the edge expansion capabilities of expander graphs.

There are three levels in the hierarchy. A clique of switches form a sub-block. Cliques of sub-blocks come together to form blocks. And cliques of blocks come together to from the full FatClique topology.

Four key design variables determine the particular instantiation of a FatClique topology: the number of ports in a switch that connect to other servers; the number of ports in a switch that connect to other sub-blocks in a block; the number of switches in a sub-block; and the number of sub-blocks in a block. A synthesis algorithm
takes a set of six input constraints (see §5.1) and determines the values for these four design variables.

There is plenty more detail in section 5 of the paper which I don’t have the space to do justice too here.

FatClique vs Clos vs Expander

The evaluation compares FatClique to Clos, Xpander, and Jellyfish at different network topology sizes, as shown in the table below.


(Enlarge)

Here’s how they stack up against the complexity metrics:

Number of switches

Number of patch panels

Number of bundle types

and associated cabling costs:

Number of expansion steps

Average number of rewired links

We find that FatClique is the best at most scales by all our complexity metrics. (The one exception is that at small and medium scales, Clos has slightly fewer patch panels). It uses 50% fewer switches and 33% fewer patch panels than Clos at large scale, and has a 23% lower cabling cost (an estimate we were able to derive from published cable prices). Finally, FatClique can permit fast expansion while degrading network capacity by small amounts (2.5-10%): at these levels, Clos can take 5x longer to expand the topology, and each step of Clos expansion can take longer than FatClique because the number of links to be rewired at each step per patch panel can be 30-50% higher.

The one thing I couldn’t find in the evaluation is any data to back up the opening claim that FatClique achieves all of this “while being performance-equivalent to existing topologies.”

The last word

As the management complexity of networks increases, the importance of designing for manageability will increase in the coming years. Our paper is only a first step in this direction…

Datacenter RPCs can be general and fast

March 18, 2019

Datacenter RPCs can be general and fast Kalia et al., NSDI’19

We’ve seen a lot of exciting work exploiting combinations of RDMA, FPGAs, and programmable network switches in the quest for high performance distributed systems. I’m as guilty as anyone for getting excited about all of that. The wonderful thing about today’s paper, for which Kalia et al. won a best paper award at NSDI this year, is that it shows in many cases we don’t actually need to take on that extra complexity. Or to put it another way, it seriously raises the bar for when we should.

eRPC (efficient RPC) is a new general-purpose remote procedure call (RPC) library that offers performance comparable to specialized systems, while running on commodity CPUs in traditional datacenter networks based on either lossy Ethernet or lossless fabrics… We port a production grade implementation of Raft state machine replication to eRPC without modifying the core Raft source code. We achieve 5.5 µs of replication latency on lossy Ethernet, which is faster than or comparable to specialized replication systems that use programmable switches, FPGAs, or RDMA.

eRPC just needs good old UDP. Lossy Ethernet is just fine (no need for fancy lossness networks), and it doesn’t need Priority Flow Control (PFC). The received wisdom is that you can either have general-purpose networking that works everywhere and is non-intrusive to applications but has capped performance, or you have to drop down to low-level interfaces and do a lot of your own heavy lifting to obtain really high performance.

The goal of our work is to answer the question: can a general-purpose RPC library provide performance comparable to specialized systems?

Astonishingly, Yes.

From the evaluation using two lossy Ethernet cluster (designed to mimic the setups used in Microsoft and Facebook datacenters):

  • 2.3µs median RPC latency
  • up to 10 million RPCs / second on a single core
  • large message transfer at up to 75Gbps on a single core
  • peak performance maintained even with 20,000 connections per node (2 million cluster wide)

eRPC’s median latency on CX5 is only 2.3µs, showing that latency with commodity Ethernet NICs and software networking is much lower than the widely-believed value of 10-100µs.

(CURP over eRPC in a modern datacenter would be a pretty spectacular combination!).

So the question that immediately comes to mind is how? As in, “what magic is this?”.

The secret to high-performance general-purpose RPCs

… is a carefully considered design that optimises for the common case and also avoids triggering packet loss due to switch buffer overflows for common traffic patterns.

That’s it? Yep. You won’t find any super low-level fancy new exotic algorithm here. Your periodic reminder that thoughtful design is a high leverage activity! You will of course find something pretty special in the way all the pieces come together.

So what assumptions go into the ‘common case?’

  • Small messages
  • Short duration RPC handlers
  • Congestion-free networks

Which is not to say that eRPC can’t handle larger messages, long-running handlers, and congested networks. It just doesn’t pay a contingency overhead price when they are absent.

Optimisations for the common case (which we’ll look at next) boost performance by up to 66% in total. On this base eRPC also enables zero-copy transmissions and a design that scales while retaining a constant NIC memory footprint.

The core model is as follows. RPCs are asynchronous and execute at most once. Servers register request handler functions with unique request types, and clients include the request types when issuing requests. Clients receive a continuation callback on RPC completion. Messages are stored in opaque DMA-capable buffers provided by eRPC, called msg-bufs. Each RPC endpoint (one per end user thread) has an RX and TX queue for packet I/O, an event loop, and several sessions.

The long and short of it

When request handlers are run directly in dispatch threads you can avoid expensive inter-thread communication (adding up to 400ns of request latency). That’s fine when request handlers are short in duration, but long handlers block other dispatch handling increasing tail latency, and prevent rapid congestion feedback.

eRPC supports running handlers in dispatch threads for short duration request types (up to a few hundred nanoseconds), and worker threads for longer running requests. Which mode to use is specified when the request handler is registered. This is the only additional user input needed in eRPC.

Scalable connection state

eRPC’s choice to use packet I/O over RDMA avoids the circular buffer scalability bottleneck in RDMA (see §4.1.1). By taking advantage of multi-packet RX-queue (RQ) descriptors in modern NICs, eRPC can use constant space in the NIC instead of a footprint that grows with the number of connected sessions (see Appendix A).

Furthermore, eRPC replaces NIC-managed connection state with CPU-managed connection state.

This is an explicit design choice, based upon fundamental differences between the CPU and NIC architectures. NICs and CPUs will both cache recently used connection state. CPU cache misses are served from DRAM, whereas NIC cache misses are served from the CPU’s memory subsystem over the slow PCIe bus. The CPU’s miss penalty is therefore much lower. Second, CPUs have substantially larger caches than the ~2MB available on a modern NIC, so the cache miss frequency is also lower.

Zero-copy transmission

Zero-copy packet I/O in eRPC provides performance comparable to lower level interfaces such as RDMA and DPDK. The msgbuf layout ensures that the data region is contiguous (so that applications can use it as an opaque buffer) even when the buffer contains data for multiple packets. The first packet’s data and header are also contiguous so that the NIC can fetch small messages with one DMA read. Headers for remaining packets are at the end, to allow for the contiguous data region in the middle.

eRPC must ensure that it doesn’t mess with msgbufs after ownership is returned to the application, which is fundamentally addressed by making sure it retains no reference to the buffer. Retransmissions can interfere with such a scheme though. eRPC chooses to use “unsignaled” packet transmission optimising for the common case of no retransmission. The trade-off is a more expensive process when retransmission does occur:

We flush the TX DMA queue after queuing a retransmitted packet, which blocks until all queued packets are DMA-ed. This ensures the required invariant: when a response is processed, there are no references to the request in the DMA queue.

eRPC provides zero copy reception for workloads under the common-case of single packet requests and dispatch mode request handlers too, which boosts eRPCs message rate by up to 16%.

Sessions and flow control

Sessions support concurrent requests (8 by default) that can complete out-of-order with respect to each other. Sessions use an array of slots to track RCP metadata for outstanding requests, and slots have an MTU-size preallocated msgbuf for use by request handlers that issue short responses. Session credits are used to implement packet-level flow control. Session credits also support end-to-end flow control to reduce switch queuing. Each session is given BDP/MTU credits, which ensures that each session can achieve line rate.

Client-driven wire protocol

We designed a wire protocol for eRPC that is optimized for small RPCs and accounts for per-session credit limits. For simplicity, we chose a simple client-driven protocol, meaning that each packet sent by the server is in response to a client packet.

Client-driven protocols have fewer moving parts, with only the client needing to maintain wire protocol state. Rate limiting becomes solely a client responsibility too, freeing server CPU.

Single-packet RPCs (request and response require only a single packet) use the fewest packets possible. With multi-packet responses and a client-driven protocol the server can’t immediately send response packets after the first one, so the client sends a request-for-response (RFR) packet. In practice this added latency turned out to be less than 20% for responses with four or more packets.

Congestion control

eRPC can use either Timely or DCQCN for congestion control. The evaluation uses Timely as the cluster hardware could not support DCQCN. Three optimisation brought the overhead of congestion control down from around 20% to 9%:

  1. Bypassing Timely altogether is the RTT of a received packet on an uncongested session is less than a low threshold value.
  2. Bypassing the rate limiter for uncongested sessions
  3. Sampling timers once per RX or TX batch rather than once per packet for RTT measurement

These optimisation works because datacenter networks are typically uncongested. E.g. at one-minute timescales 99% of all Facebook datacenter links are uncongested, and for web and cache traffic Google, 90% of ToR switch links (the most congested), are less than 10% utilized at 25 µs timescales.

Packet loss

eRPC keeps things simple by treating re-ordered packets as losses and dropping them (as do current RDMA NICs). When a client suspects a lost packet it rolls back the request’s wire protocol state using a ‘go-back-N’ mechanism. It reclaims credits and retransmits from the rollback point.

Evaluation highlights

This write-up is in danger of getting too long again, so I’ll keep this very brief. The following table shows the contribution of the various optimisations through ablation:

We conclude that optimizing for the common case is both necessary and sufficient for high-performance RPCs.

Here you can see latency with increasing threads. eRPC achieves high message rate, bandwidth, and scalability with low latency in a large cluster with lossy Ethernet.

For large RPCs, eRPC can achieve up to 75 Gbps with one core.

Section 7 discusses the integration of eRPC in an existing Raft library, and in the Masstree key-value store. From the Raft section the authors conclude: “the main takeaway is that microsecond-scale consistent replication is achievable in commodity Ethernet datacenters with a general-purpose networking library.

eRPC’s speed comes from prioritizing common-case performance, carefully combining a wide range of old and new optimizations, and the observation that switch buffer capacity far exceeds datacenter BDP. eRPC delivers performance that was until now believed possible only with lossless RDMA fabrics or specialized network hardware. It allows unmodified applications to perform close to the hardware limits.

Exploiting commutativity for practical fast replication

March 15, 2019

Exploiting commutativity for practical fast replication Park & Ousterhout, NSDI’19

I’m really impressed with this work. The authors give us a practical-to-implement enhancement to replication schemes (e.g., as used in primary-backup systems) that offers a signification performance boost. I’m expecting to see this picked up and rolled-out in real-world systems as word spreads. At a high level, CURP works by dividing execution into periods of commutative operation where ordering does not matter, punctuated by full syncs whenever commutativity would break.

The Consistent Unordered Replication Protocol (CURP) allows clients to replicate requests that have not yet been ordered, as long as they are commutative. This strategy allows most operations to complete in 1 RTT (the same as an unreplicated system).

When integrated with RAMCloud write latency was improved by ~2x, and write throughput by 4x. Which is impressive given that RAMCloud isn’t exactly hanging around in the first place! When integrated with Redis, CURP was able to add durability and consistency while keeping similar performance to non-durable Redis.

CURP can be easily applied to most existing systems using primary-backup replication. Changes required by CURP are not intrusive, and it works with any kind of backup mechanism (e.g., state-machine replication, file writes to network replicated drives, or scattered replication). This is important since most high-performance systems optimize their backup mechanisms, and we don’t want to lose those optimizations…

The big idea: durability, ordering, and commutativity

We’re looking for two things from a replication protocol: durability of executed operations, and consistent ordering across all replicas for linearizability. Hence the classic primary-backup protocol in which clients send requests to a primary, which by being a single instance serves to give a global order to requests. The primary then ensures that the update is propagated to backup replicas before returning to the client. This all takes 2 RTTs. Consensus protocols with strong leaders also require 2 RTTs. Fast Paxos and Generalized Paxos reduce latency to 1.5 RTT through optimistic replication with presumed ordering. Network-Ordered Paxos and Speculative Paxos can get close to 1 RTT latency, but require special networking hardware.

CURP’s big idea is as follows: when operations are commutative, ordering doesn’t matter. So the only property we have to ensure during a commutative window is durability. If the client sends a request in parallel to the primary and to f additional servers which all make the request durable, then the primary can reply immediately and once the client has received all f confirmations it can reveal the result. This gives us a 1 RTT latency for operations that commute, and we fall back to the regular 2 RTT sync mechanism when an operation does not commute.


Those f additional servers are called witnesses. If you’re building a new system from scratch, it makes a lot of sense to combine the roles of backup and witness in a single process. But this is not a requirement, and for ease of integration with an existing system witnesses can be separate.

… witnesses do not carry ordering information, so clients can directly record operations into witnesses in parallel with sending operations to masters so that all requests will finish in 1 RTT. In addition to the unordered replication to witnesses, masters still replicate ordered data to backups, but do so asynchronously after sending the execution results back to the clients.

This should all be raising a number of questions, chief among which is what should happen when a client, witness, or primary crashes. We’ll get to that shortly. The other obvious question is how do we know when operations commute?

Witnesses must be able to determine whether operations are commutative or not just from the operation parameters. For example, in key-value stores, witnesses can exploit the fact that operations on different keys are commutative… Witnesses cannot be used for operations whose commutativity depends on the system state.

So CURP works well with key-value stores, but not with SQL-based stores (in which WHERE clauses for example mean that operation outcomes can depend on system state).

The CURP protocol

CURP uses a total of f+1 replicas to tolerate f failures (as per standard primary-backup), and additionally uses f witnesses to ensure durability of updates even before replications to backups are completed. Witnesses may fail independently of backups.

To ensure the durability of the speculatively completed updates, clients multicast update operations to witnesses. To preserve linearizability, witnesses and masters enforce commutativity among operations that are not fully replicated to backups.

Operation in the absence of failures

Clients send update requests to the primary, and concurrently to the f witnesses allocated to the primary. Once the client has responses from all f witnesses (indicating durability) and the primary (which responds immediately without waiting for data to replicate to backups) then it can return the result. This is the 1 RTT path.

If the client sends an operation to a witness, and instead of confirming acceptance the witness rejects that operation (because it does not commute with the other operations currently being held by the witness), then the client cannot complete in 1 RTT. It now sends a sync request to the primary and awaits the response (indicating data is safely replicated and the operation result can be returned). Thus the operation latency in this case in 2 RTTs in the best case, and up to 3 RTTs in the worst case depending on the status of replication between primary and backups at the point the sync request was received.

Witnesses durably record operations requested by clients. Non-volatile memory (e.g. flash-backed DRAM) is a good choice for this as the amount of space required is relatively small. On receiving a client request, the witness saves the operation and sends an accept response if it commutes with all other saved operations. Otherwise it rejects the request. All witnesses operate independently. Witnesses may also receive garbage collection RPCs from their primary, indicating the RPC IDs of operations that are now durable (safely replicated by the primary to all backups). The witnesses can then delete these operations from their stores.

A primary receives, serializes, and executes all update RPC requests from clients. In general a primary will respond to clients before syncing to backups (speculative execution) leading to unsynced operations. If the primary receives an operation request which does not commute with all currently unsynced operations then it must sync before responding (and adds a ‘synced’ header in the response so that the client knows an explicit sync request is unnecessary. Primaries can batch and asynchronously replicate RPC requests. E.g. with 3-way primary-backup replication and batches of 10 RPCs, we have 1.3 RPCs per request on average. The optimal batch and window size depends on the particulars of the workload.

For read requests clients can go directly to primaries (no need for the durability afforded by witnesses). The primary must sync before returning if the read request does not commute with all currently unsynced updates. Clients can also read from a backup while retaining linearizability guarantees if they also confirm with a witness that the read operation commutes with all operations currently saved in the witness.

Recovery

If a client does not get a response from a primary it resends the update RPC to a new primary, and tries to record the RPC request in the witnesses of that primary. If the client does not get a response from a witness, it can fall back to the traditional replication path by issuing a sync request to the primary.

If a witness crashes it is decommissioned and a new witness is assigned to the primary. The primary is notified of its new witness list. The primary then syncs to backups before responding that it is now safe to recover from the new witness. A witness list version number maintained by the primary and sent by the client on every request so that clients can be notified of the change and update their lists.

If a primary crashes there may have been unsynced operations for which clients have received results but the data is not yet replicated to backups. A new primary is allocated, and bootstrapped by recovering from one of the backups. The primary then picks any available witness, asks it to stop accepting more operations, and then retrieves all requests known to the witness. The new primary can execute these requests in any order since they are known to be commutative. With the operations executed, the primary syncs to backups and resets all the witnesses. It can now begin accepting requests again.

The gotcha in this scheme is that some of the requests sent by the witness may in fact have already been executed and replicated to backups before the primary crashed. We don’t want to re-execute such operations:

Duplicate executions of the requests can violate linearizability. To avoid duplicate executions of the requests that are already replicated to backups, CURP relies on exactly-once semantics provided by RIFL, which detects already executed client requests and avoids their re-execution.

(We studied RIFL in an earlier edition of The Morning Paper, RIFL promises efficient ‘bolt-on’ linearizability for distributed RPC systems).

Performance evaluation

We evaluated CURP by implementing it in the RAMCloud and Redis storage systems, which have very different backup mechanisms. First, using the RAMCloud implementation, we show that CUP improves the performance of consistently replicated systems. Second, with the Redis implementation we demonstrate that CURP can make strong consistency affordable in a system where it had previously been too expensive for practical use.

I’m out of space to do the evaluation justice, but you’ll find full details in §5. Here are a couple of highlights…

The chart below shows the performance of Redis without durability (i.e., as most people use it today), with full durability in the original Redis scheme, and with durability via CURP. You can clearly see that using CURP we get the benefits of consistency and durability with performance much closer to in-memory only Redis.

And here’s the change in latency of write operations for RAMCloud. It took me a moment to understand what we’re seeing here, since it looks like the system gets slower as the write percentage goes down. But remember that we’re only measuring write latency here, not average request latency. Presumably with more reads we need to sync more often.  

Thanks to Qian Li who points out in the comments I’d completely missed the crucial ‘cumulative’ word in the description of this chart. Now it all makes so much more sense! Only a tiny fraction of writes have higher latency, and CURP is **improving** tail latencies. 

In the RAMCloud implementation, witnesses could handle 1270K requests per second on a single thread, with interwoven gc requests at 1 in every 50 writes from the primary. The memory overhead is just 9MB. CURP does however increase network bandwidth usage for update operations by 75%.

Cloud computing simplified: a Berkeley view on serverless computing

March 13, 2019

Cloud programming simplified: a Berkeley view on serverless computing Jonas et al., arXiv 2019

With thanks to Eoin Brazil who first pointed this paper out to me via Twitter….

Ten years ago Berkeley released the ‘Berkeley view of cloud computing’ paper, predicting that cloud use would accelerate. Today’s paper choice is billed as its logical successor: it predicts that the use of serverless computing will accelerate. More precisely:

… we predict that serverless computing will grow to dominate the future of cloud computing.

The acknowledgements thank reviewers from Google, Amazon, and Microsoft among others, so it’s reasonable to assume the major cloud providers have had at least some input to the views presented here. The discussion is quite high level, and at points it has the feel of a PR piece for Berkeley (there’s even a cute collective author email address: serverlessview at berkeley.edu), but there’s enough of interest in its 35 pages for us to get our teeth into…

The basic structure is as follows. First we get the obligatory attempt at defining serverless, together a discussion of why it matters. Then the authors look at some of the current limitations (cf. ‘Serverless computing: one step forward, two steps back’) before going on to suggest areas for improvement in the coming years. The paper closes with a nice section on common fallacies (and pitfalls) of serverless, together with a set of predictions. I’m going to start there!

Predictions, fallacies, pitfalls

Fallacy: cloud function instances cost up to 7.5x more per minute than regular VM instances of similar capacity, therefore serverless cloud computing is more expensive than serverful cloud computing.

Rebuttal: the equivalent functionality that you’re getting from a function is much more than you can achieve with a single VM instance. Plus, you don’t pay when there are no events. Putting the two together means that serverless could well end up being less expensive.

Prediction (for me, the most interesting sentence in the whole paper!) :

We see no fundamental reason why the cost of serverless computing should be higher than that of serverful computing, so we predict that billing models will evolve so that almost any application, running at almost any scale, will cost no more and perhaps much less with serverless computing.

But yes, billing by actual usage means that costs will be less predictable (hopefully in a good way!) than ‘always-on’ solutions.

Fallacy: it’s easy to port applications between serverless computing providers since functions are written in high-level languages. (Does anyone really think that serverless increases portability? All the talk I hear is of the opposite concern!).

Rebuttal/Pitfall: vendor lock-in may be stronger with serverless computing than for serverful computing. (Yes, because of the reliance on the eventing model and the set of back-end services your functions bind too).

Prediction: “We expect new BaaS storage services to be created that expand the types of applications that run well on serverless computing,” and, “The future of serverless computing will be to facilitate BaaS.

Pitfall: (today) “few so called ‘elastic’ services match the real flexibility demands of serverless computing.”

Fallacy: cloud functions cannot handle very low latency applications needing predictable performance

Rebuttal: if you pre-warm / keep a pool of functions ready then they can (combine this thought with ‘we see no fundamental reason why the cost of serverless computing should be higher than that of serverful computing…’)

Prediction:

While serverful cloud computing won’t disappear, the relative importance of that portion of the cloud will decline as serverless computing overcomes its current limitations. Serverless computing will become the default computing paradigm of the Cloud Era, largely replacing serverful computing and thereby bringing closure to the Client-Server Era.

(So what’s the new term? Client-Function architecture?? )

Oh, and one more thing: “we expect serverless computing to become simpler to program securely than serverful computing…

Serverless motivation

I guess it’s definition time!

Put simply, serverless computing = FaaS + BaaS. In our definition, for a service to be considered serverless, it must scale automatically with no need for explicit provisioning, and be billed based on usage. In the rest of this paper, we focus on the emergence, evolution, and future of cloud functions…

If you squint just right, you can just about make out the other important part of a serverless platform in that equation: in the ‘+’ sign connecting FaaS and BaaS is the set of platform events that can be used to glue the two together, and they’re an important part of the overall model in my mind.

One of the key things that serverless gives is a transfer of responsibility from the user to the cloud provider for many operational concerns:

The authors list three critical distinctions between serverless and serverful computing:

  1. Decoupled computation and storage: storage and computation scale separately and are provisioned and priced independently. (Hang on, don’t I get that with EBS in ‘serverful’ mode??)
  2. Executing code without managing resource allocation.
  3. Paying in proportion to resources used instead of for resources allocated.

The fundamental differentiator between serverless computing and the previous generations of PaaS is the autoscaling and associated billing model:

  • tracking load with much greater fidelity than serverful autoscaling techniques,
  • scaling all the way down to zero, and
  • charging in much more fine-grained manner for time the code spends executing, not the resources reserved to execute the program.

But then there’s also this claim that I can’t make any sense of: “by allowing users to bring their own libraries, serverless computing can support a much broader range of applications than PaaS services which are tied closely to particular use cases.” I’m pretty sure that any library / package I can reference from a function I could also reference from my Node.js / Java / Ruby / Python /… app!

The most popular uses of serverless today, according to a 2018 survey are as follows:

Current limitations

Section 3 of the paper examines a set of five applications to understand the current strengths and weaknesses. These are summarised in the table below. It should not surprise you in the least that implementing a database using functions-as-a-service turns out to be a bad idea!!

One thing that several of these workloads have in common is resource requirements that can vary significantly during their execution, hence it is attractive to try and take advantage of the fine-grained autoscaling serverless offers.

Four main current limitations emerge from this exercise:

  1. Inadequate storage support for fine-grained operations
  2. A lack of fine-grained coordination mechanisms (the latency is too high in the standard eventing mechanisms)
  3. Poor performance for standard communication patterns (which can be O(N) or O(N^2) depending on the use case, but ‘N’ typically goes up a lot as we have many more functions than we did VMs).
  4. Cold-start latency leading to unpredictable performance

Suggestions for the future

The first suggestion (§4.1) concerns the support of serverless functions with particular resource requirements (e.g. access to accelerators).

One approach would be to enable developers to specify these resource requirements explicitly. However, this would make it harder for cloud providers to achieve high utilization through statistical multiplexing, as it puts more constraints on function scheduling… A better alternative would be to raise the level of abstraction, having the cloud provider infer resource requirements instead of having the developer specify them.

So scenario A the developer (for example) explicitly specifies via metadata that the function needs a GPU, and scenario B the platform infers through inspection that the function needs a GPU. It’s not clear to me how that makes any difference to the scheduling constraints! Although with clever inference maybe we can get closer to the true resource requirements, whereas we know developers tend to over-provision when making explicit requests. We’d have to take the level of abstraction right up to something close to SLOs for things to get really interesting though, and even then there would need to be multiple viable strategies for the cloud platform to select between in order to meet those targets. This topic is also touched on in section $4.5 where we find the following, “alternatively, the cloud provider could monitor the performance of the cloud functions and migrate them to the most appropriate hardware the next time they are run. “

The second suggestion is to expose function communication patterns (computation graphs) to the platform, so that this information can inform placement. AWS Step Functions is a step in this direction ;).

Section 4.2 discusses the need for more storage options. In particular, serverless ephemeral storage used for coordination between functions (e.g., an in-memory data grid), and serverless durable storage, which here means access to a low-latency persistent shared key-value store. A related ask is for a low-latency eventing / signaling service for function coordination.

It feels worth bringing up again here that instead of trying to replicate old application development patterns in a future cloud platform, a serverless approach using something like Bloom and its underlying CALM philosophy would give much of what we’re looking for here: a higher level abstraction with a naturally exposed computation graph and ability for the platform to make informed scheduling and autoscaling decisions.

Startup times can be minimised through the investigation of new lighter-weight isolation mechanisms, using unikernels, and lazy loading of libraries.

Section 4.4 discusses security challenges: fine-grained provisioning of access to private keys, delegation of privileges across functions, and information leakage through network communication patterns (which are more revealing with a larger number of fine-grained components).

Efficient synchronisation of state-based CRDTs

March 11, 2019

Efficient synchronisation of state-based CRDTs Enes et al., arXiv’18

CRDTs are a great example of consistency as logical monotonicity. They come in two main variations:

  • operation-based CRDTs send operations to remote replicas using a reliable dissemination layer with exactly-once causal delivery. (If operations are idempotent then at-least-once is ok too).
  • state-based CRDTs exchange information about the resulting state of the data structure (not the operations that led to the state being what it is). In the original form the full-state is sent each time. State-based CRDTs can tolerate dropped, duplicated, and re-ordered messages.

State-based CRDTs look attractive therefore, but over time as the state grows sending the full state every time quickly becomes expensive. That’s where Delta-based CRDTs come in. These send only the delta to the state needed to reconstruct the full state.

Delta-based CRDTs… define delta-mutators that return a delta ( \delta ), typically much smaller than the full state of the replica, to be merged with the local state. The same \delta is also added to an outbound \delta-buffer, to be periodically propagated to remote replicas. Delta-based CRDTs have been adopted in industry as part of the Akka Distributed Data framework and IPFS.

So far so good, but the analysis in this paper reveals an unexpected issue: when updates are frequent enough that concurrent update operations are occurring between synchronisation points, delta-propagation algorithms perform no-better than sending the full state.

You can see the effect in the following chart: delta-based propagation is transmitting pretty much the same amount of data as full state-based.

Moreover, due to the overheads of computing deltas etc., delta-propagation is also consuming a lot more CPU to do so.

I’m sure you’ve also spotted in those two figures the additional data series for ‘this paper.’ Two key enhancements to the delta-propagation algorithm are introduced which greatly reduce both the amount of data transmitted and the CPU (and memory) overheads.

Some example state-based CRDTs

To bring all this to life we need a couple of examples. The paper uses grow-only CRDT counters and grow-only sets. State-based CRDTs are based upon a join semi-lattice, \mathcal{L}, a set of mutators that update that state, and a binary join operator \sqcup that derives the least upper bound for any two elements of \mathcal{L}.

For a grow-only counter, we keep track of one (sub) counter per replica. The inc operation increments the replica-local counter, the join operator takes the higher of the two replica-local counter values for each replica, and the overall counter value is produced by summing the replica-local counters.

For a grow-only set (GSet) we just add elements to the set and the join operator is a set union.

Hasse diagrams can be used to show the evolution of the lattice. For example:

Spotting inefficiencies

Let’s look at one particular evolution of a GSet in a scenario with two replicas, A and B. In the diagram below, \bullet marks a point in time where synchronisation occurs. For example, at \bullet_1, replica B sends the delta \{b\} to replica A. A has also received a local update. At \bullet_2, replica A sends all updates it has received since the last time it propagated changes back to B. This is the set \{a, b\}. Meanwhile replica B has added c, and at \bullet_3 it sends all of its changes since \bullet_1 back to A.

It’s easy to spot some redundancy in transmission here. At \bullet_1, B sends \{b\} to A, and at \bullet_2, A is including \{b\} in the update it sends back to _A_ again. All of the underlined elements in the figure above are redundant transmissions of this type.

So it’s not exactly rocket science…

> … by simply tracking the origin of each \delta-group in the \delta-buffer, replicas can avoid back-propagation of \delta-groups.

The BP optimisation in effect says “don’t send an update back to the replica that told you about it in the first place!” In order to do that, we’ll need some extra book-keeping to remember where updates have come from. The evaluation shows this is worth it.

Here’s another example trace that shows a related issue:

Here we see that at \bullet_5, C is sending delta \{b\} to D. Then C receives an update \{a, b\} from A at \bullet_6. At \bullet_7 the update \{a, b\} is propagated from C to D, with the result that C has redundantly sent \{b\} to D in this second transmission. The RR optimisation removes this redundant transfer, we should…

> … remove redundant state in received \delta-groups (RR) before adding them to the delta buffer.

And that’s it! Don’t send an update back to the replica that sent it to you, and don’t send the same update to the same replica more than once.

A revised delta-based synchronisation algorithm

Now, those ideas are formalised using the notion of a join-irreducible state. A join irreducible state x is one that can’t be reconstructed from a join of some finite set of states not containing x. So the set \{a\} is join irreducible, but the set \{a, b\} is not because we can construct it by joining \{a\} and \{b\}. Every CRDT has a unique irredundant decomposition of a state into a set of join irreducible states. For the GSet for example, these are just the sets containing individual elements, and for the GCounter they are the individual replica-local counters. We can use this decomposition to find the minimum delta, \Delta(a,b) between two states a and b.

Here’s the classic delta-based synchronization algorithm for replica i, updated to include the BP and RR optimisations. Highlighted lines show the changes.

For BP, each \delta-buffer is tagged with its origin at line 5, so that we can filter out redundant deltas when sending updates in line 11. For RR we extract the minimal delta in line 15.

Evaluation

The evaluation takes place over two different network topologies. Both have 15 nodes, but one is cyclic (mesh) and one is acyclic (tree).

Each node performs an update and synchronises with its neighbours once per second, as per the table below.

Comparisons are made to vanilla delta state, delta state with each optimisation individually, delta state with both optimisations, and also state-based CRDTs (full state exchange), operation-based CRDTs, and CRDTs using the Scuttlebut anti-entropy protocol.

Here’s an analysis of the amount of data transmitted by GSet and GCounter in the tree and mesh topologies.

The first observation is that the classic delta-based synchronization presents almost no improvement, when compared to state-based synchronization.

In the tree topology, BP alone is enough to give good results, whereas the mesh topology benefits from RR.

Compared to Scuttlebut and operation-based CRDTs, delta-based CRDTs with both BP and RR have a much lower metadata overhead. The overhead can be as high as 75% with the former group, whereas the optimised delta-based algorithm can get this as low as 7.7%.

When used in the Retwis twitter clone for a moderate-to-high contention workload, the BP + RR optimisations can reduce CPU overhead by up to 7.9x compared to the classic delta-based algorithm.

State-based CRDT solutions quickly become prohibitive in practice, if there is no support for treatment of small incremental state deltas. In this paper we advance the foundation of state-based CRDTs by introducing minimal deltas that precisely track state changes.

De-dup FTW!

A generalised solution to distributed consensus

March 8, 2019

A generalised solution to distributed consensus Howard & Mortier, arXiv’19

This is a draft paper that Heidi Howard recently shared with the world via Twitter, and here’s the accompanying blog post. It caught my eye for promising a generalised solution to the consensus problem, and also for using reasoning over immutable state to get there. The state maintained at each server is monotonic.

Consensus is a notoriously hard problem, and Howard has been deep in the space for several years now. See for example the 2016 paper on Flexible Paxos. The quest for the holy grail here is to find a unifying and easily understandable protocol that can be instantiated in different configurations allowing different trade-offs to be made according to the situation.

This paper re-examines the problem of distributed consensus with the aim of improving performance and understanding. We proceed as follows. Once we have defined the problem of consensus, we propose a generalised solution to consensus that uses only immutable state to enable more intuitive reasoning about correctness. We subsequently prove that both Paxos and Fast Paxos are instances of our generalised consensus algorithm and thus show that both algorithms are conservative in their approach.

The distributed consensus problem

We have a set of servers (at least two) that need to agree upon a value. Clients take as input a value to be written, and produce as output the value agreed by the servers.

  • The output value must have been the input value of a client (ruling out simple solutions that always output a fixed value for example)
  • All clients that produce an output must output the same value
  • All clients must eventually produce an output if the system is reliable and synchronous for a sufficient period

Note that we’re talking here about the ‘inner game’ – this is just to agree upon a single value, whereas in a real deployment we’ll probably run multiple rounds of the protocol to agree upon a series of values. Moreover, we’re assuming here that the set of clients and servers is fixed and known to the clients. The ‘clients’ are probably not end-user clients, but more likely to be system processes making using of a consensus service. Configuration changes and membership epochs is another layer we’ll probably add-on in real deployments, but is also out of scope here.

Building blocks: immutable registers and quorums

We have a fixed set of n servers, S_0, S_1, ..., S_n. Each server has a tape it can write to, aka, an infinite array of write-once persistent registers, R_0, R_1, ....

Initially the tape is blank – all registers are unwritten. Once we write a value in a register slot it can never be changed. In addition to values provided by clients the system has one special value, denoted nil or \bot.

We’re going to be interested in sets of values from the same register slot across all servers. A register set, i is the set of all registers R_i across all servers.

If enough of the registers in a register set have the same (non- \bot) value, then we will say that the servers have decided upon that value. How many registers in a register set is enough though? That’s something we get to decide as part of the configuration of the protocol. In fact, we specify more than just the number of register values that must be in agreement, we also specify which servers they belong to. A (non-empty) subset of servers that can decide a value between them is called a quorum. More precisely, “a quorum, Q, is a (non-empty) subset of servers, such that if all servers have the same (non-nil) value v in the same register then v is said to be decided.

The word ‘quorum’ is often strongly associated with ‘majority,’ but note that this doesn’t have to be the case. The specification allows us to declare a quorum containing just a single server if we want to. The dictionary definition of quorum is “the minimum number of members of an assembly or society that must be present at any of its meetings to make the proceedings of that meeting valid.” Note that there is also no requirement here for overlapping quorum memberships (more on that later).

Each register set is associated with a set of quorums. That is, on a register-set by register-set basis, we can specify exactly which subsets of servers are allowed to decide a value. For example:

Why on earth we would want to have different quorum sets for different register sets— since we’re only agreeing on a single value at the end of the day!— will become more apparent when we look at how to layer a consensus protocol on top of the registers.

Given quorum configurations and register values for register sets, we can see if any quorum is satisfied and a decision reached.

The state of all registers can be represented in a table, known as a state table, where each column represents the state of one server and each row represents a register set. By combining a configuration with a state table, we can determine whether any decision(s) have been reached.

Now, we have to be a little careful here and remember that the state table is a logical construct. Each server knows the values of its own registers, and clients can maintain their own view of the global state table as they receive information from servers. Given the immutable constructs though, we know that once a client has assembled enough information to call a value decided, it is finally decided.

Four rules

There are some rules that need to be followed for this to work as a consensus system.

  1. Quorum agreement. A client may only output a (non-nil) value v if it has read v from a quorum of servers in the same register set.
  2. New value. A client may only write a (non-nil) value v provided that v is the client’s input value or that the client has read v from a register.
  3. Current decision. A client may only write a (non-nil) value v to register r on server s provided that if v is decided in register set r by a quorum Q \in Q_r where s \in Q then no value v' where v \neq v' can also be decided in register set r.
  4. Previous decisions. A client may only write a (non-nil) value to register r provided no value v' where v \neq v' can be decided by the quorums in registers sets 0 to r - 1.

The first two rules are pretty straightforward. Rule 3 ensures that all decisions made by a register set will be for the same value, while rule 4 ensures that all decisions made by different register sets are for the same value.

Obeying these rules places some further constraints on the system configuration. Say we have two quorums for register set i, \{S_0\} and \{S_1\}. A client writing to the register on S_0 has no way of knowing whether or not some other client might be writing a different value to S_1, hence we couldn’t uphold rule 3. There are three ways we can handle this situation:

  • The trivial solution is to allow only one quorum per register set
  • With multiple quorums, we can require that all quorums for a register set intersect (i.e., any two quorums will have at least one overlapping member)
  • We can use client restricted configurations. Here ownership of a given register set is assigned to a given client, such that only that client can write values in that register set. Assuming the client is consistent with the values it writes, all will be well. One such strategy would be a round-robin ownership of register sets:

To comply with rule 4, clients need to consult their emerging view of the global state table, aka their local state table, and figure out whether any decision has been or could be reached by earlier register sets. The client maintains a decision table with one entry for each quorum of each register set. This table tracks what the client knows about the decisions made by the quorum. Possible values for a quorum are:

  • ANY – any value could yet be decided. (All quorums for all register sets start out in this state)
  • MAYBE v – if this quorum reaches a decision, then the value will be v
  • DECIDED v – the value v has been decided by the quorum (a final state)
  • NONE – the quorum will not decide a value; a final state

As the client receives information from servers, it updates the decision table according to the following rules:

  • If the client reads the value nil from a quorum member, then the state of that quorum is set to NONE.
  • If the client reads a non-nil value v for register set r and all quorum members have the same value then the state of the quorum is set to DECIDED. Otherwise the state is set to MAYBE v. In this latter case, for the same quorum in earlier register sets a value of ANY will also be set to MAYBE v, but a value of MAYBE v' where v \neq v' will be updated to NONE.

Clients use the decision table to implement the four rules for correctness as follows:

Our aim is to make reasoning about correctness sufficiently intuitive that proofs are not necessary to make a convincing case for the safety…

You’ll be pleased to know however that proofs are included, and you’ll find them in Appendix A.

Paxos over immutable registers

As a paper summary, I guess this write-up so far has been a spectacular fail! I’m already over my target length, and have probably taken more words to explain the core ideas than the original paper does!! The only defence I can offer is that there are no short-cuts when it comes to thinking about consensus 😉

An important part of the paper is to show that the register-based scheme just outlined can be instantiated to realise a number of consensus schemes from the literature, including Paxos and Fast Paxos. I’m out of space to do that proper justice, but here’s a sneak-peek at the core Paxos algorithm:

We observe that Paxos is a conservative instance of our generalised solution to consensus. The configuration used by Paxos is majorities for all register sets.. Paxos also uses client restricted for all register sets. The purpose of phase one is to implement rule 4, and the purpose of phase two is to implement rule 1.

If you differentiate between the quorums used for each register set and which phase of Paxos the quorum is used for, you can arrive at Flexible Paxos.

Mixing it up

Section 5 of the paper shows how to derive Fast Paxos, and in section six you’ll find a brief sketch of three other points in the configuration space named co-located consensus, fixed-majority consensus, and reconfigurable consensus.

In this paper, we have reframed the problem of distributed consensus in terms of write-once registers and thus proposed a generalised solution to distributed consensus. We have demonstrated that this solution not only unifies existing algorithms including Paxos and Fast Paxos but also demonstrates that such algorithms are conservative as their quorum intersection requirements and quorum agreement rules can be substantially weakened. We have illustrated the power of our generalised consensus algorithm by proposing three novel algorithms for consensus, demonstrating a few interesting points on the diverse array of algorithms made possible by our abstract.