Skip to content

Understanding real-world concurrency bugs in Go

May 17, 2019

Understanding real-world concurrency bugs in Go Tu, Liu et al., ASPLOS’19

The design of a programming (or data) model not only makes certain problems easier (or harder) to solve, but also makes certain classes of bugs easier (or harder) to create, detect, and subsequently fix. Today’s paper choice studies concurrency mechanisms in Go. Before we dive in, it might be interesting to pause for a moment and consider your own beliefs about Go, which may well include some of the following:

  • Go was explicitly designed to make concurrent programming easier and less error-prone
  • Go makes concurrent programming easier and less error-prone
  • Go programs make heavy use of message passing via channels, which is less error prone than shared memory synchronisation
  • Go programs have less concurrency bugs
  • Go’s built-in deadlock and data race detectors will catch any (most?)
    bugs you do let slip into your code

The first of those statements is true. For the remaining statements, you can use the data from this research to re-evaluate how strongly you want to hold those opinions…

We perform the first systematic study on concurrency bugs in real Go programs. We studied six popular Go software [projects] including Docker, Kubernetes, and gRPC. We analyzed 171 concurrency bugs in total, with more than half of them caused by non-traditional, Go-specific problems. Apart from root causes of these bugs, we also studied their fixes, performed experiments to reproduce them, and evaluated them with two publicly-available Go bug detectors.

The six applications studied were Docker, Kubernetes, etcd, CockroachDB, gRPC, and BoltDB, so that’s a lot of important real-world Go-code right there.

The analysis begins by studying how these applications actually make use of Go concurrency primitives, before going on to study concurrency related bugs from their issue trackers. These bugs are categorised on two main dimensions: the observed behaviour (blocking or non-blocking), and the type of concurrency primitive that is the cause (shared memory or message passing). Let’s begin with a very quick recap of the main concurrency mechanisms in Go.

Concurrency in Go

A major design goal of Go is to improve traditional multi-threaded programming languages and make concurrent programming easier and less error-prone. For this purpose, Go centers its multi-threading design around two principles: 1) making threads (called goroutines) lightweight and easy to create and 2) using explicit messaging (via channels) to communicate across threads.

Goroutines are lightweight user-level threads (‘green’ threads). A goroutine is created by adding the keyword go before a function call, including to an anonymous function. Local variables declared before an anonymous function are accessible within it and potentially shared. Channels are used to send data and states across goroutines, and may be buffered or unbuffered. When using unbuffered channels a goroutine will block on send (receive) until another goroutine is receiving (sending). The select statement allows a goroutine to wait on multiple channel operations, if more than one case is valid Go selects one at random. Go also has traditional synchronisation primitives including mutexes, condition variables, and atomic variables.

How Go concurrency primitives are used in practice

The six applications make relatively heavy use of goroutines, especially with anonymous functions.

An especially interesting comparison can be made in the case of gRPC, which has both a C implementation and a Go implementation. The following table shows the ratio of the number of goroutines created in gRPC-Go compared to gRPC-C, when processing the same number of requests.

In this comparison goroutines tend to shorter lifetimes than the threads created in the C version, but are created much more frequently. This more frequent use of goroutines is to be expected as its something the Go language encourages.

If we look at the use of concurrency primitives across the board in all of the applications, one more surprising finding is that shared memory synchronisation operations are still used more often than message passing:

The most frequently used message-passing primitive is chan, responsible for between 18.5% and 43% of all usages. So we have a situation in which traditional shared memory communication is still heavily used, in conjunction with significant amounts of message passing primitives. From a bug perspective that means we have all the exciting bug possibilities that shared memory communication affords, together with all of the bug possibilities message passing affords, and bugs caused be the interaction of these two styles!

Go concurrency bugs

The authors searched the GitHub commit histories of the applications to find commits fixing concurrency bugs (3,211). From these 171 were randomly selected for study.

Bugs are categorised into blocking bugs and non-blocking bugs. A blocking bug occurs when one or more goroutines are unintentionally stuck in their execution and cannot move forward. This definition is broader deadlocks and can include situations with no circular waits, but instead waits for resources that no other goroutines supply. 85 of the bugs were blocking, and 86 were non- blocking.

Bugs are also categorised in a second dimension according to whether they relate to shared memory protection (105 bugs) or message passing (66 bugs).

Blocking bugs

Looking at the blocking bugs first of all, 42% of these are related to shared memory, and 58% are related to message passing. Recall as well that shared memory primitives are more widely used than message passing primitives.

Contrary to the common belief that message passing is less error-prone, more blocking bugs in our studied Go applications are caused by wrong message passing than by wrong shared memory protection.

The shared memory based bugs include all the usual suspects, with a few new twists due to Go’s implementation of RWMutex and Wait (see §5.1.1).

For message-passing related bugs, many of these are due to missing sends or receives on channels, or closing channels.

All blocking bugs caused by message passing are related to Go’s new message passing semantics like channel. They can be difficult to detect especially when message passing operations are used together with other synchronization primitives. Contrary to common belief, message passing can cause more blocking bugs than shared memory.

Investigating the fixes for these bugs showed that once understood, the fixes are fairly simple, and the type of fix is correlated with the bug cause. This suggests that fully-automated or semi-automated tools for fixing blocking bugs in Go may be a promising direction.

Go’s built-in deadlock detector was only able to detect two of the 21 blocking bugs reproduced in the study.

