Skip to content

Write-limited sorts and joins for persistent memory

September 30, 2016

Write-limited sorts and joins for persistent memory Viglas, VLDB 2014

This is the second of the two research-for-practice papers for this week. Once more the topic is how database storage algorithms can be optimised for NVM, this time examining the asymmetry between reads and writes on NVM. This is premised on Viglas’ assertion that:

Writes are more than one order of magnitude slower than DRAM, and thus more expensive than reads…

I’ve found it very hard to get consistent data about NVM performance. In yesterday’s paper we find this table which shows equivalent read and write latency for RRAM and MRAM, but 3:1 write:read latency for PCM.

Viglas uses reference points of 10ns for reads and 150ns for writes. 10ns is on the order of an L3 cache read from previous figures I’ve looked at.

Let’s proceed on the assumption that at least in some forms of NVM, there is a material difference between read and write latency. On this basis, Viglas examines algorithms for sorts and joins that can be tuned to vary the ratio of read and write operations. Beyond performance, another reason to reduce writes is to reduce the impact of write degradation over the lifetime of the memory.

All the algorithms work by trading writes for reads. There are two basic approaches:

  • Algorithms that partition their input (with a configurable threshold) between a part that is processed by a regular write-incurring strategy, and a part that is processed by a write-limited strategy.
  • Algorithms that use lazy processing to defer writes for intermediate results until the cost of recreating them (additional reads + computation) exceeds the cost of the write.

Partitioned Sorting

The partitioned sorting algorithm, called segment sort trades off between a traditional external mergesort and a selection sort.

External mergesort splits the input into chunks that fit in main memory, and sorts each chunk in memory before writing it back to disk (in sorted form) as a run. Runs are then merged in passes to produce the sorted output, the number of passes required being dictated by the available memory.

Selection sort, at a cost of extra reads, writes each element of the input only once at its final location. On each pass it finds smallest set of values that will fit in its memory budget of M buffers, maintains them in eg. a max-heap when sorting in ascending order, and then writes them out as a run at the end of the pass. For an input T buffers in size this algorithm performs |T| . |T|/M read passes and |T| writes.

In segment sort we decide x ∈ (0,1), the fraction of the input that will be sorted using external mergesort (and hence 1-x is sorted using selection sort). Each segment is processed independently using the respectively algorithm, and at the end runs are merged using the standard merging phase of external merge sort.

How do you choose the appropriate value of x? Simple!😉

where λ is the read/write ratio wcost/rcost.

A variation of segment sort called hybrid sort is also introduced, that draws inspiration from hybrid hash join. See §2.1.2 for details.

Lazy Sorting

The lazy sorting approach is based on selection sort.

Our write-limited sort algorithms, given a budget of M buffers, continuously scan the input to extract the next set of minimum values to be appended to the output; each scan processes the entire input. An alternative would be to extract not only the set of minimum values from the input, but also the set of values that are necessary to produce the next set of minimum values. This is possible to achieve by the lazySort algorithm of Algorithm 2. The algorithm tracks the current iteration (i.e., the number of full scans it has performed so far), the benefit of not materializing the input for the next scan, and the penalty it has paid by rescanning the input. In each iteration the algorithm compares the cost of materializing the next input to the cost of rescanning the current input. If the rescanning cost exceeds the materialization cost, then the algorithm materializes the next input; else it proceeds as before.

Partitioned Joins

For equi-joins, Viglas trades-off between Grace join and standard nested loops joins. The Grace join component issues writes, the nested loops one is read-only. If we’re joining T and V, we need to decide the percentage x of T and the percentage y of V that will be processed using the Grace join.

Viglas offers the following heuristics for setting x and y:

If the inputs are similarly sized andthe medium is not too inefficient then we are better off using large values for x and y, i.e. employing Grace join; this is intuitive as Grace join is more effcient than nested loops. If the inputs have similar sizes then the decisive factor is λ the write to read ratio of the medium. As λ grows the advantage shifts to nested loops. On the other hand, as the ratio between input sizes changes, we can start gradually employing nested loops as the evaluation algorithm.

Segmented Grace join divides the input into partitions and operates at a partition level. It first scans both inputs and offloads k partitions where k is the size of the first input divided by the size of the memory budget available, |T|/M. Given these k partitions, we process x of them with Grace join, and k-x with standard nested loops.

We can be guided by the following equation to choose x , which tells us the conditions under which Grace join is the better choice:

Regardless of outperforming (plain) Grace join, the choice of x is a knob by which we can alter the write intensity of the algorithm.

Lazy hash joins

A standard hash join with two inputs T and V where |T| < |V| computes a join in k = |T|/M iterations by partitioning the inputs into m partitions.

During iteration i the algorithm scans T and hashes each t ∈ T to identify its partition. If t belongs to partition i, the algorithm puts it in an in-memory hash table. If t belongs to any other partition it offloads it to the backing store. The algorithm then scans V and hashes each v ∈ V to identify its partition. If v belongs to partition i it is used to probe the in-memory hash table; any matches are propagated to the output. If t does not belong to partition i, it is offloaded to the backing store. The algorithm iterates as above until both inputs are exhausted. Thus, M buffers from T and MV = |V|/k buffers from V are eliminated in each iteration.

