## Debugging distributed systems with why-across-time provenance

Debugging distributed systems with why-across-time provenance Whittaker et al., *SoCC’18*

This value is 17 here, and it shouldn’t be. Why did the get request return 17?

Sometimes the simplest questions can be the hardest to answer. As the opening sentence of this paper states:

Debugging distributed systems is hard.

The kind of why questions we’re interested in for this paper are questions of *provenance*. What are the *causes* of this output? Provenance has been studied in the context of relational databases and dataflow systems, but here we’re interested in general distributed systems. (Strictly, those where the behaviour of each node can be modelled by a deterministic state machine: non-deterministic behaviour is left to future work).

### Why why-provenance doesn’t work

Relational databases have why-provenance, which sounds on the surface exactly like what we’re looking for.

Given a relational database, a query issued against the database, and a tuple in the output of the query, why-provenance explains why the output tuple was produced. That is, why -provenance produces the input tuples that, if passed through the relational operators of the query, would produce the output tuple in question.

One reason that won’t work in our distributed systems setting is that the state of the system is not relational, and the operations can be much more complex and arbitrary than the well-defined set of relational operators why-provenance works with.

There’s a second, deeper reason why why-provenance doesn’t work here as well:

Why-provenance makes the critical assumption that the underlying relational database is static. It cannot handle the time-varying nature of stateful distributed systems.

### Why causal history buries the cause

We do have one tool in our distributed systems toolbox that can handle this issue of the state changing over time. Indeed, was designed explicitly to handle *potential* causes: the happens-before relationship and the notion of causality. The difficulty here is that the causal history of an event is a crude over-approximation, encompassing everything that was seen before. It tells us which events *could* potentially have contributed to the cause, but not which ones actually did. That can be a lot of extraneous noise to deal with when trying to debug a distributed system.

*Wat* should we do instead?

… causality lacks a notion of data dependence, and data provenance lacks a notion of time. In this paper, we present wat-provenance (why-across-time provenance): a novel form of data provenance that unifies ideas from the two. Wat-provenance generalizes the why-provenance from the domain of relational queries issued against a static database to the domain of arbitrary time-varying state machines in a distributed system.

Consider a server in a distributed system as a deterministic state machine that repeatedly receives requests, updates its state accordingly, and sends replies. Wat-provenance is defined in terms of the *traces* of such a system.

- The
*trace*T of a system is the ordered sequence of inputs received by the state machine - A
*subtrace*T’ of T is an order-preserving sub-sequence of T (but the events in the sub-sequence don’t have to be contiguous). - Given a subtrace T’ of T, then a
*supertrace*of T’ is a substrace of T containing every element of T’.

For example:

Wat-provenance aims to formalize an intuitive notion of

whythe state machine M produces outputowhen given inputi.

Since the state machines are deterministic, we can start with the notion that the cause of some output *o* must be contained in a subtrace of the input. E.g. if we have a KVS node with trace `T = set(x,1);set(y,2)`

and a request *i* = `get(x)`

then the subtrace T’ =`set(x,1)`

is sufficient to explain the output 1. Such a subtrace is called a *witness* of *o*.

In this particular example, both T and T’ are witnesses of *o*, but note that T contains some additional inputs (`set(y,2)`

) that are not needed to explain the output. So we’d like a *minimal* subtrace.

If we’re not careful about what we leave out though, we can end up creating false witnesses. Consider a server maintaining a set of boolean-valued variables all initialised to false. We have a trace `T = set(a); set(b); set(c)`

and a request *i* producing output `(a && !b) || c`

. The reason this outputs `true`

is that `c`

is true. So `set(c)`

is a genuine witness. But if we considered just the subtrace `set(a)`

(so we’re in the state a= true, b = c = false) then `set(a)`

would also appear to be a witness, even though it isn’t. To avoid false witnesses, we add the rule that *every supertrace of T’ in T must also be a witness of o*. In such an instance we say that the witness T’ of *o* is *closed under supertrace* in T. Since the supertrace `set(a);set(b);`

is not a witness of *o*, we exclude the subtrace `set(a)`

through this rule.

Sticking with the boolean server, suppose we have a simpler scenario where *i* produces an output *o* equal to `(a && d) || (b && c)`

. In this case a trace `T = set(a);set(b);set(c);set(d)`

contains *two* subtraces that can both be a cause of the `true`

output: `set(a);set(d)`

and `set(b);set(c)`

. Thus we notice that the cause of an output can be a *set* of witnesses.

And so we arrive at:

[wat-provenance] consists of every witness T’ of

osuch that (1) T’ is closed under supertrace in T, and (2) no proper subtrace of T’ is also a witness ofothat satisfies (1).

When computing wat-provenance, it’s important that we first compute the set of witnesses closed under supertrace in T, and only then remove the non-minimal elements. If you try to remove non-minimal elements first it’s possible to over-prune. (See the worked example in §3.5 of the paper).

Wat-provenance refines causality, and subsumes why-provenance.

### Black box, simple interface

Automatically computing the wat-provenance for an arbitrary distributed system component, which we dub a black box, is often intractable and sometimes impossible…. (But) … we can take advantage of the fact that many real-world black boxes are far from arbitrary. Many black boxes have complex implementations but are designed with very simple interfaces.

Instead of trying to infer wat-provenance by inspecting an implementation, we can *specify* it directly from an interface. A *wat-provenance specification* is a function that given a trace T and a request *i*, returns the wat-provenance Wat(M, T, i) for a black box modeled as state machine M.

For example, for Redis (~50K LOC) the wat-provenance specification for a get request to a key just needs to consider the most recent set to that key, and all subsequent modifying operations (e.g. `incr`

, `derc`

). It takes surprisingly few lines of code to write these wat-provenance specifications for interesting subsets of real-world systems:

Not every system is so amenable though. The authors give as an example a state machine implementing some machine learning model whereby clients can either submit training data (for online model updating) or submit an input for classification. Here a wat-provenance specification would likely be just as complex as the system itself.

### Watermelon

Watermelon is a prototype debugging framework using wat-provenance and wat-provenance specifications. It acts as a proxy intercepting all communication with the service and recording traces in a relational database. When messages are sent between Watermelon’d processes, Watermelon can connect the send and the receive such that multiple black boxes can be integrated into the same Watermelon system.

With the proxy shim in place, a developer can write wat-provenance specifications either in SQL or in Python.

To find the cause of a particular black box output, we invoke the black box’s wat-provenance specification. The specification returns the set of witnesses that cause the output. Then, we can trace a request in a witness back to the black box that sent it and repeat the process, invoking the sender’s wat-provenance specification to get a new set of witnesses.

### Evaluation

The evaluation is currently the weakest part of the paper. Wat-provenance specifications are written for 20 real-world APIs across Redis, the POSIX file system, Amazon S3, and Zookeeper (see table 1 above). Debugging ease is then qualitatively evaluated against using printf statements and against SPADE (a framework for collecting provenance information from a variety of sources including OS audit logs, network artifacts, LLVM instrumentation, and so on). This leads to the following table:

What it would be lovely to see is an evaluation of using wat-provenance to debug real-world problems. Future work perhaps?

### Wat-logs

A final thought from me is that rather than a proxy, it’s pretty common (and not unreasonable a request if not) for a server process to log incoming requests and their responses. In which case given a suitable logging system it ought to be possible to ask wat-provenance questions directly of the logs. That would fit quite well with existing troubleshooting workflows that often start with something that doesn’t look quite right in the logs and explore why that is from there.

## ApproxJoin: approximate distributed joins

ApproxJoin: approximate distributed joins Le Quoc et al., *SoCC’18*

GitHub: https://ApproxJoin.github.io

The join is a fundamental data processing operation and has been heavily optimised in relational databases. When you’re working with large volumes of unstructured data though, say with a data processing framework such as Flink or Spark, joins become distributed and much more expensive. One of the reasons for this is the amount of data that needs to be moved over the network. In many use cases, *approximate* results would be acceptable, and as we’ve seen before, likely much faster and cheaper to compute. Approximate computing with joins is tricky though: if you sample datasets *before* the join you reduce data movement, but also sacrifice up to an order of magnitude in accuracy; if you sample results *after* the join you don’t save on any data movement and the process is slow.

This paper introduces an approximate distributed join technique, *ApproxJoin*, which is able to sample before data shuffling without loss of end result accuracy. Compared to unmodified Spark joins with the same sampling ratio it achieves a speedup of 9x while reducing the shuffled data volume by 82x.

The following charts show ApproxJoin’s latency and accuracy characteristics compared to Spark sampling before the join (ApproxJoin has much better accuracy and similar latency) and to Spark sampling after the join (ApproxJoin has similar accuracy and much lower latency).

ApproxJoin works in two phases. First it uses one of the oldest tricks in the book, a Bloom filter, to eliminate redundant data shuffling. A nice twist here is that ApproxJoin directly supports multi-way joins so we don’t need to chain a series of pairwise joins together. In the second phase ApproxJoin uses *stratified* sampling to produce an answer approximating the result of an aggregation over the complete join result.

At a high level, ApproxJoin makes use of a combination of sketching and sampling to select a subset of input datasets based on the user-specified query budget. Thereafter, ApproxJoin aggregates over the subset of input data.

User queries must contain an algebraic aggregation function (e.g. SUM, AVG, COUNT, STDEV), and as is usual for approximate compute frameworks, can specify either a time bound or an error bound for the query.

### High level overview

Overall, the ApproxJoin system looks like this:

Filtering using a multi-way Bloom filter happens in parallel at each node storing partitions of the input, and simultaneously across all input tables.

The sampling phase makes use of stratified sampling *within* the join process: datasets are sampled while the cross product is being computed. *Stratified* sampling in this case means that tuples with distinct join keys are sampled independently (with simple random sampling). Thus the final sample will contain all join keys— even those with few data items. A *cost function* is used to compute an optimal sampling rate according to the query budget. There’s a one-off upfront cost to compute the standard deviation for a join key (‘stratum’), which is stored and reused in subsequent queries. It’s not clear whether or not this cost is included in the evaluation (so best guess it isn’t 😉 ).

### Filtering

The filtering step is very straightforward. First a Bloom filter is created for each input dataset: each worker with a partition of the dataset creates a local bloom filter, and then these are combined using OR. Once we have the merged Bloom filter for each input dataset, we can simply combine the filters across datasets using AND. (Think about hashing a given key, only if the corresponding bits are set in each of the input Bloom filters can that key possibly exist in all inputs). In the implementation, Bloom filters are merged using a *treeReduce* scheme to prevent the Spark driver becoming a bottleneck.

Clearly, the greater the overlap between the input datasets the more data we need to shuffle, and hence the less benefit the Bloom-filter based join can add.

After the filtering stage, it may be that the overlap fraction between the datasets is small enough that full join can now be performed within the latency requirements of the user. If this is not the case, we proceed to the approximation…

### Determining the cost function

ApproxJoin makes use of latency and error-bound cost functions to convert the join requirements specified by a user into sampling rates.

For the latency cost function we need to combine the cost of filtering and transferring the data join items, with the cost of computing the cross products. There’s no need to estimate the cost of filtering and transferring— we have to do this regardless so we can just time it. The remaining latency budget is simply then the target time specified by the user, minus the time we spent in the first phase! The cost function for the cross product phase is simply a weighting of the number of cross products we need to do. The weighting (scale) factor depends on the computation capacity of the compute cluster, which is profiled once offline to calibrate. (That is, once in the lifetime of the cluster, not once per query).

