GraphX: Graph Processing in a Distributed Dataflow Framework
GraphX: Graph Processing in a Distributed Dataflow Framework – Gonzalez et al. 2014
This is the second of two weeks dedicated to graph processing. So far in this mini-series we’ve looked at what we know about networks of complex systems and graphs that model the real-world; Google’s Pregel which led to a whole set of derivative graph-processing systems; the evolution of the specialised graph processing system GraphLab with its extensions for Distributed GraphLab and the introduction of vertex-cuts with PowerGraph to better handle real-world graphs following a power-law.; we also previously looked at Facebook’s TAO – a system designed to handle simpler graph use cases at scale (Facebook also use Apache Giraph), and an RDMA based alternative implementation of the TAO model with FARM.
On of the interesting things in the TAO paper is that all the graph data is actually stored in good old MySQL. There are a couple of reasons given: (i) there’s so much data that keeping a dedicated copy of it in a separate graph-processing system would be prohibitively expensive (not to mention the added complexity), and (ii) they can still take advantage of all of the other features of MySQL (e.g. for backups, replication, bulk import, …) without having to rebuild these in a separate system. And of course, they can run SQL-based reports etc. The authors of GraphX had a similar insight – there are lots of advantages to doing graph processing on top of a general purpose data processing platform, if only it can be made to perform…
Graphs are only part of the larger analytics process which often combines graphs with unstructured and tabular data. Consequently, analytics pipelines are forced to compose multiple systems which increases complexity and leads to unnecessary data movement and duplication. Furthermore, in pursuit of performance, graph processing systems often abandon fault tolerance in favor of snapshot recovery. Finally, as specialized systems, graph processing frameworks do not generally enjoy the broad support of distributed dataflow frameworks.
The first author of this paper, Gonzalez, is also an author of the GraphLab, Distributed GraphLab, and PowerGraph papers and so is no stranger to specialised graph processing systems. Why the change of heart?
First, the early emphasis on single stage computation and on-disk processing in distributed dataflow frameworks (e.g., MapReduce) limited their applicability to iterative graph algorithms which repeatedly and randomly access subsets of the graph. Second, early distributed dataflow frameworks did not expose fine-grained control over the data partitioning, hindering the application of graph partitioning techniques. However, new in-memory distributed dataflow frameworks (e.g., Spark and Naiad) expose control over data partitioning and in-memory representation, addressing some of these limitations.
GraphX is built on top of Spark, in an impressive 2500 lines of code for the runtime, and an additional 34 loc for the API. No changes to the Spark runtime were required. It is available as an open-source project at Apache. The authors believe the same approach could be used with other distributed dataflow frameworks.
We argue that by identifying the essential dataflow patterns in graph computation and recasting optimizations in graph processing systems as dataflow optimizations we can recover the advantages of specialized graph processing systems… Unlike existing graph processing systems, the GraphX API enables the composition of graphs with unstructured and tabular data and permits the same physical data to be viewed both as a graph and as collections without data movement or duplication.
The goal is not to beat the performance of specialized graph processing engines, but to get to performance parity – at which point the benefits of the underlying general purpose dataflow platform make it an attractive option. GraphX is evaluated against Apach Giraph (Pregel-inspired), and the PowerGraph version of GraphLab:
We emphasize that we are not claiming GraphX is fundamentally faster than GraphLab or Giraph; these systems could in theory implement the same optimizations as GraphX. Instead, we aim to show that it is possible to achieve comparable performance to specialized graph processing systems using a general dataflow engine while gaining common dataflow features such as fault tolerance.
How is it done? The graph-parallel abstraction and key optimisations are mapped onto a distributed dataflow model with corresponding optimisations. The data model itself, as with TAO, is very straightforward: a graph has a vertex collection, and an edge collection. The secret lies in making graph computation over those collections efficient.
The Graph Parallel model
At this point in the series you should be pretty familiar with the graph parallel model:
In the graph-parallel abstraction, a user-defined vertex program is instantiated concurrently for each vertex and interacts with adjacent vertex programs through messages (e.g., Pregel) or shared state (e.g., PowerGraph). Each vertex program can read and modify its vertex property and in some cases adjacent vertex properties. When all vertex programs vote to halt the program terminates.
As we’ve also seen, there are synchronous and asynchronous scheduling options, the synchronous form being based on Bulk Synchronous Processing. GraphX only supports the synchronous model:
…the gains due to an asynchronous programming model are often offset by the additional complexity and so we focus on the bulk-synchronous model and rely on system level techniques (e.g., pipelining and speculation) to address stragglers.
Two years previously, the same lead author (Gonzalez), when describing PowerGraph said:
…the frequent barriers and inability to operate on the most recent data can lead to an inefficient distributed execution and slow algorithm convergence. To address these limitations PowerGraph also supports asynchronous execution… For example, the greedy graph-coloring algorithm in Fig. 3 will not converge when executed synchronously but converges quickly when executed asynchronously. The merits of asynchronous computation have been studied extensively in the context of numerical algorithm. In [18, 19, 29] we demonstrated that asynchronous computation can lead to both theoretical and empirical gains in algorithm and system performance for a range of important MLDM applications.
Which suggests the motivation for an asynchronous model is deeper than just dealing with stragglers – it’s a fundemental property of some MLDM algorithms that they perform better under it.
Important graph parallel optimisations that lead to efficiency include vertex-cut partitioning, mirror vertices that aggregate messages to be sent to a single remote machine, and the ability to track active vertices (vertex programs within a graph converge at different rates, leading to rapidly shrinking working sets).
Distributed Dataflow and Spark
The graph-parallel model needs to be mapped onto the distributed dataflow model which has the following characteristics: a data model based on typed collections; a data-parallel programming model based on collection transformations (e.g. map, group-by, join); a scheduler that breaks jobs into a DAG of tasks each running on a partition of data; and a runtime that can tolerate stragglers and partial failures.
The features of Spark that make it an attractive base for GraphX include support for a rich set of dataflow operators and the ability to execute tasks with multiple layers of dependencies. In addition:
- Spark’s RDDs enable applications to keep data in memory, which is essential for iterative graph algorithms.
- RDDs permit user-defined partitioning, and the execution engine can exploit this.
- The lineage graph and optional in-memory distributed replication reduce failure recovery costs.
The GraphX model
The property graph … can be logically represented as a pair of vertex and edge property collections. The vertex collection contains the vertex properties uniquely keyed by the vertex identifier. In the GraphX system, vertex identifiers are 64-bit integers which may be derived externally (e.g., user ids) or by applying a hash function to the vertex property (e.g., page URL). The edge collection contains the edge properties keyed by the source and destination vertex identifiers.
Graph-parallel computation is expressed as a series of join stages and group-by stages linked by map operations. The join stage creates triplets out of the vertex and edge collections as a triplets view. This joins each edge with its corresponding source and destination vertices. In SQL:
CREATE VIEW triplets AS SELECT source.Id, dest.Id, source.Props, edge.Props, dest.Props FROM edges as edge JOIN vertices AS source JOIN vertices AS dest ON edge.srcId = source.Id AND edge.destId = dest.Id
In the group-by stage, the triplets are grouped by source or destination vertex to construct the neighborhood of each vertex and compute aggregates.
Compared to the Gather, Apply, Scatter model:
The group-by stage gathers messages destined to the same vertex, an intervening map operation applies the message sum to update the vertex property, and the join stage scatters the new vertex property to all adjacent vertices.
GraphX introduces a small set of specialized graph operators on top of the Spark dataflow operators – for example, a triplets operator, and a mrTriplets (map-reduce triplets) operator that encodes the two-stage process of graph-parallel computation. It is possible to model the Pregel abstraction in GraphX too:
We begin by initializing the vertex properties with an additional field to track active vertices (those that have not voted to halt). Then, while there are active vertices, messages are computed using the mrTriplets operator and the vertex program is applied to the resulting message sums. By expressing message computation as an edge-parallel map operation followed by a commutative associative aggregation, we leverage the GAS decomposition to mitigate the cost of high-degree vertices. Furthermore, by exposing the entire triplet to the message computation we can simplify algorithms like connected components.
System Design and Optimisations
The vertex collection is hash-partitioned by vertex ids. The edge collection is horizontally partitioned by a user-defined function, supporting vertex-cut partitioning. GraphX includes a range of built-in partitioning functions. A routing table is co-partitioned with the vertex collection.
GraphX inherits the immutability of Spark and therefore all graph operators logically create new collections rather than destructively modifying existing ones. As a result, derived vertex and edge collections can often share indices to reduce memory overhead and accelerate local graph operations.
For maximal index reuse, subgraph operations produce subgraphs that share the full graph indices, and use bitmasks to indicate which elements are included. For joins, GraphX moves vertex data to edge partitions rather than the other way around:
Because the vertex and edge property collections are partitioned independently, the join requires data movement. GraphX performs the three-way join by shipping the vertex properties across the network to the edges, thus setting the edge partitions as the join sites. This approach substantially reduces communication for two reasons. First, real-world graphs commonly have orders of magnitude more edges than vertices. Second, a single vertex may have many edges in the same partition, enabling substantial reuse of the vertex property.
Joins are eliminated altogether where possible – GraphX uses JVM bytecode analysis to determine what properties a user-defined function accesses. With a not-yet materialized triplets view, and only one property accessed GraphX will use a two-way join. With no properties accessed, GraphX can eliminate the join completely.
When joining, GraphX further ensures that each vertex property is sent only to the edge partitions that contain adjacent edges, based on information in the routing table. Incremental view maintenance further reduces data movement:
Iterative graph algorithms often modify only a subset of the vertex properties in each iteration. We therefore apply incremental view maintenance to the triplets view to avoid unnecessary movement of unchanged data. After each graph operation, we track which vertex properties have changed since the triplets view was last constructed. When the triplets view is next accessed, only the changed vertices are re-routed to their edge-partition join sites and the local mirrored values of the unchanged vertices are reused.
As the graph algorithm progresses, it is common for less and less of the vertices to remain active. Therefore a full scan of all triplets during a map-reduce becomes less and less effective. “For example, in the last iteration of connected components on the Twitter graph, only a few of the vertices are still active. However, to execute mrTriplets we still must sequentially scan 1.5 billion edges and check whether their vertices are in the active set.”
To address this problem, we introduced an indexed scan for the triplets view. The application expresses the current active set by restricting the graph using the subgraph operator. The vertex predicate is pushed to the edge partitions, where it can be used to filter the triplets using the CSR index on the source vertex id.
(CSR = Compressed Sparse Row format.)
GraphX switches between a regular scan and an indexed scan based on the degree of selectivity.