Skip to content

Data provenance at internet scale: architecture, experiences, and the road ahead

January 24, 2017

Data provenance at Internet scale: Architecture, experiences, and the road ahead Chen et al., CIDR 2017

Provenance within the context of a single database has been reasonably well studied. In this paper though, Chen et al., explore what happens when you try to trace provenance in a distributed setting and at larger scale. The context for their work is provenance in computer networks, but many of the lessons apply more broadly to distributed systems in general.

What is provenance? Chen et al. provide a good definition we can work with:

Provenance is, in essence, a way to answer “why” questions about computations. It works by linking each effect, such as a computed value, to its direct causes, such as the corresponding inputs and the operation that was performed on them. Thus, it becomes possible to recursively “explain” a particular result by showing its direct causes, and then their own causes, and so on, until a set of basic inputs is reached. This explanation is the provenance of the result: it is a tree whose vertexes represent computations or data and whose edges indicate direct causal connections.

Note carefully that tracing provenance means being able to trace both the original inputs, and the operations (i.e., code in some form or other) that processes them. So we’re back to the kind of giant distributed graphs with versioning information for both code and data that we saw yesterday with Ground.

Here’s an interesting juxtaposition: this is a 2017 research paper looking at what might be necessary to explain computation results in a distributed setting. The GDPR (sorry to keep going on about it!), which becomes law in the EU in May 2018 makes it a legal requirement to be able to do so when the inputs relate to people and the computed value to be explained is some decision that affects them.

Some of the key challenges and opportunities in extending provenance to a distributed setting that the authors explore include:

  • partitioning provenance information across nodes
  • adding temporal information so that questions can be asked about past states
  • protecting provenance information against attacks (which it can otherwise be used to help diagnose and defend against)
  • extending provenance to cover not only data but also code
  • using provenance to ask ‘why not?’ questions (something expected did not happen)
  • using provenance differences between correct and faulty output to help establish cause

The basic architecture of the ExSPAN system is as follows:

The graph recorder captures provenance from an application running on a given node and records it in a local log. The (optional) commitment module adds cryptographic commitments to outgoing messages, and verifies incoming messages. The audit responder sends excerpts from the log to the query processor when they are needed to respond to a query. Note that this architecture seems to rely on a fairly static set of nodes, and on reliable persistent of their local logs – just as with regular exhaust logs we could imagine replacing this with a more sophisticated distributed storage mechanism that survives independent of any one node.

The graph recorder itself has three options for extracting provenance information: in the lucky case, the application is written in a declarative language (e.g., NDlog) that permits extraction of provenance directly through its runtime. With source code availability the application can be instrumented to make calls to the recorder, if only binary is available, ‘provenance can still be extracted through an external specification’. I can think of another couple of options: the application could emit provenance events that the recorder subscribes too, and some provenance could be recovered through exhaust logs as in The Mystery Machine or lprof.

The provenance graph

ExSPAN is built for provenance in computer networks, so at the core it has vertices representing network events and rules, and edges between them to show the flow of packets. This graph is partitioned and distributed across many nodes. ExSPAN uses incremental view maintenance techniques on the provenance information to make queries efficient.

Extending the provenance graph with temporal information to be able to answer historical questions required the graph to move to an append-only model, and the addition of several new vertex types (EXIST, INSERT, DELETE, APPEAR, DISAPPEAR, DERIVE, and UNDERIVE). The meaning and usage of these vertices is fully explained in an earlier paper. At this point, provenance information starts to expand in volume. To keep this in check, one option is to record only inputs and non-deterministic events, deriving everything else upon query. The familiar approach of recording snapshot states (as you might do when reconstructing state from an event log) to speed up this reconstruction can then be used.

We also have a need for secure forensics, since today’s networks and computers are targets of malicious attacks. In an adversarial setting, compromised nodes can tamper with their local data and/or lie to other nodes… therefore when performing forensics, it is important to validate the integrity of the provenance data.

To address this need, SEND, RECEIVE, and BELIEVE vertex types are added to the graph. BELIEVE tuples are added when a node receives a tuple from another node, recording the belief that the tuple was indeed seen on the other node. These are stored in a tamper-evident log so a node can always prove details of its past interactions. The model (again, fully described in an earlier paper) provides two guarantees:

a) if any behavior is observable by at least one correct node, then the provenance of that behavior will be correct, and b) if a correct node detects a misbehavior, it can tell which part of the provenance is affected, and attribute the misbehavior to at least one faulty node. Our prototype shows that this model works well for BGP routing, Chord DHT, and Hadoop MapReduce applications.

To be able to answer why something good failed to happen, the graph can be further extended with negative vertices enabling counterfactual reasoning. The approach reminds me of Molly:

At a high level, negative provenance finds all possible ways in which a missing event could have occurred, and the reason why each of them did not happen . As a result, this enhancement creates a ‘negative twin’ for each vertex type (except for BELIEVE).

Here’s an example of a provenance graph tracing a missing HTTP request:

Differential provenance

By reasoning about the differences between the provenance trees for working and non-working instances, it becomes much easier to identify the root cause of problems. The team call this approach differential provenance.

One challenge in networks is that a small, initial difference can lead to wildly different network executions afterwards – for instance, a different routing decision may send packets down an entirely different path. Therefore, the differences between working and non-working provenance trees can be much larger than one might expect. Differential provenance addresses this with a counterfactual approach: it “rolls back” the network execution to a point where the provenance trees start to diverge; then, it changes the mismatched tuple(s) on the non-working provenance tree to the correct version, and the “rolls forward” the execution until the two trees are aligned.

Meta provenance

Software defined networking means that you can have bugs in software as well as in hardware! Thus you might want to include not just data but also code in your provenance graph. I think of the code as a first class part of provenance just like the data, but in ExSPAN the extension to also reason about program code goes under the name ‘meta provenance.’

Meta provenance has a set of meta tuples, which represent the syntactic elements of the program itself, and a set of meta rules, which describe the operational semantics of the programming language.