Non-blocking bugs

More non-blocking bugs are caused by misuse of shared memory primitives than message passing. About half of these are caused by ‘traditional’ shared memory bug patterns. There are also several bugs caused by a lack of understanding of Go language features, especially the sharing of local variables declared before an anonymous function used in a goroutine, and the semantics of WaitGroups.

New programming models and new libraries that Go introduced to ease multi-thread programming can themselves be the cause of more concurrency bugs.

While message-passing based non-blocking bugs were comparatively less common, “the intricate design of message passing in a language can cause these bugs to be especially hard to find when combining with other language-specific features.”

Interestingly, programmers fixing bugs caused by misuse of shared memory primitives showed a preference for using message passing to do so.

Go’s data race detector was able to detect half of the reproduced non-blocking bugs in the study.

Wrapping up

Surprisingly, our study shows that it is as easy to make concurrency bugs with message passing as with shared memory, sometimes even more.

Programmers have to have a clear understanding of:

  • goroutine creation with anonymous functions
  • the usage of buffered vs unbuffered channels
  • the non-determinism of waiting for multiple channel operations using select
  • correct use of the special library time

Although each of these features were designed to ease multi-threaded programming, in reality, it is difficult to write correct Go programs with them.

I regret that I didn’t have space in this write-up to include the many illustrative code samples highlighting details of bugs. If you’re actively developing in Go, the full paper is well worth a read to study them.

Seer: leveraging big data to navigate the complexity of performance debugging in cloud microservices

May 15, 2019

Seer: leveraging big data to navigate the complexity of performance debugging in cloud microservices Gan et al., ASPLOS’19

Last time around we looked at the DeathStarBench suite of microservices-based benchmark applications and learned that microservices systems can be especially latency sensitive, and that hotspots can propagate through a microservices architecture in interesting ways. Seer is an online system that observes the behaviour of cloud applications (using the DeathStarBench microservices for the evaluation) and predicts when QoS violations may be about to occur. By cooperating with a cluster manager it can then take proactive steps to avoid a QoS violation occurring in practice.

We show that Seer correctly anticipates QoS violations 91% of the time, and avoids the QoS violation to begin with in 84% of cases. Finally, we show that Seer can identify application level design bugs, and provide insights on how to better architect microservices to achieve predictable performance.

Seer uses a lightweight RPC-level tracing system to collect request traces and aggregate them in a Cassandra database. A DNN model is trained to recognise patterns in space and time that lead to QoS violations. This model makes predictions at runtime based on real-time streaming trace inputs. When a QoS violation is predicted to occur and a culprit microservice located, Seer uses a lower level tracing infrastructure with hardware monitoring primitives to identify the reason behind the QoS violation. It then provides the cluster manager with recommendations on how to avoid the performance degradation altogether.

The cluster manager may take one of several resource allocation actions depending on the information provided to it by Seer. For example, resizing a Docker container, using Intel’s Cache Allocation Technology for last level cache (LLC) partitioning, or the Linux traffic control’s hierarchical token bucket (HTB) queueing discipline in qdisc for network bandwidth partitioning.

Distributed tracing and instrumentation

Most of the time Seer is just using its high-level RPC tracing which adds low overhead (less than 0.1% on end-to-end latency) and less than 0.15% on throughput. This tracing system is similar to Dapper and Zipkin and records per-microservice latencies and number of outstanding requests. By instrumenting both the client-side of a request and the server-side it’s possible to figure out wait times. There are multiple sources of queueing in both hardware and software, and Seer works best when using deep instrumentation to capture these. E.g., in memcached there are five main internal stages, each of which has a hardware or software queue associated with it.

Deep instrumentation of internals is not always possible or easy, in which case Seer will simply use information on requests queued in the NIC.

Using network queue depths alone is enough to signal a large fraction of QoS violations, although smaller than when the full instrumentation is available. Exclusively polling NIC queues identifies hotspots caused by routing, incast, failures, and resource saturation, but misses QoS violations that are caused by performance and efficiency bugs in the application implementation such as blocking behaviour between microservices.

When Seer does need to turn on its lower level instrumentation to pinpoint the likely cause of a predicted QoS violation, it has two different modes for doing this. When available, it can use hardware level performance counters. In a public cloud setting these won’t be available, and instead Seer use a set of tunable microbenchmarks to figure out which resources are under pressure (see e.g. ‘Bolt’ which we looked at a couple of years ago on The Morning Paper).

Predicting QoS violations

At the core of Seer is a DNN making predictions based on the RPC-level traces that are gathered.

…the problem Seer must solve is a pattern matching problem of recognizing conditions that result in QoS violations, where the patterns are not always knows in advance or easy to annotate. This is a more complicated task than simply signaling a microservice with many enqueued requests for which simpler classification, regression, or sequence labeling techniques would suffice.

Seer’s DNN works with no a priori knowledge about dependencies between individual microservices, and the system is designed to cope with online evolution of microservices and the dependencies between them.

The most valuable feature for prediction turned out to be the per-microservice queue depths. The input layer has one neuron per active microservice with input value corresponding to the microservice queue depth. The output layer also has one neuron per microservice, with the value representing the probability for that microservice to initiate a QoS violation. Since one of the desired properties is to accommodate changes in the microservices graph over time, I’m assuming the model actually has n inputs and outputs, where n is the maximum predicted number of microservice instances, and we simply don’t use some of them with smaller deployments?