The lazy version does not write back records it encounters that do not belong to the partition currently being processed. Instead, it pays the penalty of rescanning the input during the next iteration. As before, it will act lazily until the cost of doing so exceeds the costs of the writes, at which point it materializes an intermediate input and the reverts to being lazy again. Lazy hash join needs to materialize these intermediate results every n = k/(λ + 1) iterations.

Persistent memory strategies

A salient decision to make when incorporating persistent memory into the programming stack is whether to treat it as part of the filesystem, or as part of the memory subsystem. The first option fully addresses the persistence aspects, but implies the traditional boundary between main memory and secondary storage. The second option makes persistent memory part of the memory hierarchy treated as volatile; thus the system itself must guarantee persistence. Our goal is not to answer the question of which option is better. Rather, it is to showcase the performance of our algorithms under each option.

It may not be a goal to answer the question of which option is better, but in the evaluation a clear winner does emerge for these algorithms, blocked memory. In this model the persistent collections implementation on top of which the storage algorithms are layered offers the interface of a dynamic array with byte addressability, but uses a linked list of memory blocks under the covers. Memory is allocated one block at a time.

While array expansion in a main memory setting bears a one-off cost that is dwarfed by the benefits of improved locality, this is no longer the case for persistent memory and its asymmetric write/read costs. Thus, an implementation optimized for main memory is not the best choice for persistent memory. Treating persistent memory as block-addressable storage albeit mounted in main memory is not the best option either as it introduces significant overhead. A persistent collection implementation based on blocked memory shows the true potential of the hardware and the algorithms as it effectively bears zero overhead apart from the unavoidable penalties due to the write/read cost asymmetry.

Performance analysis

The analysis compares four sort variations and six join variations, identified in the graphs as:


  1. ExMS – standard external mergesort
  2. SegS – segment sort (which trades between external mergesort and selection sort)
  3. HybS – the hybrid sort variation of segment sort
  4. LaS – lazy sort


  1. GJ – standard Grace join
  2. HS – simple hash join
  3. NLJ – nested loops join
  4. HybJ – Hybrid Grace-nested-loops join
  5. SegJ – segmented Grace join
  6. LaJ – lazy join

(Click for larger view)

The results affirm that choosing algorithms or implementations when incorporating persistent memory into the I/O stack is not straightforward. It is a combination of various parameters and it comes down to what we want to optimize for. The algorithms do well in introducing write intensity and giving the developer, or the runtime, a knob by which they can select whether to minimize writes; or minimize response time; or both. It is also important that the majority of algorithms converges to I/O-minimal behavior at a low write intensity; e.g., SegS and SegJ approximate or outperform their counterparts optimized for symmetric I/O from a 20% write intensity onwards. This confirms that one can have write-limited algorithms without compromising performance.

Let’s talk about storage and recovery methods for non-volatile memory database systems

September 29, 2016

Let’s talk about storage and recovery methods for non-volatile memory database systems Arulraj et al., SIGMOD 2015

Update: fixed a bunch of broken links.

I can’t believe I only just found out about this paper! It’s exactly what I’ve been looking for in terms of an analysis of the impacts of NVM on data storage systems, and the potential benefits of adapting storage algorithms to better exploit the technology. See my “All change please” post from earlier this year for a summary of NVM itself. Today’s paper also contains some background on that, but I’m going to focus on the heart of the material – in which the authors implement three of the common DBMS storage engine strategies and run them on an NVM system to see how they fare, before seeing what improvements can be made when the algorithms are then adapted to be optimised for NVM.

Is NVM going to make a big difference, and will we need to rearchitect to get the most out of it? Yes (although it’s not quite an order-of-magnitude difference):

We then present NVM-aware variants of these architectures that leverage the persistence and byte-addressability properties of NVM in their storage and recovery methods. Our experimental evaluation on an NVM hardware emulator shows that these engines achieve up to 5.5x higher throughput than their traditional counterparts while reducing the amount of wear due to write operations by up to 2x.

A very short primer on NVM

NVM provides low latency reads and writes on the same order of magnitude as DRAM, coupled with persistent writes and a storage capacity similar to SSDs.

NVM can scale beyond DRAM, and uses less power (DRAM consumes about 40% of the overall power consumed by a server). Flash-based SSDs use less power but are slower and only support block-based access.

Although the advantages of NVM are obvious, making full use of them in an OLTP DBMS is non-trivial…

Traditional DBMS engines are designed to cope with orders-of-magnitude differences in latency between volatile and non-volatile storage, and to optimise for block-level granularity with non-volatile storage.  Many of the coping strategies employed are unnecessary in an NVM-only system.

The authors perform their tests using an Intel Labs hardware emulator which is able to simulate varying hardware profiles expected from different NVM devices.

The DBMS testbed

The authors study systems that are NVM-only (not two-level or greater storage hierarchies). They developed a single lightweight DBMS testbed into which multiple storage engines can be plugged. Using this consistent harness for all tests means that differences in performance will be due solely to the storage engines themselves. They develop traditional implementations of three storage engines, each using different approaches for supporting durable updates: (i) an in-place updates engine, (ii) a copy-on-write updates engine, and (iii) a log-structured updates engine. After taking a performance baseline for these engines using both YCSB and TPC-C workloads, they then develop NVM-aware derivatives of each of the engines and evaluate those for comparison. We get to see what changes are made, and the difference that they make.