I guess the ‘meta’ prefix is here because these are rules that describe how the regular data tuples are generated / manipulated.

The use case for meta provenance that the authors give is automated network repair, where changes to the program and/or configuration data can be automatically explored to find what it takes to make a desired outcome become true. This of course requires your program to be expressed in a suitable language… “we have validated this approach by finding high-quality repairs for moderately complex SDN programs written in NDlog, Trema, and Pyretic.”

Provenance and privacy

Collecting provenance information while preserving privacy is still an open problem!

Using provenance to diagnose network problems that span multiple trust domains remains an open problem. Provenance is easier to collect within a single trust domain, e.g., in a centralized database or an enterprise network; but collecting and analyzing provenance while providing strong privacy guarantees seems to be a challenging problem, e.g., when performing diagnostics in a multi-tenant cloud.

Possible pieces of the puzzle may include differential privacy, secure multi-party computation, and/or trusted hardware platforms.

Ground: A data context service

January 23, 2017

Ground: A Data Context Service Hellerstein et al. , CIDR 2017

An unfortunate consequence of the disaggregated nature of contemporary data systems is the lack of a standard mechanism to assemble a collective understanding of the origin, scope, and usage of the data they manage.

Put more bluntly, many organisations have only a fuzzy picture at best of what data they have, where it came from, who uses it, and what results depend on it. These are all issues relating to metadata. And the metadata problem is now hitting a crisis point for two key reasons:

  1. Poor organisational productivity – valuable data is hard to find and human effort is routinely duplicated. Google’s GOODs and machine learning lineage systems [1], [2] that we looked at last year provide ample evidence of the productivity enhancements (near necessity) of metadata systems at scale.
  2. Governance risk. This is becoming an increasingly serious issue as regulators clamp down on the use and transfer of data.

Data management necessarily entails tracking or controlling who accesses data, what they do with it, where they put it, and how it gets consumed downstream. In the absence of a standard place to store metadata and answer these questions, it is impossible to enforce policies and/or audit behaviour.

With e.g., the forthcoming GDPR regulations, if you can’t do that and your data contains information about people, then you could be liable for fines of $20M or 4% of revenue, whichever is the higher.

Data context services represent an opportunity for database technology innovation, and an urgent requirement for the field.

Hellerstein et al. argue that in addition to the three V’s of big data, we should be worrying about the ‘ABCs’ of data context: Applications, Behaviour, and Change.

A is for Applications

Application context describes how data gets interpreted for use: this could include encodings, schemas, ontologies, statistical models and so on. The definition is broad:

All of the artifacts involved – wrangling scripts, view definitions, model parameters, training sets, etc., – are critical aspects of application context.

B is for Behaviour

Behavioural context captures how data is created and used over time. Note that it therefore spans multiple services, applications, and so on.

Not only must we track upstream lineage – the data sets and code that led to the creation of the data object – we must also track the downstream lineage, including data products derived from this data object.

In additional to lineage, also included in the behavioural context bucket are logs of usage. “As a result, behavioural context metadata can often be larger than the data itself!”

C is for Change

The change context includes the version history of data, code and associated information.

By tracking the version history of all objects spanning code, data, and entire analytics pipelines, we can simplify debugging and enable auditing and counterfactual analysis.

P is for People and Purpose?

Adding a ‘P’ rather messes up the nice ‘ABC’ scheme, but in relation to privacy regulations (e.g., the aforementioned GDPR), I think I would be tempted to also call out People and perhaps Purpose as top level parts of the model. (In Ground, these would be subsumed as a part of the application context). An organisation needs to to be able to track all information it has relating to a given person, and every system that processes that information. Moreover, under the GDPR, it will also need to track the purposes of use for which consent was given when the data was collected and be able to demonstrate that the data is not used outside of those purposes.

G is for Ground

The authors are building a data context service called Ground, aiming to be a single point of access for information about data and its usage. Ground has a shared metamodel used to describe application, behaviour and change contextual information. This metamodel is the ‘common ground’ on which everything comes together. ‘Above ground’ services build on top of this metadata (see examples in the figure below). ‘Below ground’ services are concerned with capturing and maintaining the information in the metamodel.

Information can get into Ground via both above ground and below ground services: via crawling (as in GOODs), via REST APIs, in batches via a message bus or indeed several other means. As metadata arrives, Ground publishes notifications which can be consumed by above ground services that may for example enrich it.

For example, an application can subscribe for file crawl events, hand off the files to an entity extraction system like OpenCalais or Alchemy API, and subsequently tag the corresponding Common Ground metadata objects with the extracted entities.

Ground’s storage engine must be flexible enough to efficiently store and retrieve rich metadata including version management information, model graphs, and lineage storage. Both metadata feature extraction and metadata storage are noted as active research areas.

Having ingested, enriched, and stored the metadata, we probably also ought to have some kind of search or query interface to make use of it. Metadata use cases will need combinations of search-style indexing (e.g., for querying over tags), support for analytical workloads (e.g., over usage aka exhaust data), and graph style queries supporting transitive closures etc.

… various open-source solutions can address these workloads at some level, but there is significant opportunity for research here.

G is also for Graph

Each kind of contextual data (Application, Behavioural, and Change) is captured by a different kind of graph. (Note that as with dependency-driven analytics that we looked at last week, graphs seem to be the most natural structure for the metadata layer that sits on top off all the data within an organisation. This is a major future scale-out use case for graph databases).

Application context is represented by model graphs which capture entities and the relationships between them.

Change context is represented by version graphs which form the base of the model, and capture versions of everything above them. There’s a cute definition of a “Schrödinger” version for items such as Google Docs whose metadata is managed outside of Ground – every time the metadata is accessed via Ground’s APIs, Ground fetches the external object and generates a new external version, thus ‘each time we observe an ExternalVersion it changes.’ Using this strategy Ground can track the history of an external object as it was perceived by Ground-enabled applications.

Behavioural context is represented by lineage graphs which relate principals (actors that can work with data such as users, groups and roles) and workflows which represent things that can be invoked to process data. Ground captures lineage as a relationship between versions. Usage data may often be generated by analyzing log files. We looked at a number of systems for extracting causal chains from log files in October of 2015 (e.g lprof, The Mystery Machine).

