Skip to content

ApproxJoin: approximate distributed joins

November 9, 2018

ApproxJoin: approximate distributed joins Le Quoc et al., SoCC’18


The join is a fundamental data processing operation and has been heavily optimised in relational databases. When you’re working with large volumes of unstructured data though, say with a data processing framework such as Flink or Spark, joins become distributed and much more expensive. One of the reasons for this is the amount of data that needs to be moved over the network. In many use cases, approximate results would be acceptable, and as we’ve seen before, likely much faster and cheaper to compute. Approximate computing with joins is tricky though: if you sample datasets before the join you reduce data movement, but also sacrifice up to an order of magnitude in accuracy; if you sample results after the join you don’t save on any data movement and the process is slow.

This paper introduces an approximate distributed join technique, ApproxJoin, which is able to sample before data shuffling without loss of end result accuracy. Compared to unmodified Spark joins with the same sampling ratio it achieves a speedup of 9x while reducing the shuffled data volume by 82x.

The following charts show ApproxJoin’s latency and accuracy characteristics compared to Spark sampling before the join (ApproxJoin has much better accuracy and similar latency) and to Spark sampling after the join (ApproxJoin has similar accuracy and much lower latency).

ApproxJoin works in two phases. First it uses one of the oldest tricks in the book, a Bloom filter, to eliminate redundant data shuffling. A nice twist here is that ApproxJoin directly supports multi-way joins so we don’t need to chain a series of pairwise joins together. In the second phase ApproxJoin uses stratified sampling to produce an answer approximating the result of an aggregation over the complete join result.

At a high level, ApproxJoin makes use of a combination of sketching and sampling to select a subset of input datasets based on the user-specified query budget. Thereafter, ApproxJoin aggregates over the subset of input data.

User queries must contain an algebraic aggregation function (e.g. SUM, AVG, COUNT, STDEV), and as is usual for approximate compute frameworks, can specify either a time bound or an error bound for the query.

High level overview

Overall, the ApproxJoin system looks like this:

Filtering using a multi-way Bloom filter happens in parallel at each node storing partitions of the input, and simultaneously across all input tables.

The sampling phase makes use of stratified sampling within the join process: datasets are sampled while the cross product is being computed. Stratified sampling in this case means that tuples with distinct join keys are sampled independently (with simple random sampling). Thus the final sample will contain all join keys— even those with few data items. A cost function is used to compute an optimal sampling rate according to the query budget. There’s a one-off upfront cost to compute the standard deviation for a join key (‘stratum’), which is stored and reused in subsequent queries. It’s not clear whether or not this cost is included in the evaluation (so best guess it isn’t 😉 ).


The filtering step is very straightforward. First a Bloom filter is created for each input dataset: each worker with a partition of the dataset creates a local bloom filter, and then these are combined using OR. Once we have the merged Bloom filter for each input dataset, we can simply combine the filters across datasets using AND. (Think about hashing a given key, only if the corresponding bits are set in each of the input Bloom filters can that key possibly exist in all inputs). In the implementation, Bloom filters are merged using a treeReduce scheme to prevent the Spark driver becoming a bottleneck.

Clearly, the greater the overlap between the input datasets the more data we need to shuffle, and hence the less benefit the Bloom-filter based join can add.

After the filtering stage, it may be that the overlap fraction between the datasets is small enough that full join can now be performed within the latency requirements of the user. If this is not the case, we proceed to the approximation…

Determining the cost function

ApproxJoin makes use of latency and error-bound cost functions to convert the join requirements specified by a user into sampling rates.

For the latency cost function we need to combine the cost of filtering and transferring the data join items, with the cost of computing the cross products. There’s no need to estimate the cost of filtering and transferring— we have to do this regardless so we can just time it. The remaining latency budget is simply then the target time specified by the user, minus the time we spent in the first phase! The cost function for the cross product phase is simply a weighting of the number of cross products we need to do. The weighting (scale) factor depends on the computation capacity of the compute cluster, which is profiled once offline to calibrate. (That is, once in the lifetime of the cluster, not once per query).

If a user specified an error bound, we need to calculate how many samples to take to satisfy the requirement. For stratum i, the number of samples b_i turns out to be governed by the following equation at 95% confidence level:

The standard deviation \sigma_i of the stratum is computed and stored on the first execution of a query and subsequently reused.


To preserve the statistical properties of the exact join output, we combine our technique with stratified sampling. Stratified sampling ensures that no join key is overlooked; for each join key, we perform simple random selection over data items independently. This method selects data items fairly from different join keys. The (preceding) filtering stage guarantees that this selection is executed only from data items participating in the join.

Random sampling on data items having the same join key is equivalent to perform edge sampling on a complete bipartite graph modelling the relation.

To include an edge in the sample, ApproxJoin randomly selects one endpoint vertex from each side, and then yields the edge connecting them. For a sample of size b this process is repeated b times.

In a distributed settings, data items are distributed to worker nodes based on the join keys (e.g. using a hash-based partitioner), and each worker performs the sampling process in parallel to sample the join output and execute the query.

Query execution

After sampling, each node executes the input query on the sample to produce a partial query result. These results are merged at the master node, which also produces error bound estimations.

Estimating errors

The sampling algorithm can produce an output with duplicate edges. This acts as a random sampling with replacement, and the Central Limit Theorem can be used to estimate the error bounds. An alternative error estimation mechanism is also described in which duplicate edges are not allowed in the sample, and a Horvitz-Thompson estimator can be used. I couldn’t determine which of these two mechanisms is actually used for the results reported in the evaluation.


Focusing just on the filtering stage to start with, we see that with two-way joins ApproxJoin is 6.1x faster than a native Spark join, and shuffles 12x less data. The gains are even better with 3-way and 4-way joins:

The benefits do depend on the overlap percentage though. The figures above were all with overlap fractions below 1%, and the ApproxJoin advantage disappears by the time we get to around 8%.

Turning to the sampling stage, the following figure compares scalability, latency, and accuracy of ApproxJoin sampling vs Spark joins. Here ApproxJoin can deliver order of magnitude speed-ups.

ApproxJoin is also evaluated end-to-end on two real-world datasets: CAIDA network traces, and the Netflix prize dataset. For the network trace dataset ApproxJoin is 1.5-1.7x faster, and reduces shuffled data by 300x. For the Netflix dataset ApproxJoin is 1.27-2x faster, and shuffles 1.7-3x less data.

By performing sampling during the join operation, we achieve low latency as well as high accuracy… Our evaluation shows that ApproxJoin significantly reduces query response time as well as the data shuffled through the network, without losing the accuracy of the query results compared with the state-of-the-art systems.

ASAP: fast, approximate graph pattern mining at scale

November 7, 2018

