ZooKeeper: wait-free coordination for internet scale systems

ZooKeeper: wait-free coordination for internet scale systems – Hunt et al. (Yahoo!) 2010

Distributed systems would be much simpler if the distributed parts didn’t have to coordinate in some fashion. But it’s this notion of ‘working together’ to achieve some aim that differentiates a distributed system from an unrelated bag of parts. Examples of the need for coordination include configuration information, leader elections, group membership, rendezvous points, resource locking, and synchronized computation with double barriers. ZooKeeper doesn’t implement any of these! But – it does implement an underlying coordination mechanism on top of which all of these coordination protocols and more can be easily built. It’s designed for high throughput (tens to hundreds of thousands of transactions per second) under workloads that are read-dominated.

When designing our coordination service, we moved away from implementing specific primitives on the server side, and instead we opted for exposing an API that enables application developers to implement their own primitives. Such a choice led to the implementation of a coordination kernel that enables new primitives without requiring changes to the service core.

The ZooKeeper API looks superficially similar to a file system API, with a hierarchical structure of nodes. Data can be stored at every node – not just the leaves. The intended usage is to store metadata needed for coordination though, not application data. The API is non-blocking (wait-free) – locks are not a fundemental primitive although they can be implemented on top.

One of the secrets to ZooKeeper’s good perforrmance is that reads and writes are treated separately. Writes are linearizable and use an atomic broadcast protocol to coordinate across the cluster, reads are processed locally. The asynchronous nature of the ZooKeeper API is another key to good performance:

With asynchronous operations, a client is able to have multiple outstanding operations at a time. This feature is desirable, for example, when a new client becomes a leader and it has to manipulate metadata and update it accordingly. Without the possibility of multiple outstanding operations, the time of initialization can be of the order of seconds instead of sub-second.

Individual clients are guaranteed FIFO execution order for asynchronous requests they submit.

The core model is very simple to understand: nodes in the hierarchy (called znodes) can be regular nodes (explicitly created and deleted), or ephemeral. Ephemeral nodes are explicitly created and either explicitly deleted, or implicitly removed when the process (session) that created them terminates for any reason. Normally you supply a name for the node you want to create, if you also pass a sequential flag then the value of a monotonically increasing counter will be added to the name (giving the twin properties of ordering and uniqueness). Finally, a client can set a one-time watch to be notified when a watched node changes. Delivery of the notification informs the client that the information it has is stale. A client connects to any node in the ZooKeeper ensemble and establishes a session. The session state is distributed so clients can reconnect to any other server and continue an existing session in the event of server failure or partition. ZooKeeper requires a quorum of servers to be available for the ZooKeeper service itself to be available.

What’s impressive is how many different coordination mechanisms can be built on top of this foundation. Let’s explore a few of them… (more details at ZooKeeper Recipes and Solutions)

ZooKeeper use case patterns

Configuration management

Store the configuration information in some znode, call it ‘C’. Pass processes the full pathname to C when they start up, and have them read C with the watch flag set. If the configuration is ever updated, the watch notification will tell them that their configuration information is now stale.

Group membership

Create a parent node G to represent the group. Now have each process in the group create an ephemeral node under G. If you have a unique name for each process you can use that, otherwise you can use the sequential flag. Each process puts process information (for example, ip address and port the process is listening on) in its nodes’ data. Because each process creates an ephemeral node, if that process ever dies or is unreachable (heartbeat failure) its associated node will be deleted – no further API calls required on behalf of each group member.

To find out who is in the group, simply enumerate the children of G. And to be notified of membership changes, put a watch on it and refresh that watch every time a notification is received.


A little more complex, because of the desire to avoid a ‘herd effect’ when a lock is released. The fundamental idea is to rely on the ephemeral and sequential flags. Designate some node as the root lock node, call it L. Now each client that wants to obtain the lock creates an ephemeral child node under L using the sequential flag. Recall that the sequential flag guarantees to assign a monotonically increasing sequence number to the name – the process whose node has the lowest sequence number currently holds the lock. If a process does not currently have the lock, it watches the node with the next lowest sequence number to its own. In this way only the ‘next in line’ process will be notified when the current lock owner deletes their lock node.


01 n = create (l + "/lock-", EPHEMERAL | SEQUENTIAL) 
02 C = getChildren(l, false)
03 if (n is lowest znode in C) exit // you have the lock
04 p = znode in C immediately prior to n
05 if (exists(p,true))  wait for watch event
06 on watch event, goto 02.


01 delete(n)

There’s a subtlety hidden in here which is key to why this algorithm works. Remember that all reads (including the read on line 02 therefore) proceed locally, and therefore:

One drawback of using fast reads is not guaranteeing precedence order for read operations. That is, a read operation may return a stale value, even though a more recent update to the same znode has been committed. Not all of our applications require precedence order, but for applications that do require it, we have implemented sync.

If we read stale data, and therefore missed the creation of a node with a lower sequence number, we might mistakenly think we have the lock. This is prevented because issuing a write (line 01) ensures the client sees the most up-to-date state. Any additional write that happens between line 01 and 02 (on another server) must be assigned a higher sequence number (by the definition of SEQUENTIAL) and therefore cannot affect the algorithm. If you don’t actually have anything you need to write, a sync call can serve the same purpose. An additional guarantee is that when a client has a watch on a node, it will never be able to read a changed value on that node until after it has received a change notification.

All this goes to show how tricky the correctness of some of these algorithms can be – and how hard it is to reason about behaviour when all you have is prose descriptions!

Leader election

The leader election algorithm is not actually shown in the paper, but you can find it at the ZooKeeper Recipes and Solutions page. Since it’s a commonly cited use case for ZooKeeper I give a quick outline here. The basic idea is similar to the lock use case: create a parent node ‘/election’, and then have each candidate create an ephemeral sequential child node. The node that gets the lowest sequence id wins and is the new leader. You also need to watch the leader so as to be able to elect a new one if it fails – details for doing this without causing a herd effect are given at the referenced page, the same ‘watch the next lowest sequence id node’ idea as we saw above is used.

Implementation notes and related work

ZooKeeper data is replicated to each server and held in an in-memory database. Updates are logged and forced to disk before being applied in memory.

ZooKeeper uses periodic snapshots and only requires redelivery of messages since the start of the snapshot. We call ZooKeeper snapshots fuzzy snapshots since we do not lock the ZooKeeper state to take the snapshot; instead, we do a depth first scan of the tree atomically reading each znode’s data and meta-data and writing them to disk. Since the resulting fuzzy snapshot may have applied some subset of the state changes delivered during the generation of the snapshot, the result may not correspond to the state of ZooKeeper at any point in time. However, since state changes are idempotent, we can apply them twice as long as we apply the state changes in order.

On ZooKeeper vs Chubby:

Chubby shares several of the goals of ZooKeeper. It also has a file-system-like interface, and it uses an agreement protocol to guarantee the consistency of the replicas. However, ZooKeeper is not a lock service. It can be used by clients to implement locks, but there are no lock operations in its API…. ZooKeeper clients can use their local replicas to serve data and manage watches since its consistency model is much more relaxed than Chubby. This enables ZooKeeper to provide higher performance than Chubby, allowing applications to make more extensive use of ZooKeeper.

On ZooKeeper vs Paxos:

One important technique for building fault-tolerant services is state-machine replication, and Paxos is an algorithm that enables efficient implementations of replicated state-machines for asynchronous systems. We use an algorithm that shares some of the characteristics of Paxos, but that combines transaction logging needed for consensus with write-ahead logging needed for data tree recovery to enable an efficient implementation.