An example is given of the ‘Grit’ aboveground service which tracks git repositories, their commits and commit messages, the files within them and their versions, users and the actions they performed. Grit is a good indication of the breadth of vision for Ground in capturing all metadata relating to anything that touches data within an organisation.

Building Ground

An initial version of Ground, called Ground Zero has been built, mostly to serve as an exploratory platform to see how well suited today’s technology building blocks are to creating such a system. Trying to store the metamodel information in a relational database (in this case PostgreSQL), the team were unable to get query performance within an order of magnitude of graph processing systems. An in-memory graph was built using JGraphT on top of Cassandra (with the time to build the graph not included in the figures below), and performed better. Modelling with Neo4j was straightforward and querying was fast. Both JGraphT and Neo4j can only scale to a single node however. The TitanDB scale-out graph did not perform competitively in the analysis.

… there are no clear scalable open source “winners” in either log or graph processing. Leading log processing solutions like Splunk and SumoLogic are closed-source and the area is not well studied in research. [AC: what about the ELK stack?]. There are many research papers and active workshops on graph databases, but we found the leading systems lacking.

What next?

The Ground project is only just getting started. Future work includes:

  • further investigation (and possible design of something new) for data context data storage and query
  • integration of schema and entity extraction technologies up to and including knowledge base extraction with systems such as DeepDive
  • capturing exhaust from data-centric tools (wrangling and integration
  • using the usage relationships uncovered between users, applications and datasets to provide insights into the way an organisation functions
  • improving governance (see below)
  • and connecting commonly used modelling frameworks such as sci-kit learn and TensorFlow to Ground

Simple assurances like enforcing access control or auditing usage become extremely complex for organizations that deploy networks of complex software across multiple sites and sub-organizations. This is hard for well-intentioned organizations, and opaque for the broader community. Improvements to this state of practice would be welcome on all fronts. To begin, contextual information needs to be easy to capture in a common infrastructure. Ground is an effort to enable that beginning, but there is much more to be done in terms of capturing and authenticating sufficient data lineage for governance—whether in legacy or de novo systems.

Dependency-driven analytics: a compass for uncharted data oceans

January 20, 2017

Dependency-driven analytics: a compass for uncharted data oceans Mavlyutov et al. CIDR 2017

Like yesterday’s paper, today’s paper considers what to do when you simply have too much data to be able to process it all. Forget data lakes, we’re in data ocean territory now. This is a problem Microsoft faced with their large clusters of up to 50K nodes each producing petabytes of ‘exhaust’ logs daily. In those logs is all the information you need to be able to answer many interesting questions but getting at in poses two key problems:

  1. The log data is the collected output of many disparate systems. It’s hard for humans to understand the intricacies of all the different log formats to parse and analyse them. Plus, they keep on changing anyway!
  2. The computational costs for scanning, filtering, and joining the log data in its raw form are prohibitive.

If only there were some kind of cost-effective pre-processing we could do on the raw data, that would make (the majority of) most subsequent queries easier to express and much faster to execute…

Log entries relate to entities (machines, jobs, files and so on). Guider is the Microsoft system that extracts those entities and builds a graph structure with entities as nodes, and edges representing relationships between them.

The result is a declarative, directed and informed access to the logs that saves users hours of development time, reduces the computational cost of common analyses by orders of magnitude, and enables interactive exploration of the logs.

This idea of building a compact graph representation on top of the underlying raw data is coined by the authors ‘Dependency-Driven Analytics’ :

Dependency-Driven Analytics (DDA) is a new pattern in data analytics in which massive volumes of largely unstructured data are accessed through a compact and semantically-rich overlay structure: the dependency graph. The dependency graph fulfils two roles: it serves as 1) a conceptual map of the domain being modelled as well as 2) an index for some of the underlying raw data.

Where do graph structures really win over relational or other data structures? When you need to navigate lots of relationships. And this is precisely how they help in DDA use cases:

Instead of sifting through massive volumes of shapeless data, the user of a DDA system can navigate this conceptual map, and quickly correlate items which are very far apart in the data, but directly or indirectly connected through the dependency graph. Our experience indicates that this drastically lowers the cognitive cost of accessing the unstructured data.

Lets look at some examples of where Guider has been used in Microsoft to make this idea a little more concrete:

  • A global job ranking system that uses Guider to traverse the dependencies between jobs. The key query here is the “job failure blast radius” query which aims to calculate the number of CPU-hours of downstream jobs of some job j that will be impacted if j fails. With Guider this transitive query can be answered simply using a graph query language, but is very difficult to answer from the raw logs.
  • Finding the root cause of a job failure (which is quite similar in spirit to the blast radius query, only you need to seek upstream, not downstream).
  • The impressive Morpheus automatic SLO discovery and management framework that we look at last year using Guider to access the dependency graph as its main inference tool.
  • Planning a data center migration using Guider to create a projection of the graph at tenant level, where edges store the amount of reads/writes shared by two tenants.
  • An auditing and compliance system that operates over a DDA dependency graph and a list of compliance rules. This is a really important use case with all the forthcoming privacy regulations in GPDR, so I hope you’ll forgive me for an extended quotation here:

Current storage and access technologies guarantee data integrity and protect against unauthorized access, but cannot enforce legal regulations or business rules. A number of important regulations prevent data to flow across geographic regions, like European Union regulations prohibiting the copy of “personal data” beyond jurisdictional boundaries. There are also laws and business rules for data at rest, for example requirements to delete personal data identifying users access to online services after a given period of time (e.g., 18 months), or business agreements restricting 3rd-party data access to certain scenarios. Current data stores are unable to identify policy violations in that context (e.g., based on flexible business rules). At Microsoft, we have deployed a monitoring system leveraging DDA for that purpose.

Before the DDA system, auditing was manual and substantially more costly…