So now it’s just a matter of figuring out what should happen between the input and output neurons! We have to balance inference time (Seer is operating in a pretty small window of predictions 10-100ms out, and needs to give the cluster manager time to react as well) and accuracy. The best performing combination was a hybrid network using a CNN first to reduce dimensionality followed by an LSTM with a softmax final layer.

When trained on a weeks worth of trace data from a 20 server cluster, and then tested on traces collected from a different week (after servers had been patched, and the OS had been upgraded), Seer was correctly able to anticipate 93.45% of violations.

When deployed in production, Seer is continually and incrementally retrained in the background to account for frequent application updates. This retraining uses transfer learning with weights from previous training rounds stored on disk as a starting point. When the application “changes in a major way, e.g. microservices on the critical path change” Seer will retrain from scratch in the background, with online inference happening via the incrementally-trained interim model until the new model is ready. We’re not told how Seer figures out that a major architectural change has happened. Perhaps full retraining is an operator-initiated action, or maybe the system could be configured to initiated full-retraining whenever the number of missed QoS violations (false negatives) starts to rise above some threshold.

Seer in action

For the services under study, Seer has a sweet spot when trained with around 100GB of data and a 100ms sampling interval for measuring queue depths.

Seer also works best when making prediction 10-100ms into the future.

This is because many QoS violations are caused by very short, bursty events that do not have an impact on queue lengths until a few milliseconds before the violation occurs.

A ground truth analysis of QoS violation causes shows that a large fraction are due to application level inefficiencies, including correctness bugs, unnecessary synchronisation and/or blocking behaviour, and misconfigured iptables rules. An equally large fraction are due to compute contention, followed by network, cache, memory, and disk contention. Seer is accurately able to follow this breakdown in its predictions and determination of causes.

Out of the 89 QoS violations Seer detects, it notifies the cluster manager early enough to avoid 84 of them. The QoS violations that were not avoided correspond to application level bugs, which cannot be easily corrected online.

Here’s a really interesting plot showing detection accuracy for Seer during a period of time in which the microservices system is being frequently updated, including the addition of new microservices, changing the backend from MongoDB to Cassandra, and switching the front-end from nginx to node.js.

Blue dots represent correctly predicted upcoming QoS violations, and red crosses are misses. All the misses correspond with the application being updated.

Shortly after the update, Seer incrementally retrains in the background and starts recovering its accuracy until another major update occurs.

Seer is also tested on a 100-server GCE cluster with the Social Network microservices application. This was deployed for a two-month period and had 582 registered users with 165 daily actives. On average the cluster had 386 containers active at any one time. (I know the whole point is to test a microservices architecture, but I can’t help pausing to note the ridiculousness of a 100-node cluster with 386 containers serving 165 daily actives. That’s more than two containers per daily active user, and a workload I could probably serve from my laptop!!).

On the public cloud Seer’s inference times increased from 11.4ms for the 20-node private cluster setting to 54ms for the 100 server GCE setting. Offloading training and inference to Google’s TPUs (or FPGA’s using Microsoft’s Project Brainwave) brought these times back down again.

During the two month study, the most common sources of QoS violations were memcached (on the critical path for almost all query types, as well as being very sensitive to resource contention), and Thrift services with high request fanout.

Seer has now been deployed in the Social Network cluster for over two months, and in this time it has detected 536 upcoming QoS violations (90.6% accuracy) and avoided 495 (84%) of them. Furthermore, by detecting recurring patterns that lead to QoS violations, Seer has helped the application developers better understand bugs and design decisions that lead to hotspots, such as microservices with a lot of back-and-forth communication between them, or microservices forming cyclic dependencies, or using blocking primitives. This has led to a decreasing number of QoS violations over the two month period (seen in Fig. 16), as the application progressively improves.

Systems like Seer can be used not only to improve performance predictability in complex cloud systems, but to help users better understand the design challenges of microservices, as more services transition to this application model.

An open-source benchmark suite for microservices and their hardware-software implications for cloud & edge systems

May 13, 2019

An open-source benchmark suite for microservices and their hardware-software implications for cloud & edge systems Gan et al., ASPLOS’19

Microservices are well known for producing ‘death star’ interaction diagrams like those shown below, where each point on the circumference represents an individual service, and the lines between them represent interactions.

Systems built with lots of microservices have different operational characteristics to those built from a small number of monoliths, we’d like to study and better understand those differences. That’s where ‘DeathStarBench’ comes in: a suite of five different microservices-based applications (one of which, a drone coordination platform called Swarm has two variations – one doing most compute in the cloud, and one offloading as much as possible to the edge). It’s a pretty impressive effort to pull together and make available in open source (not yet available as I write this) such a suite, and I’m sure explains much of the long list of 24 authors on this paper.

The suite is built using popular OSS applications and representative technologies, deliberately using a mix of languages (C/C++, Java, Javascript, node.js, Python, Ruby, Go, Scala, …) and both RESTful and RPC (Thrift, gRPC) style service interfaces. There’s a nice nod to the Weave Sockshop microservices sample application here too.

A typical architecture diagram for one of these services looks like this:

Suitably armed with a set of benchmark microservices applications, the investigation can begin!

Microservices fundamentally change a lot of assumptions current cloud systems are designed with, and present both opportunities and challenges when optimizing for quality of service (QoS) and utilization. In this paper we explore the implications microservices have across the cloud system stack.