In-Place updates engine

The in-place updates engine keeps only a single version of each tuple at at all times. New values are written directly on top of old ones. The design of the in-place engine used in the study was based on VoltDB. A Write-Ahead Log (WAL) is used to assist in recovery from crashes and power failures, using a variant of ARIES adapted for main-memory DBMSs with byte-addressable storage engines. (See also MARS).

The standard in-place update engine has a high rate of data duplication – recording updates both in the WAL and in the table storage area. The logging infrastructure is designed on the assumption that the durable storage is much slower than memory, and thus batches updates (which increases response latency).

Given this, we designed the NVM-InP engine to avoid these issues. Now when a transaction inserts a tuple, rather than copying the tuple to the WAL, the NVM-InP engine only records a non-volatile pointer to the tuple in the WAL. This is sufficient because both the pointer and the tuple referred to by the pointer are stored on NVM. Thus, the engine can use the pointer to access the tuple after the system restarts without needing to re-apply changes in the WAL. It also stores indexes as non-volatile B+trees that can be accessed immediately when the system restarts without rebuilding.

There is no need to replay the log during recovery as committed transactions are durable immediately a transaction commits. The effects of uncommitted transactions that may be present in the database do need to be undone though.

Copy-on-write updates engine

Instead of modifying the original tuple, a CoW engine creates a copy and then modifies that. It uses different look-up directories for accessing versions of tuples (aka shadow paging). There is no need for a WAL for recovery under this scheme.  When a transaction commits, the engine updates the master record atomically to point to the new version of a tuple. In the study, the CoW engine uses LMDB’s copy-on-write B-trees, storing directories on the filesystem with tuples in an HDD/SDD optimized format with all fields inlined.

The CoW engine incurs a high overhead in propagating changes to the dirty directory – even if a transaction only modifies one tuple a whole block is copied to the filesystem.

The NVM-CoW engine employs three optimizations to reduce these overheads. First, it uses a non-volatile copy-on-write B+tree that it maintains using the allocator interface. Second, the NVM-CoW engine directly persists the tuple copies and only records non-volatile tuple pointers in the dirty directory. Lastly, it uses the lightweight durability mechanism of the allocator interface to persist changes in the copy-on-write B+tree.

It thus avoids the transformation and copying costs incurred by the original engine.

Log-structured updates engine

The log-structured engine employs Log-structured merge (LSM) trees. Each tree consists of a collection of runs of data, each run is an ordered set of entries recording changes made to tuples. Runs reside either in volatile memory or stable storage with changes batched in memory and periodically cascaded out to stable storage. The contents of the memory table are lost on restart, so the engine maintains a WAL.

The NVM version using a WAL stored on NVM. It avoids data duplication in the memory table and WAL by recording only non-volatile pointers to tuple modifications in the WAL. Instead of periodically flushing memory tables to stable storage tables (in stable storage optimised format), memory tables are simply marked as immutable .

A few performance comparisons

Throughput comparisons at different latencies across the six engines for YCSB:

(Click for larger view)

On YCSB read-only workloads, NVM-InP is no faster than In-P, but NVM-CoW is 1.9-2.1x faster than straight CoW, and NVM-Log is 2.8x faster than Log.  Under balanced and write-heavy workloads the NVM variants do much better, the NVM-CoW engine being 4.3-5.5x faster than straight CoW.  Under TPC-C the NVM engines are 1.8-2.1x faster than the traditional engines.

And for TPC-C:

Example of the difference in number of reads and writes (TPC-C):

On YCSB the NVM-aware engines perform up to 53% fewer loads, and 17-48% fewer stores on write-heavy workloads. For TPC-C, the NVM-aware engines perform 31-42% fewer writes.

Total storage footprints:

NVM-InP and NVM-Log use 17-21% less storage on YCSB, and NVM-CoW uses 25% less. The NVM engines use 31-38% less storage on TPC-C – the space savings are more significant due to the write-intensive workload with long-running transactions.

The takeaway

NVM access latency is the most significant factor in determining the runtime performance of the engines. The NVM aware variants achieve better absolute throughput (up to 5.5x), and perform fewer store operations (less than half for write-intensive workloads) which helps to extend device lifetime. They also use less storage (17-38% depending on workload) overall, which is important because early NVM products are expected to be relatively expensive.

Overall, we find that the NVM-aware In-place engine performs the best across a wide set of workload mixtures and skew settings for all NVM latency configurations…. It achieved the best throughput among all the engines with the least amount of wear on the NVM device.

Distributed consensus and the implications of NVM on database management systems

September 28, 2016

Distributed consensus and the implications of NVM on database management systems Fournier, Arulraj, & Pavlo ACM Queue Vol 14, issue 3

As you may recall, Peter Bailis and ACM Queue have started a “Research for Practice” series introducing “expert curated guides to the best of CS research.” Aka, reading lists for The Morning Paper! I previously covered the papers from the first edition (blog entries dated June 14th-21st, 2016). Today we’re turning our attention to the the second edition:

I am thrilled to introduce our second instalment of Research for Practice, which provides highlights from two critical areas in storage and large-scale services: distributed consensus and non-volatile memory.

Distributed consensus