We’ve established therefore that traversing the graph of relationships between entities can be highly useful when understanding a big dataset. That should come as no surprise. The team would like to be able to run large scale graph queries over the resulting graph, but currently they are limited to loading and querying small subsets of the graph via an ETL process. Neo4j was the only graph engine Microsoft found that was robust enough to handle the workload, but is restricted to a single instance.

We considered several other graph technologies, including TitanDB, Spark GraphFrames, and Tinkerpop/Tinkergraph, but at the current state of stability/development they were not sufficiently capable for our setting.

None of the approaches, including the limited Neo4j solution, is really satisfactory (‘creating, storing and manipulating the graph at scale is a major technical issue’), and the authors are pretty clear about what they’d like to see here:

This experience indicates that industry is in desperate need for a fully capable, scale-out graph solution that can handle complex, large, and structured graphs, as well as a unified surface language for graph, relational and unstructured data handling.

If you’re interested in large-scale graph processing, we spent a couple of weeks looking at the topic back in May 2015 on The Morning Paper and continuing into June.

The research agenda for Guider has two main strands. The first is to find an optimized structure for storing and querying DDA graphs. In particular, it may be possible to take advantage of certain properties of the dependency graph that general purpose graph-engines cannot depend upon:

The fact that the structure we operate on is a causal, temporal graph allows us to prune all nodes/edges that “precede” the source node JobAi chronologically. This pruning can be pushed down to the storage of the graph (like we do in our time-partitioned DFS copy of the graph) to dramatically reduce IO. The fact that nodes and edges are typed can also be used for pruning, further reducing the cardinality of scans and joins. We advocate for a system that automatically derives such complex constraints (as part of the dependency definition process), and leverages them during query optimization.

The second strand of future research is to see if entities and their relationships can be automatically extracted from log entries, rather than the fragile manual log mining scripts the team depend on today. In a fast-moving ecosystem, these require constant maintenance. As as example, Hadoop alone has 11K log statements spread over 1.8M lines of code, 300kloc of which changed during 2015 alone. The proposed direction is to take advantage of information in the source code itself. Although the authors don’t cite the paper, the sketch they present is very similar to the approach used by lprof, so I’m going to refer you to my write-up of lprof’s log file and source code analysis rather than repeat it here.

Then there will just remain one final challenge : 😉

To further complicate this task, our next challenge is to achieve all this in near real-time as data items are produced across hundreds of thousands of machines. At Microsoft, we have built solutions to ingest logs at such a massive scale, but stream-oriented entity extraction and linking at this scale remains an open problem.

Prioritizing attention in fast data: principles and promise

January 19, 2017

Prioritizing attention in fast data: principles and promise Bailis et al., CIDR 2017

Today it’s two for the price of one as we get a life lesson in addition to a wonderfully thought-provoking piece of research.

I’m sure you’d all agree that we’re drowning in information – so much content being pumped out all of the time! (And hey, this blog has to take a share of the blame there too 🙂 ). Can you read all of the articles, keep up with all of the tweets, watch all of the latest YouTube videos, and so on?

Back in 1971, Herbert Simon wrote a piece entitled “Designing organizations for an information rich world” (they really had no idea just how much of an information flood was coming back then though!). In it Simon says:

In an information-rich world, the wealth of information means a dearth of something else: a scarcity of whatever it is that information consumes. What information consumes is rather obvious: it consumes the attention of its recipients.

We have a limited capacity for what we can actually pay attention to – only the tiniest portion of all the available information. You could try uniform random sampling, but a better strategy is to prioritize where you choose to spend your attention budget (why? – not all content has equal merit). Here’s a personal example – I watch very little tv (what I do watch is largely selected live sporting events) and don’t follow the news (I find very little is actionable, and the really important stuff has a way of creeping into my consciousness anyway). I used the freed-up attention budget to learn, and a lot of it goes into reading and writing up research papers.

Imagine that those streams of incoming tweets, articles, videos and so on are instead events streaming into a big data system. The volume and velocity of the incoming data is such that we have exactly the same problem: we can’t meaningfully pay attention to it all – we have to prioritise.

… we believe that the impending era of fast data will be marked by an overabundance of data and a relative scarcity of resources to process and interpret it.

This prioritisation has two stages: first we have to prioritize the attention of our machines (where we choose to spend our computational budget in processing the incoming data), and then we really have to prioritize what information gets flagged up to humans as requiring attention. Human ability to process information has sadly failed to follow Moore’s law ;). You end up with a picture that looks like this:

As an example, consider a web service generating millions of events per second. There’s a limit to how much processing you can do as those events fly by – if you fall behind you’re never going to catch up again. Strictly in fact, there are multiple limits (or budgets) – a response time budget, a computational resources budget, and a space budget. Let’s say we’re looking for anomalous patterns in the data – flagging too many of these to human operators is next to useless – they’ll just develop alert-blindness and the really important issues will go unaddressed.

The ‘fast data manifesto‘ the authors propose is based on three principles for designing fast data: prioritize output, prioritize iteration, and prioritize computation.

Prioritize Output: The design must deliver more information using less output. Each result provided to the user costs attention. A few general results are better than many specific results… Fast data systems can prioritize attention by producing summaries and aggregates that contextualize and highlight key behaviors within and across records; for example, instead of reporting all 128K problematic records produced by device 1337, the system can simply return the device ID and a count of records.

As another example of the same phenomenon, one of the companies I’m involved with, Atomist, greatly improved the utility of notifications in their engineering Slack channel by aggregating and contextualising notifications related to builds, deploys, and commits.

Prioritize Iteration: The design should allow iterative feedback-driven development. Give useful defaults, and make it easy to tune analysis pipelines and routines. It is slightly better to be flexible than perfectly precise.

The scarce resource we’re prioritizing the use of here is the human domain experts and the time it takes them to develop high-quality analyses. Iteration is key to this, so that shorter we can make the feedback-cycles, the more productive the team will be.

Prioritize Computation: The design must prioritize computation on inputs that most affect its output. The fastest way to compute is to avoid computation. Avoid computation on inputs that contribute less to output.