The paper examines the implications of microservices at the hardware, OS and networking stack, cluster management, and application framework levels, as well as the impact of tail latency.

Hardware implications

We show that despite the small amount of computation per microservice, the latency requirements of each individual tier are much stricter than for typical applications, putting more pressure on predictably high single-thread performance.

Smaller microservices demonstrated much better instruction-cache locality than their monolithic counterparts. The most prominent source of misses, especially in the kernel, was Thrift. It might also seem logical that these services would be good targets for running on simpler (lower power) cores. The following figure shows that this isn’t necessarily the case though. The top line shows the change in tail latency across a set of monolithic applications as operating frequency decreases. The bottom line shows the tail latency impact in the microservices-based applications.

…microservices are much more sensitive to poor single-thread performance than traditional cloud applications. Although initially counterintuitive, this result is not surprising, given the fact that each individual microservice must meet much stricter tail latency constraints compared to an end-to-end monolith, putting more pressure on performance predictability.

Operating system and network implications

Applications built in a microservices style spend a good amount of time in the kernel, as shown in the following figure:

Included in this count though is the time that components such as MongoDB and memcached spend in the kernel handling interrupts, processing TCP packets, and activating and scheduling idling interactive services.

Unlike monolithic services… microservices spend much more time sending and processing network requests over RPCs or other REST APIs.

This is most pronounced when under high loads, and becomes a very significant tail latency factor. This occurs both with HTTP and RPC-based services.

Microservices communication patterns and the pressure they put on network processing make network acceleration an interesting avenue of investigation. Introducing an FPGA-based offload improved network processing latency by 10-68x over native TCP, with end-to-end tail latency (what %-ile??) improving by 43% and up to 2.2x.

For interactive, latency-critical services, where even a small improvement in tail latency is significant, network acceleration provides a major boost in performance.

Cluster management implications

Each microservice may be individually simpler, but at least the essential complexity in the application had to go somewhere. And the place it goes is in the interactions between services, and interestingly, as this next section of the paper shows, in the interactions between your services and the platform they run on. We trade off code complexity for a new kind of emergent system complexity.

Microservices significantly complicate cluster management. Even though the cluster manager can scale out individual microservices on-demand instead of the entire monolith, dependencies between microservices introduce back-pressure effects and cascading QoS violations that quickly propagate through the system, making performance unpredictable.

Consider the two scenarios shown in the following figure:

In the top line (case A), nginx is reaching saturation as the client issues read requests causing long queues to firm in its inputs. Autoscaling systems can handle this by scaling out nginx. In the bottom line though (case B), we have blocking HTTP/1 requests causing memcached to only support one outstanding request per connection across tiers. Long queues again form at nginx, but scaling out nginx isn’t going to make this situation any better, and may well make it worse.

Real-world applications are of course much more complex than this simple two-tier example. The following plots study cascading QoS violations in the SocialNetwork service. Bright areas in the LH plot show high latency, and in the RH plot high CPU usage. The ordering of services (top to bottom, back-end to front-end) is the same in both cases.

Once back-end services have high utilisation this propagates all the way to the front-end, and can put even higher pressure on microservices in the middle. There are also microservices that show relatively low utilisation but still have degraded performance (due to blocking).

The authors draw two conclusions from their analysis here:

  • Cluster managers need to account for the impact dependencies have on end-to-end performance when allocating resources (but how will they learn this?)
  • Once microservices experience a QoS violation they need longer to recover than traditional monolithic applications, even in the presence of autoscaling mechanisms which most cloud providers employ.

Bottleneck locations also vary with load.

Application and programming framework implications

In section 7, the authors evaluate the performance and cost of running each of the five microservices systems on AWS Lambda instead of EC2. Services were run for 10 minutes with a varying input load. Two versions of the Lambda implementation were compared: one using S3 for communication of state between functions, and one that uses the memory of four additional EC2 instances (DynamoDB or SQS feel like more natural serverless choices here, assuming the state is not generally large?).

Lambda showed higher performance variability, but for these workloads cost was almost an order of magnitude lower, and Lambda adjusted resources to user demand more quickly.

The conclusion in this section is that microservices in a serverless world should remain mostly stateless, and leverage in-memory primitives to pass data between dependent functions (do you mean e.g. Redis???). I’d prefer to see either DynamoDB or SQS explored in an AWS context, and a move towards an asynchronous event-driven model rather than having functions call each other.

Tail at scale implications

Tail-at-scale effects become more pronounced in microservices compared to monolithic applications, as a single poorly-configured microservice, or slow server can degrade end-to-end latency by several orders of magnitude.

The increased sensitivity in the tail comes up in multiple places in the paper. Section 8 studies this phenomenon explicitly. The general finding is that the more complex the interaction graph of a microservices application, the more impactful slow services are as the probability that a service on the critical path will be degraded increases. Or as we learned a long time ago in ‘The Tail at Scale’, if you call more things, you’ve got more chances of one of them being slow.

All of the applications from DeathStarBench are (will be) made available under a GPL license at

Distributed consensus revised – Part III

May 10, 2019

Distributed consensus revised (part III) Howard, PhD thesis

With all the ground work laid, the second half of the thesis progressively generalises the Paxos algorithm: weakening the quorum intersection requirements; reusing intersections to allow decisions to be reached with fewer participants; weakening the value selection rules; and sharing phases to take best advantage of the generalisation.

