Skip to content

ScootR: scaling R dataframes on dataflow systems Kunft et al., SoCC’18

The language of big data is Java ( / Scala). The languages of data science are Python and R. So what do you do when you want to run your data science analysis over large amounts of data?

…programming languages with rich support for data manipulation and statistics, such as R and Python, have become increasingly popular… [but]… they are typically designed for single machine and in-memory usage…. In contrast, parallel dataflow systems, such as Apache Flink and Apache Spark, are able to handle large amounts of data. However, data scientists are often unfamiliar with the systems’ native language and programming abstraction, which is crucial to achieve good performance.

A tempting solution is to embed Python / R support within the dataflow engine. There are two basic approaches to this today:

1. Keep the guest language components in a separate process and use IPC (inter-process communication) to exchange input and output data between the dataflow engine and the guest language process. This approach can support the full power of the guest language, but pays a heavy price in IPC and serialisation costs.
2. Use source-to-source (STS) translation to translate guest language code into the dataflows native API. The translated code can achieve near native performance, but comprehensive source-to-source translation is difficult and so tends to be restricted to a subset of the guest language functions and libraries.

SparkR is of interest here because it supports both STS translation and IPC. It uses STS where possible, and falls back to IPC outside of this. Executing a simple user-defined function via IPC is about 100x slower than native execution after STS translation:

Clearly what we want is the performance of native execution (STS), but with the flexibility to use the full scope of the guest language and libraries. In theory we could just invest in building better and better STS translators (e.g., R to Java in this case), but this entails a huge effort and results in a hard-to-maintain point solution.

When faced with an M:N style integration problem, it often pays to look for a common intermediate representation (IR). If only there was some IR which both Java and R (and Python, Ruby, JavaScript,… ) could compile into, then we could build the inter-operation at the IR level. The JVM has byte code, LLVM has bitcode, and a little bit closer to home, Weld has an IR based on linear types.

In this research, Graal and Truffle provide the common ground (see One VM to rule them all).

Graal, Truffle, and FastR

Truffle is a language implementation framework that supports development of high performance language runtimes through self-optimising AST interpreters. The ASTs collect profiling information at runtime and specialise their structure accordingly. Languages built on top of Truffle can efficiently exchange data and access functions.

Graal is a dynamic compiler that produces highly-optimised machine code as soon as a Truffle AST reaches a stable state. De-optimisations and speculation failures are handled automatically by falling back to the AST.

The GraalVM is a multi-language execution runtime capable of running multiple languages in the same virtual machine instance, with full interoperability between all its supported languages.

GraalVM can execute Java applications on top of the HotSpot Java VM, and can execute other Truffle-based language runtimes such as JavaScript, Ruby, Python, and LLVM.

One of the default languages of the GraalVM is fastR, a high-performance GNU-R compatible R language runtime implemented using Truffle and relying on the Graal dynamic compiler. fastR supports the C API from GNU-R, and so can support many of the R packages that depend on underlying C implementations. Some R packages rely on GNU-R internals which make fastR integration harder, but fastR is continually being enhanced to support these too.

Introducing ScootR

ScootR builds on the capabilities of the GraalVM to expose Flink’s internal data structures to the fastR engine. First ScootR creates an execution plan based on the R source code, and then this plan can be deployed and executed on a Flink cluster. R user-defined functions (UDFs) are executed in parallel by each worker node, and automatically optimised by the Graal JIT compiler.

Here’s an example R application making use of the ScootR dataframe API:

R dataframes are mapped to Flink TupleN dataset types, and invocations to ScootR’s API are mapped to Flink operators via new Truffle AST nodes (_RBuiltinNode_s). For R functions that involve user-defined code, ScootR needs to infer the input and output types. Input types are specified by the user when reading files (e.g., line 7 in the listing above). For output types ScootR just instantiates a temporary tuple, invokes the function, and inspects the output.

For efficient access of Java data types within R and vice-versa, ScootR makes use of Truffle’s language interoperability features. R functions are also rewritten to create and return Flink tuples directly in the R function.

Here’s an example of the execution plan generated for the sample R program above.

Only the apply function includes user-defined code, all other functions are replaced with the corresponding Flink operators during the plan generation phase.

Evaluation

The evaluation is based on two datasets:

• The Airline On-Time Performance Dataset with arrival data for US flights, converted into CSV format. The resulting file size is 9.5GB.
• The Reddit Comments Dataset (four consecutive months’ worth, at about 14GB per month in CSV format).

ScootR is compared against native GNU-R, fastR, and SparkR, as well as against natively coded pipelines in Spark and Flink. Here are the single node and cluster comparisons for an ETL pipeline on the airline data as follows:

Key findings from the evaluation are as follows:

• For non-UDF functions, both ScootR and SparkR provide reliable mapping of R functions to native API calls with overhead below 1.2x.
• With UDFs, ScootRs performance is competitive with SparkR’s STS approach when SparkR is able to use STS, and an order of magnitude faster when SparkR has to fallback to IPC.
• Total overheads in operator pipelines (vs fully native) are up to 1.2x for SparkR with STS, and up to 1.4x for ScootR.
• Both SparkR and ScootR outperform GNU-R and fastR, even for single-threaded execution on a single node.

One possible direction for future work is to integrate other dynamic languages with Truffle support, such as JavasScript or Python.

Overload control for scaling WeChat microservices Zhou et al., SoCC’18

There are two reasons to love this paper. First off, we get some insights into the backend that powers WeChat; and secondly the authors share the design of the battle hardened overload control system DAGOR that has been in production at WeChat for five years. This system has been specifically designed to take into account the peculiarities of microservice architectures. If you’re looking to put a strategy in place for your own microservices, you could do a lot worse than start here.

WeChat

The WeChat backend at this point consists of over 3000 mobile services, including instant messaging, social networking, mobile payment, and third-party authorization. The platform sees between $10^{10} - 10^{11}$ external requests per day. Each such request can triggers many more internal microservice requests, such that the WeChat backend as a whole needs to handle hundreds of millions of requests per second.

WeChat’s microservice system accommodates more than 3000 services running on over 20,000 machines in the WeChat business system, and these numbers keep increasing as WeChat is becoming immensely popular… As WeChat is ever actively evolving, its microservice system has been undergoing fast iteration of service updates. For instance, from March to May in 2018, WeChat’s microservice system experienced almost a thousand changes per day on average.