If a user specified an error bound, we need to calculate how many samples to take to satisfy the requirement. For stratum *i*, the number of samples turns out to be governed by the following equation at 95% confidence level:

The standard deviation of the stratum is computed and stored on the first execution of a query and subsequently reused.

### Sampling

To preserve the statistical properties of the exact join output, we combine our technique with stratified sampling. Stratified sampling ensures that no join key is overlooked; for each join key, we perform simple random selection over data items independently. This method selects data items fairly from different join keys. The (preceding) filtering stage guarantees that this selection is executed only from data items participating in the join.

Random sampling on data items having the same join key is equivalent to perform edge sampling on a complete bipartite graph modelling the relation.

To include an edge in the sample, ApproxJoin randomly selects one endpoint vertex from each side, and then yields the edge connecting them. For a sample of size *b* this process is repeated *b* times.

In a distributed settings, data items are distributed to worker nodes based on the join keys (e.g. using a hash-based partitioner), and each worker performs the sampling process in parallel to sample the join output and execute the query.

### Query execution

After sampling, each node executes the input query on the sample to produce a partial query result. These results are merged at the master node, which also produces error bound estimations.

### Estimating errors

The sampling algorithm can produce an output with duplicate edges. This acts as a random sampling with replacement, and the Central Limit Theorem can be used to estimate the error bounds. An alternative error estimation mechanism is also described in which duplicate edges are not allowed in the sample, and a Horvitz-Thompson estimator can be used. I couldn’t determine which of these two mechanisms is actually used for the results reported in the evaluation.

### Evaluation

Focusing just on the filtering stage to start with, we see that with two-way joins ApproxJoin is 6.1x faster than a native Spark join, and shuffles 12x less data. The gains are even better with 3-way and 4-way joins:

The benefits do depend on the overlap percentage though. The figures above were all with overlap fractions below 1%, and the ApproxJoin advantage disappears by the time we get to around 8%.

Turning to the sampling stage, the following figure compares scalability, latency, and accuracy of ApproxJoin sampling vs Spark joins. Here ApproxJoin can deliver order of magnitude speed-ups.

ApproxJoin is also evaluated end-to-end on two real-world datasets: CAIDA network traces, and the Netflix prize dataset. For the network trace dataset ApproxJoin is 1.5-1.7x faster, and reduces shuffled data by 300x. For the Netflix dataset ApproxJoin is 1.27-2x faster, and shuffles 1.7-3x less data.

By performing sampling during the join operation, we achieve low latency as well as high accuracy… Our evaluation shows that ApproxJoin significantly reduces query response time as well as the data shuffled through the network, without losing the accuracy of the query results compared with the state-of-the-art systems.

## ASAP: fast, approximate graph pattern mining at scale

ASAP: fast, approximate graph pattern mining at scale Iyer et al., *OSDI’18*

I have a real soft spot for approximate computations. In general, we waste a lot of resources on overly accurate analyses when understanding the trends and / or the neighbourhood is quite good enough (do you really need to know it’s 78.763895% vs 78 ± 1%?). You can always drill in with more accuracy if the approximate results hint at something interesting or unexpected.

Approximate analytics is an area that has gathered attention in big data analytics, where the goal is to let the user trade-off accuracy for much faster results.

(See e.g. ApproxHadoop which we covered on The Morning Paper a while back).

In the realm of graph processing, *graph pattern mining* algorithms, which discover structural patterns in a graph, can reveal very interesting things in our data but struggle to scale to larger graphs. This is in contrast to *graph analysis* algorithms such as PageRank which typically compute properties of a graph using neighbourhood information.

Today, a deluge of graph processing frameworks exist, both in academia and open-source… a vast majority of the existing graph processing frameworks however have focused on graph analysis algorithms.

Arabesque explicitly targets the graph mining problem using distributed processing, but even then large graphs (1 billion edges) can take hours (e.g. 10 hours) to mine.

In this paper, we present

A Swift Approximate Pattern-miner(ASAP), a system that enables bothfastandscalablepattern mining. ASAP is motivated by one key observation: in many pattern mining tasks, it is often not necessary to output the exact answer. For instance, in frequent sub-graph mining (FSM) the task is to find the frequency of subgraphs with an end goal of ordering them by occurrences… our conversations with a social network firm revealed that their application for social graph similarity uses a count of similar graphlets. Another company’s fraud detection system similarly counts the frequency of pattern occurrences. In both cases, an approximate count is good enough.

ASAP outperforms Arabesque by up to 77x on the LiveJournal graph while incurring less than 5% error. It can scale to graphs with billions of edges (e.g. Twitter at 1.5B edges) and produce results in minutes rather than hours.

One major challenge is that the approximation methods used in big data analytics don’t carry over to graph pattern mining. There, the core idea is to sample the input, run the analysis over the sample, and extrapolate based on an assumption that the *sample size relates to the error in the output*. If we take this same idea and apply it to a foundational graph mining problem, counting triangles, it quickly becomes apparent that there is no clear relationship we can rely on between the size of the sample and the error or the speedup.

We conclude that the existing approximation approach of running the exact algorithm on one or more samples of the input is incompatible with graph pattern mining. Thus, in this paper, we propose a new approach.

### Graph pattern mining and sampling

Pattern mining is the problem of finding instances of a given pattern e.g. triangles, 3-chains, cliques, motifs, in a graph. (A *motif* query looks for patterns involving a certain number of vertices, e.g. a 3-motif query looks for triangles and 3-chains).

A common approach is to iterate over all possible embeddings in the graph starting with a vertex or edge, filtering out those that cannot possibly match. The remaining *candidate* embeddings are then expanded by adding one more vertex/edge and the process repeats.