ASAP: fast, approximate graph pattern mining at scale Iyer et al., OSDI’18

I have a real soft spot for approximate computations. In general, we waste a lot of resources on overly accurate analyses when understanding the trends and / or the neighbourhood is quite good enough (do you really need to know it’s 78.763895% vs 78 ± 1%?). You can always drill in with more accuracy if the approximate results hint at something interesting or unexpected.

Approximate analytics is an area that has gathered attention in big data analytics, where the goal is to let the user trade-off accuracy for much faster results.

(See e.g. ApproxHadoop which we covered on The Morning Paper a while back).

In the realm of graph processing, graph pattern mining algorithms, which discover structural patterns in a graph, can reveal very interesting things in our data but struggle to scale to larger graphs. This is in contrast to graph analysis algorithms such as PageRank which typically compute properties of a graph using neighbourhood information.

Today, a deluge of graph processing frameworks exist, both in academia and open-source… a vast majority of the existing graph processing frameworks however have focused on graph analysis algorithms.

Arabesque explicitly targets the graph mining problem using distributed processing, but even then large graphs (1 billion edges) can take hours (e.g. 10 hours) to mine.

In this paper, we present A Swift Approximate Pattern-miner (ASAP), a system that enables both fast and scalable pattern mining. ASAP is motivated by one key observation: in many pattern mining tasks, it is often not necessary to output the exact answer. For instance, in frequent sub-graph mining (FSM) the task is to find the frequency of subgraphs with an end goal of ordering them by occurrences… our conversations with a social network firm revealed that their application for social graph similarity uses a count of similar graphlets. Another company’s fraud detection system similarly counts the frequency of pattern occurrences. In both cases, an approximate count is good enough.

ASAP outperforms Arabesque by up to 77x on the LiveJournal graph while incurring less than 5% error. It can scale to graphs with billions of edges (e.g. Twitter at 1.5B edges) and produce results in minutes rather than hours.

One major challenge is that the approximation methods used in big data analytics don’t carry over to graph pattern mining. There, the core idea is to sample the input, run the analysis over the sample, and extrapolate based on an assumption that the sample size relates to the error in the output. If we take this same idea and apply it to a foundational graph mining problem, counting triangles, it quickly becomes apparent that there is no clear relationship we can rely on between the size of the sample and the error or the speedup.

We conclude that the existing approximation approach of running the exact algorithm on one or more samples of the input is incompatible with graph pattern mining. Thus, in this paper, we propose a new approach.

Graph pattern mining and sampling

Pattern mining is the problem of finding instances of a given pattern e.g. triangles, 3-chains, cliques, motifs, in a graph. (A motif query looks for patterns involving a certain number of vertices, e.g. a 3-motif query looks for triangles and 3-chains).

A common approach is to iterate over all possible embeddings in the graph starting with a vertex or edge, filtering out those that cannot possibly match. The remaining candidate embeddings are then expanded by adding one more vertex/edge and the process repeats.

The obvious challenge in graph pattern mining, as opposed to graph analysis, is the exponentially large candidate set that needs to be checked.

Neighbourhood sampling has been proposed in the context of a specific graph pattern, triangle counting. The central idea is to sample one edge from an edge stream, and the gradually add more edges until either they form a triangle or it becomes impossible to form the pattern. Given a graph with m edges, it proceeds as follows. To perform one trial:

  • Uniformly sample one edge (l_0) from the graph, with sampling probability 1/m.
  • Uniformly sample one of l0’s adjacent edges (l_1) from the graph. (Note that neighbourhood sampling depends on the ordering of edges in the stream, and l_1 appears after l_0 here). If _l0 has c neighbours appearing in the stream after it, then the sampling probability for l_1 is 1/c.
  • Find an edge l_2 appearing in the stream after l_1 that completes the triangle, if possible. If we do find such an edge, the sampling probability is 1/mc.

If a trial successfully samples a triangle, we can estimate e_i = mc triangles in the full graph. After conduction r trials we have an approximate result given by \frac{1}{r}\sum_{r}e_i.

Here’s an example using a graph with five nodes:

Introducing ASAP

ASAP generalises the neighbouring sampling approach outlined above to work with a broader set of patterns. It comes with a standard library of implementations for common patterns, and users can also write their own. It is designed for a distributed setting and also includes mining over property graphs (which requires predicate matching).

Users of ASAP can explicitly trade off compute time and accuracy, specifying either a time budget (best result you can do in this time) or an error budget (give me a result as quickly as you can, with at least this accuracy). Before running the algorithm, ASAP returns an estimate of the time or error bounds it can achieve. If the user approves the algorithm is run and the presented result includes a count, confidence level, and actual run time. Users may then optionally ask to output the actual embeddings of the found pattern.

To find out how many estimators it needs to run for the given bounds ASAP builds an error-latency profile (ELP). Building an ELP is fast and can be done online. For the error profile, the process begins with uniform sampling to reduce the graph to a size where nearly 100% accurate estimates can be produced. A loose upper bound for the number of estimators required is then produced using Chernoff bounds and scaled to the larger graph (see section 5.2 for details and for how the time profile is computed). For evolving graphs, the ELP algorithm can be re-run after a certain number of changes to the graph (e.g., when 10% of edges have changed).

Approximate pattern mining in ASAP

ASAP generalises neighbourhood sampling to a two phase process: a sampling phase followed by a closing phase.

In the sampling phase, we select an edge in one of two ways by treating the graph as an ordered stream of edges: (a) sample an edge randomly; (b) sample an edge that is adjacent to any previously sampled edges, from the remainder of the stream. In the closing phase, we wait for one or more specific edges to complete the pattern.

(So in the triangle counting example above, sampling l_0 and l_1 form the sampling phase, and waiting for l_2 is the closing phase).

The probability of sampling a pattern can be computed from these two phases….

For a k-node pattern, the probability of detecting a pattern p depends on k and the different ways to sample using the neighbourhood sampling technique. There are four different cases for k = 2..5, and up to three different types of sampling possible within each case. The probability formulas for all of these are enumerated in section 4.1.1. E.g., when k=2 the probability is 1/mc as we saw previously.

When applied in a distributed setting ASAP parallelises the sampling process and then combines the outputs (MapReduce). Vertices in the graph are partitioned across machines, and several copies of the estimator task are scheduled on each machine. The graphs edges and vertices need to seen in the same order on each machine, but any order will do (‘thus, ASAP uses a random ordering which is fast and requires no pre-processing of the graph’). The output from each machine is a partial count, making the reduce step a trivial sum operation.

For predicate matching on property graphs ASAP supports both ‘at least one’ and ‘all’ predicates on the pattern’s vertices and edges. For ‘all’ predicates, ASAP introduces a filtering phase before the execution of the pattern mining task. For ‘at least one’ predicates a first pass produces a matching list of edges matching the predicate. Then in the second pass when running the sampling phase of the algorithm every estimator picks its first edge randomly from within the matching list.