WeChat classify their microservices as “Entry leap” services (front-end services receiving external requests), “Shared leap” services (middle-tier orchestration services), and “Basic services” (services that don’t fan out to any other services, and thus act as sinks for requests).

On a typical day, peak request rate is about 3x the daily average. At certain times of year (e.g. around the Chinese Lunar New Year) peak workload can rise up to 10x the daily average.

Challenges of overload control for large-scale microservice-based platforms

Overload control… is essential for large-scale online applications that need to enforce 24×7 service availability despite any unpredictable load surge.

Traditional overload control mechanisms were designed for a world with a small number of service components, a relatively narrow ‘front-door,’ and trivial dependencies.

… modern online services are becoming increasingly complex in their architecture and dependencies, far beyond what traditional overload control was designed for.

• With no single entry point for service requests sent to the WeChat backend, the conventional approach of centralized load monitoring at a global entry point (gateway) is not applicable.
• The service invocation graph for a particular request may depend on request-specific data and service parameters, even for requests of the same type. So when a particular service becomes overload it is very difficult to determine what types of requests should be dropped to mitigate the situation.
• Excessive request aborts (especially when deeper in the call graph or later in the request processing) waste computational resources and affect user experience due to high latency.
• Since the service DAG is extremely complex and continuously evolving, the maintenance cost and system overhead for effective cross-service coordination is too high.

Since one service may make multiple requests to a service it depends on, and may also make requests to multiple backend services, we have to take extra care with overload controls. The authors coin the term subsequent overload for the cases where more than one overloaded service is invoked, or a single overloaded service is invoked multiple times.

Subsequent overload raises challenges for effective overload control. Intuitively, performing load shedding at random when a service becomes overloaded can sustain the system with a saturated throughput. However, subsequent overload may greatly degrade system throughput beyond that intended…

Consider a simple scenario where service A invokes service B twice. If B starts rejecting half of all incoming requests, A’s probability of success drops to 0.25.

DAGOR overview

WeChat’s overload control system is called DAGOR. It aims to provide overload control to all services and thus is designed to be service agnostic. Overload control runs at the granularity of an individual machine, since centralised global coordination is too expensive. However, it does incorporate a lightweight collaborative inter-machine protocol which is needed to handle subsequent overload situations. Finally, DAGOR should sustain the best-effort success rate of a service when load shedding becomes inevitable due to overload. Computational resources (e.g. CPU, I/O) spent on failed service tasks should be minimised.

We have two basic tasks to address: detecting an overload situation, and deciding what to do about it once detected.

Overload detection

For overload detection, DAGOR uses the average waiting time of requests in the pending queue (i.e., queuing time). Queuing time has the advantage of negating the impact of delays lower down in the call-graph (compared to e.g. request processing time). Request processing time can increase even when the local server itself is not overloaded. DAGOR uses window-based monitoring, where a window is one second or 2000 requests, whichever comes first. WeChat clearly run a tight ship:

For overload detection, given the default timeout of each service task being 500ms in WeChat, the threshold of the average request queuing time to indicate server overload is set to 20ms. Such empirical configurations have been applied in the WeChat business system for more than five years with its effectiveness proven by the system robustness with respect to WeChat business activities.

Admission control

Once overload is detected, we have to decide what to do about it. Or to put things more bluntly, which requests we’re going to drop. The first observation is that not all requests are equal:

The operation log of WeChat shows that when WeChat Pay and Instant Messaging experience a similar period of service unavailability, user complaints against the WeChat Pay service are 100x those against the Instant Messaging service.

To deal with this in a service agnostic way, every request is assigned a business priority when it first enters the system. This priority flows with all downstream requests. Business priority for a user request is determined by the type of action requested. Although there are hundreds of entry points, only a few tens have explicit priority, all the others having a default (lower) priority. The priorities are maintained in a replicated hashtable.

When overload control is set to business priority level n, all requests from levels n+1 will be dropped. That’s great for mixed workloads, but suppose we have a flood of Payment requests, all at the same priority (e.g. p). The system will become overloaded, and hence move the overload threshold to p-1, when it will become lightly loaded again. Once light load is detected, the overload threshold is incremented to p again, and once more we are in overload. To stop this flip-flipping when overloaded with requests at the same priority level, we need a level of granularity finer than business priority.

WeChat has a neat solution to this. It adds a second layer of admission control based on user-id.

User priority is dynamically generated by the entry service through a hash function that takes the user ID as an argument. Each entry service changes its hash function every hour. As a consequence, requests from the same user are likely to be assigned to the same user priority within one hour, but different user priorities across hours.

This provides fairness while also giving an individual user a consistent experience across a relatively long period of time. It also helps with the subsequent overload problem since requests from a user assigned high priority are more likely to be honoured all the way through the call graph.

Combining business priority and user priority gives a compound admission level with 128 levels of user priority per business priority level.

With each admission level of business priority having 128 levels of user priority, the resulting number of compound admission levels is in the tens of thousands. Adjustment of the compound admission level is at the granule of user priority.

There’s a nice sidebar on why using session ID instead of user ID doesn’t work: you end up training users to log out and then log back in again when they’re experiencing poor service, and now you have a login storm on top of your original overload problem!

DAGOR maintains a histogram of requests at each server to track the approximate distribution of requests over admission priorities. When overload is detected in a window period, DAGOR moves to the first bucket that will decrease expected load by 5%. With no overload, it moves to the first bucket that will increase expected load by 1%.

A server piggy-backs its current admission level on each response message sent to upstream servers. In this way an upstream server learns the current admission control setting of a downstream service, and can perform local admission control on the request before even sending it.

End-to-end therefore, the DAGOR overload control system looks like this:

Experiments

The best testimony to the design of DAGOR is that it’s been working well in production at WeChat for five years. That doesn’t provide the requisite graphs for an academic paper though, so we also get a set of simulation experiments. The following chart highlights the benefits of overload control based on queuing time rather than response time. The benefits are most pronounced in situations of subsequent overload (chart (b)).

Compared to CoDel and SEDA, DAGOR exhibits a 50% higher request success rate with subsequent overloading when making one subsequent call. The benefits are greater the higher the number of subsequent requests:

Finally, in terms of fairness CoDel can be seen to favour services with smaller fan-out to overloaded services when under stress, whereas DAGOR manifests roughly the same success rate across a variety of requests.

Three lessons for your own systems

Even if you don’t use DAGOR exactly as described, the authors conclude with three valuable lessons to take into consideration:

• Overload control in a large-scale microservice architecture must be decentralized and autonomous in each service
• Overload control should take into account a variety of feedback mechanisms (e.g. DAGOR’s collaborative admission control) rather than relying solely on open-loop heuristics
• Overload control design should be informed by profiling the processing behaviour of your actual workloads.

Unikernels as processes Williams et al., SoCC’18

Ah, unikernels. Small size, fast booting, tiny attack surface, resource efficient, hard to deploy on existing cloud platforms, and undebuggable in production. There’s no shortage of strong claims on both sides of the fence.

See for example:

In today’s paper choice, Williams et al. give us an intriguing new option in the design space: running unikernels as processes. Yes, that’s initially confusing to get your head around! That means you still have a full-fat OS underneath the process, and you don’t get to take advantage of the strong isolation afforded by VMs. But through a clever use of seccomp, unikernels as processes still have strong isolation as well as increased throughput, reduced startup time, and increased memory density. Most importantly though, with unikernels as processes we can reuse standard infrastructure and tools:

We believe that running unikernels as processes is an important step towards running them in production, because, as processes, they can reuse lighter-weight process or container tooling, be debugged with standard process debugging tools, and run in already virtualized infrastructure.

So instead of a battle between containers and unikernels, we might be able to run unikernels inside containers!

Unikernels today

Unikernels consist of an application linked against just those parts of a library OS that it needs to run directly on top of a virtual hardware abstraction. Thus they appear as VMs to the underlying infrastructure.

In the case of Linux and KVM, the ukvm monitor process handles the running of unikernels on top of KVM. (Like a version of QEMU, but specialised for unikernels). The ukvm monitor is used by several unikernel ecosystem including MirageOS, IncludeOS, and Rumprun.

The userspace ukvm monitor process handles setup (e.g. allocating memory and virtual CPU, opening file descriptors) and exit. During execution, the unikernel exits to the monitor via hypercalls, usually to perform I/O.

When using virtualization technology, isolation is derived from the interface between the unikernel and the monitor process. This interface is this: the unikernel can exit to ukvm via at most 10 hypercalls.

The argument for enhanced security (isolation) comes from the narrowness of this interface. If an attacker did break through the interface into the monitor process, then of course they would then be able to launch attacks across the entire Linux system call interface. For comparison, Linux has over 300 system calls, and Xen has about 20 hypercalls.

Unikernels as described above have some drawbacks though: there is no guest OS, so no familiar debugging tools. Memory density can suffer as all guests typically perform file caching independently; and every hypercall involves a context switch, doubling the cycles consumed compared to a direct function call. Finally, when an existing infrastructure-as-a-service offering is used as a base on which to build higher-level offerings (e.g. serverless platforms), then unikernels can’t be deployed without nested virtualisation, which is itself difficult and often not supported.

Unikernels as processes

When using a unikernel monitor as described above, unikernels are already very similar to applications.

A running unikernel is conceptually a single process that runs the same code for its lifetime, so there is no need for it to manage page tables after setup. Unikernels use cooperative scheduling and event loops with a blocking call like poll to the underlying system for asynchronous I/O, so they do not even need to install interrupt handlers (nor use interrupts for I/O).

If we think of a unikernel in this way as a specialised application processes, then maybe we can get the same narrow interface afforded by the exposed hypercalls of ukvm in some other way….

Many modern operating systems have a notion of system call whitelisting allowing processes to transition into a mode with a more restricted system call interface available to them. For example, Linux has seccomp.

The traditional difficulty with seccomp is figuring out the set of system calls that should be allowed. E.g., Docker runs containers under a default policy that allows them to perform more than 250 system calls. When we have a unikernel as our process though, we can lock the set of calls right down.

So that’s how unikernels as processes work. In place of the ukvm monitor there’s a component the authors call a tender. The tender is responsible for setup and exit handling as the ukvm monitor is. However, once file descriptors etc. are setup, the tender dynamically loads the unikernel code into it’s own address space. Then it configures seccomp filters to only allow system calls corresponding to the unikernel exits for I/O, and makes a one-way transition to this new mode. Finally it calls the entry point of the loaded unikernel.

… the unikernel executes as usual, but instead of invoking hypercalls when necessary, the unikernel simply does a normal procedure call to the hypercall implementation in the tender. The hypercall implementation in the tender is identical to the monitor implementation in the virtualization case; the tender will likely perform a system call to Linux, then return the result to the unikernel.

The table below shows the mapping from hypercalls to system calls for ukvm-based unikernels.

Isolation properties come from the interface between the tender and the host, since the tender becomes the unikernel when it first jumps to guest code. Seccomp filters allow the tender to specify file descriptor associations at setup time, and other checks such as constraints on the arguments to blkwrite can also be specified.

We therefore consider unikernels as processes to exhibit an equal degree of isolation to unikernels as VMs.

By being processes, unikernels as processes can also take advantage of ASLR, common debugging tools, memory sharing, and architecture independence for free.

Introducing nabla

Solo5 provides a unikernel base layer upon which various library OSes run such as MirageOS, IncludeOS, and Rumprun. ukvm is distributed with Solo5. The authors extend Solo5 and ukvm by adding a new backend to ukvm enabling it to operate as a tender, and changing the Solo5 ukvm binding to eliminate some hardware specific setup and use a function call mechanism rather than a hypercall mechanism to access the tender. The resulting prototype system is called nabla. With that, we’re good to go…

Evaluation

Using a variety of workloads, the authors explore the isolation and performance characteristics of nabla. The evaluation compares unikernels running under ukvm, nabla, and vanilla processes.

Nabla runs the systems using 8-14 times fewer system calls than the corresponding native process:

It also accesses about half of the number of kernel functions accessed by ukvm unikernels (mostly due to nabla avoiding virtualization related functions).

Using a fuzzer, the authors also explored how much of the underlying kernel is reachable through the nabla interface. Compared to a seccomp policy that accepts everything, nabla reduces the amount of kernel function accessible by 98%.