The obvious challenge in graph pattern mining, as opposed to graph analysis, is the exponentially large candidate set that needs to be checked.

*Neighbourhood sampling* has been proposed in the context of a specific graph pattern, triangle counting. The central idea is to sample one edge from an edge stream, and the gradually add more edges until either they form a triangle or it becomes impossible to form the pattern. Given a graph with *m* edges, it proceeds as follows. To perform one trial:

- Uniformly sample one edge () from the graph, with sampling probability 1/m.
- Uniformly sample one of l0’s adjacent edges () from the graph. (
*Note that neighbourhood sampling depends on the ordering of edges in the stream, and appears after here*). If has*c*neighbours appearing in the stream after it, then the sampling probability for is 1/c. - Find an edge appearing in the stream after that completes the triangle, if possible. If we do find such an edge, the sampling probability is .

If a trial successfully samples a triangle, we can estimate triangles in the full graph. After conduction *r* trials we have an approximate result given by .

Here’s an example using a graph with five nodes:

### Introducing ASAP

ASAP generalises the neighbouring sampling approach outlined above to work with a broader set of patterns. It comes with a standard library of implementations for common patterns, and users can also write their own. It is designed for a distributed setting and also includes mining over property graphs (which requires predicate matching).

Users of ASAP can explicitly trade off compute time and accuracy, specifying either a time budget (best result you can do in this time) or an error budget (give me a result as quickly as you can, with at least this accuracy). Before running the algorithm, ASAP returns an estimate of the time or error bounds it can achieve. If the user approves the algorithm is run and the presented result includes a count, confidence level, and actual run time. Users may then optionally ask to output the actual embeddings of the found pattern.

To find out how many estimators it needs to run for the given bounds ASAP builds an *error-latency profile* (ELP). Building an ELP is fast and can be done online. For the error profile, the process begins with uniform sampling to reduce the graph to a size where nearly 100% accurate estimates can be produced. A loose upper bound for the number of estimators required is then produced using Chernoff bounds and scaled to the larger graph (see section 5.2 for details and for how the time profile is computed). For evolving graphs, the ELP algorithm can be re-run after a certain number of changes to the graph (e.g., when 10% of edges have changed).

### Approximate pattern mining in ASAP

ASAP generalises neighbourhood sampling to a two phase process: a *sampling* phase followed by a *closing* phase.

In the sampling phase, we select an edge in one of two ways by treating the graph as an ordered stream of edges: (a) sample an edge randomly; (b) sample an edge that is adjacent to any previously sampled edges, from the remainder of the stream. In the closing phase, we wait for one or more specific edges to complete the pattern.

(So in the triangle counting example above, sampling and form the sampling phase, and waiting for is the closing phase).

The probability of sampling a pattern can be computed from these two phases….

For a k-node pattern, the probability of detecting a pattern *p* depends on *k* and the different ways to sample using the neighbourhood sampling technique. There are four different cases for , and up to three different types of sampling possible within each case. The probability formulas for all of these are enumerated in section 4.1.1. E.g., when k=2 the probability is 1/mc as we saw previously.

When applied in a distributed setting ASAP parallelises the sampling process and then combines the outputs (MapReduce). Vertices in the graph are partitioned across machines, and several copies of the estimator task are scheduled on each machine. The graphs edges and vertices need to seen in the same order on each machine, but any order will do (‘*thus, ASAP uses a random ordering which is fast and requires no pre-processing of the graph*’). The output from each machine is a partial count, making the reduce step a trivial sum operation.

For predicate matching on property graphs ASAP supports both ‘at least one’ and ‘all’ predicates on the pattern’s vertices and edges. For ‘all’ predicates, ASAP introduces a filtering phase before the execution of the pattern mining task. For ‘at least one’ predicates a first pass produces a matching list of edges matching the predicate. Then in the second pass when running the sampling phase of the algorithm every estimator picks its first edge randomly from within the matching list.

There’s a performance optimisation for motif queries (finding all patterns with a certain number of vertices) whereby common building blocks from k-node patterns can be reused.

If a user first explores the graph with a low accuracy answer (e.g. using 1 million estimators), and then wants to drill in to higher accuracy (requiring e.g. 3 million estimators) then ASAP only needs to launch another 2 million estimators and can reuse the first 1 million.

### Key evaluation results

ASAP is evaluated using a number of graphs:

Compared to Arabesque, ASAP shows up to 77x performance improvement with a 5% loss of accuracy when counting 3-motifs and 4-motifs on modest sized graphs:

On larger graphs ASAPs advantage extends up to 258x.

The final part of the evaluation evaluates mining for 5-motifs (21 individual patterns), based on conversations with industry partners who use similar patterns in their production systems. Results for two of those patterns are shown in the table below, and demonstrate that ASAP can handle them easily (using a cluster of 16 machines each with 16 cores).

Our evaluation shows that not only does ASAP outperform state-of-the-art exact solutions by more than an order of magnitude, but it also scales to large graphs while being low on resource demands.

## Sharding the shards: managing datastore locality at scale with Akkio

Sharding the shards: managing datastore locality at scale with Akkio Annamalai et al., *OSDI’18*

In Harry Potter, the Accio Summoning Charm summons an object to the caster of the spell, sometimes transporting it over a significant distance. In Facebook, Akkio summons data to a datacenter with the goal of improving data access locality for clients. Central to Akkio is the notion of microshards (μ-shards), units of data much smaller than a typical shard. μ-shards are defined by the client application, and should exhibit strong access locality (i.e., the application tends to read/write the data in a μ-shard together in a small window of time). Sitting as a layer between client applications and underlying datastores, Akkio has been in production at Facebook since 2014, where it manages around 100PB of data.