There’s a performance optimisation for motif queries (finding all patterns with a certain number of vertices) whereby common building blocks from k-node patterns can be reused.

If a user first explores the graph with a low accuracy answer (e.g. using 1 million estimators), and then wants to drill in to higher accuracy (requiring e.g. 3 million estimators) then ASAP only needs to launch another 2 million estimators and can reuse the first 1 million.

Key evaluation results

ASAP is evaluated using a number of graphs:

Compared to Arabesque, ASAP shows up to 77x performance improvement with a 5% loss of accuracy when counting 3-motifs and 4-motifs on modest sized graphs:

On larger graphs ASAPs advantage extends up to 258x.

The final part of the evaluation evaluates mining for 5-motifs (21 individual patterns), based on conversations with industry partners who use similar patterns in their production systems. Results for two of those patterns are shown in the table below, and demonstrate that ASAP can handle them easily (using a cluster of 16 machines each with 16 cores).

Our evaluation shows that not only does ASAP outperform state-of-the-art exact solutions by more than an order of magnitude, but it also scales to large graphs while being low on resource demands.

Sharding the shards: managing datastore locality at scale with Akkio

November 5, 2018

Sharding the shards: managing datastore locality at scale with Akkio Annamalai et al., OSDI’18

In Harry Potter, the Accio Summoning Charm summons an object to the caster of the spell, sometimes transporting it over a significant distance. In Facebook, Akkio summons data to a datacenter with the goal of improving data access locality for clients. Central to Akkio is the notion of microshards (μ-shards), units of data much smaller than a typical shard. μ-shards are defined by the client application, and should exhibit strong access locality (i.e., the application tends to read/write the data in a μ-shard together in a small window of time). Sitting as a layer between client applications and underlying datastores, Akkio has been in production at Facebook since 2014, where it manages around 100PB of data.

Measurements from our production environment show that Akkio reduces latencies by up to 50%, cross-datacenter traffic by up to 50%, and storage footprint by up to 40% compared to reasonable alternatives.

Akkio can support trillions of μ-shards and many 10s of millions of data access requests per second.


Our work in this area was initially motivated by our aim to reduce service response times and resource usage in our cloud environment which operates globally and at scale… Managing data access locality in geo-distributed systems is important because doing so can significantly improve data access latencies, given that intra-datacenter communication latencies are two orders of magnitude smaller than cross-datacenter communication latencies: e.g. 1ms vs 100ms.

At scale, requests issued on behalf of one end-user are likely to be processed by multiple distinct datacenters over time. For example, the user may travel from one location to another, or service workload may be shifted from one location to another.

One way to get data locality for accesses is to fully replicate all data across all datacenters. This gets expensive very quickly (the authors estimated $2 million per 100PBs per month for storage, plus costly WAN cross-datacenter bandwidth usage on top).

We could cap those costs by setting a caching budget, but for many of the workloads important to Facebook distributed caching is ineffective. For acceptable hit rates we still need to dedicate large amounts of hardware resources, and these workloads have low read-write ratios. Beyond the cost and latency implications there’s one further issue: “many of the datasets accessed by our services need strong consistency… it is notable that the widely popular distributed caching systems that are scalable, such as Memcached or Redis, do not offer strong consistency. And for good reason.

Why not just use shards?

Don’t datastores already partition their data using shards though (e.g. through key ranges or hashing)? There are two issues with the existing sharding mechanism from an Akkio perspective:

  1. Shard sizes are set by administrators to balance shard overhead, load balancing, and failure recovery. They tend to be on the order of a few tens of gigabytes.
  2. Shards are primarily a datastore concern, used as the unit for replication, failure recovery, and load balancing.

At Facebook, because the working set size of accessed data tends to be less than 1MB, migrating an entire shard (1-10GB) would be ineffective.


Making shards smaller (and thus having many more of them) interferes with the datastore operations, and moving shards as a unit is too heavyweight. So Akkio introduces a new layer on top of shards, μ-shards.

μ-shards are more than just smaller shards, a key difference is that the application itself assigns data to μ-shards with high expectation of access locality. μ-shard migration has an overhead an order of magnitude lower than shard migration and its utility is far higher.

Each μ-shard is defined to contain related data that exhibits some degree of access locality with client applications. It is the application that determines which data is assigned to which μ-shard. At Facebook, μ-shard sizes typically vary from a few hundred bytes to a few megabytes in size, and a μ-shard (typically) contains multiple key-value pairs or database table rows. Each μ-shard is assigned (by Akkio) to a unique shard in that a μ-shard never spans multiple shards.

Examples of applications that fit well with μ-shards include:

The design of Akkio

Akkio is implemented as an layer inbetween client applications and an underlying datastore. The paper focuses on Akkio’s integration with Facebook’s ZippyDB, but it’s design enable use with multiple backend datastores. The Akkio client library is embedded within the database client library, exposing an application-managed μ-shard id on all requests. It is up to the application to establish it’s own μ-shard naming scheme.

The Akkio location service maintains a location database that the client can use to map μ-shards to storage servers so that it knows where to direct requests. An access counter service is used to track all accesses, so that μ-shard placement and migration decisions can be made. It is the responsibility of the data placement service (DPS) to decide where to place each μ-shard.

The Akkio Client Library asynchronously notifies the DPS that a μ-shard placement may be suboptimal whenever a data access request needs to be directed to a remote datacenter. The DPS re-evaluates the placement of a μ-shard only when it receives such a notification.

Location information is configured with an eventually consistent replica at every datacenter: the dataset is relatively small, on the order of few hundred GB. If a client reads a stale location and gets a miss it will query a second time requesting that the cache by bypassed (and subsequently updated). Storage required for access counters is also low, less than 200GB per datacenter at Facebook.

All the main action happens in the data placement service, which is tailored for each backend datastore:

There is one DPS per Akkio-supported backend datastore system that is shared among all of the application services using instances of the same datastore system. It is implemented as a distributed service with a presence in every datacenter.

The createUShard() operation is called when a new μ-shard is being created and DPS must decide where to initially place it. evaluatePlacement() is invoked asynchronously by the Akkio client library whenever a data access request needs to be directed to a remote datacenter. DPS first checks its policies to see if initiating a migration is permissible (details of the policy mechanism are mostly out of scope for this paper), and whether a migration is already in process. If migration is permissible it determines the optimal placement for the μ-shard and starts the migration.

μ-shard placement