Nabla also achieves higher throughput than ukvm in all cases:

Running unikernels as processes increases throughput by up to 245%, reduces startup times by up to 73%, and increases memory density by 20%. It all adds up to an exciting new set of options for unikernel projects and deployments.

The last word

In this paper, we have shown that running unikernels as processes can improve isolation over VMs, cutting their access to kernel functions in half. At the same time, running unikernels as processes moves them into the mainstream; unikernels as process inherent many process-specific characteristics— including high memory density, performance, startup time, etc— and tooling that were previously thought to be necessary sacrifices for isolation. Going forward, we believe this work provides insight into how to more generally architect processes and containers to be isolated in the cloud.

Debugging distributed systems with why-across-time provenance Whittaker et al., SoCC’18

This value is 17 here, and it shouldn’t be. Why did the get request return 17?

Sometimes the simplest questions can be the hardest to answer. As the opening sentence of this paper states:

Debugging distributed systems is hard.

The kind of why questions we’re interested in for this paper are questions of provenance. What are the causes of this output? Provenance has been studied in the context of relational databases and dataflow systems, but here we’re interested in general distributed systems. (Strictly, those where the behaviour of each node can be modelled by a deterministic state machine: non-deterministic behaviour is left to future work).

Why why-provenance doesn’t work

Relational databases have why-provenance, which sounds on the surface exactly like what we’re looking for.

Given a relational database, a query issued against the database, and a tuple in the output of the query, why-provenance explains why the output tuple was produced. That is, why -provenance produces the input tuples that, if passed through the relational operators of the query, would produce the output tuple in question.

One reason that won’t work in our distributed systems setting is that the state of the system is not relational, and the operations can be much more complex and arbitrary than the well-defined set of relational operators why-provenance works with.

There’s a second, deeper reason why why-provenance doesn’t work here as well:

Why-provenance makes the critical assumption that the underlying relational database is static. It cannot handle the time-varying nature of stateful distributed systems.

Why causal history buries the cause

We do have one tool in our distributed systems toolbox that can handle this issue of the state changing over time. Indeed, was designed explicitly to handle potential causes: the happens-before relationship and the notion of causality. The difficulty here is that the causal history of an event is a crude over-approximation, encompassing everything that was seen before. It tells us which events could potentially have contributed to the cause, but not which ones actually did. That can be a lot of extraneous noise to deal with when trying to debug a distributed system.

Wat should we do instead?

… causality lacks a notion of data dependence, and data provenance lacks a notion of time. In this paper, we present wat-provenance (why-across-time provenance): a novel form of data provenance that unifies ideas from the two. Wat-provenance generalizes the why-provenance from the domain of relational queries issued against a static database to the domain of arbitrary time-varying state machines in a distributed system.

Consider a server in a distributed system as a deterministic state machine that repeatedly receives requests, updates its state accordingly, and sends replies. Wat-provenance is defined in terms of the traces of such a system.

• The trace T of a system is the ordered sequence of inputs received by the state machine
• A subtrace T’ of T is an order-preserving sub-sequence of T (but the events in the sub-sequence don’t have to be contiguous).
• Given a subtrace T’ of T, then a supertrace of T’ is a substrace of T containing every element of T’.

For example:

Wat-provenance aims to formalize an intuitive notion of why the state machine M produces output o when given input i.

Since the state machines are deterministic, we can start with the notion that the cause of some output o must be contained in a subtrace of the input. E.g. if we have a KVS node with trace T = set(x,1);set(y,2) and a request i = get(x) then the subtrace T’ =set(x,1) is sufficient to explain the output 1. Such a subtrace is called a witness of o.

In this particular example, both T and T’ are witnesses of o, but note that T contains some additional inputs (set(y,2)) that are not needed to explain the output. So we’d like a minimal subtrace.

If we’re not careful about what we leave out though, we can end up creating false witnesses. Consider a server maintaining a set of boolean-valued variables all initialised to false. We have a trace T = set(a); set(b); set(c) and a request i producing output (a &amp;&amp; !b) || c. The reason this outputs true is that c is true. So set(c) is a genuine witness. But if we considered just the subtrace set(a) (so we’re in the state a= true, b = c = false) then set(a) would also appear to be a witness, even though it isn’t. To avoid false witnesses, we add the rule that every supertrace of T’ in T must also be a witness of o. In such an instance we say that the witness T’ of o is closed under supertrace in T. Since the supertrace set(a);set(b); is not a witness of o, we exclude the subtrace set(a) through this rule.

Sticking with the boolean server, suppose we have a simpler scenario where i produces an output o equal to (a &amp;&amp; d) || (b &amp;&amp; c). In this case a trace T = set(a);set(b);set(c);set(d) contains two subtraces that can both be a cause of the true output: set(a);set(d) and set(b);set(c). Thus we notice that the cause of an output can be a set of witnesses.

And so we arrive at:

[wat-provenance] consists of every witness T’ of o such that (1) T’ is closed under supertrace in T, and (2) no proper subtrace of T’ is also a witness of o that satisfies (1).

When computing wat-provenance, it’s important that we first compute the set of witnesses closed under supertrace in T, and only then remove the non-minimal elements. If you try to remove non-minimal elements first it’s possible to over-prune. (See the worked example in §3.5 of the paper).

Wat-provenance refines causality, and subsumes why-provenance.

Black box, simple interface

Automatically computing the wat-provenance for an arbitrary distributed system component, which we dub a black box, is often intractable and sometimes impossible…. (But) … we can take advantage of the fact that many real-world black boxes are far from arbitrary. Many black boxes have complex implementations but are designed with very simple interfaces.

Instead of trying to infer wat-provenance by inspecting an implementation, we can specify it directly from an interface. A wat-provenance specification is a function that given a trace T and a request i, returns the wat-provenance Wat(M, T, i) for a black box modeled as state machine M.

For example, for Redis (~50K LOC) the wat-provenance specification for a get request to a key just needs to consider the most recent set to that key, and all subsequent modifying operations (e.g. incr, derc). It takes surprisingly few lines of code to write these wat-provenance specifications for interesting subsets of real-world systems:

Not every system is so amenable though. The authors give as an example a state machine implementing some machine learning model whereby clients can either submit training data (for online model updating) or submit an input for classification. Here a wat-provenance specification would likely be just as complex as the system itself.

