Wormhole: Reliable pub-sub to support Geo-Replicated Internet Services

Wormhole: Reliable pub-sub to support Geo-Replicated Internet Services – Sharma et al. 2015

At Facebook, lots of applications are interested in data being written to Facebook’s data stores. Having each of these applications poll the data stores of interest would be untenable, so Facebook built a pub-sub system to identify updates and transmit notifications to interested applications. That pub-sub system is called Wormhole.

Wormhole has been in production at Facebook for over three years. Over this period, Wormhole has grown tremendously from its initial deployment where it published database updates for cache invalidation system. Today, Wormhole is used by tens of datasets stored on MySQL, HDFS and RocksDB. Across these deployments, Wormhole publishers transport over 35 GBytes/sec of updates at steady state across 5 trillion messages per day. Note that this update rate isn’t the peak for the system—in practice, we have found that Wormhole has transported over 200 GBytes/sec of updates when some applications fail and need to replay past updates.

Unlike many pub-sub systems, Wormhole does not have its own custom data store used for messages in transit. Neither does it require any interposition in the update path of the supported data sources to post messages to topics. Instead,

Wormhole publishers directly read the transaction logs maintained by the data storage systems to learn of new writes committed by producers. Upon identifying a new write, Wormhole encapsulates the data and any corresponding metadata in a custom Wormhole update that is delivered to subscribers. Wormhole updates always encode data as a set of key-value pairs irrespective of the underlying storage system, so interested applications do not have to worry about where the underlying data lives.

The supported data storage systems include MySQL, HDFS, and RocksDB. Given that Wormhole works by scanning transaction logs, the support for geo-replication is very straightforward: Wormhole relies on the geo-replication mechanisms in place for each data storage system of interest, and simply works off of the local transaction log for each replica. This avoids the need for Wormhole to do any geo-replication of its own, and also means that a subscriber in a geo that get a notification knows that the local source data storage system must also have received that update.

Delivery guarantees

Subscribers receive a stream of updates called a flow. An ack per message is too expensive, so publishers periodically send a datamarker per flow which is acked. This acknowledgement indicates that the subscriber has processed all updates up to the datamarker.

A datamarker for a flow is essentially a pointer in the datastore log that indicates the position of the last received and acknowledged update of the flow by the subscriber.

Wormhole provides at least once delivery for updates from single copy and multiple copy datasets. For ‘multiple copy’, read ‘geo-replicated’. A single copy (single region) datastore may still be divided into multiple shards. Each subscriber will see updates from an individual shard in order, but ordering of updates across shards is not defined. For geo-replicated datasets, applications are allowed to subscribe to multiple copies at once. In this case,

Wormhole guarantees that its subscribers receive at least once all updates contained in any subscribed copy of the dataset. The updates for any shard are, again, delivered in order. There is no ordering guarantee between updates that belong to different shards.

On recovery, Wormhole will replay messages from the last acknowledged data marker.

Dealing with shards

Producers are deployed locally alongside the datastore whose updates they are broadcasting. For a sharded datastore there is therefore one producer per shard. Every notification created by the producer contains a key-value pair indicating the shard it belongs too.

Subscribing applications can also deploy multiple instances of themselves in order to spread load. Each instance is called a subscriber. Applications and their subscriber instances are registered in ZooKeeper. Publishers find subscribers via ZooKeeper, and divide up the shards between subscribers. Each subscriber will receive an onShardNotice()call back to tell it the shard(s) it has been assigned.

All updates of a single shard are always sent to one subscriber, i.e., they are not split among multiple subscribers.

Initially there was a centralized deployment system for Wormhole, but keeping track of the large set of production datastore machines as new machines are constantly brought into and taken out of production proved challenging.

We decided to switch to a distributed deployment system based on runsv while trying to minimize dependencies outside of the local machine. We run a lightweight Wormhole monitor on each datastore machine. The monitor periodically checks the configuration system (which indicates the machines that are in production) and based on that determines whether to run a publisher or not, and if so, with what configuration. This decentralized system has been significantly more reliable and easier to use.

Dealing with recovering and slow consumers

In steady state, all flows get updates from the same position in the datastore, i.e., the position corresponding to the current time. Hence, Wormhole uses one reader to get the updates, and sends them to all interested flows.

But if subscribers are recovering, or if there are slow subscribers that can’t keep up, we could end up with multiple readers each working from a different point in the transaction log. This causes high I/O load. So instead of one flow per reader (one reader per flow) Wormhole clusters flows working from close points in the transaction log.

Wormhole clusters flows and creates a reader for each cluster instead, which results in significant I/O savings. Each such reader combined with associated flows is called a caravan. The singleton steady state caravan (or the farthest caravan in case of multiple caravans) is called the lead caravan.

Obviously, it is desirable to have as few caravans as possible:

Usually, the non-lead caravans are expected to eventually catch up with the lead caravan and are thus forced to read updates at a rate that is faster than the rate of the lead caravan (typically 1.25 to 2 times faster). In order to prevent overloading the datastore, Wormhole has configuration parameters for the maximum number of caravans, the maximum rate at which a caravan is allowed to read updates, and a maximum cumulative rate at which the collection of caravans is allowed to read updates. We also dynamically move flows between existing caravans. If a caravan has a flow which is not able to keep up with the speed of the caravan (because the corresponding subscriber is overloaded, for instance) or whose datamarker is far ahead (and can better served by another caravan), we can move the flow. These actions are taken periodically.

Why build a custom pub-sub system?

The section on related work contains a discussion of (amongst others) Kafka and RabbitMQ.

On Kafka:

Kafka is LinkedIn’s message bus, now open source and maintained by Apache. It has a topic-based pub-sub API. LikeWormhole, it uses ZooKeeper to keep track of how many events particularly subscribers have consumed. As in Wormhole, data sources are sharded. At LinkedIn, it is used to distribute various real-time logging information to various subscribers. Kafka can lose messages in case one of its message brokers suffers a failure. Recent benchmarks puts the speed at which Kafka can transport messages at about 250 MBytes/sec, orders of magnitude below Wormhole’s production load, but Kafka’s throughput can be improved by sharding differently.

On RabbitMQ:

Like Wormhole, RabbitMQ can scale to multiple datacenters and is particularly efficient for small messages. Neither (RabbitMQ nor beanstalkd) supports replicated data sources, or have been been demonstrated to support the scale of Facebook’s workloads.