To understand placement in the context of ZippyDB, we first need to look briefly at how ZippyDB manages shards and replicas. ZippyDB partitions data horizontally, with each partition assigned to a different shard. Shards can be configured to have multiple replicas which form a shard replica set and participate in a shard-specific Paxos group. The replication configuration of a shard identifies not only the number of replicas, but also how they are to be distributed over datacenters, clusters, and racks, and the degree of consistency required. Replica sets that all have the same configuration from a replica set collection.

When running on ZippyDB, Akkio places μ-shards on, and migrates μ-shards between different such replica set collections.

ZippyDBs Shard Manager assigns each shard replica to a specific ZippyDB server conforming to the policy. The assignments are registered in a directory service.

For initial placement of a μ-shard, the typical strategy is to select a replica set collection with primary replica local to the requesting client and secondary replica(s) in one of the more lightly loaded datacenters. For migration a scoring system is used to select the target replica set collection from those available. First datacenters are scored based on the number of times the μ-shard was accessed from that datacenter over the last X-days, with stronger weighting for more recent accesses. Any ties are then broken by scoring datacenters based on the amount of available resources within them.

When working on top of ZippyDB, Akkio makes use of ZippyDB’s access control lists and transactions to implement migrations. The move algorithm looks like this:

On top of Facebook’s Cassandra variant (which doesn’t support ACLs, though the open-source Cassandra does), a different migration strategy is needed:

Now, that’s all well and good but here’s the part I still don’t understand. The underlying sharding system of the datastore controls placement. Data items are assigned to shards by some partitioning scheme (e.g. by range partitioning, or by hash partitioning). So when we move a μ-shard from one shard (replica collection set) to another, what is really happening with respect to those keys? Are they being finagled in the client so that e.g. they end up in the right range? (Have the right hash value????!). I feel like I must be missing something really obvious here, but I can’t see it described in the paper. Akkio does allow for some minimal changes to the underlying datastore to accommodate μ-shard migrations, but these changes are minimal:

Akkio in production

The evaluation section contains multiple examples of Akkio in use at Facebook, which we don’t have space to cover in detail. The before-and-after comparisons (in live production deployments) make compelling reading though:

  • ViewState data stores a history of content previously shown to a user. Replica set collections are configured with two replicas in one local datacenter, and third in a nearby datacenter. Akkio is configured to migrate μ-shards aggressively.

Originally, ViewState data was fully replicated across six datacenters. Using Akkio with the setup described above let to a 40% smaller storage footprint, a 50% reduction of cross-datacenter traffic, and about a 60% reduction in read-and write latencies compared to the original non-Akkio setup.

  • AccessState stores information about user actions taken in response to displayed content. Akkio once more decreased storage footprint by 40%, and cross-datacenter traffic by 50%. Read latency was unaffected, but write latency reduced by 60%.
  • For Instagram ‘Connection-Info’ Akkio enabled Instagram to expand into a second continent keeping both read and write latencies lower than 50ms. “This service would not have expanded into the second continent without Akkio.”
  • For the Instagram Direct messaging application, Akkio reduced end-to-end message delivery latency by 90ms at P90 and 150ms at P99. (The performance boost in turn delivered improvements in key user engagement metrics).

Up next: migrating more applications to Akkio, and supporting more datastore systems on the backend (including MySQL). In addition…

… work has started using Akkio (i) to migrate data between hot and cold storage, and (ii) to migrate data more gracefully onto newly created shards when resharding is required to accommodate (many) new nodes.

The FuzzyLog: a partially ordered shared log

November 2, 2018

The FuzzyLog: a partially ordered shared log Lockerman et al., OSDI’18

If you want to build a distributed system then having a distributed shared log as an abstraction to build upon — one that gives you an agreed upon total order for all events — is such a big help that it’s practically cheating! (See the “Can’t we all just agree” mini-series of posts for some of the background on consensus).

Services built over a shared log are simple, compact layers that map a high-level API to append/read operations on the shared log, which acts as the source of strong consistency, durability, failure atomicity, and transactional isolation. For example, a shared log version of ZooKeeper uses 1K lines of code, an order of magnitude lower than the original system.

There’s a catch of course. System-wide total orders are expensive to maintain. Sometimes it may be impossible (e.g. in the event of a network partition). But perhaps we don’t always need a total ordering. Oftentimes for example causal consistency is strong enough. FuzzyLog aims to provide the simplicity of a shared log without imposing a total order: it provides partial ordering instead. It’s designed for a world of sharded, geo-replicated systems. The log is actually a directed acyclic graph made up of interlinked chains. A chain contains updates originating in a single geographic region. Nodes within the chains are coloured, with each colour representing updates to a single application level data shard.

(Note: when this paper talks about nodes, the authors mean nodes in the Log DAG, not system nodes (servers)).

Our evaluation shows that [applications built on top of FuzzyLog] are compact, fast, and flexible: they retain the simplicity (100s of lines of code) and strong semantics (durability and failure atomicity) of a shared log design while exploiting the partial order of the FuzzyLog for linear scalability, flexible consistency guarantees (e.g., causal+ consistency), and network partition tolerance. On a 6-node Dapple deployment [Dapple is a FuzzyLog implementation], our FuzzyLog-based ZooKeeper supports 3M/sec single-key writes, and 150K/sec atomic cross-shard renames.

The FuzzyLog API

Let’s take a closer look at the abstraction provided by FuzzyLog through its API. FuzzyLog is designed to support partitioning state into logical data shards, with concurrent processing of updates against different shards. It is also supports geo-replication with concurrent updates across regions (even to the same logical partition).

  • Each node in the DAG is tagged with one or more colours, which divide an application’s state into logical shards. Nodes tagged with a colour correspond to updates against the corresponding logical shard.
  • Each colour is a set of totally ordered chains, one per region, with cross-edges between them that indicate causality. Every region has a full (but potential stale) copy of each colour.

Clients interact with their own local region via the FuzzyLog API:

Operations to a single color across regions are causally consistent. In other words, two append operations to the same color issued by clients in different regions are only ordered if the node introduced by one of them has already been seen by the client issuing the other one…. Operations within a single region are serializable. All append and sync operations issued by clients within a region execute in a manner consistent with some serial execution. This serialization order is linearizable if the operations are to a single color within the region (i.e., on a single chain).

A client uses the sync call to synchronise its state. sync takes a snapshot of the set of nodes currently present at the local region’s copy of a color, and ‘plays’ all new nodes since the last sync invocation. (I.e., sync is moving clients from watermark to watermark in the totally ordered chain for the given colour in the given region).

Here’s an example of sync at work:

Building applications on top of FuzzyLog

Given the FuzzyLog abstraction, the authors give highlights of building a series of different systems on top of it. We are left to infer that this process is simple based on the very small number of lines of code required.