Watermelon

Watermelon is a prototype debugging framework using wat-provenance and wat-provenance specifications. It acts as a proxy intercepting all communication with the service and recording traces in a relational database. When messages are sent between Watermelon’d processes, Watermelon can connect the send and the receive such that multiple black boxes can be integrated into the same Watermelon system.

With the proxy shim in place, a developer can write wat-provenance specifications either in SQL or in Python.

To find the cause of a particular black box output, we invoke the black box’s wat-provenance specification. The specification returns the set of witnesses that cause the output. Then, we can trace a request in a witness back to the black box that sent it and repeat the process, invoking the sender’s wat-provenance specification to get a new set of witnesses.

Evaluation

The evaluation is currently the weakest part of the paper. Wat-provenance specifications are written for 20 real-world APIs across Redis, the POSIX file system, Amazon S3, and Zookeeper (see table 1 above). Debugging ease is then qualitatively evaluated against using printf statements and against SPADE (a framework for collecting provenance information from a variety of sources including OS audit logs, network artifacts, LLVM instrumentation, and so on). This leads to the following table:

What it would be lovely to see is an evaluation of using wat-provenance to debug real-world problems. Future work perhaps?

Wat-logs

A final thought from me is that rather than a proxy, it’s pretty common (and not unreasonable a request if not) for a server process to log incoming requests and their responses. In which case given a suitable logging system it ought to be possible to ask wat-provenance questions directly of the logs. That would fit quite well with existing troubleshooting workflows that often start with something that doesn’t look quite right in the logs and explore why that is from there.

ApproxJoin: approximate distributed joins Le Quoc et al., SoCC’18

GitHub: https://ApproxJoin.github.io

The join is a fundamental data processing operation and has been heavily optimised in relational databases. When you’re working with large volumes of unstructured data though, say with a data processing framework such as Flink or Spark, joins become distributed and much more expensive. One of the reasons for this is the amount of data that needs to be moved over the network. In many use cases, approximate results would be acceptable, and as we’ve seen before, likely much faster and cheaper to compute. Approximate computing with joins is tricky though: if you sample datasets before the join you reduce data movement, but also sacrifice up to an order of magnitude in accuracy; if you sample results after the join you don’t save on any data movement and the process is slow.

This paper introduces an approximate distributed join technique, ApproxJoin, which is able to sample before data shuffling without loss of end result accuracy. Compared to unmodified Spark joins with the same sampling ratio it achieves a speedup of 9x while reducing the shuffled data volume by 82x.

The following charts show ApproxJoin’s latency and accuracy characteristics compared to Spark sampling before the join (ApproxJoin has much better accuracy and similar latency) and to Spark sampling after the join (ApproxJoin has similar accuracy and much lower latency).

ApproxJoin works in two phases. First it uses one of the oldest tricks in the book, a Bloom filter, to eliminate redundant data shuffling. A nice twist here is that ApproxJoin directly supports multi-way joins so we don’t need to chain a series of pairwise joins together. In the second phase ApproxJoin uses stratified sampling to produce an answer approximating the result of an aggregation over the complete join result.

At a high level, ApproxJoin makes use of a combination of sketching and sampling to select a subset of input datasets based on the user-specified query budget. Thereafter, ApproxJoin aggregates over the subset of input data.

User queries must contain an algebraic aggregation function (e.g. SUM, AVG, COUNT, STDEV), and as is usual for approximate compute frameworks, can specify either a time bound or an error bound for the query.

High level overview

Overall, the ApproxJoin system looks like this:

Filtering using a multi-way Bloom filter happens in parallel at each node storing partitions of the input, and simultaneously across all input tables.

The sampling phase makes use of stratified sampling within the join process: datasets are sampled while the cross product is being computed. Stratified sampling in this case means that tuples with distinct join keys are sampled independently (with simple random sampling). Thus the final sample will contain all join keys— even those with few data items. A cost function is used to compute an optimal sampling rate according to the query budget. There’s a one-off upfront cost to compute the standard deviation for a join key (‘stratum’), which is stored and reused in subsequent queries. It’s not clear whether or not this cost is included in the evaluation (so best guess it isn’t 😉 ).

Filtering

The filtering step is very straightforward. First a Bloom filter is created for each input dataset: each worker with a partition of the dataset creates a local bloom filter, and then these are combined using OR. Once we have the merged Bloom filter for each input dataset, we can simply combine the filters across datasets using AND. (Think about hashing a given key, only if the corresponding bits are set in each of the input Bloom filters can that key possibly exist in all inputs). In the implementation, Bloom filters are merged using a treeReduce scheme to prevent the Spark driver becoming a bottleneck.

Clearly, the greater the overlap between the input datasets the more data we need to shuffle, and hence the less benefit the Bloom-filter based join can add.

After the filtering stage, it may be that the overlap fraction between the datasets is small enough that full join can now be performed within the latency requirements of the user. If this is not the case, we proceed to the approximation…

Determining the cost function

ApproxJoin makes use of latency and error-bound cost functions to convert the join requirements specified by a user into sampling rates.

For the latency cost function we need to combine the cost of filtering and transferring the data join items, with the cost of computing the cross products. There’s no need to estimate the cost of filtering and transferring— we have to do this regardless so we can just time it. The remaining latency budget is simply then the target time specified by the user, minus the time we spent in the first phase! The cost function for the cross product phase is simply a weighting of the number of cross products we need to do. The weighting (scale) factor depends on the computation capacity of the compute cluster, which is profiled once offline to calibrate. (That is, once in the lifetime of the cluster, not once per query).

If a user specified an error bound, we need to calculate how many samples to take to satisfy the requirement. For stratum i, the number of samples $b_i$ turns out to be governed by the following equation at 95% confidence level:

The standard deviation $\sigma_i$ of the stratum is computed and stored on the first execution of a query and subsequently reused.

Sampling

To preserve the statistical properties of the exact join output, we combine our technique with stratified sampling. Stratified sampling ensures that no join key is overlooked; for each join key, we perform simple random selection over data items independently. This method selects data items fairly from different join keys. The (preceding) filtering stage guarantees that this selection is executed only from data items participating in the join.

Random sampling on data items having the same join key is equivalent to perform edge sampling on a complete bipartite graph modelling the relation.