When you unpack “prioritizing computation on inputs that most affect outputs” you get to something pretty exciting. Most stream processing systems leave the design of complex analysis routines to their end users, and most ML and statistics methods have historically optimized for prediction quality.

As a result, the trade-offs between quality and speed in a given domain are often unexplored.

(Approximate Query Processing is one area of CS that has explicitly explored some of these trade-offs, also some asynchronous machine learning algorithms).

If you accept that not all inputs will contribute equally to the outputs, then you arrive at a conclusion that has a lot of similarities to the use of backpropagation in machine learning…

Fast data systems should start from the output and work backwards towards the input, doing as little work as needed on each piece of data, priortizing computation over data that matters most. Gluing together black-box functions that are unaware of the final output will miss critical opportunities for fast compute; if a component is unaware of the final output, it is likely to wast time and resources. By quickly identifying inputs that most contribute to the output, we can aggressively prune the amount of computation required.

(See also Weld that we looked at earlier this week for another example of the value of understanding the desired end output when optimising).


MacroBase is a fast data system built to explore the fast data principles above. We looked at an early version of MacroBase on The Morning Paper last March. MacroBase has since generalized from the single analytics workflow used as the basis for the initial exploration.

MacroBase provides a set of composable, streaming dataflow operators designed to prioritize attention. These operators perform tasks including feature extraction, supervised and unsupervised classification, explanation and summarization.

MacroBase is based on a dataflow model, and takes advantage of underlying dataflow engines instead of reinventing the wheel there.

Although MacroBase’s core computational primitives continue to evolve, we have found that expressing them via a dataflow computation model is useful. Many developers of recent specialized analytics engines expended considerable effort devising new techniques for scheduling, partitioning, and scaling their specialized analytics only to see most performance gains recouped (or even surpassed) by traditional dataflow-oriented engines.

When it comes to prioritizing output MacroBase allows users to highlight highlight fields of interest within inputs as either key performance metrics, or as important explanatory attributes. MacroBase then outputs a set of explanations regarding correlated behaviors within the selected metrics. There are some similarities here to DBSherlock that we looked at last year, although that starts with the user highlighting an anomalous region of interest.

Looking forward, we believe current practice in high-volume error explanation and diagnosis is especially ripe for improvement.

MacroBase supports fast iteration by allowing users to interact at three different levels. A point-and-click interface allows a user to specify a source from which to ingest data, identify the key attributes as above, and see explanations in the form of combinations of attributes correlated with abnormal metrics.

At the next level down, users can use Java to configure custom pipelines and incorporate domain-specific logic. Finally, at the lowest level of the system users can implement new analytics operators.

The prioritized computation pieces of MacroBase look to be especially exciting, though what we mostly get here is a teaser for information to be expounded in more depth in a forthcoming SIGMOD 2017 paper.

Bailis et al. found that the most accurate techniques were often too slow for MacroBase use cases – for example, an initial prototype explanation operator became too slow even at 100K point datasets. Drawing on their database systems pedigree, the authors are not afraid to challenge the status quo:

Applying classical systems techniques – including predicate pushdown, incremental memorization, partial materialization, cardinality estimation, approximate query processing, and shared scans — has accelerated common operations in MacroBase. Per the principle of prioritizing computation, by focusing on the end results of the computation, we can avoid a great deal of unnecessary work…. Stacked end-to-end, the several order-of-magnitude speedups we have achieved via the above techniques enable analysis over larger datasets (e.g., response times reduced from hours to seconds).

There’s plenty more interesting material in the paper itself, but space precludes me from covering it all. If this has piqued your interest at all, it’s well worth reading the full paper. I’ll leave you with the following concluding remark from the authors:

We view MacroBase as a concrete instantiation of a larger trend towards systems that not only train models but also enable end-to-end data product development and model deployment; solely addressing model training ignores both the end-to-end needs of users and opportunities for optimization throughout the analysis pipeline.

SnappyData: A unified cluster for streaming, transactions, and interactive analytics

January 18, 2017

SnappyData: A unified cluster for streaming, transactions, and interactive analytics Mozafari et al., CIDR 2017

Update: fixed broken paper link, thanks Zteve.

