Spinning Fast Iterative Dataflows

Spinning Fast Iterative Dataflows – Ewen et al. 2012

Last week we saw how Naiad combines low-latency stream processing with iterative computation, and yesterday we looked in more detail at the Differential Dataflow model for incremental processing (needed for low-latency). The Apache Flink project also combines low-latency stream processing with support for incremental, iterative computation. Today’s paper choice is ‘Spinning Fast Iterative Dataflows’ which is based on the academic work in Stratosphere that led to Flink, and describes the mechanism Flink uses to embed iterative computation in the dataflow. It’s also a great read if you just want a general introduction to the space as the explanations are very clear.

While dataflow systems were originally built for tasks like indexing, filtering, transforming, or aggregating data, their simple interface and powerful abstraction have made them popular for other kinds of applications, like machine learning or graph analysis. Many of these algorithms are of iterative or recursive nature, repeating some computation until a condition is fulfilled. Naturally, these tasks pose a challenge to dataflow systems, as the flow of data is no longer acyclic.

Frameworks such as Spark can efficiently implement a certain class of iterative algorithms that use bulk iteration. In this model, each iteration produces a completely new partial solution from the previous iteration’s partial result. In incremental iteration though, an iteration’s result differs only partially from the previous iteration’s.

Sparse computational dependencies exist between the elements in the partial solution: an update on one element has a direct impact only on a small number of other elements, such that different parts of the solution may converge at different speeds. An example is the Connected Components algorithm, where a change in a vertex’s component membership directly influences only the membership of its neighbors.

Frameworks that only support bulk iteration have to treat incremental iteration as if it were bulk iterative, and hence can be drastically outperformed by specialized systems. Iterative systems on the other hand, can easily support bulk mode.

Fixpoints, Incremental iteration, and the Microstep

A fixpoint iteration applies a step function until its result no longer changes:

while s != f(s) do
  s = f(s);
done

(And for continuous domains, you can check whether the solution stays the same within some tolerance).

Fixpoint iterations are guaranteed to converge if it is possible to define a complete partial order (CPO) ≼ for the data type of s, with a bottom element ⊥. Furthermore, the step function f must guarantee the production of a successor to s when applied: ∀s : f(s) ≽ s. The existence of a supremum and the guaranteed progress towards the supremum result in eventual termination.

For many fixpoint algorithms the partial solution s is a set of data points, and instead of fully computing s on each iteration, the prior s is updated simply by adding or updating some of the data points. If a change to a data point in one iteration (e.g. a vertex) affects only a few other data points in the next iteration (e.g. its neighbours) then we have sparse computational dependencies.

To support such algorithms efficiently, we can express an iteration using two distinct functions u and δ, instead of a single step function f… The δ function computes the working set w = δ(s, f(s)), which is conceptually the set of (candidate) updates that, when applied to s, produce the next partial solution. The update function u combines w with s to build the next partial solution: f(s) = u(s,w). Because the evaluation of f(s) is what we seek to avoid, we vary this pattern to evaluate δ on si and wi to compute the next working set wi+1.

while !w.isEmpty() do
  w' = δ(s,w)
  s = u(s,w)
  w = w'
done

Call each iteration of the incremental algorithm a superstep.

In practice, one can frequently obtain an effective and efficient δ function by decomposing the iterations into a series of microsteps, and eliminating the supersteps. A microstep removes a single element d from w and uses it to update s and w, effectively interleaving the updates of the partial solution and the working set. Microstep iterations lead to a modified chain of (partial) solutions where pi,j is the partial solution in iteration i after combining the j-th element from w. The changes introduced by the element d are directly reflected in the partial solution after the microstep.

while !w.isEmpty() do
  d = w.takeOne!()
  s = u(s,d)
  w = w.union(δ(s,d))
done

The iteration state s and the working set w are both incrementally updated by looking at one element d ∈ w at a time. Similar to superstep iterations, microstep iterations are guaranteed to converge, if each individual update to the partial solution leads to a successor state in the CPO. Note that this is a stricter condition than for incremental iterations, where all updates together need to produce a successor state.

The microstep form is amenable to asynchronous execution and thus can enable fine-grained parallelism with no synchronization required to coordinate the iterations/supersteps across parallel instances.

Embedding iterative computation in a parallel dataflow model

Bulk iteration is embedded into a dataflow like a regular operator:

An iteration is a complex operator defined as a tuple (G, I,O, T). G is a data flow that represents the step function f : S → S, S being the data type of the partial solution. The partial solution corresponds to the pair (I,O), where I is an edge that provides the latest partial solution as input to G. O is the output of G, representing the next partitial solution… . The iteration is embedded into a dataflow by connecting the operator providing the initial version of the partial solution I, and the operator cosuming the result of the last iteration to O. T, finally, denotes the termination criterion for the iteration. T is an operator integrated into G and is similar to a data sink in that it has only a single input and no output.

Execution is either via loop unrolling or feedback channels. With loop unrolling a new instance of G is created whenever O receives the first record and T has signalled that another iteration is needed. With feedback channels G is reused in each iteration, with each operator being reset after it has produced its lat record.

…if the dynamic data path contains less than two materializing operators, the feedback channel must also dam the dataflow to prevent the operators from participating in two iterations simultaneously.

Incremental iteration builds on this model but avoids recomputing the full state each time by working with deltas. D = u(S,W). (Where u is the update function, S is the solution set, and W is the working set).

The delta set D contains all records that will be added to the partial solution and the new versions of the records that should be replaced in the partial solution.

The delta set D is then combined with the solution set to give the new solution set for the next iteration. The union merge gives precedence to elements in D where both S and D contain an element with the same key.

We hence express an update of a record in the partial solution through the replacement of that record. The incremental iterations algorithm becomes:

while !W.isEmpty() do
  D = u(S,W)
  W = δ(D,S,W)
  S = S.addOrReplace(D)
done

(Where W, D, & S represent collections in the dataflow).

Because the update function u and the working set function δ frequently share operations, we combine them both to a single function ∆, for ease of programmability: (Di+1,Wi+1) = ∆(Si,Wi).

Microstep iteration is represented in the same way as incremental iteration. An incremental iteration can be executed in microsteps rather than supersteps if it meets the following conditions:

  • The step function Δ must consist solely of record-at-a-time operations (e.g. Map) such that record is processed individually.
  • Binary operators in Δ must have at most one input on the dynamic data path
  • Consequently, the dynamic data path may not have branches. Each operator has only one immediate successor, with the exception of the output that connects to D.
  • To enable parallelism without fine-grained distributed locking, the dataflow between S and D must not cross partition boundaries, which can be inferred from Δ:

The sufficient conditions are that the key field containing k(s) is constant across the path between S and D, and that all operations on that path are either key-less (e. g. Map or Cross) or use k(s) as the key (for example in the case of the Match).

See the full paper for worked examples and a discussion of the approach taken to cost-based optimisation for the creation of execution plans.