The result of this thesis is a family of approaches to achieving distributed consensus, which generalise over the most popular existing algorithms such as Paxos and Fast Paxos.

Quorum intersection revised

Classic Paxos requires all quorums to intersect, but this turns out to be a stronger condition than is actually required to guarantee safety and progress.

Our first finding is that it is only necessary for phase one quorums and phase two quorums to intersect. There is no need to require that phase one quorums intersect with each other nor that phase two quorums intersect with each other.

This finding (‘revision A’) was also discussed in the Flexible Paxos paper that we’ve covered in a previous edition of The Morning Paper. So long as one quorum member is around to carry the learnings from phase one into phase two, we’re good (the thesis itself provides the proof for this of course).

It turns out we can go even further than this, and differentiate quorums by their epoch as well as by phase. With epoch-specific quorums, the requirement can be further refined (‘revision B’) to require only that a phase two quorum for a given epoch intersects with the phase one quorums of all smaller epochs.

A key implication of this result is that for the minimum epoch there is no phase one quorum intersection requirement. The practical application of this is that a proposer with epoch e_{min} may skip phase one and proceed directly to proposing their own value in phase two.

Furthermore, the phase two bypass optimisation can now kick in whenever we have a phase two quorum of acceptors rather than requiring a full majority.

We can also play with performance and availability trade-offs. For example, in “All aboard Paxos” we colocate proposers and acceptors so that phase one can be completed locally, and we’re still guaranteed a quorum intersection so long as a quorum for phase two requires all acceptors. Or we can flip that on its head (“Singleton Paxos”) and require all acceptors to form a quorum in phase one, allowing any single acceptor to from a quorum in phase two.

Promises revised

Classic Paxos requires a proposer to wait for sufficient promises before proceeding. The one exception we’ve seen to that so far was the phase two bypass when a majority of acceptors return the same proposal during phase one.

If a proposer with epoch e receives a proposal in a promise response promise(e, f, v), then it has learned that if a decision was reached in epoch f, then the value chosen was v. This allows us to further weaken the quorum intersection requirement (‘revision C’): the proposer no longer needs no intersect with the phase two quorums for epochs up to and including f, but must continue to intersect with any phase two quorums for epochs greater than f (up to e of course, its current epoch).

One practical application of this is that a proposer receiving a promise message for the immediate predecessor of the current epoch can proceed directly to phase two.

Value selection revised

In Classic Paxos, the value proposed in phase two is the value associated with the highest epoch received from the acceptors.

In this section… we generalise over the classic value selection rules, by exploiting the additional insight that a proposer gains from each promise it receives. We refer to our revised technique as Quorum-based value selection and it can give proposers more flexibility when choosing a value to propose.

Ignoring epochs for the moment, the first idea is to consider each possible quorum, and whether or not that quorum could have reached a decision. If no quorum could have reached a decision then we’re free to propose our own value. For example, let the set of acceptors be \{a_1, a_2, a_3, a_4, a_5\} and the possible quorum sets be \{a_1, a_2, a_3\} and \{a_4, a_5\}. A proposer in epoch 5 receives the following promises in order:

  • promise(5, 3, A) from a_1
  • promise(5, nil, nil) from a_ 2
  • promise(5, 2, B) from a_4

With Classic Paxos the proposer would have to propose A (the value associated with the highest epoch). But here we know that A cannot have been decided because the nil response from a_2 makes that impossible. Since we have a returned proposal (3, _), we also know that the proposal (2, B) from a_4 cannot have been successful.

If we’re using revisions B and C then we can extend this logic to also track quorum decisions by epoch.

Epochs revised

Chapter 7 in the thesis looks at three different strategies to remove the need to pre-allocate or vote for unique epochs among proposers.

One such strategy is to use a singleton allocator, and have every proposer ask the allocator for an epoch each time. The allocator could implement e.g. a simple counter. This has the disadvantage of course of making the allocator a single point of failure.

Another strategy in the case were we know the finite set of possible values that can be decided up front is to pre-allocate epochs to those values. Now proposers proposing a value v use the epochs associated with the value they wish to propose.

Section 7.3 gives a more generally applicable technique, called epochs by recovery. The central idea, as the name suggests, is that instead of trying to ensure up front that the epochs used by proposers will all be unique, we instead allow proposers to use any epoch and add a recovery mechanism to undo the damage if it turns out that multiple values end up being proposed for the same epoch.

So in total we’ve now seen 5 different epoch allocation strategies, as summarised in the table below.

And if your head isn’t spinning enough already…

…algorithms for distributed consensus need not utilise only one of these mechanisms, but may use them in combination by allocating epochs to particular methods.

Putting it all together

Paxos has been synonymous with distributed consensus for over two decades. As such, it has been extensively researched, taught and deployed in production. This thesis sought to reconsider how we approach consensus in distributed systems and challenge the widely held belief that the Paxos algorithm is an optimal solution to consensus.

Paxos was already a family of algorithms. Dr Howard has made that family considerably bigger. Algorithms now have more flexibility in their choice of quorums, values, and epochs, allowing us to make a finer-grained set of trade-offs for the situation at hand. Some of these algorithms introduce new progress properties depending on the state of the system (i.e., we are able to make progress under a broader set of conditions). One particularly interesting combination of these ideas is Multi-Paxos combined with weakened quorum intersection between phases. We can set things up to require a larger quorum for the rarely used phase one, and smaller quorums for the phase two decisions. We also have a larger set of conditions under which we can reach agreement in one round trip without centralisation.