On Monday we looked at Weld which showed how to combine disparate data processing and analytic frameworks using a common underlying IR. Yesterday we looked at Peloton that adapts to mixed OLTP and OLAP workloads on the fly. Today it’s the turn of SnappyData (, which throws one more ingredient into the mix: what if we need OLTP + OLAP + Streaming (which let’s face it, many applications do). I should add a disclaimer today that several of the authors of this paper are friends and ex-colleagues from my days at Pivotal and VMware.

Here’s a motivating example: consider a real-time market surveillance system ingesting trade streams and detecting abusive trading patterns. We need to join the incoming trade streams with historical records, financial reference data, and other streams of information. Triggered alerts may result in further analytical queries. You can build systems like this by aggregating a stream processing engine, online data store, and an OLAP system but there are disadvantages to running multiple systems:

  • Significantly increased operational complexity
  • Inefficiencies and reduced performance caused by data movement and /or replication across clusters
  • It’s very hard to reason about overall consistency across systems when each has its own consistency guarantees and recovery models.

[SnappyData] aims to offer streaming, transaction processing, and interactive analytics in a single cluster, with better performance, fewer resources, and far less complexity than today’s solutions.

It does this by fusing together Apache Spark with Apache Geode (aka GemFire) – a distributed in-memory transactional store.

The Snappy Data programming model is a pure extension of the Spark API (also encompassing Spark SQL and Spark Streaming). The hybrid storage layer is primarily in-memory, and can manage data in row, column, or probabilistic stores. The column format is derived from Spark’s RDDs, the row-oriented tables from GemFire’s table structures (bringing support for indexing).

I’m particularly interested to see the probabilistic store for approximate query processing (AQP). My personal sense is that AQP is significantly under-utilised in practice when compared to its potential (we previously looked at e.g., ApproxHadoop, and BlinkDB is well worth learning about too. I think there’s even an old talk by me on probabilistic data structures kicking around on the internet somewhere!).

… SnappyData can also summarize data in probabilistic data structures, such as stratified samples and other forms of synopses. SnappyData’s query engine has built-in support for approximate query processing (AQP) which can exploit the probabilistic structures. This allows applications to trade accuracy for interactive-speed analytics on streams or massive datasets.

The supported probabilistic data structures include uniform samples, stratified samples, and sketches. Uniform sampling simply means that every element has an equal chance of being included in the sample set, with samples being chosen at random. Stratified sampling is the process of dividing a population into homogeneous subgroups (strata) before sampling. Per wikipedia, there are a couple of other considerations that need to be taken into account:

The strata should be mutually exclusive: every element in the population must be assigned to only one stratum. The strata should also be collectively exhaustive: no population element can be excluded

Sketches are wonderful data structures for maintaining approximate counts, membership tests, most frequent items and so on.

From these data structures, SnappyData uses known algorithms for error estimation when processing queries (see ‘The analytical bootstrap…‘ and ‘Knowing when you’re wrong…‘ – we should really look at those papers in more detail in future editions of The Morning Paper). The innovation is in the way that SnappyData builds and maintains the structures in the first place.

Unlike uniform samples, choosing which stratified samples to build is a non-trivial problem. The key question is which sets of columns to build a stratified sample on. Prior work has used skewness, popularity and storage cost as the criteria for choosing column-sets. SnappyData extends these criteria as follows: for any declared or foreign-key join, the join key is included in a stratified sample in at least one of the participating relations (tables or streams).

The WorkloadMiner tool, integrated into CliffGuard can be used to generate statistics to help users choose the right strata.

Tuples arriving in a stream are batched into groups of (by default) 1M tuples and samples for the each of the observed stratified columns are captured in memory. Once the batch is complete it can then be safely stored in a compressed columnar format and archived to disk if desired.

A quick word on tables

Tables follow GemFire’s lead and can be managed either in Java heap memory or off-heap. Tables can be co-partitioned (so that frequently joined tables are partitioned together) or replicated. A core use case for replicated tables is the support for star schemas with large (partitioned) fact tables related to multiple smaller dimension tables. The dimension tables can be replicated to optimise joins.

Behind the scenes, Spark’s column store is extended to support mutability:

Updating row tables is trivial. When records are written to column tables, they first arrive in a delta row buffer that is capable of high write rates and then age into a columnar form. The delta row buffer is merely a partitioned row table that uses the same partitioning strategy as its base column table. This buffer table is backed by a conflating queue that periodically empties itself as a new batch into the column table. Here, conflation means that consecutive updates to the same record result in only the final state getting transferred to the column store.

Performance evaluation

The main intended benefit of SnappyData is not pure performance, but a reduction in the operational complexity from not having to manage multiple disparate systems. However, we do get a comparison of SnappyData’s performance against Spark+Cassandra and Spark+MemSQL baselines. The test workload is based on advertising analytics, and includes streaming, transactional, and analytical components. The streaming component receives a stream of impression events, aggregates them by publisher and geographical region, and computes the average big, number of impressions, and number of uniques every few seconds. The analytical queries execute over the full history and return (1) top 20 ads by number of impressions for each geography, (2) top 20 ads receiving the most bids by geography, and (3) top 20 publishers receiving the most bids overall.

We can see how the systems fared in the results below. The analytical queries were run after ingesting 300M records.

SnappyData ingests data faster than either of the other two systems (the text says up to 2x, but the chart reads like 3x), and also processes transactions marginally faster, but it’s in the analytical queries where it really seems to shine.

Spark+Cassandra pays a heavy price for serializing and copying data to the Spark cluster for processing. The Spark+MemSQL connector does push as much of the query processing as possible down to MemSQL, giving it performance close to that of native MemSQL. Why does SnappyData perform so better than MemSQL on this particular use case then?

SnappyData embeds its column store alongside Spark executors,
providing by-reference data access to rows (instead of by-copy). SnappyData also ensures that each partition at the storage layer
uses its parent’s partitioning method. Thus, each update becomes a local write (i.e., no shuffles). When queried, SnappyData’s data is column-compressed and formatted in the same format as Spark’s, leading to significantly lower latencies.

Let’s finish with a reminder of the power of AQP. After ingesting 2 billion ad impressions approximate queries with an error tolerance of 0.05 were up to 7x faster than their exact SnappyData equivalents.

Self-driving database management systems

January 17, 2017

Self-driving database management systems Pavlo et al., CIDR 2017

We’ve previously seen many papers looking into how distributed and database systems technologies can support machine learning workloads. Today’s paper choice explores what happens when you do it the other way round – i.e., embed machine learning into a DBMS in order to continuously optimise its runtime performance. In the spirit of CIDR’s innovation charter it’s a directional paper with only preliminary results – but no less interesting for that. I find that format very stimulating in this case because it encourages the reader to engage with the material and ask a lot of “what if?” and “have you thought about?” questions that don’t yet have answers in the evaluation.

Tuning a modern DBMS has become a black art – it requires DBAs who are expert in the internals of the system, and often the use of tools that analyse the effects of possible changes on offline copies of the database using workload traces.

In this paper, we make the case that self-driving database systems are now achievable. We begin by discussing the key challenges with such a system. We then present the architecture of Peloton, the first DBMS that is designed for autonomous operation. We conclude with some initial results on using Peloton’s deep learning framework for workload forecasting and action deployment.

You can find PelotonDB online at

What should/could a self-driving DBMS do?

At the core of the job is the need to understand an application’s workload type. For example, is it mostly an OLTP or OLAP application? OLTP workloads are best served by row-oriented layouts, and OLAP by column-oriented. HTAP (hybrid transaction-analytical processing) systems combine both within one DBMS. A self-driving DBMS should be able to automatically determine the appropriate OLTP or OLAP optimisations for different database segments.

A self-driving DBMS must also have a forecasting model that lets it predict what is about to happen, and thus prepare itself to best meet anticipated demand. The possible actions a self-driving database might take are shown in the following table. Each action has a cost to implement (change the database), and a potential benefit. These costs and benefits may themselves need to be part of some estimation model.

Finally, it must be possible for the DBMS to implement its plans of action efficiently without incurring large overheads. For example,…

… if the system is only able to apply changes once a week, then it is too difficult for it to plan how to correct itself. Hence, what is needed is a flexible, in-memory DBMS architecture that can incrementally apply optimisations with no perceptible impact to the applications during their deployment.

Peloton system architecture

To get the degree of fine-grained control over the DBMS runtime required, and to avoid e.g. the need to restart the system in order to effect changes, the authors decided to build their own DBMS. Two key design points are the use of a multi-version concurrency control mechanism than can interleave OLTP in amongst OLAP queries without blocking them, and an in-memory store manager that allows for fast execution of HTAP workloads. (The preliminary results so far are all centred around switching between row and column-oriented table layouts at runtime).

What we’re all here for though, is to understand how the ‘self-driving’ pieces fit into the architecture:

There are three major components:

  1. Workload classification
  2. Workload forecasting
  3. An action planning and execution engine

The workload classification component uses clustering to group queries into clusters. These clusters then become the units of forecasting. Features for clustering can include both features of the query itself, and also runtime metrics from query execution. The former have the advantage that they are stable across different optimisations which may (we hope!) impact the runtime metrics. Peleton currently uses the DBSCAN algorithm for query clustering.

The workload forecasting component contains models to predict the arrival rate of queries for each workload cluster.

After the DBMS executes a query, it tags each query with its cluster identifier and then populates a histogram that tracks the number of queries that arrive per cluster within a time period. Peloton uses this data to train the forecast models that estimate the number of queries per cluster that the application will execute in the future. The DBMS also constructs similar models for the other DBMS/OS metrics in the event stream.

Obviously we need to use deep learning for this ;). I know that you can do time series analysis with LSTMs, but I don’t know enough to understand when that approach makes sense vs more classical methods for separating periodic and underlying trends. See e.g., ‘time series decomposition‘ or even good old DFT. I bet some of The Morning Paper readers know though: if that’s you, please leave a comment to enlighten the rest of us! What I do know, is that the Peloton team chose to address the problem of learning the periodicity and repeating trends in a time-series using LSTMs. Let’s just roll with it for now…