Measurements from our production environment show that Akkio reduces latencies by up to 50%, cross-datacenter traffic by up to 50%, and storage footprint by up to 40% compared to reasonable alternatives.

Akkio can support trillions of μ-shards and many 10s of millions of data access requests per second.

### Motivation

Our work in this area was initially motivated by our aim to reduce service response times and resource usage in our cloud environment which operates globally and at scale… Managing data access locality in geo-distributed systems is important because doing so can significantly improve data access latencies, given that intra-datacenter communication latencies are two orders of magnitude smaller than cross-datacenter communication latencies: e.g. 1ms vs 100ms.

At scale, requests issued on behalf of one end-user are likely to be processed by multiple distinct datacenters over time. For example, the user may travel from one location to another, or service workload may be shifted from one location to another.

One way to get data locality for accesses is to fully replicate all data across all datacenters. This gets expensive very quickly (the authors estimated $2 million per 100PBs per month for storage, plus costly WAN cross-datacenter bandwidth usage on top).

We could cap those costs by setting a caching budget, but for many of the workloads important to Facebook distributed caching is ineffective. For acceptable hit rates we still need to dedicate large amounts of hardware resources, and these workloads have low read-write ratios. Beyond the cost and latency implications there’s one further issue: “*many of the datasets accessed by our services need strong consistency… it is notable that the widely popular distributed caching systems that are scalable, such as Memcached or Redis, do not offer strong consistency. And for good reason.*”

### Why not just use shards?

Don’t datastores already partition their data using shards though (e.g. through key ranges or hashing)? There are two issues with the existing sharding mechanism from an Akkio perspective:

- Shard sizes are set by administrators to balance shard overhead, load balancing, and failure recovery. They tend to be on the order of a few tens of gigabytes.
- Shards are primarily a datastore concern, used as the unit for replication, failure recovery, and load balancing.

At Facebook, because the working set size of accessed data tends to be less than 1MB, migrating an entire shard (1-10GB) would be ineffective.

### μ-shards

Making shards smaller (and thus having many more of them) interferes with the datastore operations, and moving shards as a unit is too heavyweight. So Akkio introduces a new layer on top of shards, μ-shards.

μ-shards are more than just smaller shards, a key difference is that the *application itself* assigns data to μ-shards with high expectation of access locality. μ-shard migration has an overhead an order of magnitude lower than shard migration and its utility is far higher.

Each μ-shard is defined to contain related data that exhibits some degree of access locality with client applications. It is the application that determines which data is assigned to which μ-shard. At Facebook, μ-shard sizes typically vary from a few hundred bytes to a few megabytes in size, and a μ-shard (typically) contains multiple key-value pairs or database table rows. Each μ-shard is assigned (by Akkio) to a unique shard in that a μ-shard never spans multiple shards.

Examples of applications that fit well with μ-shards include:

### The design of Akkio

Akkio is implemented as an layer inbetween client applications and an underlying datastore. The paper focuses on Akkio’s integration with Facebook’s ZippyDB, but it’s design enable use with multiple backend datastores. The Akkio client library is embedded within the database client library, exposing an application-managed μ-shard id on all requests. It is up to the application to establish it’s own μ-shard naming scheme.

The Akkio *location service* maintains a location database that the client can use to map μ-shards to storage servers so that it knows where to direct requests. An *access counter service* is used to track all accesses, so that μ-shard placement and migration decisions can be made. It is the responsibility of the *data placement service* (DPS) to decide where to place each μ-shard.

The Akkio Client Library asynchronously notifies the DPS that a μ-shard placement may be suboptimal whenever a data access request needs to be directed to a remote datacenter. The DPS re-evaluates the placement of a μ-shard only when it receives such a notification.

Location information is configured with an eventually consistent replica at every datacenter: the dataset is relatively small, on the order of few hundred GB. If a client reads a stale location and gets a miss it will query a second time requesting that the cache by bypassed (and subsequently updated). Storage required for access counters is also low, less than 200GB per datacenter at Facebook.

All the main action happens in the data placement service, which is tailored for each backend datastore:

There is one DPS per Akkio-supported backend datastore system that is shared among all of the application services using instances of the same datastore system. It is implemented as a distributed service with a presence in every datacenter.

The `createUShard()`

operation is called when a new μ-shard is being created and DPS must decide where to initially place it. `evaluatePlacement()`

is invoked asynchronously by the Akkio client library whenever a data access request needs to be directed to a remote datacenter. DPS first checks its policies to see if initiating a migration is permissible (details of the policy mechanism are mostly out of scope for this paper), and whether a migration is already in process. If migration is permissible it determines the optimal placement for the μ-shard and starts the migration.

### μ-shard placement

To understand placement in the context of ZippyDB, we first need to look briefly at how ZippyDB manages shards and replicas. ZippyDB partitions data horizontally, with each partition assigned to a different shard. Shards can be configured to have multiple replicas which form a *shard replica set* and participate in a shard-specific Paxos group. The replication configuration of a shard identifies not only the number of replicas, but also how they are to be distributed over datacenters, clusters, and racks, and the degree of consistency required. Replica sets that all have the same configuration from a *replica set collection*.

When running on ZippyDB, Akkio places μ-shards on, and migrates μ-shards between different such replica set collections.

ZippyDBs *Shard Manager* assigns each shard replica to a specific ZippyDB server conforming to the policy. The assignments are registered in a directory service.

For initial placement of a μ-shard, the typical strategy is to select a replica set collection with primary replica local to the requesting client and secondary replica(s) in one of the more lightly loaded datacenters. For migration a scoring system is used to select the target replica set collection from those available. First datacenters are scored based on the number of times the μ-shard was accessed from that datacenter over the last X-days, with stronger weighting for more recent accesses. Any ties are then broken by scoring datacenters based on the amount of available resources within them.