This write-up was made considerably easier to put together by virtue of skipping all of the proofs – but of course they are all there in the thesis if you want to dig deeper!

This won’t be the last word on Paxos (the thesis has already inspired further research), but as a deconstruction and examination of the fundamentals, I fully expect it to be a reference work for a long time to come.

At the very least, we hope to have furthered understanding of this important and surprisingly subtle field of distributed systems.

Distributed consensus revised – Part II

May 8, 2019

Distributed consensus revised (part II) Howard, PhD thesis

In today’s post we’re going to be looking at chapter 3 of Dr Howard’s thesis, which is a tour (“systematisation of knowledge”, SoK) of some of the major known revisions to the classic Paxos algorithm.

Negative responses (NACKs)

In classic Paxos acceptors only send replies to proposer messages with an epoch greater than or equal to the acceptors last promised epoch (Property 6). The algorithms relies on timeouts to determine when a proposer abandons the current phase and retries with a new epoch number. We can eliminate the timeout delays by adding negative responses, for example no\_promise(e) and no\_accept(e), to be sent by the acceptor in response to prepare or propose messages with an invalid epoch number. These negative acknowledgements (NACKS) can also include further information such as the acceptor’s last promised epoch and last accepted proposal value. (There’s no point a proposer retrying with a new epoch less than the acceptor’s last promised one for example).

NACKs have replaced timeouts as we assume that messages are eventually delivered. We can therefore remove the synchrony assumptions from our progress proof.

Bypassing phase two

If a proposer learns during phase one that a value has been decided (the same proposal is returned by a majority of acceptors), then the proposer may skip phase two and simply return the learned value. This requires amending the algorithm to keep track of the set of acceptors who have promised and returned a given proposal (e_{max}, v) with their promise.

Now that we have the possibility of skipping phase two, there are a number of further tricks we can do to increase the probability. For example, if we’re one short of a majority and the timeout is about expire with responses still outstanding, it might make more sense to wait for an additional grace period rather than restart phase one. Or we could start phase two but keep phase one ‘open’, and if that additional promise comes in for phase one while phase two is still in flight the proposer can return the decided value immediately. Another twist would be to remember promises from previous epochs during proposal tracking…


So far we’re getting pretty good at deciding a value, but there’s room for improvement in how proposers learn the decided value. Specifically, even when a value has been decided proposers still need to communicate with a majority of acceptors to learn that value. This means that we need the majority of acceptors up and communicating for a proposer to execute its algorithm and return a value.

We can improve this by adding an optional phase three to Classic Paxos, in which the acceptors learn the value has been decided. The acceptors can then notify future proposers that the value has been decided, enabling the proposer to return a decided value without waiting upon the majority of acceptors. Adding phase three to Classic Paxos serves an important purpose that may not be immediately apparent, namely that the liveness conditions are no longer necessary for progress.

Raft and Viewstamped Replication Revisited use a related idea to communicate a learned decision by including a commit index in messages instead of an explicit phase three.

Distinguished proposer

You may have picked up on that odd-sounding requirement in the progress conditions we looked at in the last instalment that there be only a single fixed proposer in order for progress to be guaranteed. This condition is to avoid a pathology in which two (or more) proposers continually duel, overriding each others proposals. We can make it much more likely that only a single proposer is executing Classic Paxos at any given time by introducing the notion of a distinguished proposer. Non-distinguished proposers forward their candidate values to this proposer, and only the distinguished proposer initiates Paxos.

If the distinguished proposer appears to be slow or unresponsive, another proposer can become a distinguished proposer and thus propose values directly. It is always safe for there be to no distinguished proposer, multiple distinguished proposers or inconsistent knowledge of the distinguished proposer. However to guarantee progress, there should be exactly one distinguished proposer at a given time and all proposers should be aware of its identity.

The distinguished proposer (leader) optimisation is widely utilised.

Phase ordering

A proposer doesn’t make any use of the value they intend to propose until phase two. So it’s entirely possible to run phase one without even knowing that value, and then when the proposer does learn the value to propose it can decide the value in one round trip rather than two provided no other proposer has also executed the proposer algorithm for a greater epoch. We can greatly increase the chances of this fortuitous situation occurring of course if the proposer is the distinguished proposer.


Multi-Paxos is an optimisation of Classic Paxos for consensus over a sequence of values. In Classic Paxos we’d need to run phases one and two for each value in the sequence (i.e., an instance of Paxos for every value). In Multi-Paxos phase one is shared by all instances.

Each acceptor needs only to store the last promised epoch once. Prepare and promise messages are not instance-specific and therefore do not need an index included in the phase one messages. Once phase one is completed, we will refer to this proposer as the Leader. The leader is the distinguished proposer and thus is responsible for making decisions.

During the steady state, we can reach each decision in one round trip to the majority of acceptors and one synchronous write to persistent storage. The Multi-Paxos optimisation is so common that an unqualified reference to Paxos often means Multi-Paxos.

One disadvantage of Multi-Paxos is that it places substantial load on the leader, which often becomes the bottleneck.