To deal with periodicity at different levels of granularity (e.g. hourly, daily, weekly, monthly, quarterly, yearly) the authors train multiple models, each at different time horizons / interval granularities. It strikes me that fits very well with models for aggregating time-series data in blocks whereby we maintain e.g. by the minute data for several hours, and then this gets rolled up into hourly aggregates at the next level, which get rolled up into daily aggregates, and so on. The further back in time you go, the more coarse-grained the data that is retained.

Combining multiple RNNs (LSTMs) allows the DBMS to handle immediate problems where accuracy is more important as well as to accommodate longer term planning where the estimates can be broad.

The control framework continuously monitors the system performance and plans and executes optimisations to improve performance. It’s like a classical AI agent that observes its environment and makes plans to meet its goals by selecting out of a set of possible actions at each time step. This is also the domain of reinforcement learning therefore, and if we choose to go deep, of architectures such as DQNs. Peloton maintains a catalog of actions available to it, together with a history of what happened when it invoked them in the past.

[From the set of available actions] the DBMS chooses which one to deploy based on its forecasts, the current database configuration, and the objective function (latency minimisation). Control theory offers an effective methodology for tackling this problem. One particular approach, known as the receding-horizon control model (RHCM) is used to manage complex systems like self-driving cars.

The basic idea is quite straightforward. At each time step the agent looks ahead for some finite time horizon using its forecasts, and searches for a sequence of actions to minimise the objective function. It then takes only the first action in that sequence, and waits for this deployment to complete before re-planning what to do next (c.f., STRAW which we looked at a couple of weeks ago, that learns when to stick with multi-action plans, and when to replan).

Under RHCM, the planning process is modelled as a tree where each level contains every action that the DBMS can invoke at that moment. The system explores the tree by estimating the cost-benefit of actions and chooses an action sequence with the best outcome.

Forecasts are weighted by their time horizon so that nearer-in forecasts have greater weight in the cost-benefit analysis.

Initial results

The authors integrated TensorFlow inside Peleton and trained two RNNs on 52 million queries from one month of traffic for a popular site. The first model predicts the number of queries arriving over the next hour at one-minute granularity. It takes as input a 120-element vector with the arrival rates for the past two hours, and outputs a 60-element vector with predictions for the next hour. The second model uses a 24-hour horizon with one-hour granularity. It takes as input an 24-element vector for the previous day, and outputs a scalar prediction for the next day.

Using these models, we then enable the data optimization actions in Peloton where it migrates tables to different layouts based on the type of queries that access them…

The RNNs produce accurate forecasts as can be seen below:

Using these forecasts, the optimiser is able to reduce overall latency by switching layouts between daytime OLTP and night time OLAP queries.

These early results are promising: (1) RNNs accurately predict the expected arrival rate of queries. (2) hardware-accelerated training has a minor impact on the DBMS’s CPU and memory resources, and (3) the system deploys actions without slowing down the application. The next step is to validate our approach using more diverse database workloads and support for additional actions.

Machine learning for system optimisation

It’s not just tuning of DBMSs that has become a black art. One of the companies I’m involved with, Skipjaq, has been studying the opportunities to reduce latencies through automated tuning of cloud instance type, OS, JVM, and framework settings. Optimising the settings across all of those variables for a given workload is also a black art – and one that most teams simply don’t have the resources or skills to undertake. Skipjaq uses machine learning techniques to perform the optimisation and the results with customer workloads so far show that there are significant performance gains to be had for nearly all applications as a result of the process. You can find out more and even give it a test drive at Tell them I sent you ;).

Weld: A common runtime for high performance data analytics

January 16, 2017

Weld: A common runtime for high performance data analytics Palkar et al. CIDR 2017

