MillWheel: Fault-Tolerant Stream Processing at Internet Scale

MillWheel: Fault-Tolerant Stream Processing at Internet Scale – Akidau et al. (Google) 2013

Earlier this week we looked at the Google Cloud Dataflow model which is implemented on top of FlumeJava (for batch) and MillWheel (for streaming):

We have implemented this model internally in FlumeJava, with MillWheel used as the underlying execution engine for streaming mode; additionally, an external reimplementation for Cloud Dataflow is largely complete at the time of writing.

Direct use of MillWheel is being replaced by use of the Cloud Dataflow abstractions which are better equiped to deal with event-time based windowing and unbounded data streams:

MillWheel and Spark Streaming are both sufficiently scalable, fault-tolerant, and low-latency to act as reasonable substrates, but lack high-level programming models that make calculating event-time sessions straightforward… we believe a major shortcoming of all the models and systems mentioned above (with exception given to CEDR and Trill), is that they focus on input data (unbounded or otherwise) as something which will at some point become complete.

Nevertheless, many parts of MillWheel’s design remain influential and the MillWheel paper is worth studying. MillWheel seeks to do for streaming what MapReduce did for batch, enabling users to depend on framework-level correctness and fault-tolerance and thus greatly simplify application development:

MillWheel is such a programming model, tailored specifically to streaming, low-latency systems. Users write application logic as individual nodes in a directed compute graph, for which they can define an arbitrary, dynamic topology. Records are delivered continuously along edges in the graph. MillWheel provides fault tolerance at the framework level, where any node or any edge in the topology can fail at any time without affecting the correctness of the result. As part of this fault tolerance, every record in the system is guaranteed to be delivered to its consumers. Furthermore, the API that MillWheel provides for record processing handles each record in an idempotent fashion, making record delivery occur exactly once from the user’s perspective. MillWheel checkpoints its progress at fine granularity, eliminating any need to buffer pending data at external senders for long periods between checkpoints.

MillWheel offers persistent storage, watermarking, and exactly-once delivery of messages. Many ‘revenue-processing customers’ depend on this. MillWheel uses the familiar data flow graph model, and each computation (node processing) can be parallelized across an arbitrary number of machines.

Abstractly, inputs and outputs in MillWheel are represented by (key, value, timestamp) triples. While the key is a metadata field with semantic meaning in the system, the value can be an arbitrary byte string, corresponding to the entire record. The context in which user code runs is scoped to a specific key, and each computation can define the keying for each input source, depending on its logical needs… The timestamps in these triples can be assigned an arbitrary value by the MillWheel user (but they are typically close to wall clock time when the event occurred), and MillWheel will calculate low watermarks according to these values.

(Recall the Google Cloud Dataflow expands this triple to a (key, value, event-time, window) 4-tuple).

The data flow graph can be dynamically manipulated (adding and removing computations) and record processing in idempotent within the bounds of the framework API. Computations operate in the context of a single key, based on a user-specified key extraction function.

Computation code is invoked upon receipt of input data, at which point user-defined actions are triggered, including contacting external systems, manipulating other MillWheel primitives, or outputting data. If external systems are contacted, it is up to the user to ensure that the effects of their code on these systems is idempotent.

Access to persistent state is available on a per-key basis. This is opaque to MillWheel and backed by a replicated highly available data store (Bigtable, Spanner). “Common uses of state include counters aggregated over windows of records and buffered data for a join.”

Low watermarks provide a bound on the timestamps of future records arriving at a computation. The low watermark at a computation C is the lowest timestamp of any unfinished (in-flight, stored or pending-delivery) message at C or upstream of C.

Low watermark values are seeded by injectors, which send data into MillWheel from external systems. Measurement of pending work in external systems is often an estimate, so in practice, computations should expect a small rate of late records – records behind the low watermark – from such systems. Zeitgeist [a search-query analysis pipeline] deals with this by dropping such data, while keeping track of how much data was dropped (empirically around 0.001% of records). Other pipelines retroactively correct their aggregates if late records arrive. Though this is not reflected in the above definition, the system guarantees that a computation’s low watermark is monotonic even in the face of late data.