Proposers and acceptors don’t have to be separate processes (participants). So for example we could combine the roles of acceptor and proposer in a participant, which means one less remote acceptor to communicate with. We could also introduce additional roles, e.g. a reader role that simply asks acceptors for the last accepted proposal to try and determine if a value has been decided.


A classic approach is to use the natural numbers as epoch numbers and divide them round-robin style between participants. A similar idea is to use lexicographically-ordered tuples (sid, pid) where sid is a monotonically increasing proposal sequence number and pid is the unique proposer id (fixed at configuration time). In either case, proposals must begin with a synchronous write to storage (strictly, the write must be completed before phase two starts). There are a couple of tricks for reducing the number of persistent writes required. For example, using epochs of the form (sid, pid, vid) where vid is a proposer version number incremented each time the proposer (re)starts. Now sids no longer have to be persisted.

Phase one voting for epochs

Classic Paxos does not strictly require that epochs are unique so long as acceptors require a proposer’s epochs be strictly greater than the last promised proposal. If we revise the acceptor algorithm to keep track of the proposer of the last promised epoch, then we can additionally accept a prepare from a proposer when the epoch is the last promised epoch, and the proposer is the same proposer. This variation is called ‘epochs by voting’, and means that we don’t have to allocated a disjoint subset of epochs to proposers: any proposer can use any epoch from the set (though of course, the proposer may not be successful).

Proposal copying

If we include last accepted proposals (epoch and value) in NACKs, then proposers learn about the state of the acceptors. In proposal copying proposers can use this information to jump straight to phase two. Furthermore, when proposing that value, the acceptor(s) from which they learned it can count towards the phase two quorum already.

Quorum generalisation

Classic Paxos requires majority participation and can handle a minority of acceptors failing (typically expressed as f out of 2f+1 total).

This approach tightly couples the total number of acceptors, the number of acceptors needed to participate in consensus and the number of failures tolerated. Ideally, we would like to minimise the number of acceptors in the system and the number required to participate in consensus, as the proposer must wait upon the acceptors to reply and send more messages.

Strict majorities are used in Classic Paxos in order to ensure that all quorums intersect. The majority requirement can be generalised to use any quorum system which guarantees all quorums intersect. Quorum systems other than strict majority are rarely utilised in practice.


There’s more! The miscellaneous section (3.12) gives brief treatment to another six variations, see the full thesis for details.

Distributed consensus revised – Part I

May 7, 2019

Distributed consensus revised Howard, PhD thesis

Welcome back to a new term of The Morning Paper! To kick things off, I’m going to start by taking a look at Dr Howard’s PhD thesis, ‘Distributed consensus revised’. This is obviously longer than a standard paper, so we’ll break things down over a few days. As the title suggests, the topic in hand is distributed consensus:

Single-valued agreement is often overlooked in the literature as already solved or trivial and is seldom considered at length, despite being a vital component in distributed systems which is infamously poorly understood… we undertake an extensive examination of how to achieve consensus over a single value.

What makes this much harder than it might at first appear of course, is the possibility of failures and asynchronous communication. In the face of this, an algorithm for consensus must meet three safety requirements and two progress requirements:

  • Non-triviality: the decided value must have been proposed by a participant (so for example, solutions which always choose a fixed pre-determined value are not acceptable)
  • Safety: if a value has been decided, no other value will be decided
  • Safe learning: if a participant learns a value, it must learn the decided value
  • Progress: under a specified set of liveness conditions, if a value has been proposed by a participant then a value is eventually decided
  • Eventual learning under a specified set of liveness conditions, if a value has been decided then a value is eventually learned

The first part of the thesis examines the Paxos family of algorithms, and is a great resource for navigating this complex space. A combination of 16 different lemma and theorems are used to prove that Paxos does provide the desired safety and progress guarantees. The proofs are carefully tied back to 10 different properties of the classic Paxos algorithm.

The surprising results of this approach are twofold: firstly, the proof of correctness did not use the full strength of the properties provided and secondly, there are many approaches which satisfy the same properties.

The second part of the thesis uses these insights to generalise Paxos, weakening the requirements for quorum intersection, phase completion, value selection, and epoch allocation. It helps us to tease apart what is essential to the problem of distributed consensus , and what is merely a design choice of a particular solution. The solution space that is opened up offers greater flexibility in performance and reliability trade-offs, new progress guarantees, and improved performance.

These are important results since as well as being notoriously difficult to understand and to implement, Paxos has some well-known and not-so-well-known limitations:

The reliance on majority agreement means that the Paxos algorithm is slow to reach decisions… systems are limited in scale, often to three or five participants, as each additional participant substantially decreases overall performance. It is widely understood that Paxos is unable to reach an agreement if the majority of participants have failed. However, this is only part of the overall picture, failure to reach agreement can result not only from unavailable hosts but also network partitions, slow hosts, network congestion, contention for resources such as persistent storage, clock skew, packet loss and countless other scenarios. Such issues are commonplace in some systems…

Let’s begin our journey by considering one of the simplest solutions possible…

The single acceptor algorithm

We’ll distinguish between two different roles in the system: proposers propose values that they wish to have chosen, and acceptors agree and persist decided values. For all the scenarios we’ll be considering, the set of acceptors and proposers is fixed and known in advance.

A consensus algorithm defines the process by which a value v is chosen by the acceptors from the proposers. We refer to the point in time when the acceptors have committed to a particular value as the commit point.