The first topic area is Distributed Consensus, with papers selected by Camille Fournier. “The three papers included in this selection address the real world of consensus system: Why are they needed? Why are they difficult to understand? What happens when you try to implement. Them? Is there an easier way, something that more developers can understand and therefore implement?”

Fournier’s three choices are:

All of which will be familiar to regular readers of The Morning Paper😉 (Links above are to my write-ups). If you want more of this kind of thing, I did a two-week mini-series on consensus back in March of last year.

Here are three additional picks of my own:

Implications of NVM on database management systems

Joy Arulrja and Andrew Pavlo introduce a selection of three papers looking at the implications of NVM for database management systems:

The advent of non-volatile memory (NVM) will fundamentally change the dichotomy between memory and durable storage in a database management systems (DBMS).

This is a topic area that really caught my attention earlier this year, and I wrote a short post entitled “All change please” summarizing some of the hardware advances hitting our data centers, including NVM. On the subject of NVM itself and its implications, the papers I’ve covered so far can be found by searching on the blog for the keyword ‘NVM’.

The first of Arulja and Pavlo’s picks is

Which looks at the classic ARIES recovery protocol and how it can be optimized for NVM.

Their second and third paper choices are ones that I haven’t covered before. So we’ll be looking at those papers in the next two days. The links below will go live as each days’ post goes up.

As Arulja and Pavlo say,

The common theme for these papers is that you cannot just run and existing DBMS on NVM and expect it to leverage its unique set of properties. The only way to achieve that is to come up with novel architectures, protocols, and algorithms that are tailor-made for NVM.

The third edition of Reseach for Practice must be due out soon – I’m very much looking forward to seeing where it goes next!

Flexible Paxos: Quorum intersection revisited

September 27, 2016

Flexible Paxos: Quorum intersection revisited Howard et al., 2016

Paxos has been around for 18 (26) years now, and extensively studied. (For some background, see the 2 week mini-series on consensus that I put together last year). In this paper, Howard et al. make a simple(?) observation that has significant consequences for improving the fault-tolerance and throughput of Paxos-based systems. After so much time and so much study, you can’t help but admire that.

Paxos proceeds in two phases: in the first phase a proposer is chosen (through a quorum), known as the leader; in the second phase the leader proposes a value to be agreed upon by the participants (acceptors) and brings a quorum to agreement. Paxos (like many other consensus systems) uses majority agreement for its quorums (i.e., at least N/2 + 1 out of N acceptors), which guarantees an intersection between the participants in phase one and phase two. Since we normally want to make agreement over a series of values (referred to in the literature as slots), we need to run a distinct instance of Paxos each time. Multi-Paxos recognises that the first phase (leader election) is independent of the proposal phase, so why not do that once and then retain that leader through a whole series of phase two agreements?

Here’s the key new insight: Paxos is more conservative than necessary in requiring a majority quorum in each phase. All we actually need to guarantee is that the phase one and phase two quorums intersect. We don’t need to guarantee that phase one quorums across successive rounds intersect, nor that phase two quorums intersect. If Q1 represents the participants in the first phase quorum, and Q2 the participants in the second phase quorum, then |Q1| + |Q2| > N (for N overall participants) will provid the needed guarantee. And that gets really interesting when you consider that (hopefully) leader elections happen much less frequently than phase 2 slot-decisions. So you can trade off e.g. requiring a larger quorum when you do need to elect a new leader for having a smaller quorum requirement during regular operations. The authors call this variant Flexible Paxos (FPaxos). If you’re like me, you’ll want to see some kind of proof or justification for the quorum intersection claim – I spent the first 8 pages of the paper hungry for it! It’s in section 5 starting on page 9. There’s also a model checked formal specification in TLA+ in the appendix.

In this paper, we demonstrate that Paxos, which lies at the foundation of many production systems, is conservative. Specifically, we observe that each of the phases of Paxos may use non-intersecting quorums. Majority quorums are not necessary as intersection is required only across phases.

It has previously been noted that Paxos could be generalized to replace majority quorums with any quorum system guaranteeing any two quorums will have a non-empty intersection. FPaxos takes this generalisation one step further by relaxing the conditions under which such guarantees are needed in the first place.

Howard et al. (to the best of their knowledge) are the first to prove and implement the generalization. While preparing their publication Sougoumarane independently made the same observation on which the work is based, and released a blog post summarising it last month.

Why does all this matter?

The fundamental theorem of quorum intersection states that [the resilience of a quorum system] is inversely proportional to the load on (hence the throughput of) the participants. Therefore, with Paxos and its intersecting quorums, one can only hope to increase throughput by reducing the resilience, or vice versa…. by weakening the quorum intersection requirement, we can break away from the inherent trade off between resilience and performance.

Howard et al. go on to illustrate the practical implications of the relaxed quorum requirement for FPaxos using majority quorums, simple quorums, and grid quorums. These however can be considered “naive” quorum systems, and FPaxos actually opens up a whole new world:

There already exists an extensive literature on quorum systems from the fields of databases and data replication, which can now be more efficiently applied to the field of consensus. Interesting example systems include weighted voting, hierarchies, and crumbling walls.

[And that’s another addition to my known unknowns list😉 ].

Majority quorums

If we stick with majority quorums, FPaxos allows us to make a simple improvement in the case when the number of acceptors n is even. Instead of needing n/2 +1 participants in Q2, we can reduce this to n/2.

