Data processing can often be naturally expressed as a sequence of steps in a pipeline. For example, the unix command line below that pipes a file through a series of transforms to ultimately generate some output.
cat Fin.csv | a | b | c | d > Fout.txt
Observe that if a, b, c, and d are pure functions (no side effects) then having Fin.csv and the pipe ‘a | b | c | d’ in our possession is equivalent (ignoring the cost of computation) to having Fout.txt since we can always generate Fout.txt at any time. The pipeline ends up looking like a function composition or arrow. The classic pipe is set up in a streaming fashion with the output from one step immediately available to the next. If a step needs to see all of the input before creating any output (e.g. adding a ‘sort’ into a unix pipe) then it can simple buffer the input until it is ready to process it.
Many big data frameworks don’t work in this streaming way though. Instead, the output of each processing step is written to a file, and the next step reads the data back in from that file. Keeping with the unix analogy, it looks something like this:
a < Fin.csv > T1.dat
b < T1.dat > T2.dat
c < T2.dat > T3.dat
d < T3.dat > Fout.txt
Notice all that extra disk I/O taking place! It looks horribly inefficient doesn’t it?
Because of these limitations and the advancement of in-memory computation frameworks, inter-job data sharing cost often dominates pipeline’s end-to-end latencies for big data workloads.
Once again, if a, b, c, and d are pure functional transforms then if I have Fin.csv, I can recreate Fout.txt on demand (and any of the intermediate stages too). As an aside, note that datalog would be a great basis for implementing the processing steps (as would SQL where applicable).
Sometimes those intermediate files are actually required outputs (i.e. long-lived) in their own right, but oftentimes they are temporary:
From our contacts at Facebook, nowadays, more than 70% data is deleted within a day, without even counting shuffle data.
Moreover, a lot of ‘big data’ workloads will actually fit happily in memory as file sizes follow a Zipf distribution.
As observed previously, working sets are Zipf-distributed. We can therefore store in memory all but the very largest datasets, which we avoid storing in memory altogether. For example, the distribution of input sizes of MapReduce jobs at Facebook is heavy-tailed. Furthermore, 96% of active jobs can have their entire data simultaneously fit in the corresponding clusters’ memory
Put all these pieces together and you have the motivation for Tachyon: we can recompute any output if we lose it so long as we have pure functional transforms and the input data set (a concept known as lineage); we can fit 96% of active jobs in memory; and we probably don’t need many of the temporary files to ever be stored on disk anyway (so long as the next step can still read them).
Tachyon is a distributed (in-memory) file system enabling reliable data sharing at memory speed across cluster computing frameworks. While caching today improves read workloads, writes are either network or disk bound, as replication is used for fault-tolerance.
Caching can always improve read performance, but traditionally writing (either to disk, or over the network to other machines) is used for fault tolerance.
Even replicating the data in memory can lead to a significant drop in the write performance, as both the latency and throughput of the network are typically much worse than that of local memory.
But if you have lineage, maybe you can get fault tolerance another way?
Tachyon circumvents the throughput limitations of replication by leveraging the concept of lineage, where lost output is recovered by re-executing the operations (tasks) that created the output. As a result, lineage provides fault-tolerance without the need for replicating the data.
This concept is used in Spark, but there are extra challenges with Tachyon due to the continuously running nature of the system that requires a checkpointing process to ensure that recomputation costs remain bounded. An algorithm called the Edge algorithm is introduced that figures out which are the most beneficial files to checkpoint and when.
Spark uses lineage information within a single job or shell, all running inside a single JVM. Different queries in Spark cannot share datasets (RDD) in a reliable and high-throughput fashion, because Spark is a computation engine, rather than a storage system. Our integration with Spark substantially improves existing industry workflows of Spark jobs, as they can share datasets reliably through Tachyon.Moreover, Spark can benefit from the asynchronous checkpointing in Tachyon, which enables high-throughput write.
Lineage may become an increasingly important mechanism in the future:
due to the inherent bandwidth limitations of replication, a lineage-based recovery model might be the only way to make cluster storage systems match the speed of in-memory computations in the future.
…and may be applicable to a large fraction of data center workloads:
Recomputation based recovery assumes that input files are immutable (or versioned, c.f., §9) and that the executions of the jobs are deterministic. While these assumptions are not true of all applications, they apply to a large fraction of datacenter workloads (c.f., §2.1), which are deterministic applications (often in a high-level language such as SQL where lineage is simple to capture).
Keeping everything in memory is a great goal (and remember that 96% of Facebook’s jobs would fit simultaneously in memory). But if you do run out of memory, it’s no big deal, just evict some files to disk. An LRU eviction policy is used for this by default, though I suspect ARC would perform even better.
Tachyon is an append-only file system (as is HDFS), that also provides an API for specifying lineage.
As long as a framework, e.g. Spark, integrates with Tachyon, applications on top of the framework take advantage of lineage based fault-tolerance transparently.
These framework integrations take a surprisingly small amount of code: 300 lines-of-code for Spark, and 200 for MapReduce.
This is all well and good, but how well does it work in practice?
Tachyon can write data 110x faster than MemHDFS (in-memory HDFS). It speeds up a realistic multi-job workflow by 4x over MemHDFS. In case of failure, it recovers in around one minute and still finishes 3.8x faster.
(MemHDFS still replicates data in-memory over the network, but avoids writes to disk). It gets great raw performance too.
For reads, Tachyon achieves 38GB/sec/node. We optimized HDFS read performance using two of its most recent features, HDFS caching and short-circuit reads. With these features, MemHDFS achieves 17 GB/sec/node. The reason Tachyon performs 2x better is that the HDFS API still requires an extra memory copy due to Java I/O streams.
Likewise Tachyon can speed up even Spark’s native modes by avoiding Java memory management.
So Tachyon is fast, but of course it also reduces a lot of traffic that would otherwise flow on the network:
The Edge algorithm out-performs any fixed checkpointing interval. Analysis shows that Tachyon can reduce replication caused network traffic up to 50%.
And if it all goes wrong, the recomputation based recovery has surprisingly low overhead:
Recomputation has minor impact on other jobs. In addition, recomputation would consume less than 1.6% of cluster resources in traces from Facebook and Bing.
This is another paper full of great insights and information – my copy is heavily marked up and I’ve only been able to share a small percentage of my highlighted passages here. I encourage you to read the full paper from Li et. al. to get the most from it.