# The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing

With thanks to William Vambenepe for suggesting this paper via twitter.

Google Cloud Dataflow reached GA last week, and the team behind Cloud Dataflow have a paper accepted at VLDB’15 and available online. The lead author, Tyler Akidau, has also written a very readable overview of the streaming domain over at O’Reilly which is a good accompaniment to this paper, “The world beyond batch: Streaming 101.”

Akidau et al. set out a strong manifesto for modern data processing, based on the notion of accepting uncertainty and incompleteness:

We propose that a fundamental shift of approach is necessary to deal with these evolved requirements in modern data processing. We as a field must stop trying to groom unbounded datasets into finite pools of information that eventually become complete, and instead live and breathe under the assumption that we will never know if or when we have seen all of our data, only that new data will arrive, old data may be retracted, and the only way to make this problem tractable is via principled abstractions that allow the practitioner the choice of appropriate tradeoffs along the axes of interest: correctness, latency, and cost.

Three beliefs and five principles shape the work on Dataflow:

• Expecting data to become complete is a fundamentally flawed approach:

We believe a major shortcoming of all the models and systems mentioned above (with exception given to CEDR and Trill), is that they focus on input data (unbounded or otherwise) as something which will at some point become complete. We believe this approach is fundamentally flawed when the realities of today’s enormous, highly disordered datasets clash with the semantics and timeliness demanded by consumers.

We also believe that any approach that is to have broad practical value across such a diverse and varied set of use cases as those that exist today (not to mention those lingering on the horizon) must provide simple, but powerful, tools for balancing the amount of correctness, latency, and cost appropriate for the specific use case at hand.

• Execution engines should not dictate system semantics:

Lastly, we believe it is time to move beyond the prevailing mindset of an execution engine dictating system semantics; properly designed and built batch, micro-batch, and streaming systems can all provide equal levels of correctness, and all three see widespread use in unbounded data processing today. Abstracted away beneath a model of sufficient generality and flexibility, we believe the choice of execution engine can become one based solely on the practical underlying differences between them: those of latency and resource cost.

And the five key principles are:

1. Never rely on any notion of completeness
2. Be flexible, to accommodate the diversity of known use cases, and those to come in the future
3. Not only make sense, but also add value, in the context of each of the envisioned execution engines
4. Encourage clarity of implementation
5. Support robust analysis of data in the context in which they occurred

The Dataflow model arose out of real-world experiences building streaming systems with FlumeJava and MillWheel at Google, including:

• log joining pipelines
• billing pipelines
• aggregate statistics calculations
• abuse detection pipelines
• recommendation generation
• anomaly detection

Rather than talk about streaming (a term which has become bound up with a certain style of execution engine), the authors prefer to talk about processing of unbounded data.

The future of data processing is unbounded data. Though
bounded data will always have an important and useful place, it is semantically subsumed by its unbounded counterpart. Furthermore, the proliferation of unbounded data sets across modern business is staggering. At the same time, consumers of processed data grow savvier by the day, demanding powerful constructs like event-time ordering and unaligned windows. The models and systems that exist today serve as an excellent foundation on which to build the data processing tools of tomorrow, but we firmly believe that a shift in overall mindset is necessary to enable those tools to comprehensively address the needs of consumers of unbounded data.

Enough with the rhetoric! Let’s take a look at the Dataflow Model itself. First we need to define a few background terms (see the aforementioned O’Reilly article for a more extensive treatment):

• Datasets may be bounded or unbounded.
• Windows are a way of slicing up datasets into finite chunks. There are many windowing approaches. For example fixed windows (e.g. hourly, daily); sliding windows defined by a period (e.g. every minute) and size (e.g. an hour); and sessions – windows that capture some period of activity over a subset of the data, typically defined by a timeout gap.
• It is important to consider both event time and processing time. Event time is the time at which the event itself actually occurred. Processing time is the time at which an event is observed at any given point during processing within the pipeline.

In the ideal world, event time and processing time would be the same, but unfortunately this is not the case in real-world systems:

During processing, the realities of the systems in use (communication delays, scheduling algorithms, time spent processing, pipeline serialization, etc.) result in an inherent and dynamically changing amount of skew between the two domains. Global progress metrics, such as punctuations or watermarks, provide a good way to visualize this skew. For our purposes, we’ll consider something like MillWheel’s watermark, which is a lower bound (often heuristically established) on event times that have been processed by the pipeline.

Here’s how this is illustrated in Akidau’s O’Reilly article:

https://d3ansictanv2wj.cloudfront.net/post01_fig01_timedomains-ea86183a3a9d99179a6e6f5c521ffdbf.jpg

The Dataflow model can work with unbounded, unordered data sources. It enables separation of a pipline across four dimensions: what results are being computed; where in event time they are being computed; when in processing time they are materialized; and how earlier results relate to later refinements (embracing the notion that we must give up on ‘completeness’). Dataflow also “Separates the logical notion of data processing from the underlying physical implementation, allowing the choice of batch, micro-batch, or streaming engine to become one of simply correctness, latency, and cost.” This latter goal is very reminiscent of the excellent work on Musketeer (part 1, part 2) by the Cambridge Systems at Scale team.

At the core of the Dataflow model are the ParDo and GroupByKey operations. ParDo is for parallel processing in which each input element is passed to a user-defined function that can produce zero or more output elements. GroupByKey groups key-value pairs. Since it needs to collect ‘all’ data for a given key before sending the reduction downstream, it needs a way to know when to end a group. For this purpose Dataflow supports windowing.

Our primary contribution here is support for unaligned windows, for which there are two key insights. The first is that it is simpler to treat all windowing strategies as unaligned from the perspective of the model, and allow underlying implementations to apply optimizations relevant to the aligned cases where applicable. The second is that windowing can be broken apart into two related operations: AssignWindows assigns an element to zero or more windows, and MergeWindows merges windows at grouping time.

(An unaligned window is one which applies only across subsets of the data – for example, per key – whereas an aligned window applies across all data for the window of time in question).

To support event-time windowing natively, key-value pairs are passed as 4-tuples: (key, value, event_time, window).

Since windows are associated directly with the elements to which they belong, this means window assignment can happen anywhere in the pipeline be- fore grouping is applied. This is important, as the grouping operation may be buried somewhere downstream inside a composite transformation (e.g. Sum.integersPerKey()).

Window merging (which happens when grouping) is a five-part operation:

1. Timestamps are dropped since only the window is relevant from this point.
2. (value, window) tuples are grouped by key
3. The set of currently buffered windows for a key are merged (the merge logic being defined by the windowing strategy). For example, a sessions windowing strategy may merge windows into a new, larger session.
4. Post-merging, for each resulting window the values are grouped by key
5. Each per-key, per-window group of values is expanded into (key, value, event_time, window) where event_time is a new per-window timestamp – for example, the end of the window.

Here’s an example of calculating keyed integer sums with 30-minute session windows:

    PCollection<KV> input = IO.read(...);
PCollection<KV> output = input
.apply(Window.into(Sessions.withGapDuration(
Duration.standardMinutes(30))))
.apply(Sum.integersPerKey());


We still need a way of knowing when to emit results for a window. Watermarks are a good approximation, but they can never be more than that. Dataflow introduces triggers for this purpose. Windowing tells us where in event time data are grouped together for processing, and triggers determine when in processing time the results of groupings are emitted as panes.

Our systems provide predefined trigger implementations for triggering at completion estimates (e.g. watermarks, including percentile watermarks, which provide useful semantics for dealing with stragglers in both batch and streaming execution engines when you care more about processing a minimum percentage of the input data quickly than processing every last piece of it), at points in processing time, and in response to data arriving (counts, bytes, data punctuations, pattern matching, etc.). We also support composing triggers into logical combinations (and, or, etc.), loops, sequences, and other such constructions. In addition, users may define their own triggers utilizing both the underlying primitives of the execution runtime (e.g. watermark timers, processing- time timers, data arrival, composition support) and any other relevant external signals (data injection requests, external progress metrics, RPC completion callbacks, etc.).

If you can have multiple panes for a window, then we also need to know how those panes relate to each other. Dataflow provides three different modes for this: discarding simply throws away any previous results for a window upon triggering; accumulating leaves window contacts intact in a persistent state, and later results become a refinement of previous results; accumulating-and-retracting behaves as the accumulating case, but also issues a retraction for the previous value before emitting the new one.

Retractions are necessary in pipelines with multiple serial GroupByKeyAndWindow operations, since the multiple results generated by a single window over subsequent trigger fires may end up on separate keys when grouped down- stream. In that case, the second grouping operation will generate incorrect results for those keys unless it is informed via a retraction that the effects of the original output should be reversed.

Section 2.4 of the paper gives a series of examples, with very clear figures illustrating the windows and panes that result. It’s too much to repeat here, but well worth checking out in the original paper (link at the top of this post).