To include an edge in the sample, ApproxJoin randomly selects one endpoint vertex from each side, and then yields the edge connecting them. For a sample of size b this process is repeated b times.

In a distributed settings, data items are distributed to worker nodes based on the join keys (e.g. using a hash-based partitioner), and each worker performs the sampling process in parallel to sample the join output and execute the query.

Query execution

After sampling, each node executes the input query on the sample to produce a partial query result. These results are merged at the master node, which also produces error bound estimations.

Estimating errors

The sampling algorithm can produce an output with duplicate edges. This acts as a random sampling with replacement, and the Central Limit Theorem can be used to estimate the error bounds. An alternative error estimation mechanism is also described in which duplicate edges are not allowed in the sample, and a Horvitz-Thompson estimator can be used. I couldn’t determine which of these two mechanisms is actually used for the results reported in the evaluation.

Evaluation

Focusing just on the filtering stage to start with, we see that with two-way joins ApproxJoin is 6.1x faster than a native Spark join, and shuffles 12x less data. The gains are even better with 3-way and 4-way joins:

The benefits do depend on the overlap percentage though. The figures above were all with overlap fractions below 1%, and the ApproxJoin advantage disappears by the time we get to around 8%.

Turning to the sampling stage, the following figure compares scalability, latency, and accuracy of ApproxJoin sampling vs Spark joins. Here ApproxJoin can deliver order of magnitude speed-ups.

ApproxJoin is also evaluated end-to-end on two real-world datasets: CAIDA network traces, and the Netflix prize dataset. For the network trace dataset ApproxJoin is 1.5-1.7x faster, and reduces shuffled data by 300x. For the Netflix dataset ApproxJoin is 1.27-2x faster, and shuffles 1.7-3x less data.

By performing sampling during the join operation, we achieve low latency as well as high accuracy… Our evaluation shows that ApproxJoin significantly reduces query response time as well as the data shuffled through the network, without losing the accuracy of the query results compared with the state-of-the-art systems.

ASAP: fast, approximate graph pattern mining at scale Iyer et al., OSDI’18

I have a real soft spot for approximate computations. In general, we waste a lot of resources on overly accurate analyses when understanding the trends and / or the neighbourhood is quite good enough (do you really need to know it’s 78.763895% vs 78 ± 1%?). You can always drill in with more accuracy if the approximate results hint at something interesting or unexpected.

Approximate analytics is an area that has gathered attention in big data analytics, where the goal is to let the user trade-off accuracy for much faster results.

(See e.g. ApproxHadoop which we covered on The Morning Paper a while back).

In the realm of graph processing, graph pattern mining algorithms, which discover structural patterns in a graph, can reveal very interesting things in our data but struggle to scale to larger graphs. This is in contrast to graph analysis algorithms such as PageRank which typically compute properties of a graph using neighbourhood information.

Today, a deluge of graph processing frameworks exist, both in academia and open-source… a vast majority of the existing graph processing frameworks however have focused on graph analysis algorithms.

Arabesque explicitly targets the graph mining problem using distributed processing, but even then large graphs (1 billion edges) can take hours (e.g. 10 hours) to mine.

In this paper, we present A Swift Approximate Pattern-miner (ASAP), a system that enables both fast and scalable pattern mining. ASAP is motivated by one key observation: in many pattern mining tasks, it is often not necessary to output the exact answer. For instance, in frequent sub-graph mining (FSM) the task is to find the frequency of subgraphs with an end goal of ordering them by occurrences… our conversations with a social network firm revealed that their application for social graph similarity uses a count of similar graphlets. Another company’s fraud detection system similarly counts the frequency of pattern occurrences. In both cases, an approximate count is good enough.

ASAP outperforms Arabesque by up to 77x on the LiveJournal graph while incurring less than 5% error. It can scale to graphs with billions of edges (e.g. Twitter at 1.5B edges) and produce results in minutes rather than hours.

One major challenge is that the approximation methods used in big data analytics don’t carry over to graph pattern mining. There, the core idea is to sample the input, run the analysis over the sample, and extrapolate based on an assumption that the sample size relates to the error in the output. If we take this same idea and apply it to a foundational graph mining problem, counting triangles, it quickly becomes apparent that there is no clear relationship we can rely on between the size of the sample and the error or the speedup.

We conclude that the existing approximation approach of running the exact algorithm on one or more samples of the input is incompatible with graph pattern mining. Thus, in this paper, we propose a new approach.

Graph pattern mining and sampling

Pattern mining is the problem of finding instances of a given pattern e.g. triangles, 3-chains, cliques, motifs, in a graph. (A motif query looks for patterns involving a certain number of vertices, e.g. a 3-motif query looks for triangles and 3-chains).

A common approach is to iterate over all possible embeddings in the graph starting with a vertex or edge, filtering out those that cannot possibly match. The remaining candidate embeddings are then expanded by adding one more vertex/edge and the process repeats.

The obvious challenge in graph pattern mining, as opposed to graph analysis, is the exponentially large candidate set that needs to be checked.

Neighbourhood sampling has been proposed in the context of a specific graph pattern, triangle counting. The central idea is to sample one edge from an edge stream, and the gradually add more edges until either they form a triangle or it becomes impossible to form the pattern. Given a graph with m edges, it proceeds as follows. To perform one trial:

• Uniformly sample one edge ($l_0$) from the graph, with sampling probability 1/m.
• Uniformly sample one of l0’s adjacent edges ($l_1$) from the graph. (Note that neighbourhood sampling depends on the ordering of edges in the stream, and $l_1$ appears after $l_0$ here). If $_l0$ has c neighbours appearing in the stream after it, then the sampling probability for $l_1$ is 1/c.
• Find an edge $l_2$ appearing in the stream after $l_1$ that completes the triangle, if possible. If we do find such an edge, the sampling probability is $1/mc$.

If a trial successfully samples a triangle, we can estimate $e_i = mc$ triangles in the full graph. After conduction r trials we have an approximate result given by $\frac{1}{r}\sum_{r}e_i$.

Here’s an example using a graph with five nodes:

Introducing ASAP

ASAP generalises the neighbouring sampling approach outlined above to work with a broader set of patterns. It comes with a standard library of implementations for common patterns, and users can also write their own. It is designed for a distributed setting and also includes mining over property graphs (which requires predicate matching).