Different types of injectors may have different strategies for injecting low watermark values. For example, a log file ingestor could publish a low watermark value that corresponds to the minimum file creation time among its unfinished files.

In practice, library implementations exist for common input types at Google (log files, pubsub service feeds, etc.), such that normal users do not need to write their own injectors. If an injector violates the low watermark semantics and sends a late record behind the low watermark, the user’s application code chooses whether to discard the record or incorporate it into an update of an existing aggregate.

For data consistency, a global view of the low watermark is needed (cf. GVT in the Virtual Time paper):

In order to ensure data consistency, low watermarks must be implemented as a sub-system that is globally available and correct. We have implemented this as a central authority (similar to OOP [19]), which tracks all low watermark values in the system and journals them to persistent state, preventing the reporting of erroneous values in cases of process failure… For scalability, the authority can be sharded across multiple machines, with one or more computations on each worker. Empirically, this can scale to 500,000 key intervals with no loss in performance.

Exactly-once-delivery is achieved as follows:

  • An incoming record is checked against deduplication data from previous deliveries and duplicates are discarded.
  • The user computation is run, which may result in pending changes to timers, state, and productions.
  • Pending changes are committed to the backing store.
  • Senders are ACKed.
  • Pending downstream productions are sent.

The system assigns unique IDs to all records at production time. We identify duplicate records by including this unique ID for the record in the same atomic write as the state modification…. Deliveries in MillWheel are retried until they are ACKed in order to meet our at-least-once requirement, which is a prerequisite for exactly-once.

Deduplication is made more efficient by the use of Bloom filters:

Since we cannot necessarily store all duplication data in-memory, we maintain a Bloom filter of known record fingerprints, to provide a fast path for records that we have provably never seen before.

The pending changes committed to the backing store (including productions) are held in a system ‘such as Bigtable.’ :

We use a storage system such as Bigtable, which efficiently implements blind writes (as opposed to read-modify-write operations), making checkpoints mimic the behavior of a log. When a process restarts, the checkpoints are scanned into memory and replayed. Checkpoint data is deleted once these productions are successful.

Checkpointing can be disabled by the user at their discretion. This is described as the weak production model (vs. the strong production model with guarantees).

For weak productions, rather than checkpointing record productions before delivery, we broadcast downstream deliveries optimistically, prior to persisting state. Empirically, this introduces a new problem, in that the completion times of consecutive stages of the pipeline are now strictly coupled as they wait for downstream ACKs of records… We ameliorate this by checkpointing a small percentage of straggler pending productions, allowing those stages to ACK their senders. By selectively checkpointing in this way, we can both improve end-to-end latency and reduce overall resource consumption.

Given that work may shift between machines, a sequencer token is attached to each write. This token is checked by the mediator of the backing store before a write can go ahead. New workers invalidate any extant sequencers before starting work.

MillWheel is designed to produce low-latency results. This is evaluated using a single-stage pipeline that buckets and sorts numbers. With weak productions and exactly-once delivery disabled, 95th percentile latency is 30ms. With strong productions and exactly once delivery it jumps to 94ms.

Finally, the MillWheel model is sensitive to key distribution and to the progress of low watermarks:

As a distributed system, MillWheel does not perform well on problems that are not easily parallelized between different keys. If 90% of a pipeline’s traffic is assigned a single key, then one machine must handle 90% of the overall system load for that stream, which is clearly inadvisable. Computation authors are advised to avoid keys that are high-traffic enough to bottleneck on a single machine (such as a customer’s language or user-agent string), or build a two-phase aggregator. If a computation is performing an aggregation based on low watermark timers, MillWheel’s performance degrades if data delays hold back low watermarks for large amounts of time.