The single acceptor algorithm is the simplest non-trivial consensus algorithm you can imagine. We have n proposers but only one acceptor. The acceptor chooses the first value proposed to it.

Proposers propose a value, and learn the decided value by return:

If you check this off against the safety and progress requirements we looked at earlier, you’ll see that it meets all of them. The single acceptor provides total ordering. This advantage is also its weakness though: if the acceptor fails the algorithm cannot make progress until it recovers. Removing that single point of failure and supporting multiple acceptors leads us to Paxos…

Classic Paxos

Unoptimised classic Paxos can reach agreement with two round trips to the majority of acceptors, and three synchronous writes to persistent storage. It requires that a majority of acceptors and at least one proposer be up and communicating synchronously for liveness. The algorithm has two phases: a learning phase where proposers learn about the current state of the system, and a deciding (writing) phase when a value is chosen. Central to the algorithm is the idea of an epoch (aka view number, round number, instance value or ballot number): an epoch is a member of an infinite totally ordered set, such as the integers. A proposal (e, v) is an epoch and value pair.

In phase one proposers send prepare messages with an epoch number, and acceptors send back promises. Based on the information they learned in phase one, in phase two proposers may propose a value for an epoch, and acceptors send back accept messages.

Phase one

  1. A proposer chooses a unique epoch e and sends prepare(e) to the acceptors
  2. Each acceptors stores the last promised epoch and the last accepted proposal. When an acceptor receives prepare(e), if e is the first epoch promised or if e is equal to or greater than the last epoch promised, then e is written to storage and the acceptor replies with promise(e, f, v) where f is the epoch of the last accepted proposal (if any), and v is the corresponding proposed value.
  3. Once the proposer receives promise(e, \_, \_) from the majority of acceptors, it proceeds to phase two.
  4. If the proposer times out before receiving promises from a majority of acceptors it retries phase one with a greater epoch.

Phase two

  1. The proposer selects a value v to propose. If no proposals were returned with promises in phase one, then the proposer can choose its own candidate value. If exactly one proposal was returned in phase one, then its value must be chosen. If multiple proposals were returned in phase one, then the proposer must propose the value associated with the greatest epoch. The proposer then sends propose(e, v) to the acceptors.
  2. When an acceptor receives a propose(e, v) message, if e is the first promised epoch, or is greater than the last promised epoch, then the promised epoch and accepted proposal is update, and the acceptor replies with accept(e).
  3. Once the proposer receives accept(e) from a majority of acceptors, it learns that the value v has been decided.
  4. If the proposer times out, it retries phase 1 with a greater epoch.


There are 5 key proposer properties:

  • Property 1. Proposers use unique epochs for each proposal
  • Property 2. Proposers only propose a value after receiving promises from a majority of acceptors
  • Property 3. Proposers only return a value after receiving accepts from a majority of acceptors
  • Property 4. Proposers must choose a value according to the value selection rule outlined above in step 1 of phase two.
  • Property 5. Each epoch used by a proposer is greater than all previous epochs used by the proposer.

Likewise there are 5 key acceptor properties:

  • Property 6. Prepare and propose messages are only processed by an acceptor if the epoch received is greater than or equal to the last promised epoch
  • Property 7. If an acceptor does process a message, then its last promised epoch is set to the epoch received.
  • Property 8. If an acceptor does process a prepare message, then it replies with promise after satisfying property 7
  • Property 9. If an acceptor does process a propose message, it replies with accept after satisfying property 7 and updating its last accepted proposal
  • Property 10. The last promised epoch and last accepted proposal are persistent and only updated by properties 7 and 9.

Theorems and Lemmas

The reason for labouring the point so on those properties, is that the safety and progress guarantees of Paxos and then proved in terms of them. Safety is the big one, and the following table summarises the lemmas / theorems (column one), together with the properties that they depend on, as well as the other results they use.

So for example, safety of learning (“If the value v is returned by a proposer then v has been decided”), depends only on properties 1, 3, and 9. That is, we could weaken or abandon the other properties, and safety of learning at least would still hold.

Our approach of using multiple layers of intermediate results will allow us to revise this proof throughout this thesis, without reproducing the complete proof.

Proof of progress requires some assumption of liveness conditions for a sufficient period:

  • At least a majority of acceptors are live and reply to messages from proposers, within some known upper bound
  • Exactly one fixed proposer is live and its relative clock is no faster than some delta ahead of global time (this is to prevent proposers duelling indefinitely)
  • Message between the proposer and the majority of acceptors are delivered within a known bound

In the next instalment we’ll look at some of the many variants of classic Paxos.

End of term

April 22, 2019

We’ve reached the end of term again on The Morning Paper, and I’ll be taking a two week break. The Morning Paper will resume on Tuesday 7th May (since Monday 6th is a public holiday in the UK).

My end of term tradition is to highlight a few of the papers from the term that I especially enjoyed, but this time around I want to let one work stand alone:

You might also enjoy “The Mess We’re In,” and Joe’s seven deadly sins of programming:

  1. Code even you cannot understand a week after you wrote it – no comments
  2. Code with no specifications
  3. Code that is shipped as soon as it runs and before it is beautiful
  4. Code with added features
  5. Code that is very very fast very very very obscure and incorrect
  6. Code that is not beautiful
  7. Code that you wrote without understanding the problem

We’re in an even bigger mess without you Joe. Thank you for everything. RIP.