Streams à la carte: Extensible pipelines with object algebras

Streams à la carte: Extensible pipelines with object algebras – Biboudis et al. 2015

Streaming APIs are popping up everywhere, allowing the programmer to express streaming computations such as:

int sum = IntStream.of(v)
  .filter(x -> x % 2 == 0)
  .map(x -> x * x)
  .sum();

On examining the streaming libraries in Java, Scala, and C#, the authors observe two key limitations:

  1. There is no way to alter the semantics of a streaming pipeline without changing the library itself – e.g. push-vs-pull operators, adding logging, performing async computation and so on.
  2. It is not possible (easy?) to add new operators due to the way that evaluation is performed:

As discussed above, Java streams introduce push-style iteration by default. This approach would yield semantic differences from pull-style iteration if more operators, such as zip, were added to the library. Furthermore, in some languages the addition of new operators requires editing the library code or using advanced facilities: in Java such addition is only possible by changing the library itself, while in C# one needs to use extension methods, and in Scala one needs to use implicits.

Biboudis et al. introduce a new API form for streaming pipelines, based on the object algebra of Oliveira and Cook, that provides extensibility and configurable semantics (e.g. push vs pull) with no loss of performance.

The goal of our work is to offer extensible streaming libraries. The main axis of extensibility that is not well-supported in past designs is that of pluggable semantics. In existing streaming libraries there is no way to change the evaluation behavior of a pipeline so that it performs, e.g., lazy evaluation, augmented behavior (e.g., logging), operator fusing, etc. Currently, the semantics of a stream pipeline evaluation is hard-coded in the definition of operators supplied by the library. The user has no way to intervene.

What’s the deal with push vs pull? Take the pipeline we composed above, with the form of(…).filter(…).map(…).sum(). With a pull-based approach, sum will retrieve data from the map operation, with the mapper being supplied the necessary lambda (and so on upstream to the source). In the unoptimised Java case, that would map onto a call to forEachRemaining, which internally results in calls to hasNext() and next(), followed by accept(). Thus there are three virtual calls per element.

However, stream pipelines, such as the one in our example, can be optimized. For the array-based Spliterator, the forEachRemaining method performs an indexed-based, do-while loop. The entire traversal is then transformed: instead of sum requesting the next element from map, the pipeline operates in the inverse order: map pushes elements through the accept method of its downstream Consumer object, which implements the sum functionality. (A Consumer in Java is an operation that accepts an argument and returns no result.) In this way, the implementation eliminates two virtual calls per step of iteration and effectively uses internal (push-style) iteration, instead of external (pull-style). This also enables further optimizations by the JIT compiler, often resulting in fully fused code.

C#, F#, and Scala implement deferred execution over pipelines as pull streams. Java supports push streams by default. Java additionally supports limited pull capabilities through the iterator combinator.

An object algebra for stream pipelines

Our extensible, pluggable-semantics design of the library is implemented using an architecture based on object algebras. Object algebras were introduced by Oliveira and Cook [13] as a solution to the expression problem [25]: the need to have fully extensible data abstraction while preserving the modularity of past code and maintaining type safety. The need for extensibility arises in two forms: adding new data variants and adding new operations. Intuitively, an object algebra is an interface that describes method signatures for creating syntax nodes (data variants). An implementation of the algebra offers semantics to such syntax nodes. Thus, new data variants (syntax nodes) are added by extending the algebra, while new operations (semantics) correspond to different implementations of the algebra.

An example helps to make this clearer:

PushFactory alg = new PushFactory();
int sum = Id.prj(
            alg.sum(
              alg.map(x -> x * x, 
                alg.filter(x -> x % 2 == 0, 
                  alg.source(v))))).value;

(Yes, this is nowhere near as elegant as the standard fluent API. The authors were able to ‘recover’ a fluent API on top of the algebra for Scala and C#, but not for Java).

Factories determine the behaviour of the stream pipeline. The authors demonstrate a push factory, a pull factory, a log factory and a fused factory which can apply optimizations over a pipeline by fusing adjacent operations. A future factory is used to trigger an asynchronous computation. New factories can be added independently of the streaming operators (combinators) themselves.

Our library design also allows adding new combinators without changing the library code.

It’s all pretty heavy on the generic typing as you might imagine. The presentation of the concepts relies on type-constructor polymorphism , which is available in Scala but not in Java. The authors show how to emulate type-constructor polymorphism in Java using a ‘brand’ class. See the paper for details.

Performance

It is interesting to assess the performance of our approach, compared to the highly optimized Java 8 streams. Since our techniques add an extra layer of abstraction, one may suspect they introduce inefficiency. However, there are excellent reasons why our design can yield high performance:

  • “Object algebras are used merely for pipeline construction and not for execution. Once the data processing loop starts, it should be as efficient as in standard Java streams.”
  • “Our design offers fully pluggable semantics. This is advantageous for performance. We can leverage fusion of combinators, proper pull-style iteration without materialization of full intermediate results, and more.”

Benchmark results are compared against Java 8, and fall into four categories:

  • For reduce, filter/reduce, filter/map/reduce, and cart/reduce we see comparison of equivalent functionality in the stream algebra and Java 8 libraries: “the performance of our push algebra implementation matches or exceeds that of Java 8, validating the claim that our approach does not incur undue overheads.”
  • The fused filters and fused maps benchmarks demonstrate the improvements from fusing semantics: “our fused pull-style semantics yield a successful optimization, outperforming even the efficient, push-style iteration. Due to our design, this optimization is achieved modularly and independently of the rest of the stream implementation.”
  • count, filter/count, and map/filter/count require true pull semantics. Java 8 streams support this by transforming the stream into an iterator, “but this is not equivalent to full pull-style iteration.” The stream algebra version is much faster here.
  • The cart/take/count benchmark contains a pipeline that is pathological for Java 8 streams due to inner loop array allocations. The pull algebra implementation is dramatically better here.

Future Work

Given our extensible library design, there are several avenues for further work. The clearest path is towards enriching the current library implementation with shared-memory parallel evaluation semantics, cloud evaluation semantics, distributed pipeline parallelism, GPU processing, and more. Since we expose the streaming pipeline, such additions should be transparent to current evaluation semantics, and can even be performed by third-party programmers.