This is the first in a series of posts looking at papers from CIDR 2017. See yesterday’s post for my conference overview.

We have a proliferation of data and analytics libraries and frameworks – for example, Spark, TensorFlow, MxNet, Numpy, Pandas, and so on. As modern applications increasingly embrace machine learning and graph analytics in addition to relational processing, we also see diverse mixes of frameworks within an application.

Unfortunately, this increased diversity has also made it harder to achieve high performance. In the past, an application could push all of the data processing work to an RDBMS, which would optimize the entire application. Today, in contrast, no single system understands the whole application.

Each individual framework may optimise its individual operations, but data movement across the functions can dominate the execution time. This is the case even when all of the movement is done within memory. Moreover, as we’ll see, the function boundaries prevent important optimisations that could otherwise fuse loops and operators.

The cost of data movement through memory is already too high to ignore in current workloads, and will likely get worse with the growing gap between processor and memory speeds. The traditional method of composing libraries, through functions that pass data to in-memory data, will be unacceptably slow.

So what are our options? One approach would be to design a single data processing system that encompasses all modes and algorithms, and heavily optimise within it. That has a couple of drawbacks: firstly, the diversity and rate and pace of development of data processing makes it very hard to keep up – trying to integrate all new developments into a single framework will be slower to market at best, and likely quite inefficient. Secondly, the one-size-fits-all universal framework, at least when it comes to extracting the best performance, is probably a fool’s errand as one of my favourite papers from 2015, Musketeer (part 1) demonstrated.

If we’re going to continue to use a variety of tools and frameworks then we’ll need another approach. Musketeer (part 2) gives us a hint: it defines a DAG-based intermediate representation (IR) that sits above any one store and can map a computation into the most appropriate runtime. Weld also defines an IR, but this time situates it underneath existing libraries and frameworks – application developers continue to use the familiar framework APIs. Weld integrations in the frameworks map the operations to Weld IR, and then optimisations can be applied at the IR level. The IR is designed in such a way that cross-function (and hence also cross-framework) optimisations can be automatically applied.

The benefits of IR and its optimisations turn out to be significant even when using a single framework and only a single core.

… we have integrated Weld into Spark SQL, NumPy, Pandas, and TensorFlow. Without changing the user APIs of these frameworks, Weld provides speedups of up to 6x for Spark SQL, 32x for TensorFlow, and 30x for Pandas. Moreover, in applications that combine these libraries, cross-library optimization enables speedups of up to 31x.

Needless to say, a 31x speed-up is a very significant result!

High level overview

Weld provides a runtime API that allows libraries to implement parts of their computation as Weld IR fragments. Integration of Weld itself into TensorFlow, Spark SQL, Pandas, and NumPy required on the order of a few days effort and about 500 loc per framework. From that point, each individual operator ported to Weld (e.g., a Weld implementation of numpy.sum) required about 50-100 lines of code. As an example, Spark SQL already generates Java code, so all that had to be done there was to provide an alternative implementation to generated Weld.

As applications call Weld-enabled functions, the system builds a DAG of such fragments, but only executes them lazily when libraries force an evaluation. This lets the system optimize across different fragments.

Optimisations are done as Weld transformations (Weld -> Weld). A compiler backend maps the final, combined Weld IR program to efficient multi-threaded code. This backend is implemented using LLVM.

Because the Weld IR is explicitly parallel, our backend automatically implements multithreading and vectorization using Intel AVX2.

The Weld IR

The design of Weld IR dictates what workloads can run on Weld, and what optimizations can easily be applied. It supports composition and nesting so that fragments from multiple libraries can easily be combined, and explicit parallelism in its operators.

Our IR is based on parallel loops and a construct for merging results called “builders.” Parallel loops can be nested arbitrarily, which allows complex composition of functions. Within these loops, the program can update various types of builders, which are declarative data types for constructing the results in parallel (e.g. computing a sum or adding items to a list). Multiple builders can be updated in the same loop, making it easy to express optimizations such as loop fusion or tiling, which change the order of execution but produce the same result.

Here are some very basic examples of using builders to give a feel for the IR:

Weld’s loops and builders can be used to implement a wide range of programming abstractions – all of the functional operators in systems like Spark for example, as well as all of the physical operators needed for relational algebra.

As a small worked example, consider this Python code using both Pandas and Numpy:

We need Weld implementations of  > for Pandas DataFrames, and sum for NumPy, which are trivially expressed as:

With this is place Weld can save one whole data scan and fuse the Pandas and NumPy loops into one. The optimised Weld program looks like this:

One limitation of the current version of the IR is that it is fully deterministic, so it cannot express asynchronous algorithms where threads race to update a result, such as Hogwild! We plan to investigate adding such primitives in a future version of the IR.

Experimental results

To test the raw speed of the code Weld generates, Weld implementations of a TPC-H workload for SQL, PageRank for graph analytics, and Word2Vec for ML were hand-written and compared against both a hand-optimized C++ version, and also an existing high-performance framework. As an example, here are the TPC-H results for four queries:

The result show that…

Weld can generate parallel code competitive with state-of-the-art systems across at least three domains.

The next stage of the evaluation was to see what benefits Weld can bring when used just with a single framework. Here are the results of using Weld with (a) TensorFlow, (b) Spark SQL, and(c) Pandas:

The TensorFlow application is a binary logistic classifier trained on MNIST, the Spark SQL example is running TPC-H queries, and the Pandas example is based on an online data science tutorial for cleaning a dataset of zipcodes.

Weld’s runtime API also enables a substantial optimization across libraries. We illustrate this using a Spark SQL query that calls a User-Defined Function (UDF) written in Scala, as well as a Python data science workload that combines Pandas and NumPy.

On the Spark Scala workload we see a 14x speedup. On the data science workload we see a 31x speedup even on a single core….

With multiple cores, we see a further 8x speedup over single-threaded Weld execution, for an overall 250x speedup over the Pandas baseline.

Future work for Weld includes extending the set of supported back-ends to target GPUs and FPGAs in addition to CPUs. “In many domains, these platforms have become essential for performance…”