# Distributed GraphLab: A framework for machine learning and data mining in the cloud

Two years on from the initial GraphLab paper we looked at yesterday comes this extension to support distributed graph processing for larger graphs, including data mining use cases.

In this paper, we extend the GraphLab framework to the substantially more challenging distributed setting while preserving strong data consistency guarantees.

The work is motivated by “the exponential growth in the scale of Machine Learning and Data Mining (MLDM) problems and increasing sophistication of MLDM techniques.” Three representative problems from this space are implemented using GraphLab: Netflix movie recommendations, video co-segmentation, and named-entity recognition. Each experiment was based on large real-world problems and datasets. The principal findings were that:

• On equivalent tasks, GraphLab outperforms Hadoop by 20-60x and performance is comparable to tailored MPI implementations.
• GraphLab’s performance scaling improves with higher computation to communication ratios.
• The GraphLab abstraction more compactly expresses the Netflix, NER and Coseg algorithms than MapReduce or MPI.

This also translates into significant cost-savings: “For the Netflix application, GraphLab is about two orders of magnitude more cost-effective than Hadoop.”

So what did it take to extend GraphLab to a distributed setting?

…we incorporate data versioning to reduce network congestion and pipelined distributed locking to mitigate the effects of network latency. To address the challenges of data locality and ingress we introduce the atom graph for rapidly placing graph structured data in the distributed setting. We also add fault tolerance to the GraphLab framework by adapting the classic Chandy-Lamport snapshot algorithm and demonstrate how it can be easily implemented within the GraphLab abstraction.

I’m going to focus on the distributed aspects since we covered the core GraphLab abstraction yesterday. Even if you’ve read the original GraphLab paper though, it’s well worth you reviewing sections 2 and 3 of this paper which cover some of the limitations of alternatives models and the core GraphLab abstraction. The slightly different slant and new examples and comparisons really bring out the strengths of the GraphLab model.

Because of the inherently random memory access patterns common to dynamic asynchronous graph algorithms, we focus on the distributed in-memory setting, requiring the entire graph and all program state to reside in RAM.

### Partitioning the graph

The first thing to be done is to partition the graph across multiple nodes. As we’ve seen before, for better flexibility in terms of scaling and balancing of load, it’s desirable to have many more partitions than nodes (over-partitioning).

The data graph is initially over-partitioned using domain specific knowledge (e.g., planar embedding), or by using a distributed graph partitioning heuristic (e.g., ParMetis, Random Hashing) into k parts where k is much greater than the number of machines. Each part, called an atom, is stored as a separate file on a distributed storage system (e.g., HDFS, Amazon S3)… If hashed partitioning is used, the construction process is Map-Reduceable where a map is performed over each vertex and edge, and each reducer accumulates an atom file. The atom journal format allows future changes to the graph to be appended without reprocessing all the data.

Atoms also contain information about ghosts, the set of vertices and edges adjacent to the partition boundary.

The connectivity structure and file locations of the k atoms is stored in a atom index file as a meta-graph with k vertices (corresponding to the atoms) and edges encoding the connectivity of the atoms. Distributed loading is accomplished by performing a fast balanced partition of the meta-graph over the number of physical machines.

Ghosts are used as caches for their counterparts across the network, and a versioning scheme is used to eliminate transmission of unchanged data.

### Execution Engines

The engine is responsible for executing update functions and sync operations, maintaining the set of scheduled vertices, and ensuring serializability with respect to the selected consistency model. (If you haven’t already, you’ll need to read about the vertex, edge, and full consistency models supported by GraphLab to understand what follows). Two different engines are supported: a low-overhead chromatic engine, and a more expressive locking engine.

#### Chromatic Engine

The chromatic engine is so-named because it is based on graph colouring. I find the solution rather elegant:

A classic technique to achieve a serializable parallel execution of a set of dependent tasks (represented as vertices in a graph) is to construct a vertex coloring that assigns a color to each vertex such that no adjacent vertices share the same color. Given a vertex coloring of the data graph, we can satisfy the edge consistency model by executing, synchronously, all vertices of the same color in the vertex set T before proceeding to the next color. We use the term color-step, in analogy to the super-step in the BSP model, to describe the process of updating all the vertices within a single color and communicating all changes. The sync operation can then be run safely between color-steps.

By changing the way the colouring is done, the full consistency and vertex consistency models can also be supported. For full consistency, it is necessary to create a colouring in which no vertex shares the same colour as any of its 2-distance neighbours. For the vertex consistency model, all vertices can simply be assigned the same colour.

While optimal graph coloring is NP-hard in general, a reasonable quality coloring can be constructed quickly using graph coloring heuristics (e.g., greedy coloring). Furthermore, many MLDM problems produce graphs with trivial colorings. For example, many optimization problems in MLDM are naturally expressed as bipartite (two-colorable) graphs, while problems based upon template models can be easily colored using the template.

#### Locking Engine

The distributed locking engine is able to avoid the full communication barrier between colour-steps that the chromatic engine requires.

We achieve distributed mutual exclusion by associating a readers-writer lock with each vertex. The different consistency models can then be implemented using different locking protocols. Vertex consistency is achieved by acquiring a write-lock on the central vertex of each requested scope. Edge consistency is achieved by acquiring a write lock on the central vertex, and read locks on adjacent vertices. Finally, full consistency is achieved by acquiring write locks on the central vertex and all adjacent vertices. Deadlocks are avoided by acquiring locks sequentially following a canonical order. We use the ordering induced by machine ID followed by vertex ID (owner(v), v) since this allows all locks on a remote machine to be requested in a single message. Since the graph is partitioned, we restrict each machine to only run updates on local vertices. The ghost vertices/edges ensure that the update have direct memory access to all information in the scope.

Several techniques are used to reduce latency associated with locking. The ghosting system provides caching, and all lock requests and synchronization calls are pipelined. This allows a machine to request locks and data for many scopes simultaneously.

Lock acquisition requests provide a pointer to a callback, that is called once the request is fulfilled. These callbacks are chained into a distributed continuation passing scheme that passes lock requests across machines in sequence. Since lock acquisition follows the total ordering described earlier, deadlock free operation is guaranteed.

### Fault Tolerance

Fault tolerance is achieved via a distributed snapshot algorithm.

Using the GraphLab abstraction we designed and implemented a variant of theChandy-Lamport snapshot specifically tailored to the GraphLab data-graph and execution model. The resulting algorithm is expressed as an update function and guarantees a consistent snapshot under the following conditions:

• Edge consistency is used on all update functions
• Schedule completes before a scope is unlocked
• the snapshot update is prioritized over all other updates.

Chandy-Lamport is asynchronous and can proceed in parallel with graph computation. A simpler fully synchronous snapshot mechanism is also provided that suspends all computation while the snapshot is constructed.

Snapshots are initiated at fixed intervals.

The choice of interval must balance the cost of constructing the checkpoint with the computation lost since the last checkpoint in the event of failure.

A first-order approximation to the ideal checkpoint interval was given by Young et al.:

optimal_interval = sqrt(2*time_to_create_a_checkpoint*MTBF)


For instance, using a cluster of 64 machines, a per machine MTBF of 1 year, and a checkpoint time of 2 min leads to optimal checkpoint intervals of 3 hrs. Therefore, for the deployments considered in our experiments, even taking pessimistic assumptions for TMTBF, leads to checkpoint intervals that far exceed the runtime of our experiments and in fact also exceed the Hadoop experiment runtimes. This brings into question the emphasis on strong fault tolerance in Hadoop. Better performance can be obtained by balancing fault tolerance costs against that of a job restart.