Scaling Spark in the real world: performance and usability

Scaling Spark in the real world: performance and usability Armbrust et al. VLBD 2015

A short and easy paper from the Databricks team to end the week. Given the pace of development in the Apache Spark world, a paper published in 2015 about enhancements to Spark will of course be a little dated. But this paper nicely captures some of the considerations in the transition from research project to commercial software – we see two years of that journey.

As Spark transitioned from early adopters to a broader audience, we had a chance to see where its functional API worked well in practice, where it could be improved, and what the needs of new users were. This paper describes the major initiatives we have taken at Databricks to improve usability and performance of Spark.

The challenges identified included ease-of-use of the programming model, performance and scalability, and improving visibility into the runtime for troubleshooting.

Programming model

The functional API was hard to understand for non-developers, and could even catch out more experienced users. For example, there’s a big efficiency difference between groupByKey and then sum vs reduceByKey and then sum. The former must send each list of records to one machine, whereas the latter is much faster as it can perform partial aggregation on each node.  As functions are arbitrary Java or Python code, it’s also hard to do much in the way of analysis and optimisation.

Providing higher-level APIs proved to be a double win, first in making Spark accessible to a broader audience, and secondly in improving computation times.

  In most organizations, “big data” needs to be accessible to many other individuals, such as domain experts (e.g., statisticians or data scientists) who are not developers. In addition, for all users, higher-level APIs are important because much data analysis is exploratory: users do not have time to write a fully optimized distributed program. To address these challenges, we have invested substantial effort in providing high-level data science APIs that mirror single-node tools, such as R’s data frames, over Spark.

The DataFrame API provides a standard interface similar to data frames in Python and R, but under the covers the implementation compiles the program using the relational optimizer in SparkSQL.

You get to write simple expressions such as:

means = users.where(users["age"] > 20)

And they run up to 2-5x faster than equivalent computations expressed via the functional API.

  The speedups come from both runtime code generation and algebraic optimizations (e.g., predicate pushdown).

A lovely counter-example to the common wisdom that higher level APIs must be slower.

Performance and scalability

“Big data” comes in a wide range of formats and sizes, requiring careful memory management throughout the engine…

Spark initially assumed for example that the data in each block of a file (128MB in HDFS) could fit in memory all at once. But some highly compressed datasets found in the wild could decompress into 3-4GB!  The growth in cluster sizes in addition to data size (as of 2015, clusters of up to 8000 nodes were known) also stressed the networking and I/O layers.

Memory management enhancements were planned based on a study of user reports. A large fraction of memory exhaustion problems stemmed from processing large joins or aggregations since the original memory manager did not track memory usage here.  A dynamically allocated cap was introduced to track hash tables for joins and aggregations. A third space (the first two being for cached data, and for join and aggregation hash tables) was reserved for the unrolling of blocks read from disk to verify whether the uncompressed data was still small enough to cache.

In all these cases, we check memory usage every 16 records to handle skewed record sizes. With these controls, the engine runs robustly across a wide range of workloads.

Spark’s original networking layer was built directly on Java’s NIO. As of Apach Spark 1.2 this was replaced with Netty (

Netty simplifies networking programming by providing a higher level asynchronous event-driven abstraction.

(There’s that combination of a higher level API coupled with better performance again!).

Using Netty the team were easily able to add support for zero-copy I/O, off-heap network buffer management, and the ability to handle multiple concurrent connections at each node.

We used [the implementation] to set a new record in the Daytona GraySort competition, by sorting 100TB of ondisk data 3x faster than the previous Hadoop-based record using 10x fewer machines.

The paper also describes a then-new project codenamed Tungsten using runtime code generation to bring the performance of DataFrames and SQL to the limit of the underlying hardware. See ‘Efficiently compiling efficient query plans for modern hardware‘ for more details on that and the up-to 10x performance improvements it brought.

Runtime visibility

Distributed programs are inherently hard to debug, even with Spark’s side-effect-free, functional API, because users have to worry about work distribution and skew. We found that the most challenging issues are in performance debugging: users often do not realize that their work is concentrated on a few machines, or that some of their data structures are memory-inefficient.

Maybe something like DB Sherlock would help?

Several extensions were made to the monitoring UI showing per-task metrics with graphing support and a visualization of the operator DAG.

A ‘stack trace’ button which lets you take a trace from any worker turned out to be a useful swiss-army knife for both sample profiling and identifying deadlocks.

In our experience, visibility into the system remains one of the biggest challenges for users of distributed computing.

If you’re interested in this topic, you might also like Big Debug.