Such a change would be trivial to implement and by reducing the number of acceptors required to participate in replication, we can reduce latency and improve throughput. Furthermore, we have also improved the fault tolerance of the system. As with Paxos, if at most n/2 -1 failures occur then we are guaranteed to be able to make progress. However unlike with Paxos, if exactly n/2 acceptors fails and the leader is still up then we are able to continue to make progress and suffer no loss of availability.

Simple quorums

Simple quorums are for me the most natural way of understanding the benefits of FPaxos. “We will use the term simple quorums to refer to a quorum system where any acceptor is able to participate in a quorum and each acceptor’s participation is counted equally. Simple quorums are a straightforward generalization of majority quorums.”

Since we require only that |Q1| + |Q2| > N, and we know that phase 2 happens a lot more often than phase one, we can reduce the quorum size requirement in phase 2 and make a corresponding increase in phase 1. Take a classic Paxos deployment with 5 replicas. Whereas traditionally each phase requires at least three nodes (acceptors) to be up, we can tweak this to require four acceptors in a leader election quorum, but only two acceptors for Q2.

FPaxos will always be able to handle up to |Q2| – 1 failures. However, if between |Q2| to N – |Q2| failures occur, we can continue replication until a new leader is required.

If we chose 5 replicas with |Q1| = 4 and |Q2| = 2, we therefore will always be able to handle any single failure, and we’ll be able to continue replication with the loss of up to three replicas, so long as we don’t need a new leader.

Here are some results from the evaluation showing how the latency and throughput of FPaxos compares to regular Paxos with varying Q2 quorum sizes:

Grid quorums

Grid quorum schemes arrange the N nodes into a matrix of N1 columns by N2 rows, where N1 × N2 = N and quorums are composed of rows and columns. As with many other quorum systems, grid quorums restrict which combinations of acceptors can form valid quorums. This restriction allows us to reduce the size of quorums whilst still ensuring that they intersect.

Whereas with simple quorums any reduction in |Q2| must be paid for by a corresponding increase in |Q1|, with grid quorums we can make different trade-offs between quorum size, quorum selection flexibility, and fault tolerance.

Since Paxos requires all quorums to intersect, one suitable scheme would be to require one row and one column to form a quorum : (a) in the figure above.

In FPaxos we can safely reduce our quorums to one row of size N1 for Q1, and one column of size N2 for Q2. This construction is interesting as quorums from the same phase will never intersect, and may be useful in practice for evenly distributing the load of FPaxos across a group of acceptors.

A further enhancement

FPaxos only requires that a given Q1 will intersect with all Q2’s with lower proposal numbers. Given a mechanism to learn which Q2s have participated in lower numbered proposals, we have further flexibility in choosing a Q1 quorum.

The implications of this enhancement can be far reaching. For example, in a system of N = 100 f nodes, a leader may start by announcing a fixed Q2 of size f+1 and all higher proposal numbers (and readers) will need to intersect with only this Q2. This allows us to tolerate _N-f_failures…

(And many other interesting variations are possible).

The bottom line

Generalizing existing systems to use FPaxos should be quite straightforward. Exposing replication (phase 2) quorum size to developers would allow them to choose their own trade-off between failure tolerance and steady state latency… [Secondly], by no longer requiring replication quorums to intersect, we have removed an important limit on scalability. Through smart quorum construction and pragmatic system design, we believe a new breed of scalable, resilient, and performant consensus algorithms is now possible.

Time evolving graph processing at scale

September 26, 2016

Time evolving graph processing at scale Iyer et al., GRADES 2016

Here’s a new (June 2016) paper from the distinguished AMPlab group at Berkeley that really gave me cause to reflect. The work addresses the problem of performing graph computations on graphs that are constantly changing (because updates flow in, such as a new follower in a social graph). Many graphs of interest have this property of constantly evolving. In part, that’s what makes them interesting. You could always take a snapshot of e.g. the graph as it was at the end of the previous day and compute on that, but some applications need more up to date results (e.g. detecting traffic hotspots in cellular networks), and many applications would benefit from real-time results. GraphTau is a solution to this problem, implemented on top of GraphX which is in turn implemented on top of Spark’s RDDs. It’s a convergence of stream processing and graph processing.

I’m seeing a lot of streaming recently, and a lot of convergence. That topic probably warrants a separate post. In the meantime…

Graph-structured data is on the rise, in size, complexity and the dynamism they exhibit. From social networks to telecommunication networks, applications that generate graph-structured data are ubiquitous… the dynamic nature of these datasets gives them a unique characteristic – the graph-structure underlying the data evolves over time. Unbounded, real-time data is fast becoming the norm, and thus it is important to process these time-evolving graph-structured datasets efficiently.

(Aside: applications generating graph-structured data certainly are ubiquitous – pretty much any relational database has graph structure the minute you introduce foreign keys. It’s applications generating graph-structured data and that require extensive traversals or graph-specific computations that we’re really interested in here).

For time-evolving graph-structured datasets, the authors identify three core requirements:

  1. The ability to execute iterative graph algorithms in real-time
  2. The ability to combine graph-structured data with unstructured and tabular data
  3. The ability to run analytics over windows of input data