First up is a LogMap, which uses a single colour and a single region. Each server has a local in-memory copy of the map and continuously executes sync to keep its local view up to date. LogMap effectively runs over a single totally ordered shared log. It is implemented in just 193 lines of code, but its reliance on total order comes at the cost of scalability, performance, and availability.

To scale within a single region, we can add sharding (colours). ShardedMap is a trivial change to LogMap, requiring just that the colour parameter be set correctly on calls to the FuzzyLog.

If we need atomic operations across shards (with blind multi-put operations that don’t return a value), the AtomicMap (201 lines of code) can do this. It supports serializable (but not linearizable or strictly serializable) appends.

For full read/write transactions, TxMap at 417 lines of code can do the business. It tracks read-sets and buffers write-sets, and plays out a commit protocol through nodes in the log. TxMap provides strict serializability.

If we want to go across regions, then CRDTMap (284 lines of code) provides causal+ consistency based on the Observed-Remove set CRDT. We can make it transactional (TxCRDTMap by changing 80 LOC).

CRDTMap sacrifices consistency even when there is not partition in the system. CAPMap (424 LOC) provides strong consistency in the absence of network partitions, and causal consistency during them.

A ZooKeeper clone supporting linear scaling across shards and atomic cross-shard renames is 1881 LOC.

Introducing Dapple, a FuzzyLog implementation

Dapple is a distributed implementation of the FuzzyLog abstraction. It uses a collection of storage servers called chainservers (they store the per-colour chains, and also happen to use chain replication). Each chainserver stores multiple in-memory log-structured address spaces. The FuzzyLog state is partitioned across chainservers with each colour on a single partition. Persistent is achieved using the ‘lots of little batteries’ strategy (battery-backed DRAM).

If we stick to a single colour for a moment, then each region will have a single totally ordered chain. In addition to this latest copy of its own local chain, it also has potentially stale copies of the other region’s chains (shadow logs). Clients write to the local log, shadow logs are asynchronously replicated from other regions. Log appends use chain replication and are acknowledged by the tail. Snapshots requests are sent directly to the tail, and reads can be satisfied by any replica in the chain.

The FuzzyLog API supports append a node to multiple colours, which in Dapple equates to atomically appending a node to multiple logs, one log per colour:

…Dapple uses a classical total ordering protocol called Skeen’s algorithm (which is unpublished but described verbatim in other papers…) to consistently order appends… We add three fault-tolerance mechanisms —leases, fencing, and write-ahead logging— to produce a variant of Skeen’s that completes in two phases in a failure-free ‘fast’ path, but can safely recover if the origin client crashes.

Details of the protocol are given in §5.2 of the paper.


Tango and vCorfu are based on shared logs using a centralised sequencer. Compared to Tango, Dapple offers near linear scaling with 0% multi-colour appends, and holds its advantage over Tango until about 10%. At 100% multi-colour appends Tango wins because we’re back in the total order situation which Tango provides more efficiently.

When using multi-shard operations with AtomicMap, scalability and absolute throughput degrade gracefully as the percentage of multi-colour appends is steadily increased:

A ZooKeeper clone, DappleZK, is implemented in 1881 lines of Rust code. It partitions a namespace across a set of servers, each acting as a Dapple client, storing a partition of the namespace in in-memory data-structures backed by a FuzzyLog colour. Each DappleZK server is responsible for an independent shard of the ZooKeeper namespace. Rename operations move files from one DappleZK shard to another in a distributed transaction. A ZooKeeper deployment maintaining its state in DRAM is included for comparison.

We include the ZooKeeper comparison for completeness; we expect the FuzzyLog single-partition case to outperform ZooKeeper largely due to the different languages used (Rust vs Java) and the different between prototype and production-quality code.

(Here the assumption is that production quality code is slower due to all the checks-and-balances – versus being faster because it has been optimised).

The last word

The FuzzyLog abstraction— and its implementation in Dapple— extends the shared log approach to partial orders, allowing applications to scale linearly without sacrificing transactional guarantees, and switch seamlessly between these guarantees when the network partitions and heals. Crucially, applications can achieve these capabilities in hundreds of lines of code via simple, data-centric operations on the FuzzyLog, retaining the core simplicity of the shared log approach.

Moment-based quantile sketches for efficient high cardinality aggregation queries

October 31, 2018

Moment-based quantile sketches for efficient high cardinality aggregation queries Gan et al., VLDB’18

Today we’re temporarily pausing our tour through some of the OSDI’18 papers in order to look at a great sketch-based data structure for quantile queries over high-cardinality aggregates.

That’s a bit of a mouthful so let’s jump straight into an example of the problem at hand. Say you have telemetry data from millions of heterogenous mobile devices running your app. Each device tracks multiple metrics such as request latency and memory usage, and is associated with dimensional metadata (categorical variables) such as application version and hardware model.

In applications such as A/B testing, exploratory data analysis, and operations monitoring, analysts perform aggregation queries to understand how specific user cohorts, device types, and feature flags are behaving.

We want to be able to ask questions like “what’s the 99%-ile latency over the last two weeks for v8.2 of the app?

SELECT percentile(latency, 99) FROM requests
WHERE time > date_sub(curdate(), 2 WEEK)
AND app_version = "v8.2"

As well as threshold queries such as “what combinations of app version and hardware platform have a 99th percentile latency exceeding 100ms?

SELECT app_version, hw_model, PERCENTILE(latency, 99) as p99
FROM requests
GROUP BY app_version, hw_model
HAVING p99 > 100

Instead of starting from raw data every time when answering this type of query, OLAP engines can reduce query time and memory usage by maintaining a data cube of pre-aggregated summaries for each tuple of dimension values. The ultimate query performance then depends on just how quickly we can merge those summaries to compute quantile roll-ups over the requested dimensions.

Let’s take a very simple example. Suppose I have two dimensions, letter (with values A and B), and colour (with values red and green), and I have request latency data from log messages including these attributes. Then I will have four summary sketches, one accumulating latency values for (A, red) one for (A, green), one for (B, red) and one for (B, green).

If a query wants to know the P99 latency for ‘red’ requests, we can add together the (A, red) and (B, red) sketches to get a complete sketch for red.

In this paper, we enable interactive quantile queries over high-cardinality aggregates by introducing a compact and efficiently mergeable quantile sketch and associated quantile estimation routines.

The data structure than makes all this possible is called a moments sketch (named after the method of moments statistical technique). It’s easy to construct, but a bit more difficult to interpret. It’s worth the effort though, as the evaluation shows:

  • The moments sketch supports 15-50x faster query times that comparably accurate summaries on quantile aggregations
  • The moments sketch gives good accuracy across a range of real-world datasets using less than 200 bytes of storage
  • Integration of the moments sketch in Druid provides 7x faster quantile queries than the default quantile summary in Druid workloads.

There’s a Java implementation available at

Mergeable quantile summaries