Users of ASAP can explicitly trade off compute time and accuracy, specifying either a time budget (best result you can do in this time) or an error budget (give me a result as quickly as you can, with at least this accuracy). Before running the algorithm, ASAP returns an estimate of the time or error bounds it can achieve. If the user approves the algorithm is run and the presented result includes a count, confidence level, and actual run time. Users may then optionally ask to output the actual embeddings of the found pattern.

To find out how many estimators it needs to run for the given bounds ASAP builds an error-latency profile (ELP). Building an ELP is fast and can be done online. For the error profile, the process begins with uniform sampling to reduce the graph to a size where nearly 100% accurate estimates can be produced. A loose upper bound for the number of estimators required is then produced using Chernoff bounds and scaled to the larger graph (see section 5.2 for details and for how the time profile is computed). For evolving graphs, the ELP algorithm can be re-run after a certain number of changes to the graph (e.g., when 10% of edges have changed).

Approximate pattern mining in ASAP

ASAP generalises neighbourhood sampling to a two phase process: a sampling phase followed by a closing phase.

In the sampling phase, we select an edge in one of two ways by treating the graph as an ordered stream of edges: (a) sample an edge randomly; (b) sample an edge that is adjacent to any previously sampled edges, from the remainder of the stream. In the closing phase, we wait for one or more specific edges to complete the pattern.

(So in the triangle counting example above, sampling $l_0$ and $l_1$ form the sampling phase, and waiting for $l_2$ is the closing phase).

The probability of sampling a pattern can be computed from these two phases….

For a k-node pattern, the probability of detecting a pattern p depends on k and the different ways to sample using the neighbourhood sampling technique. There are four different cases for $k = 2..5$, and up to three different types of sampling possible within each case. The probability formulas for all of these are enumerated in section 4.1.1. E.g., when k=2 the probability is 1/mc as we saw previously.

When applied in a distributed setting ASAP parallelises the sampling process and then combines the outputs (MapReduce). Vertices in the graph are partitioned across machines, and several copies of the estimator task are scheduled on each machine. The graphs edges and vertices need to seen in the same order on each machine, but any order will do (‘thus, ASAP uses a random ordering which is fast and requires no pre-processing of the graph’). The output from each machine is a partial count, making the reduce step a trivial sum operation.

For predicate matching on property graphs ASAP supports both ‘at least one’ and ‘all’ predicates on the pattern’s vertices and edges. For ‘all’ predicates, ASAP introduces a filtering phase before the execution of the pattern mining task. For ‘at least one’ predicates a first pass produces a matching list of edges matching the predicate. Then in the second pass when running the sampling phase of the algorithm every estimator picks its first edge randomly from within the matching list.

There’s a performance optimisation for motif queries (finding all patterns with a certain number of vertices) whereby common building blocks from k-node patterns can be reused.

If a user first explores the graph with a low accuracy answer (e.g. using 1 million estimators), and then wants to drill in to higher accuracy (requiring e.g. 3 million estimators) then ASAP only needs to launch another 2 million estimators and can reuse the first 1 million.

Key evaluation results

ASAP is evaluated using a number of graphs:

Compared to Arabesque, ASAP shows up to 77x performance improvement with a 5% loss of accuracy when counting 3-motifs and 4-motifs on modest sized graphs:

On larger graphs ASAPs advantage extends up to 258x.

The final part of the evaluation evaluates mining for 5-motifs (21 individual patterns), based on conversations with industry partners who use similar patterns in their production systems. Results for two of those patterns are shown in the table below, and demonstrate that ASAP can handle them easily (using a cluster of 16 machines each with 16 cores).

Our evaluation shows that not only does ASAP outperform state-of-the-art exact solutions by more than an order of magnitude, but it also scales to large graphs while being low on resource demands.

Sharding the shards: managing datastore locality at scale with Akkio Annamalai et al., OSDI’18

In Harry Potter, the Accio Summoning Charm summons an object to the caster of the spell, sometimes transporting it over a significant distance. In Facebook, Akkio summons data to a datacenter with the goal of improving data access locality for clients. Central to Akkio is the notion of microshards (μ-shards), units of data much smaller than a typical shard. μ-shards are defined by the client application, and should exhibit strong access locality (i.e., the application tends to read/write the data in a μ-shard together in a small window of time). Sitting as a layer between client applications and underlying datastores, Akkio has been in production at Facebook since 2014, where it manages around 100PB of data.

Measurements from our production environment show that Akkio reduces latencies by up to 50%, cross-datacenter traffic by up to 50%, and storage footprint by up to 40% compared to reasonable alternatives.

Akkio can support trillions of μ-shards and many 10s of millions of data access requests per second.

Motivation

Our work in this area was initially motivated by our aim to reduce service response times and resource usage in our cloud environment which operates globally and at scale… Managing data access locality in geo-distributed systems is important because doing so can significantly improve data access latencies, given that intra-datacenter communication latencies are two orders of magnitude smaller than cross-datacenter communication latencies: e.g. 1ms vs 100ms.

At scale, requests issued on behalf of one end-user are likely to be processed by multiple distinct datacenters over time. For example, the user may travel from one location to another, or service workload may be shifted from one location to another.

One way to get data locality for accesses is to fully replicate all data across all datacenters. This gets expensive very quickly (the authors estimated \$2 million per 100PBs per month for storage, plus costly WAN cross-datacenter bandwidth usage on top).

We could cap those costs by setting a caching budget, but for many of the workloads important to Facebook distributed caching is ineffective. For acceptable hit rates we still need to dedicate large amounts of hardware resources, and these workloads have low read-write ratios. Beyond the cost and latency implications there’s one further issue: “many of the datasets accessed by our services need strong consistency… it is notable that the widely popular distributed caching systems that are scalable, such as Memcached or Redis, do not offer strong consistency. And for good reason.

Why not just use shards?

Don’t datastores already partition their data using shards though (e.g. through key ranges or hashing)? There are two issues with the existing sharding mechanism from an Akkio perspective:

1. Shard sizes are set by administrators to balance shard overhead, load balancing, and failure recovery. They tend to be on the order of a few tens of gigabytes.
2. Shards are primarily a datastore concern, used as the unit for replication, failure recovery, and load balancing.