When working on top of ZippyDB, Akkio makes use of ZippyDB’s access control lists and transactions to implement migrations. The move algorithm looks like this:

On top of Facebook’s Cassandra variant (which doesn’t support ACLs, though the open-source Cassandra does), a different migration strategy is needed:

Now, that’s all well and good but here’s the part I still don’t understand. The underlying sharding system of the datastore controls placement. Data items are assigned to shards by some partitioning scheme (e.g. by range partitioning, or by hash partitioning). So when we move a μ-shard from one shard (replica collection set) to another, what is really happening with respect to those keys? Are they being finagled in the client so that e.g. they end up in the right range? (Have the right hash value????!). I feel like I must be missing something really obvious here, but I can’t see it described in the paper. Akkio does allow for some minimal changes to the underlying datastore to accommodate μ-shard migrations, but these changes are minimal:

### Akkio in production

The evaluation section contains multiple examples of Akkio in use at Facebook, which we don’t have space to cover in detail. The before-and-after comparisons (in live production deployments) make compelling reading though:

- ViewState data stores a history of content previously shown to a user. Replica set collections are configured with two replicas in one local datacenter, and third in a nearby datacenter. Akkio is configured to migrate μ-shards aggressively.

Originally, ViewState data was fully replicated across six datacenters. Using Akkio with the setup described above let to a 40% smaller storage footprint, a 50% reduction of cross-datacenter traffic, and about a 60% reduction in read-and write latencies compared to the original non-Akkio setup.

- AccessState stores information about user actions taken in response to displayed content. Akkio once more decreased storage footprint by 40%, and cross-datacenter traffic by 50%. Read latency was unaffected, but write latency reduced by 60%.
- For Instagram ‘Connection-Info’ Akkio enabled Instagram to expand into a second continent keeping both read and write latencies lower than 50ms. “
*This service would not have expanded into the second continent without Akkio*.” - For the Instagram Direct messaging application, Akkio reduced end-to-end message delivery latency by 90ms at P90 and 150ms at P99. (The performance boost in turn delivered improvements in key user engagement metrics).

Up next: migrating more applications to Akkio, and supporting more datastore systems on the backend (including MySQL). In addition…

… work has started using Akkio (i) to migrate data between hot and cold storage, and (ii) to migrate data more gracefully onto newly created shards when resharding is required to accommodate (many) new nodes.

## The FuzzyLog: a partially ordered shared log

The FuzzyLog: a partially ordered shared log Lockerman et al., *OSDI’18*

If you want to build a distributed system then having a distributed shared log as an abstraction to build upon — one that gives you an agreed upon total order for all events — is such a big help that it’s practically cheating! (See the “Can’t we all just agree” mini-series of posts for some of the background on consensus).

Services built over a shared log are simple, compact layers that map a high-level API to append/read operations on the shared log, which acts as the source of strong consistency, durability, failure atomicity, and transactional isolation. For example, a shared log version of ZooKeeper uses 1K lines of code, an order of magnitude lower than the original system.

There’s a catch of course. System-wide total orders are expensive to maintain. Sometimes it may be impossible (e.g. in the event of a network partition). But perhaps we don’t always need a *total* ordering. Oftentimes for example causal consistency is strong enough. *FuzzyLog* aims to provide the simplicity of a shared log without imposing a total order: it provides *partial ordering* instead. It’s designed for a world of sharded, geo-replicated systems. The log is actually a directed acyclic graph made up of interlinked chains. A chain contains updates originating in a single geographic region. Nodes within the chains are *coloured*, with each colour representing updates to a single application level data shard.

(Note: when this paper talks about *nodes*, the authors mean nodes in the Log DAG, not system nodes (servers)).

Our evaluation shows that [applications built on top of FuzzyLog] are compact, fast, and flexible: they retain the simplicity (100s of lines of code) and strong semantics (durability and failure atomicity) of a shared log design while exploiting the partial order of the FuzzyLog for linear scalability, flexible consistency guarantees (e.g., causal+ consistency), and network partition tolerance. On a 6-node Dapple deployment [Dapple is a FuzzyLog implementation], our FuzzyLog-based ZooKeeper supports 3M/sec single-key writes, and 150K/sec atomic cross-shard renames.

### The FuzzyLog API

Let’s take a closer look at the abstraction provided by FuzzyLog through its API. FuzzyLog is designed to support partitioning state into logical data shards, with concurrent processing of updates against different shards. It is also supports geo-replication with concurrent updates across regions (even to the same logical partition).

- Each node in the DAG is tagged with one or more colours, which divide an application’s state into logical shards. Nodes tagged with a colour correspond to updates against the corresponding logical shard.
- Each colour is a set of totally ordered chains, one per region, with cross-edges between them that indicate causality. Every region has a full (but potential stale) copy of each colour.

Clients interact with their own local region via the FuzzyLog API:

Operations to a single color across regions are causally consistent. In other words, two append operations to the same color issued by clients in different regions are only ordered if the node introduced by one of them has already been seen by the client issuing the other one…. Operations within a single region are serializable. All append and sync operations issued by clients within a region execute in a manner consistent with some serial execution. This serialization order is linearizable if the operations are to a single color within the region (i.e., on a single chain).

A client uses the `sync`

call to synchronise its state. `sync`

takes a snapshot of the set of nodes currently present at the local region’s copy of a color, and ‘plays’ all new nodes since the last sync invocation. (I.e., sync is moving clients from watermark to watermark in the totally ordered chain for the given colour in the given region).

Here’s an example of sync at work:

### Building applications on top of FuzzyLog

Given the FuzzyLog abstraction, the authors give highlights of building a series of different systems on top of it. We are left to infer that this process is simple based on the very small number of lines of code required.