Intuitively, a summary is mergeable if there is no accuracy cost to combining pre-computed summaries compared with constructing a summary on the raw data. Thus, mergeable summaries are algebraic aggregate functions in the data cube literature… Mergeable summaries can be incorporated naturally into a variety of distributed systems [e.g. MapReduce].

The moments sketch mergeable summary is based on the method of moments, “a well established statistical technique for estimating the parameters of probability distributions.” Given a sketch recording moments (we’ll look at those in just a minute), the method of moments helps us to construct a distribution with moments matching those in the sketch. There could be many such distributions meeting those criteria. The principle of maximum entropy is used to select the distribution that encodes the least additional information about the data beyond that captured by the given moment constraints (i.e., just enough to fit the facts, and no more).

Maintaining and merging moments sketches

A moments sketch is a set of floating point values containing:

  • the minimum value seen so far
  • the maximum value seen so far
  • the count of the number of values seen so far
  • k moments, and
  • k log moments

The k sample moments are given by \frac{\sum x}{n}, \frac{\sum x^2}{n}, \frac{\sum x^3}{n} ... \frac{\sum x^k}{n}. In the sketch data structure we can just store n once and maintain the unscaled values \sum x, \sum x^2, ... \sum x^k.

For long-tailed datasets where values can vary over several orders of magnitude we can recover better quantile estimates using log-moments.

In general, when updating a moments sketch in a streaming manner or when maintaining multiple moments sketches in a distributed setting, we cannot know a priori whether standard moments or log moments are more appropriate for the given data set. Therefore, our default approach is to store both sets of moments up to the same order k.

The functions to initialise an empty sketch, accumulate a new point-wise addition, or merge two sketches together are very straightforward given these definitions (the figure below complicates things by showing how to accumulate the scaled moments rather than the unscaled versions).

Estimating quantiles given a moment sketch

So it’s easy to maintain the sketch data structures. But the small matter of how to use these data structures to estimate quantiles remains.

… given a moments sketch with minimum x_{min} and maximum x_{max}, we solve for a pdf f(x) supported on [x_{min},x_{max}] with moments equal to the values in the moments sketch… We can then report the quantiles of f(x) as estimates for the quantiles of the dataset.

The differential Shannon entropy H is used to measure the degree of uniformativeness in a distribution ( H[f] = - \int_{x} f(x) \log f(x) dx ) and thus help select the maximum entropy distribution out of the set of possibilities. Given the pdf f(x), quantiles can be estimated using numeric integration and Brent’s method for root finding.

With a little mathematical juggling (§4.2), we can solve the problem of finding the maximum entropy distribution f using Newton’s method. Section 4.3 gives details on making this practical when calculating the second-order derivatives (Hessians) and combating numerical stability.

You also need to be aware of bumping into the limits of floating point stability. In practice data centered at 0 have stable higher moments up to k=16. With double precision floating point moments and data in a range [c-1, c+1] you can get numerically useful values with moments up to

Overall, these optimisations bring quantile estimation overhead down to 1ms or less, sufficient for interactive latencies on single quantile queries.

Optimisations for threshold queries

For threshold queries (e.g. P99 greater than 100ms) we can do a further optimisation.

Instead of computing the quantile on each sub-group directly, we compute a sequence of progressively more precise bounds in a cascade and only use more expensive estimators when necessary… Given the statistics in a moments sketch, we apply a variety of classical inequalities to derive bounds on the quantiles. These provides worst-case error guarantees for quantile estimates, and enable faster query processing for threshold queries over multiple groups.

Two kinds of bounds are used, one based on Markov’s inequality, and a slightly more expensive to compute bound called the RTTBound (see §5.1). The bounds are used to see if the threshold predicate can be resolved immediately, if not then we solve for the maximum entropy distribution as previously. It all comes together like this:


The moment sketch (M-Sketch) is evaluated against existing quantile summaries and also integrated into Druid and MacroBase. Six real-valued datasets are used, with statistics as captured in the following table:

Microbenchmarks evaluating query times and accuracy show that M-Sketch offers best-in-class query times for the same accuracy.

The moments sketch gets its advantage from fast merge times, which means the more merges required (the bigger the datacube) the better its relative performance. With 10^4 or more merges, M-Sketch provides the best performance. At lower merge numbers (e.g. 100 or less) estimation times dominate.

When integrated into Druid and compared to Druid’s default S-Hist summary using the Milan dataset with 26 million entries the moments sketch gives a 7x lower query time (note the log scale).

The following figure shows the impact of the bounds checking optimisation for threshold queries. Concentrating on the first four bars we see (a) a baseline calculating quantile sketches directly, (b) the moments sketch with no bounds optimisations, (c) adding in the Markov bounds test, and (d) using both the Markov bound and RTTBound tests.

Low merge overhead allows the moments sketch to outperform comparably accurate existing summaries when queries aggregate more than 10 thousand summaries…. the use of numeric optimizations and cascades keep query times at interactive latencies.

Noria: dynamic, partially-stateful data-flow for high-performance web applications

October 29, 2018

Noria: dynamic, partially-stateful data-flow for high-performance web applications Gjengset, Schwarzkopf et al., OSDI’18

I have way more margin notes for this paper than I typically do, and that’s a reflection of my struggle to figure out what kind of thing we’re dealing with here. Noria doesn’t want to fit neatly into any existing box!

We’ve seen streaming data-flow engines that maintain state and offer SQL interfaces and even transactions (e.g. Apache Flink, and data Artisan’s Streaming Ledger for Flink). The primary model here is data-flow, and SQL is bolted on as an interface to the state. The title of this paper sets me off thinking along those lines, but from the end user perspective, Noria looks and feels more like a database. The SQL interface is primary, not ancillary, and it maintains relational data in base tables (using RocksDB as the storage engine). Noria makes intelligent use of data-flow beneath the SQL interface (i.e., dataflow is not exposed as an end-user programming model) in order to maintain a set of (semi-)materialized views. Noria itself figures out the most efficient data-flows to maintain those views, and how to update the data-flow graphs in the face of schema / query set changes.

The primary use case Noria is designed for is read-heavy web applications with high performance (low latency) requirements. Such applications today normally include some kind of caching layer (e.g., memcached, Redis) to accelerate read performance and lighten database load. A lot of application developer effort can go into maintaining these caches and also denormalised and computed state in the database.

In general, developers must choose between convenient, but slow, ‘natural’ relational queries (e.g., with inline aggregations), and increased performance at the cost of application and deployment complexity (e.g. due to caching).

Noria simplifies application development by keeping data in base tables (roughly, the core persistent data) and maintaining derived views (roughly, the data an application might choose to cache). Any computed information derived from the base tables is kept out of those tables. Programmers don’t need to worry about explicit cache management/invalidation, computing and storing derived values, and keeping those consistent. Noria does all this for them.