At Facebook, because the working set size of accessed data tends to be less than 1MB, migrating an entire shard (1-10GB) would be ineffective.

μ-shards

Making shards smaller (and thus having many more of them) interferes with the datastore operations, and moving shards as a unit is too heavyweight. So Akkio introduces a new layer on top of shards, μ-shards.

μ-shards are more than just smaller shards, a key difference is that the application itself assigns data to μ-shards with high expectation of access locality. μ-shard migration has an overhead an order of magnitude lower than shard migration and its utility is far higher.

Each μ-shard is defined to contain related data that exhibits some degree of access locality with client applications. It is the application that determines which data is assigned to which μ-shard. At Facebook, μ-shard sizes typically vary from a few hundred bytes to a few megabytes in size, and a μ-shard (typically) contains multiple key-value pairs or database table rows. Each μ-shard is assigned (by Akkio) to a unique shard in that a μ-shard never spans multiple shards.

Examples of applications that fit well with μ-shards include:

The design of Akkio

Akkio is implemented as an layer inbetween client applications and an underlying datastore. The paper focuses on Akkio’s integration with Facebook’s ZippyDB, but it’s design enable use with multiple backend datastores. The Akkio client library is embedded within the database client library, exposing an application-managed μ-shard id on all requests. It is up to the application to establish it’s own μ-shard naming scheme.

The Akkio location service maintains a location database that the client can use to map μ-shards to storage servers so that it knows where to direct requests. An access counter service is used to track all accesses, so that μ-shard placement and migration decisions can be made. It is the responsibility of the data placement service (DPS) to decide where to place each μ-shard.

The Akkio Client Library asynchronously notifies the DPS that a μ-shard placement may be suboptimal whenever a data access request needs to be directed to a remote datacenter. The DPS re-evaluates the placement of a μ-shard only when it receives such a notification.

Location information is configured with an eventually consistent replica at every datacenter: the dataset is relatively small, on the order of few hundred GB. If a client reads a stale location and gets a miss it will query a second time requesting that the cache by bypassed (and subsequently updated). Storage required for access counters is also low, less than 200GB per datacenter at Facebook.

All the main action happens in the data placement service, which is tailored for each backend datastore:

There is one DPS per Akkio-supported backend datastore system that is shared among all of the application services using instances of the same datastore system. It is implemented as a distributed service with a presence in every datacenter.

The createUShard() operation is called when a new μ-shard is being created and DPS must decide where to initially place it. evaluatePlacement() is invoked asynchronously by the Akkio client library whenever a data access request needs to be directed to a remote datacenter. DPS first checks its policies to see if initiating a migration is permissible (details of the policy mechanism are mostly out of scope for this paper), and whether a migration is already in process. If migration is permissible it determines the optimal placement for the μ-shard and starts the migration.

μ-shard placement

To understand placement in the context of ZippyDB, we first need to look briefly at how ZippyDB manages shards and replicas. ZippyDB partitions data horizontally, with each partition assigned to a different shard. Shards can be configured to have multiple replicas which form a shard replica set and participate in a shard-specific Paxos group. The replication configuration of a shard identifies not only the number of replicas, but also how they are to be distributed over datacenters, clusters, and racks, and the degree of consistency required. Replica sets that all have the same configuration from a replica set collection.

When running on ZippyDB, Akkio places μ-shards on, and migrates μ-shards between different such replica set collections.

ZippyDBs Shard Manager assigns each shard replica to a specific ZippyDB server conforming to the policy. The assignments are registered in a directory service.

For initial placement of a μ-shard, the typical strategy is to select a replica set collection with primary replica local to the requesting client and secondary replica(s) in one of the more lightly loaded datacenters. For migration a scoring system is used to select the target replica set collection from those available. First datacenters are scored based on the number of times the μ-shard was accessed from that datacenter over the last X-days, with stronger weighting for more recent accesses. Any ties are then broken by scoring datacenters based on the amount of available resources within them.

When working on top of ZippyDB, Akkio makes use of ZippyDB’s access control lists and transactions to implement migrations. The move algorithm looks like this:

On top of Facebook’s Cassandra variant (which doesn’t support ACLs, though the open-source Cassandra does), a different migration strategy is needed:

Now, that’s all well and good but here’s the part I still don’t understand. The underlying sharding system of the datastore controls placement. Data items are assigned to shards by some partitioning scheme (e.g. by range partitioning, or by hash partitioning). So when we move a μ-shard from one shard (replica collection set) to another, what is really happening with respect to those keys? Are they being finagled in the client so that e.g. they end up in the right range? (Have the right hash value????!). I feel like I must be missing something really obvious here, but I can’t see it described in the paper. Akkio does allow for some minimal changes to the underlying datastore to accommodate μ-shard migrations, but these changes are minimal:

Akkio in production

The evaluation section contains multiple examples of Akkio in use at Facebook, which we don’t have space to cover in detail. The before-and-after comparisons (in live production deployments) make compelling reading though:

• ViewState data stores a history of content previously shown to a user. Replica set collections are configured with two replicas in one local datacenter, and third in a nearby datacenter. Akkio is configured to migrate μ-shards aggressively.

Originally, ViewState data was fully replicated across six datacenters. Using Akkio with the setup described above let to a 40% smaller storage footprint, a 50% reduction of cross-datacenter traffic, and about a 60% reduction in read-and write latencies compared to the original non-Akkio setup.

• AccessState stores information about user actions taken in response to displayed content. Akkio once more decreased storage footprint by 40%, and cross-datacenter traffic by 50%. Read latency was unaffected, but write latency reduced by 60%.
• For Instagram ‘Connection-Info’ Akkio enabled Instagram to expand into a second continent keeping both read and write latencies lower than 50ms. “This service would not have expanded into the second continent without Akkio.”
• For the Instagram Direct messaging application, Akkio reduced end-to-end message delivery latency by 90ms at P90 and 150ms at P99. (The performance boost in turn delivered improvements in key user engagement metrics).

Up next: migrating more applications to Akkio, and supporting more datastore systems on the backend (including MySQL). In addition…

… work has started using Akkio (i) to migrate data between hot and cold storage, and (ii) to migrate data more gracefully onto newly created shards when resharding is required to accommodate (many) new nodes.