First up is a *LogMap*, which uses a single colour and a single region. Each server has a local in-memory copy of the map and continuously executes `sync`

to keep its local view up to date. LogMap effectively runs over a single totally ordered shared log. It is implemented in just 193 lines of code, but its reliance on total order comes at the cost of scalability, performance, and availability.

To scale within a single region, we can add sharding (colours). ShardedMap is a trivial change to LogMap, requiring just that the colour parameter be set correctly on calls to the FuzzyLog.

If we need atomic operations across shards (with blind multi-put operations that don’t return a value), the AtomicMap (201 lines of code) can do this. It supports serializable (but not linearizable or strictly serializable) appends.

For full read/write transactions, TxMap at 417 lines of code can do the business. It tracks read-sets and buffers write-sets, and plays out a commit protocol through nodes in the log. TxMap provides strict serializability.

If we want to go across regions, then CRDTMap (284 lines of code) provides causal+ consistency based on the Observed-Remove set CRDT. We can make it transactional (TxCRDTMap by changing 80 LOC).

CRDTMap sacrifices consistency even when there is not partition in the system. CAPMap (424 LOC) provides strong consistency in the absence of network partitions, and causal consistency during them.

A ZooKeeper clone supporting linear scaling across shards and atomic cross-shard renames is 1881 LOC.

### Introducing Dapple, a FuzzyLog implementation

Dapple is a distributed implementation of the FuzzyLog abstraction. It uses a collection of storage servers called chainservers (they store the per-colour chains, and also happen to use chain replication). Each chainserver stores multiple in-memory log-structured address spaces. The FuzzyLog state is partitioned across chainservers with each colour on a single partition. Persistent is achieved using the ‘lots of little batteries’ strategy (battery-backed DRAM).

If we stick to a single colour for a moment, then each region will have a single totally ordered chain. In addition to this latest copy of its own local chain, it also has potentially stale copies of the other region’s chains (shadow logs). Clients write to the local log, shadow logs are asynchronously replicated from other regions. Log appends use chain replication and are acknowledged by the tail. Snapshots requests are sent directly to the tail, and reads can be satisfied by any replica in the chain.

The FuzzyLog API supports append a node to multiple colours, which in Dapple equates to atomically appending a node to multiple logs, one log per colour:

…Dapple uses a classical total ordering protocol called Skeen’s algorithm (which is unpublished but described verbatim in other papers…) to consistently order appends… We add three fault-tolerance mechanisms —leases, fencing, and write-ahead logging— to produce a variant of Skeen’s that completes in two phases in a failure-free ‘fast’ path, but can safely recover if the origin client crashes.

Details of the protocol are given in §5.2 of the paper.

### Evaluation

Tango and vCorfu are based on shared logs using a centralised sequencer. Compared to Tango, Dapple offers near linear scaling with 0% multi-colour appends, and holds its advantage over Tango until about 10%. At 100% multi-colour appends Tango wins because we’re back in the total order situation which Tango provides more efficiently.

When using multi-shard operations with AtomicMap, scalability and absolute throughput degrade gracefully as the percentage of multi-colour appends is steadily increased:

A ZooKeeper clone, DappleZK, is implemented in 1881 lines of Rust code. It partitions a namespace across a set of servers, each acting as a Dapple client, storing a partition of the namespace in in-memory data-structures backed by a FuzzyLog colour. Each DappleZK server is responsible for an independent shard of the ZooKeeper namespace. Rename operations move files from one DappleZK shard to another in a distributed transaction. A ZooKeeper deployment maintaining its state in DRAM is included for comparison.

We include the ZooKeeper comparison for completeness; we expect the FuzzyLog single-partition case to outperform ZooKeeper largely due to the different languages used (Rust vs Java) and the different between prototype and production-quality code.

(Here the assumption is that production quality code is slower due to all the checks-and-balances – versus being faster because it has been optimised).

### The last word

The FuzzyLog abstraction— and its implementation in Dapple— extends the shared log approach to partial orders, allowing applications to scale linearly without sacrificing transactional guarantees, and switch seamlessly between these guarantees when the network partitions and heals. Crucially, applications can achieve these capabilities in hundreds of lines of code via simple, data-centric operations on the FuzzyLog, retaining the core simplicity of the shared log approach.

Moment-based quantile sketches for efficient high cardinality aggregation queries Gan et al., *VLDB’18*

Today we’re temporarily pausing our tour through some of the OSDI’18 papers in order to look at a great sketch-based data structure for quantile queries over high-cardinality aggregates.

That’s a bit of a mouthful so let’s jump straight into an example of the problem at hand. Say you have telemetry data from millions of heterogenous mobile devices running your app. Each device tracks multiple metrics such as request latency and memory usage, and is associated with dimensional metadata (categorical variables) such as application version and hardware model.

In applications such as A/B testing, exploratory data analysis, and operations monitoring, analysts perform aggregation queries to understand how specific user cohorts, device types, and feature flags are behaving.

We want to be able to ask questions like “*what’s the 99%-ile latency over the last two weeks for v8.2 of the app?*”

SELECT percentile(latency, 99) FROM requests WHERE time > date_sub(curdate(), 2 WEEK) AND app_version = "v8.2"

As well as *threshold* queries such as “*what combinations of app version and hardware platform have a 99th percentile latency exceeding 100ms?*”

SELECT app_version, hw_model, PERCENTILE(latency, 99) as p99 FROM requests GROUP BY app_version, hw_model HAVING p99 > 100

Instead of starting from raw data every time when answering this type of query, OLAP engines can reduce query time and memory usage by maintaining a *data cube* of pre-aggregated summaries for each tuple of dimension values. The ultimate query performance then depends on just how quickly we can merge those summaries to compute quantile roll-ups over the requested dimensions.