At its core, Noria runs a continuous, but dynamically changing, dataflow computation that combines the persistent store, the cache, and elements of application logic. Each write to Noria streams through a joint data-flow graph for the current queries and incrementally updates the cached, eventually-consistent internal state and query results.

(Which is also reminiscent of CQRS, but again, the pattern here is used inside the datastore).

It’s not enough for Noria to maintain just some recent window of state, it needs to store all the persistent state. So state explosion is a potential problem. That’s where the ‘partially-stateful data-flow’ part from the paper title comes in, as Noria has a mechanism for retaining only a subset of records in memory and re-computing any missing values from the upstream operators (and ultimately, base tables) on demand.

The current prototype has some limitations, but it’s also showing a whole lot of promise:

When serving the Lobsters web application on a single Amazon EC2 VM, our prototype outperforms the default MySQL-based backend by 5x while simultaneously simplifying the application. For a representative query, our prototype outperforms the widely-used MySQL/memcached stack and the materialized views of a commercial database by 2-10x. It also scales the query to millions of writes and tens of millions of reads per seconds on a cluster of EC2 VMS, outperforming a state-of-the-art data-flow system, differential dataflow.

The end-user perspective

A Noria program looks like SQL DDL and includes definitions of base tables, internal views used as shorthands in other expressions, and external views which the application can later query. Data is retrieved via parameterised SQL queries. Data in base tables can be updated with SQL insertions, updates, and deletes. Noria applies these changes to the appropriate base tables and updates dependent views.

Noria also implements the MySQL binary protocol, so existing applications using prepared statements against a MySQL database can work directly on top of Noria with no changes required.

The consistency model is eventual with a guarantee that if writes quiesce, all external views eventually hold results that are the same as if the queries had been executed directly against the base table data. “Many web applications fit this model: they accept the eventual consistency imposed by caches that make common-case reads fast.

One very nice feature of Noria is that it accepts the fact that application queries and schema evolve over time. Noria plans the changes needed to the data-flow graph to support the changes and transitions the application with no downtime.

Porting a MySQL-based application to Noria typically proceeds in three steps:

  1. Import existing data into Noria from a database dump, and point the application at the Noria MySQL adapter. You should see performance improvements for read queries, especially those that are frequently used.
  2. Create views for computations that the MySQL application currently manually materialises.
  3. Incrementally rewrite the application to rely on these natural views, updating the write path so that the application itself no longer manually updates views and caches.

During the third phase application performance should steadily improve while the code simplifies at the same time.

Data-flows in Noria

Noria creates a directed acyclic data-flow graph of relational operators with base tables at the roots and external views at the leaves.

When an application write arrives, Noria applies it to a durable base table and injects it into the data-flow as an update. Operators process the update and emit derived updates to their children; eventually updates reach and modify the external views. Updates are deltas that can add to, modify, and remove from downstream state.

Joins are implemented using upqueries: requests for matching records from stateful ancestors.


To provide its eventual consistency guarantees Noria requires that:

  • operators are deterministic functions over their own state and the inputs from their ancestors;
  • there are no races between updates and upqueries;
  • updates on the same data-flow path are not reordered; and
  • races between related updates that arrive independently at multi-ancestor operators via different data-flow paths are resolved. Noria addresses this by requiring such operators to be commutative. “The standard relational operators Noria supports have this property.

With respect to ordering, each operator totally orders all updates and upquery requests it receives for an entry, and the downstream dataflow ensures that all updates and upquery responses from the entry are processed by all consumers in that order.

Upqueries require special care since upquery responses don’t commute with each other or with previous updates. Noria handles this by ensuring that no updates are in flight between the upstream stateful operator and the join when a join upquery occurs: each join upquery is scoped to an operator chain processed by a single thread. (Updates on other chains can be processed in parallel).


The partially-stateful data-flow model lets operators maintain only a subset of their state. This concept of partial materialization is well-known for materialized views in databases, but novel to data-flow systems. Partial state reduces memory use, allows eviction of rarely-used state, and relieves operators from maintaining state that is never read… Noria makes state partial whenever it can service upqueries using efficient index lookups. If Noria would have to scan the full state of an upstream operator to satisfy upqueries, Noria disables partial state for that operator.

Partial-state operators start out empty and are gradually and lazily populated by upqueries.

Like a cache, entries can be evicted under memory pressure. Eviction notices flow along the update data-flow path, indicating that some state entries will no longer be updated. If it is later required to read from evicted state Noria recomputes it via recursive upqueries (all the way to the base tables if necessary).

For correct handling of joins, once upstream state has been filled in via recursive upqueries, a special join upquery executes within a single operator chain and excludes concurrent updates.

Data-flow transitions

Changes to a Noria program over time (e.g. the set of SQL query expressions) are handled by adapting the data-flow dynamically.

Noria first plans the transition, reusing operators and state of existing expressions where possible. It then incrementally applies these changes to the data-flow, taking care to maintain its correctness invariants. Once both steps complete, the application can use new tables and queries.

See section 5 in the paper for full details.


Noria is 45kloc of Rust and supports both single server and clustered usage. The prototype is evaluated using backend workloads generated from the production Lobsters web application. It is compared against vanilla MySQL (MariaDB), a MySQL/memcached stack, the materialized views of a commercial database, and an idealized cache-only deployment (the latter not offering any persistence, but giving an estimate of the performance when all reads are served from memory).

Here’s how Noria compares to MariaDB on Lobsters, where “Noria achieves both good performance and natural, robust queries.”

Noria’s space overhead is around 3x the base table size for Lobsters.

The rest of the comparisons are done with single server setups and a subset of Lobsters. For read-heavy workloads Noria outperforms all other systems except for the pure memcached at 100-200K requests/sec. With a mixed read-write workload Noria again beats everything except for the (unrealistic) pure memcached solution.

See section 8.2 for an interesting comparison of Noria with DBToaster as well.

Compared to a Differential Dataflow implementation based on Naiad and a 95% read Lobsters subset workload, Noria scales competitively and starts to show advantage from 4 machines onwards.

To achieve truly large scale Noria can shard large base tables and operators with large state across machines. “Efficient resharding and partitioning the data-flow to minimize network transfers are important future work for Noria…

So let’s return to the question we started with, what kind of thing is Noria? In the authors’ own words:

Noria is a web application backend that delivers high performance while allowing for simplified application logic. Partially-stateful data-flow is essential to achieving this goal: it allows fast reads, restricts Noria’s memory footprint to state that is actually used, and enables live changes to the data-flow.

Noria is available at<

RobinHood: tail latency aware caching – dynamic reallocation from cache-rich to cache-poor

October 26, 2018