While some specialized systems for evolving-graph processing exist, these do not support the second and third requirements.  GraphTau is “the first time-evolving graph processing system, to our knowledge, built on a general purpose dataflow framework.” GraphTau is built on top of GraphX, which maintains graphs internally as a pair of RDDs: a vertex collection and an edge collection.

(Note that Apache Flink has Gelly, which builds graph processing on top of a streaming dataflow core, but does not support iterative processing over evolving graphs to the best of my knowledge.)

The main idea in GraphTau is to treat time-evolving graphs as a series of consistent graph snapshots, and dynamic graph computations as a series of deterministic batch computations on discrete time intervals. A graph snapshot is simply a regular graph, stored as two RDDs, the vertex RDD and the edge RDD. We define GraphStream as a sequence of immutable, partitioned datasets (graphs represented as two RDDs) that can be acted on by deterministic operators. User programs manipulate GraphStreams to produce new GraphStreams, as well as intermediate state in the form of RDDs or graphs.

A DeltaRDD is an RDD whose elements are updates that need to be applied to a graph. A stream of such updates is called a DeltaDStream.  GraphStreams can be built from a DeltaDStream or directly from a vertex DStream and an edge DStream.

There are two supported computational models, called pause-shift-resume and online rectification.


Some classes of graph algorithms can cope with the graph being modified while the algorithm is still converging. For example, if a graph changes during an evaluation of PageRank you’ll still get an answer, which studies have shown will be within a reasonable error to the actual answer you’d get if you started the algorithm again from scratch with the now current graph.

Under these conditions, the pause-shift-resume (PSR) model is appropriate.

In this model, GraphTau starts running a graph algorithm as soon as the first snapshot of a graph is available. Upon the availability of a new snapshot, it pauses the computation on the current graph, shifts the algorithm specific data to the new snapshot, and resumes the computation on the new graph.

Online rectification

Algorithms such as connected-components will produce incorrect results under the PSR model (consider an edge or vertex that is removed during processing).

For such algorithms, GraphTau proposes the online rectification model. In this model, GraphTau rectifies the errors caused by the underlying graph modificationts in an online fashion using minimal state.

In the connected component example, it is necessary for every vertex to keep track of its component id over time. The approach works for any algorithm based on label propagation, at the expense of needing to keep algorithm-specific state.

The question of time

GraphStream splits time into non-overlapping intervals, and stores all the inputs received during these intervals in batches (worker nodes are synchronized using NTP).  Such intervals are based on receive time, there is also an option to process based on external timestamps (event time) which requires either the introduction of limited slack time to wait for late events, or application specific code to correct for late records.

Each interval’s updates reflects all of the input received until then. This is despite the fact that the DeltaRDD and its updated graph snapshot are distributed across nodes. As long as we process the whole batch consistently (e.g. ordered by timestamps), we will get a consistent snapshot. This makes distributed state much easier to reason about and is the same as “exactly once” processing of the graph updates even with faults or stragglers.

GraphStream inherits its recovery mechanisms from GraphX and its RDDs: parallel recovery of lost state and speculative execution.

Programming with GraphTau

The GraphStream interface supports transform, merge, streamingBSP, and forEachGraph operations as well an updateLocalState operator to allow for event processing and state tracking.

  • mergeByWindow merges all graphs from a sliding window of past time intervals into one graph
  • forEachGraph applies a function to each graph generated from the GraphStream
  • transformWith combines two graph streams with various join and cogroup operators.
  • the streamingBSP operator supports differential computation

This [streamingBSP] operator enables efficient implementation of a large class of incremental algorithms on time-evolving graphs. We signal the availability of the new graph snapshot using a variable in the driver program. In each iteration of Pregel, we check whether a new graph is available. If so, we do not proceed to the next iteration on the current graph. Instead, we resume computation on the new graph reusing the result, where only vertices in the new active set continue message passing. The new active set is a function of the old active set and the changes between the new graph and the old graph. For a large class of algorithms (e.g. incremental PageRank), the new active set includes vertices from the old set, any new vertices, and vertices with edge additions and deletions.

Here’s what the Page Rank example looks like:

Even on a simple six-node graph where one edge is added after 10 iterations, this saves 13/34 iterations overall.

Here’s another example GraphTau program, showing the ability to unify data and graph stream processing.

This example computes top users in terms of triangle counts from a Twitter attention graph. A DStream ds is created from the external Twitter feed, and then a GraphStream is built from it. Triangle count is applied to each graph snapshot, and then we compute the number of times a user is a top user over a sliding window of ten seconds, outputting results every second.

Preliminary evaluation shows that GraphTau’s performances matches or exceeds that of specialized systems on a streaming connected components task based on a cellular dataset.

The last word…

In this paper, we presented GraphTau, a time-evolving graph processing system built on a data flow framework that addresses this demand. GraphTau represents time-evolving graphs as a series of consistent graph snapshots. On these snapshots, GraphTau enables two computational model, the Pause-Shift-Resume model and the Online Rectification model which allows the application of differential computation on a wide variety of graph algorithms. These techniques enable GraphTau to achieve significant performance improvements.

Texture networks: feed-forward synthesis of textures and stylized images

September 23, 2016

Texture Networks: Feed-forward synthesis of textures and stylized images Ulyanov et al., arXiv, March 2016