Let’s take a very simple example. Suppose I have two dimensions, letter (with values A and B), and colour (with values red and green), and I have request latency data from log messages including these attributes. Then I will have four summary sketches, one accumulating latency values for (A, red) one for (A, green), one for (B, red) and one for (B, green).

If a query wants to know the P99 latency for ‘red’ requests, we can add together the (A, red) and (B, red) sketches to get a complete sketch for red.

In this paper, we enable interactive quantile queries over high-cardinality aggregates by introducing a compact and efficiently mergeable quantile sketch and associated quantile estimation routines.

The data structure than makes all this possible is called a *moments sketch* (named after the *method of moments* statistical technique). It’s easy to construct, but a bit more difficult to interpret. It’s worth the effort though, as the evaluation shows:

- The moments sketch supports 15-50x faster query times that comparably accurate summaries on quantile aggregations
- The moments sketch gives good accuracy across a range of real-world datasets using less than 200 bytes of storage
- Integration of the moments sketch in Druid provides 7x faster quantile queries than the default quantile summary in Druid workloads.

There’s a Java implementation available at https://github.com/stanford-futuredata/msketch.

### Mergeable quantile summaries

Intuitively, a summary is mergeable if there is no accuracy cost to combining pre-computed summaries compared with constructing a summary on the raw data. Thus, mergeable summaries are algebraic aggregate functions in the data cube literature… Mergeable summaries can be incorporated naturally into a variety of distributed systems [e.g. MapReduce].

The moments sketch mergeable summary is based on the method of moments, “*a well established statistical technique for estimating the parameters of probability distributions*.” Given a sketch recording moments (we’ll look at those in just a minute), the method of moments helps us to construct a distribution with moments matching those in the sketch. There could be many such distributions meeting those criteria. The *principle of maximum entropy* is used to select the distribution that encodes the *least* additional information about the data beyond that captured by the given moment constraints (i.e., just enough to fit the facts, and no more).

### Maintaining and merging moments sketches

A moments sketch is a set of floating point values containing:

- the minimum value seen so far
- the maximum value seen so far
- the count of the number of values seen so far
*k*moments, and*k*log moments

The *k* sample moments are given by . In the sketch data structure we can just store once and maintain the unscaled values .

For long-tailed datasets where values can vary over several orders of magnitude we can recover better quantile estimates using *log-moments*.

In general, when updating a moments sketch in a streaming manner or when maintaining multiple moments sketches in a distributed setting, we cannot know a priori whether standard moments or log moments are more appropriate for the given data set. Therefore, our default approach is to store both sets of moments up to the same order

k.

The functions to initialise an empty sketch, accumulate a new point-wise addition, or merge two sketches together are very straightforward given these definitions (the figure below complicates things by showing how to accumulate the scaled moments rather than the unscaled versions).

### Estimating quantiles given a moment sketch

So it’s easy to maintain the sketch data structures. But the small matter of how to use these data structures to estimate quantiles remains.

… given a moments sketch with minimum and maximum , we solve for a pdf supported on with moments equal to the values in the moments sketch… We can then report the quantiles of as estimates for the quantiles of the dataset.

The differential Shannon entropy *H* is used to measure the degree of *uniformativeness* in a distribution ( ) and thus help select the maximum entropy distribution out of the set of possibilities. Given the pdf , quantiles can be estimated using numeric integration and Brent’s method for root finding.

With a little mathematical juggling (§4.2), we can solve the problem of finding the maximum entropy distribution using Newton’s method. Section 4.3 gives details on making this practical when calculating the second-order derivatives (Hessians) and combating numerical stability.

You also need to be aware of bumping into the limits of floating point stability. In practice data centered at 0 have stable higher moments up to k=16. With double precision floating point moments and data in a range [c-1, c+1] you can get numerically useful values with moments up to

Overall, these optimisations bring quantile estimation overhead down to 1ms or less, sufficient for interactive latencies on single quantile queries.

### Optimisations for threshold queries

For *threshold* queries (e.g. P99 greater than 100ms) we can do a further optimisation.

Instead of computing the quantile on each sub-group directly, we compute a sequence of progressively more precise bounds in a

cascadeand only use more expensive estimators when necessary… Given the statistics in a moments sketch, we apply a variety of classical inequalities to derive bounds on the quantiles. These provides worst-case error guarantees for quantile estimates, and enable faster query processing for threshold queries over multiple groups.

Two kinds of bounds are used, one based on Markov’s inequality, and a slightly more expensive to compute bound called the RTTBound (see §5.1). The bounds are used to see if the threshold predicate can be resolved immediately, if not then we solve for the maximum entropy distribution as previously. It all comes together like this:

### Evaluation

The moment sketch (M-Sketch) is evaluated against existing quantile summaries and also integrated into Druid and MacroBase. Six real-valued datasets are used, with statistics as captured in the following table:

Microbenchmarks evaluating query times and accuracy show that M-Sketch offers best-in-class query times for the same accuracy.

The moments sketch gets its advantage from fast merge times, which means the more merges required (the bigger the datacube) the better its relative performance. With or more merges, M-Sketch provides the best performance. At lower merge numbers (e.g. 100 or less) estimation times dominate.

When integrated into Druid and compared to Druid’s default S-Hist summary using the Milan dataset with 26 million entries the moments sketch gives a 7x lower query time (note the log scale).

The following figure shows the impact of the bounds checking optimisation for threshold queries. Concentrating on the first four bars we see (a) a baseline calculating quantile sketches directly, (b) the moments sketch with no bounds optimisations, (c) adding in the Markov bounds test, and (d) using both the Markov bound and RTTBound tests.

Low merge overhead allows the moments sketch to outperform comparably accurate existing summaries when queries aggregate more than 10 thousand summaries…. the use of numeric optimizations and cascades keep query times at interactive latencies.