RobinHood: tail latency aware caching – dynamic reallocation from cache-rich to cache-poor Berger et al., OSDI’18

It’s time to rethink everything you thought you knew about caching! My mental model goes something like this: we have a set of items that probably follow a power-law of popularity.

We have a certain finite cache capacity, and we use it to cache the most frequently requested items, speeding up request processing.

Now, there’s a long tail of less frequently requested items, and if we request one of these that’s not in the cache the request is going to take longer (higher latency). But it makes no sense whatsoever to try and improve the latency for these requests by ‘shifting our cache to the right.’

Hence the received wisdom that unless the full working set fits entirely in the cache, then a caching layer doesn’t address tail latency.

So far we’ve been talking about one uniform cache. But in a typical web application one incoming request might fan out to many back-end service requests processed in parallel. The OneRF page rendering framework at Microsoft (which serves, and among others) relies on more than 20 backend systems for example.



The cache is shared across these back-end requests, either with a static allocation per back-end that has been empirically tuned, or perhaps with dynamic allocation so that more popular back-ends get a bigger share of the cache.

The thing about this common pattern is that we need to wait for all of these back-end requests to complete before returning to the user. So improving the average latency of these requests doesn’t help us one little bit.

Since each request must wait for all of its queries to complete, the overall request latency is defined to be the latency of the request’s slowest query. Even if almost all backends have low tail latencies, the tail latency of the maximum of several queries could be high.

(See ‘The Tail at Scale’).

The user can easily see P99 latency or greater.

Techniques to mitigate tail latencies include making redundant requests, clever use of scheduling, auto-scaling and capacity provisioning, and approximate computing. Robin Hood takes a different (complementary) approach: use the cache to improve tail latency!

Robin Hood doesn’t necessarily allocate caching resources to the most popular back-ends, instead, it allocates caching resources to the backends (currently) responsible for the highest tail latency.

…RobinHood dynamically allocates cache space to those backends responsible for high request tail latency (cache-poor) backends, while stealing space from backends that do not affect the request tail latency (cache-rich backends). In doing so, Robin Hood makes compromises that may seem counter-intuitive (e.g., significantly increasing the tail latencies of certain backends).

If you’re still not yet a believer that caching can help with tail latencies, the evaluation results should do the trick. RobinHood is evaluated with production traces from a 50-server cluster with 20 different backend systems. It’s able to address tail latency even when working sets are much larger than the cache size.

In the presence of load spikes, RobinHood meets a 150ms P99 goal 99.7% of the time, whereas the next best policy meets this goal only 70% of the time.

Look at that beautiful blue line!

When RobinHood allocates extra cache space to a backend experience high tail latency, the hit ratio for that backend typically improves. We get a double benefit:

  • Since backend query latency is highly variable in practice, decreasing the number of queries to a backend will decrease the number of high-latency queries observed, improving the P99 request latency.
  • The backend system will see fewer requests. As we’ve studied before on The Morning Paper, small reductions in resource congestion can have an outsized impact on backend latency once a system has started degrading.

Caching challenges

Why can’t we just figure out which backends contribute the most to tail latency and just statically assign more cache space to them? Because the latencies of different backends tends to vary wildly over time: they are complex distributed systems in their own right. The backends are often shared across several customers too (either within the company, or perhaps you’re calling an external service). So the changing demands from other consumers can impact the latency you see.

Most existing cache systems implicitly assume that latency is balanced. They focus on optimizing cache-centric metrics (e.g., hit ratio), which can be a poor representation of overall performance if latency is imbalanced.

Query latency is not correlated with query popularity, but instead reflects a more holistic state of the backed system at some point in time.

An analysis of OneRF traces over a 24 hour period shows that the seventh most queried backend receives only about 0.06x as many queries as the most queried backend, but has 3x the query latency. Yet shared caching systems inherently favour backends with higher query rates (they have more shots at getting something in the cache).

The RobinHood caching system

RobinHood operates in 5 second time windows, repeatedly taxing every backend by reclaiming 1% of its cache space and redistributing the wealth to cache-poor backends. Within each window RobinHood tracks the latency of each request, and chooses a small interval (P98.5 to P99.5) around P99 to focus on, since the goal is to minimise the P99 latency. For each request that falls within this interval, RobinHood tracks the ID of the backend corresponding to the slowest query in the request. At the end of the window RobinHood calculates the request blocking count (RBC) of each backend – the number of times it was responsible for the slowest query.

Backends with a high RBC are frequently the bottleneck in slow requests. RobinHood thus considers a backend’s RBC as a measure of how cache-poor it is, and distributes the pooled tax to each backend in proportion to its RBC.

RBC neatly encapsulates the dual considerations of how likely a backend is to have high latency, and how many times that backend is queried during request processing.

Since some backends are slow to make use of the additional cache space (e.g., if their hit rations are already high). RobinHood monitors the gap between the allocated and used cache capacity for each backend, and temporarily ignores the RBC of any backend with more than a 30% gap.

When load balancing across a set of servers RobinHood makes allocation decisions locally on each server. To avoid divergence of cache allocations over time, RobinHood controllers exchange RBC data. With a time window of 5 seconds, RobinHood caches converge to the average allocation within about 30 minutes.

The RobinHood implementation uses off-the-shelf memcached instances to form the caching layer in each application server. A lightweight cache controller at each node implements the RobinHood algorithm and issues resize commands to the local cache partitions. A centralised RBC server is used for exchange of RBC information. RBC components store only soft state (aggregated RBC for the last one million requests, in a ring buffer), so can quickly recover after a crash or restart.

Key evaluation results

The RobinHood evaluation is based on detailed statistics of production traffic in the OneRF system for several days in 2018. The dataset describes queries to more than 40 distinct backend systems. RobinHood is compared against the existing OneRF policy, the policy from Facebook’s TAO, and three research systems Cliffhanger, FAIR, and LAMA. Here are the key results:

  • RobinHood brings SLO violations down to 0.3%, compared to 30% SLO violations under the next best policy.
  • For quickly increasing backend load imbalances, RobinHood maintains SLO violations below 1.5%, compared to 38% SLO violations under the next best policy.
  • Under simultaneous latency spikes, RobinHood maintains less than 5% SLO violations, while other policies do significantly worse.
  • Compared to the maximum allocation for each backend under RobinHood, even a perfectly clairvoyant static allocation would need 73% more cache space.
  • RobinHood introduces negligible overhead on network, CPU, and memory usage.

Our evaluation shows that RobinHood can reduce SLO violations from 30% to 0.3% for highly variable workloads such an OneRF. RobinHood is also lightweight, scalable, and can be deployed on top of an off-the-shelf software stack… RobinHood shows that, contrary to popular belief, a properly designed caching layer can be used to reduce higher percentiles of request latency.