During the summer break I mostly stayed away from news feeds and twitter, which induces terrible FOMO (Fear Of Missing Out) to start with. What great research was published / discussed that I missed? Was there a major industry announcement I’m completely ignorant of? One thing I’m glad I didn’t miss was the Prisma app that produces quite beautiful stylized versions of photos from your smartphone. It’s a great example of deep technology behind a simple interface, and also of the rapid packaging and exploitation of research results – today’s choice is the paper describing the technology breakthrough that makes Prisma possible, and it was released to arXiv in March 2016. The source code and models described in the paper can also be found on GitHub.

Gatys et al. recently (2015) showed that deep networks can generate beautiful textures and stylized images from a single texture example. If you want to style a lot of images though (to provide styling-as-a-service for example), you’ll find that their technique is slow and uses a lot of memory. To generate images of equivalent quality, an implementation of Gatys et al. required about 10 seconds and 1.1GB of memory, whereas the approach described by Ulyanov et al. in this paper requires about 20ms and only 170MB of memory. Significantly faster and cheaper therefore, and although the algorithm doesn’t quite match the results of Gatys et al. for all images, it’s still very good.

Just in case you haven’t seen it, here are some examples. First, generating textures in the style of sample image:

And combining a style image with a content image:

If you download the app, you can play with examples using your own photos.

One of the possibilities I’m personally excited about is the opportunities the image creation speed opens up for applying the technique to movies. I like the images, but when I saw the movies created by Ruder et al. using an extension of the Gatys technique I was really blown away [paper,explanation and video]. Update: I just learned about the Artisto app that does this for you!

High-level approach

In general, one may look at the process of generating an image x as the problem of drawing a sample from a certain distribution p(x). In texture synthesis, the distribution is induced by an example texture instance x0 such that we can write x ~ p(x|x0). In style transfer, the distributed is induced by an image x0 representative of the visual style (e.g. an impressionist painting) and a second image x1 representative of the visual content (e.g. a boat), such that x ~ p(x|x0,x1).

Gatys et al. cast this as an optimisation problem looking to minimise the difference between certain image statistics of the generated image, and the statistics of the example image(s). They use an iterative optimisation procedure with back propagation to gradually change the values of the pixels in the generated image until the desired statistics are achieved.

In contrast, in the texture networks approach a feed-forward generation network produces the image, which requires only a single evaluation of the network and does not incur in the cost of backpropagation.

A separate generator network is trained for each texture or style and, once trained, it can synthesize an arbitrary number of images of arbitrary size in an efficient feed-forward manner.

The loss function used in training the generator network is derived from Gatys et al. and compares image statistics extracted from a fixed pre-trained descriptor CNN. This is used to measure the mismatch between the prototype texture and the generated image. The texture loss function compares feature activations across all spatial locations. A similar content loss function compares feature activations at corresponding spatial locations, and therefore preserves spatial information.

Analogously to Gatys et al. we use the texture loss alone when training a generator network for texture synthesis, and we use a weighted combination of the texture loss and the content loss when training a generator network for stylization.


A texture generator network is trained to transform a noise vector sampled from a certain distribution into texture samples that match, according to the texture loss function, a certain prototype texture x0, a three colour channel tensor.

We experimented with several architectures for the generator network g… we found that multi-scale architectures result in images with small texture loss and better perceptual quality while using fewer parameters and training faster. […] Each random noise tensor is first processed by a sequence of convolutional and non-linear activation layers, then upsampled by a factor of two, and finally concatenated as additional feature channels to the partially processed tensor from the scale below.

(Click on image for larger view).

Each convolutional block contains three convolutional layers containing respectively 3×3, 3×3, and 1×1 filters applied using circular convolution to remove boundary effects. Each convolutional layer is followed by a ReLU activation layer.

When learning using stochastic gradient descent each iteration draws a mini-batch of noise vectors, performs forward evaluation of the generator network to obtain the corresponding images, and computes the loss vs x0.

… After that, the gradient of the texture loss with respect to the generator network parameters θ is computed using backpropagation, and the gradient is used to update the parameters.


For stylized image generator networks the network is modified to take as input in addition to the noise vector z , the image y to which the noise should be applied.

The generator network is then trained to output an image x that is close in content to y and in texture/style to a reference texture x0.

The architecture is the same as that used for texture synthesis, _with the important difference that the noise tensors at the K scales are concatenated (as additional feature channels) with downsampled versions of the input image y. The learning objective is to minimize the combination of the content and texture loss.

In practice, we found that learning is surprisingly resilient to overfitting and that it suffices to approximate the distribution on natural images with a very small pool of images (e.g 16).

Broader applicability

The success of this approach highlights the suitability of feed-forward networks for complex data generation and for solving complex tasks in general. The key to this success is the use of complex loss functions that involve different feed-forward architectures serving as “experts” assessing the performance of the feed-forward generator.

Why should I trust you? Explaining the predictions of any classifier

September 22, 2016

“Why Should I Trust You? Explaining the Predictions of Any Classifier Ribeiro et al., KDD 2016

You’ve trained a classifier and it’s performing well on the validation set – but does the model exhibit sound judgement or is it making decisions based on spurious criteria? Can we trust the model in the real world? And can we trust a prediction (classification) it makes well enough to act on it? Can we explain why the model made the decision it did, even if the inner workings of the model are not easily understandable by humans? These are the questions that Ribeiro et al. pose in this paper, and they answer them by building LIME – an algorithm to explain the predictions of any classifier, and SP-LIME, a method for building trust in the predictions of a model overall. Another really nice result is that by explaining to a human how the model made a certain prediction, the human is able to give feedback on whether the reasoning is ‘sound’ and suggest features to remove from the model – this leads to classifiers that generalize much better to real world data.

Consider two classifiers (Algorithm 1 and Algorithm 2 in the figure below) both trained to determine whether a document is about Christianity or atheism. Algorithm 2 performs much better in hold-out tests, but when we see why it is making its decisions, we realise it is actually much worse…

Magenta words are those contributing to the atheism class, green for Christianity. The second algorithm is basing its decision on “Posting”, “Host”, “Re” and “nntp” – words that have no connection to either Christianity or atheism, but happen to feature heavily in the headers of newsgroup postings about atheism in the training set.

What makes a good explanation?

It must be easily understandable by a human!

For example, if hundreds or thousands of features significantly contribute to a prediction, it is not reasonable to expect any user to comprehend why the prediction was made, even if individual weights can be inspected.

And it must meaningfully connect input variables to the response:

..which is not necessarily tue of the features used by the model, and thus the “input variables” in the explanation may need to be different than the features.

Furthermore, an explanation must have local fidelity: it should correspond to how the model behaves in the vicinity of the instance being predicted.

The ideal explainer, should also be able to explain any model, and thus be model-agnostic.

A key insight – local interpretation

Creating a globally faithful interpreter of a model’s decisions might require a complete description of the model itself. But to explain an individual decision we only need to understand how it behaves in a small local region. The idea reminds me a little bit of differentiation – overall the shape of the curve may be very complex, but if we look at just a small part we can figure out the gradient in that region.

Here’s a toy example from the paper – the true decision boundary in the model is represented by the blue/pink background. In the immediate vicinity of the decision (the bold red cross) though we can learn a much simpler explanation that is locally faithful even if not globally faithful.

The LIME algorithm produces Local Interpretable Model-agnostic Explanations.

The overall goal of LIME is to identify an interpretable model over the interpretable representation that is locally faithful to the classifier.

For text classification, an interpretable representation could be a vector indicating the presence or absence of a word, even though the classifier may use more complex word embeddings. For image classification an interpretable representation might be an binary vector indicating the ‘presence’ or ‘absence’ of a contiguous patch of similar pixels.

LIME works by drawing samples in the vicinity of the input to be explained and learning a linear classifier using locally weighted square loss, with a limit K set on the number of interpretable features.

Since [the algorithm] produces an explanation for an individual prediction, its complexity does not depend on the size of the dataset, but instead on time to compute f(x) [a model prediction] and on the number of samples N. In practice, explaining random forests with 1000 trees using scikit-learn on a laptop with N = 5000 takes under 3 seconds without any optimizations such as using gpus or parallelization. Explaining each prediction of the Inception network for image classification takes around 10 minutes.

From local explanation to model trust

The central idea here is that if we understand and trust the reasoning behind an individual prediction, and we repeat this process for a number of predictions that give good coverage of the input space, then we can start to build global trust in the model itself.

We propose to give a global understanding of the model by explaining a set of individual instances. This approach is still model agnostic, and is complementary to computing summary statistics such as held-out accuracy. Even though explanations of multiple instances can be insightful, these instances need to be selected judiciously, since users may not have the time to examine a large number of explanations. We represent the time/patience that humans have by a budget B that denotes the number of explanations they are willing to look at in order to understand a model. Given a set of instances X, we define the pick step as the task of selecting B instances for the user to inspect.

Examining the instances X, we know the features that are locally important in making the prediction at X. Features that are locally important for many instances are globally important. Instances B are picked so as to cover the globally important features first, and to avoid redundancy in explanation between them.

With a little help from my friends

Using human subjects recruited via Amazon Mechanical Turk – by no means machine learning experts, but with a basic knowledge of religion – the team provided explanations for the predictions of two different models classifying documents as atheist or Christian and asked the subjects which would generalize better (perform the best in the real world). Using LIME coupled with the mechanism just described to create representative instances, the human subjects were able to choose the correct model 89% of the time.

A second experiment asked Amazon Mechanical Turk users to identify which words from the explanations should be removed from subsequent training, for the worst classifier.

If one notes that a classifier is untrustworthy, a common task in machine learning is feature engineering, i.e. modifying the set of features and retraining in order to improve generalization. Explanations can aid in this process by presenting the important features, particularly for removing features that the users feel do not generalize.

The users are not ML experts, and don’t know anything about the dataset. Starting with 10 users, 10 classifiers are trained (one for each subject, with their suggested words removed). These are presented to five users each, resulting in another 50 classifiers. Each of these are presented to five users, giving 250 final models.

It is clear… that the crowd workers are able to improve the model by removing features they deem unimportant for the task… Each subject took an average of 3.6 minutes per round of cleaning, resulting in just under 11 minutes to produce a classifier that generalizes much better to real world data.

High agreement among users on the words to be removed indicated that users are converging to similar correct models.. “This evaluation is an example of how explanations make it easy to improve an untrustworthy classifier – in this case easy enough that machine learning knowledge is not required.”