Skip to content

End of term, and Orders of Magnitude

July 10, 2017

It’s end of term time again. As part of making The Morning Paper habit sustainable I take a few weeks off three times a year to do some more relaxed background reading, recharge my paper queues, and let my mind wander. The Morning Paper will return on Monday 7th August.

Here are a few selections from the last few months to tide you over in case you missed them:

And here’s something a little different which didn’t quite fit in any particular paper review as a fun thought to leave you with for now: developing an intuition for orders of magnitude and some of the numbers you see in CS papers.


When I was looking at forthcoming hardware changes (All Change Please), translating all of the nanoseconds, microseconds, and milliseconds onto a human scale really helped me to understand the massive performance differences. If we set up a ‘human time’ scale where one nanosecond is represented by one second, then we have:

1 nanosecond   = 1 second
1 microsecond  = 16.7 minutes
1 millisecond  = 11.6 days
1 second       = 31.7 years

(Because we jump three orders of magnitude at each step, you can start the scale at any point, e.g., if you equate one microsecond with one second, then one millisecond is about 16.7 minutes).


Units of time help when looking at absolute feeds and speeds. But for comparing relative latencies I like to think about ‘how fast does it get there’. Let’s set as a baseline a person walking at a brisk pace of 4 miles per hour. Maybe you need to get from New York to San Francisco for example. Then,

  • One order of magnitude faster is the car that drives past you at 40mph.
  • Two orders of magnitude faster is approximately a commercial airliner. (Thanks to Dan Holle for this analogy)
  • Three orders of magnitude faster is literally faster than a speeding bullet from a high-powered rifle (see: Muzzle Velocity)


To visualise relative throughput I like to think of people crossing a checkpoint at a border (e.g., every traveller’s favourite: the customs and immigration line). Let’s set as the baseline one person getting through every second. Then,

  • One order of magnitude higher throughput is a minibus’ worth of people arriving every second and getting through.
  • Two orders of magnitude higher throughput is two coaches’s worth of people arriving every second and getting through.
  • Three orders of magnitude higher throughput is approximately two Airbus A380’s worth of people arriving every second and getting through (I wish!).

Do we need specialized graph databases? Benchmarking real-time social networking applications

July 7, 2017

Do we need specialized graph databases? Benchmarking real-time social networking applications Pacaci et al., GRADES’17

Today’s paper comes from the GRADES workshop co-located with SIGMOD. The authors take an established graph data management system benchmark suite (LDBC) and run it across a variety of graph and relational stores. The findings make for very interesting reading, and may make you think twice before you break out the graph store on your next OLTP-like application.

The highly-connected structure of many natural phenomena, such as road, biological, and social networks make graphs an obvious choice in modelling. Recently, storage and processing of such graph-structured data have attracted significant interest both from industry and academia and have led to the development of many graph analytics systems and graph databases.

Analytics systems such as Pregel, Giraph, and PowerGraph specialize in OLAP-like batch processing of graph algorithms. Graph databases focus on real-time querying and manipulation of entities, relationships, and the graph structure.

Many studies focus on comparisons between different graph database engines and graph analytics systems. Although there are some studies comparing graph databases with relational models, the real-time aspect of graph applications is mostly ignored and more complex graph traversals are not tested.

The work in this paper fills that gap.

Finding a fair workload for benchmarking

If you want to say “my database is better than your database” then you really also need to specify “for what?”. And if you want to evaluate whether graph databases really do earn their keep as compared to relational databases, you really want to do the comparison on the home turf of the graph databases – the use cases they claim to be good at.

The Linked Data Benchmark Council (LDBC) is a joint effort from academia and industry to establish benchmarking practices for evaluating RDF and graph data management systems, similar to the Transaction Processing Performance Council (TPC).

The LDBC Social Network Benchmark (SNB) models a social graph, and three different workloads based on top of it. Of these, the SNB Interactive workload is an OLTP-like workload designed to simulate real-world interactions in social networking applications.

If you have a joint benchmarking council going to all the trouble of specifying graph based benchmarks, it seems reasonable to assume the council believe these to be representative of the kinds of workloads graph databases are expected to be used for in the wild. So that seems a pretty good choice to use to compare different graph systems and relational systems.

Strictly of course all we can say from the results of this paper is that “for the LDBC SNB Interactive Workload running on a single 32 core node, this is what we found.” There may be other workloads which show graph databases in a stronger light (and vice-versa). But it’s a very good start, and a little extrapolation is justified I feel.

The LDBC SNB Interactive Workload species a set of read-only traversals that touch a small portion of the graph and concurrent update transactions that modify the social network’s structure. The majority of the read-only traversals are simple, common social networking operations that involve transitive closure, one-hop neighbourhood, etc. The rest of the read-only traversals are complex operations that are usually beyond the functionality of real-world social network systems due to their online nature.

The primary goal of the benchmark is to evaluate technical choke points in the underlying systems under test.

Systems under test

Representing highly-connected data in the relational model results in a large amount of many-to-many relations, which can produce complex, join-heavy SQL statements for graph queries. By focusing not on the entities but rather the relationships among the entities, graph databases and RDF stores can offer efficient processing of graph operations like reachability queries and pattern matching.

RDF stores represent graphs as collections of triples and use a standardised query language called SPARQL. Graph databases mostly use adjacency lists for storing adjacency information next to each entity. This enables index-free adjacency access with relationships traversed directly by pointers. Most graph databases support a proprietary query API (e.g. Neo4j’s Cypher), and there are also efforts to provide a uniform querying API across stores, most notably Apache TinkerPop3. TinkerPop3 provides the Gremlin Structure API and the Gremlin query language. For TinkerPop3-compliant databases, Gremlin queries are supported through the Gremlin server.

The authors created reference implementations of the LDBC SNB Interactive workload in Gremlin and in SQL as well as using the database proprietary APIs. The SQL implementation maps each vertex type and edge type to separate tables.

The full list of evaluated systems is as follows:

  • TitanDB v1.1 with Cassandra (Titan-C) and BerkeleyDB (Titan-B) storage backends. (Gremlin query language)
  • Neo4j 2.3.6 with both Gremlin and Cypher
  • Virtuouso Opensource v7.2.4 – a column store RDBMS with support for RDF processing. Tested with both SQL and SPARQL queries (i.e. RDF, and non-RDF).
  • Postgres v9.5, using native SQL queries
  • Sqlq v1.3.3 – a TinkerPop3 implementation on top of Postgres (Gremlin query language)

The systems were run on a 32x 2.6 GHz core machine with 256 GM RAM, and configured to load the entire dataset into main memory.

Workloads were created using the LDBC data generator at two different scale factors (3 and 10), resulting in the following dataset statistics:


Key results

Examining read-only graph queries first (point lookups, one-hop traversals, two-hop traversals, and single-pair shortest path queries), the authors found that the relational databases provide the lowest query latency across all query types. They also found that Gremlin support comes with high overheads:

For Neo4j, the Gremlin interface introduces up to two orders of magnitude of performance degradation compared to the native Cypher interface. Similarly, Postgres (SQL) significantly outperforms Sqlg (Gremlin) even though both systems have the same underlying data model and storage engine.

The more complex the query, the worse Gremlin seems to do, as show in the following result tables: one for the scale 3 dataset, and one for scale 10 (highlights mine).

Take a look at the Postgres and Neo4j (Cypher) columns in the scale factor 10 table above. For queries up to two hops (i.e., two joins, not especially stressful for an RDBMS), Postgres outperforms Neo4j by an order of magnitude. For the shortest path query however, Neo4j outperforms Postgres by two orders of magnitude. It’s Virtuoso though that shows the best overall performance across the board, with its SQL interface, even beating Neo4j with Cypher on the shortest path query. Notice that all of my green (best in row) highlights at scale factor 10 are in SQL columns!

Virtuoso (SQL) outperforms Postgres (SQL) in more complex two-hop traversals and single-pair shortest path queries. Virtuoso’s graph-aware engine and optimized transitivity support enable it to execute such complex graph queries eciently. Virtuoso (SPARQL) has slightly lower performance due to query translation costs, even though the benchmark queries are graph queries.

The second part of the evaluation used the full LDBC SNB Interactive workload to simulate the real-time aspects of online social networking applications (the authors processed updates through a dedicated Kafka queue, modelling a structure commonly seen in industrial deployments). The TinkerPop3 systems couldn’t cope with the relatively large numbers of complex queries in the mix specified by the benchmark. The authors chose to change the mix to include just a two-hop neighbourhood-based complex query and a set of short read-only queries. Even then, Titan-B had to be withdrawn from the experiment as its performance degradation was too severe. Personally I would really like to have also seen the results on the original query mix for the systems that could handle it – the choice made by the authors here seems to bias things in favour of the SQL systems.

For the scale 3 dataset this time, here are the aggregrate read and write throughputs obtained:

In general, RDBMSes with a native SQL interface provide the best performance under the real-time interactive workload. While read performance of selected systems are comparable and within a factor of four, Postgres (SQL) and Virtuoso (SQL) exhibit significantly better update performance and maintain up to an order of magnitude faster write throughput compared to their competitors.

Note also the frequent sudden drops in Neo4j’s (Cypher) update performance – attributed to checkpointing.


  • Maybe stay away from Gremlin for now. It’s a very worthy goal, but the performance just doesn’t seem to be there to justify it at the moment.

A more concerning problem relates to the performance of the Gremlin Server… it was unable to handle complex queries under a large number of concurrent clients… causing it to hang and eventually crash. Overall our experience reveals the generally poor state of implementations and suggests that TinkerPop3 and specifically the Gremlin Server are not production ready.

  • If you don’t have lots of complex graph operations, RDBMSs can provide competitive performance under a concurrent transactional workload:

We believe that robust RDBMS technology can deliver competitive performance for OLTP-like online social networking applications, especially in single-node settings… RDBMSes should not be ignored for interactive transactional graph workloads.


Does this mean you shouldn’t use a graph database? I don’t think you can draw that conclusion from this study. On the one hand, the benchmarks used seem to bias the workload towards things which traditional relational databases are very good at, so it’s no suprise they do well. But on the other hand the workload was chosen as representative of an OLTP-like graph workload as selected by an independent benchmarking council. We can conclude that if you have an OLTP-like graph workload it’s well worth checking out the relational alternative before you take the plunge.

You may recall we previously looked at Ground, which captures lineage, model, and version metadata graphs for data. This is a problem most naturally modelled in a graph store (indeed, one of the companies I work with, Atomist, use a graph database for a very similar use case). In the Ground paper we also find some hints about the best kind of store for this workload – they found that relational systems (Postgres) were an order of magnitude slower than graph processing systems for their workload. They also found that even the leading graph databases were lacking in terms of performance at scale.

This theme was also echoed in the ‘Dependency-driven analytics‘ paper: “… this experience indicates that industry is in desperate need for a fully capable, scale-out graph solution that can handle complex, large, and structured graphs…“.

Using word embedding to enable semantic queries on relational databases

July 6, 2017
tags: ,

Using word embedding to enable semantic queries in relational databases Bordawekar and Shmeuli, DEEM’17

As I’m sure some of you have figured out, I’ve started to work through a collection of papers from SIGMOD’17. Strictly speaking, this paper comes from the DEEM workshop held in conjunction with SIGMOD, but it sparked my imagination and I hope you’ll enjoy it too. Plus, as a bonus it’s only four pages long!

What do you get if you cross word embedding vectors with a relational database? The ability to ask a new class of queries, which the authors term cognitive intelligence (CI) queries, that ask about the semantic relationship between tokens in the database, rather than just syntactic matching as is supported by current queries. It’s a really interesting example of AI infusing everyday systems.

We begin with a simple observation: there is a large amount of untapped latent information within a database relation. This is intuitively clear for columns that contain unstructured text. But even columns that contain different types of data, e.g., strings, numerical values, images, dates, etc., possess significant latent information in the form of inter- and intra-column relationships.

If we understood the meaning of these tokens in the database (at least in some abstract way that was comparable), we could ask queries such as “show me all the rows similar to this.” That’s something you can’t easily do with relational databases today – excepting perhaps for range queries on specific types such as dates. Where can we get comparable abstract representations of meaning though? The answer is already given away in the paper title of course – this is exactly what word embedding vectors do for us!

If you’re not familiar with word embedding vectors, we covered word2vec and GloVe in The Morning Paper a while back. In fact, “The Amazing Power of Word Vectors” continues to be one of the most read pieces on this blog. In short:

The idea of word embedding is to fix a d-dimensional vector space and for each word in a text corpus associate a dimension d vector of reals numbers that encodes the meaning of that word… If two words have similar meaning, their word vectors point in very similar directions.

The authors use word2vec in their work, though as they point out they could equally have used GloVe.

How do we get word embedding vectors for database content?

One approach is to use word vectors that have been pre-trained from external sources. You can also learn directly from the database itself. Think of each row as corresponding to a sentence, and a relation as a document.

Word embedding then can extract latent semantic information in terms of word (and in general, token) associations and co-occurrences and encode it in word vectors. Thus, these vectors capture first inter- and intra-attribute relationships within a row (sentence) and then aggregate these relationships across the relation (document) to compute the collective semantic relationships.

In their prototype implementation, the authors first textify (!) the data in a database table (e.g., using a view), and then use a modified version of word2vec to learn vectors for the words (database tokens) in the extracted text. This phase can also use an external source (e.g. Wikipedia articles) for model training.

We use word as a synonym to token although some tokens may not be valid words in any natural language. Following vector training, the resultant vectors are stored in a relational system table.

At runtime, the system (built on Spark using Spark SQL and the DataFrames API) uses UDFs to fetch trained vectors from the system and answer CI queries.

CI Queries

Broadly, there are two classes of cognitive intelligence queries: similarity and prediction queries… The key characteristic of the CI queries is that these queries are executed, in part, using the vectors in the word embedding model. If the word embedding model is generated using the database being queried, it captures meaning in the context of the associated relational table, as specified by the relational view. If a model is rebuilt using a different relational view, a CI query may return different results for the new model.

It’s time to look at some concrete examples to make all this a bit clearer. Given a similarityUDF that can tell us how similar two sets of word vectors are, we can ask a query such as:

In this case, the vector sets correspond to the items purchased by the corresponding customers. What this query will return is pairs of customers that have similar purchasing histories!

The pattern observed in this query can be applied to other domains as well, e.g., identifying patients that are taking similar drugs, but with different brand names or identifying food items with similar ingredients, or finding mutual funds with similar investment strategies.

The key difference to a traditional query is that we’re matching by semantic similarity, not by values.

Recall that word embeddings also support inductive reasoning (e.g., the classic King is to Man as Queen is to ? style queries). You can exploit this capability in CI queries too. In the following toy example, we’re looking for food product pairs that relate to each other as ‘peanut-butter’ relates to ‘jelly’. (For example, the query may return the pair ‘chips’, ‘salsa’).

The analogyUDF computes the differences (peanut butter – jelly) and (p1 – p2) and looks at the cosine similarity of those differences.

The analogy capabilities of CI queries have several applications in the enterprise space, e.g., associating customers with either most-common or least-common purchases in a given domain (e.g., books, electronics, etc.).

I understand the analogy query mechanism, but I’m not sure I quite get the example the authors are trying to give above. Neither finding product popularity, nor seeing whether a customer has purchased a low-popularity (high popularity) item seems to need an analogy? Here’s an example of my own – recommendation by analogy: razor is to blade as [product the customer just put in their basket] is to ?. (Probably not about to replace frequent itemset mining anytime soon!)

Our final example shows how embeddings trained using external data can be used in queries. Suppose we trained word embeddings with a data set that reveals information about fruits and their allergenic properties. We would have a relationship between the vector for ‘allergenic’ and the vectors for allergenic fruit names. Now we can ask:

This example demonstrates a very powerful ability of CI queries that enables users to query a database using a token (e.g., allergenic) not present in the database.

The last word

Will all relational databases one day come with CI querying capabilities built-in?

In summary, we believe this work is a step towards empowering database systems with built-in AI capabilities… We believe CI queries are applicable to a broad class of application domains including healthcare, bio-informatics, document searching, retail analysis, and data integration. We are currently working on applying the CI capabilities to some of these domains.

Blockbench: a framework for analyzing private blockchains

July 5, 2017

Blockbench: a framework for analyzing private blockchains Dinh et al., SIGMOD’17

Here’s a paper which delivers way more than you might expect from the title alone. First we get a good discussion of private blockchains and why interest in them is growing rapidly. Then the authors analyse the core layers in a private blockchain, and show how existing systems make different trade-offs in each of these areas. Finally, using the Blockbench tool we get a whole new level of understanding of how well the systems work in practice and their various strengths and weaknesses. Based on their findings, the authors conclude with a set of recommendations for future private blockchain designs. Even the appendices are great! Especially if you’re interested in future applications of blockchain technology beyond cryptocurrencies, it’s well worth a read.

Introducing private blockchains

Interest from the industry has started to drive development of new blockchain platforms that are designed for private settings in which participants are authenticated. Blockchain systems in such environments are called private (or permissioned), as opposed to the early systems operating in public (or permissionless) environments where anyone can join and leave. Applications for security trading and settlement, asset and finance management, banking and insurance are being built and evaluated.

Most of these applications currently run on traditional enterprise databases. Why is blockchain so exciting for these use cases?

  • Blockchain immutability and transparency help reduce human errors and the need for manual intervention due to conflicting data.
  • Blockchain can streamline processes by removing duplicate efforts in data governance.
  • Blockchain provides a basis for cooperation among competing entities that may not fully trust each other.

Goldman Sachs estimated 6 billion saving in current capital markets, and J.P. Morgan forecast that blockchains will start to replace currently redundant infrastructure by 2020.

For private blockchain use cases, we need more complex state than just the digital coins (cryptocurrency) supported in Bitcoin, and we need trusted logic to manipulate that state. A good example is Ethereum with its smart contracts (which we looked at previously).

Ethereum extends Bitcoin to support user-defined and Turing complete state machines. In particular, Ethereum blockchain lets the user define any complex computations in the form of smart contracts.

Ethereum uses proof-of-work (PoW) based consensus, as do nearly all public blockchain systems. In permissioned environments PoW can still be used, but there are more efficient and deterministic approaches where node identities are known.

Recent permissioned blockchains either use PBFT, as in Hyperledger, or develop their own variants, as in Parity, Ripple, and ErisDB… As a result, permissioned blockchains can execute complex applications more efficiently than PoW-based blockchains, while being Byzantine fault tolerant. These properties and the commercial interests from major banking and financial institutions have bestowed on private blockchains the potential to disrupt the current practice in data management.

An abstract model of a blockchain system

The multitude of blockchain based systems all have different design points. As a basis for comparison, the authors use the following abstract model:

Starting from the bottom,

  • the consensus layer is responsible for reaching agreement on when a block is added to the blockchain,
  • the data layer contains the structure, content and operations on the blockchain data itself,
  • the execution engine is the runtime environment supporting blockchain operations, and
  • the application layer includes classes of blockchain applications.

Blockbench, which we’ll come onto shortly, contains benchmarks designed to evaluate the performance of different implementations in each of these layers.


Blockchain systems employ a spectrum of Byzantine fault-tolerant protocols. Bitcoin uses a proof of work scheme at a difficulty level achieving a rate of one block every 10 minutes. Ethereum also uses proof of work, but tuned to achieve a rate of one block every 14 seconds.

[PoW] consumes a lot of energy and computing power, as nodes spend their CPU cycles solving puzzles instead of doing otherwise useful work. Worst still, it does not guarantee safety: two nodes may both be selected to append to the blockchain, and both blocks can be accepted. This causes a fork in the blockchain, and most PoW-based systems add additional rules, for example, only blocks on the longest chain are considered accepted.

At the other end of the spectrum, HyperLedger uses PBFT (Practical Byzantine Fault Tolerance) which has communication overhead in O(N^2), and can tolerate up to N/3 failures.

PBFT has been shown to achieve liveness and safety properties in a partially asynchronous model, thus unlike PoW, once the block is appended it is confirmed immediately.

PBFT does require the node identities to be known, and is limited in scale due to the communication overhead.

In-between full PoW and PBFT there are a variety of design points, for example Byzcoin and Elastico use PoW to determine random smaller consensus groups which then run PBFT. Other models include proof-of-stake systems, of which an interesting variant is proof-of-authority: each pre-determined authority is assigned a fixed time-slot within which it can generate blocks.

Data model

In Bitcoin, transactions are first class citizens: they are system states representing digital coins in the network. Private blockchains depart from this model, by focusing on accounts. One immediate benefit is simplicity, especially for applications involving crypto-currencies.

Ethereum smart contract accounts combine executable code and private states. The code can read the state of other non-contract accounts, and can send new transactions. Hyperledger has accounts called chaincode which are like Ethereum’s contracts, only chaincode can only access its own private storage.

Parity keeps block content in memory. Ethereum and Hyperledger use two-layered data structures with disk-based key-value storage. There are also variations in Merkle trees (e.g. Patricia-Merkle trees supporting efficient update and search, and Bucket-Merkle trees which use hashing to group states into a list of buckets from which a Merkle tree is built).

Execution engine

Execution must be fast, since multiple contracts and transactions in a block must all be verified by a node, and it must be deterministic so that unnecessary inconsistency in transaction input and output leading to blocks being aborted can be avoided.

Ethereum develops its own machine language (bytecode) and a virtual machine (called EVM) for executing the code, which is also adopted by Parity.

This enables Ethereum to keep track of gas, the tax on execution. HyperLedger doesn’t have this concept, and simply supports running Docker images. The HyperLedger API exposes only putState and getState (key-value), whereas Ethereum and Parity support richer data types such as map, array, and composite structures.

Application layer

Many applications are being proposed for blockchain, leveraging the latter’s two key properties. First, data in the blockchain is immutable and transparent to the participants, meaning that once a record is appended, it can never be changed. Second, it is resilient to dishonest and malicious participants. Even in permissioned settings, participants can be mutually distrustful. The most popular application, however, is still crypto-currency.

Beyond cryptocurrencies though, new applications which build on the immutability and transparency of blockchains are emerging.

For example, security settlements and insurance processes can be sped up by storing data on the blockchain. Another example is sharing economy applications, such as AirBnB, which can use blockchain to evaluate reputation and trust in a decentralized settings, because historical activities of any users are available and immutable. This also extends to Internet of Things settings, where devices need to establish trust among each other.

Design choices for selected systems

The following table from the appendix shows the choices made at each layer by a variety of blockchain platforms.

Existing blockchain systems under the microscope

Blockbench is an open source benchmarking system developed by the authors to evaluate a variety of different blockchain platforms under a set of workloads designed to exercise them at each layer.

Blockbench is structured such that you can write an adapter to plugin the blockchain platform of your choice.

We selected Ethereum, Parity, and Hyperledger for our study, as they occupy different positions in the blockchain design space, and also for their codebase maturity.

Blockbench measures throughput, latency, scalability, and fault tolerance (by injecting crash failures, network delays, and random responses). It can also measure security, defined as the number of blocks in forks, which represent the window in which an attacker can perform double spending or selfish mining. Blockbench simulates attacks, including BGP hijacking by partitioning the network for a given duration. The security score is the ratio between the total number of blocks included in the main branch, and the total number of blocks confirmed by users.

Here are the key findings:

  • Hyperledger performs consistently better than Ethereum (5.5x) and Parity (28x) across the benchmarks, but it fails to scale up to more than 16 nodes. (A new implementation of PBFT has now been released for Hyperledger post this study, which may scale better).


  • Even so, you need to put the Hyperledger performance in context compared to for example the H-Store in-memory database (note the log scale!):

  • Ethereum and Parity are more resilient to node failures, but they are vulnerable to security attacks that fork the blockchain.

  • The main bottlenecks in Hyperledger and Ethereum are the consensus protocols, but for Parity the bottleneck is caused by transaction signing.
  • Ethereum and Parity incur large overhead in terms of memory and disk usage compared to Hyperledger (order of magnitude). Their execution engine is also less efficient than that of Hyperledger.

  • Hyperledger’s data model is low level, but its flexibility enables customised optimisation for analytical queries of the blockchain data.

There is plenty more interesting detail behind these high-level findings, which you can read in section 4 of the paper.


Our experience in working with the three blockchain systems confirms the belief that in its current state blockchains are not yet ready for mass usage [for permissioned use cases / large-scale data processing workloads? – AC]. Both their designs and codebases are still being refined constantly, and there are no other established applications beyond crypto-currency.

The authors advocate four design principles from the world of database systems which they believe can help to address performance bottlenecks:

  1. Decouple the storage, execution engine, and consensus layer from each other and then optimise and scale them independently.
  2. Embrace new hardware primitives to boost performance
  3. Consider how the concept of sharding might be applied (research agenda)
  4. Support declarative languages for smart contracts, opening up opportunities for low-level optimisations that speed up contract execution.

Azure Data Lake Store: a hyperscale distributed file service for big data analytics

July 4, 2017

Azure data lake store: a hyperscale distributed file service for big data analytics Douceur et al., SIGMOD’17

Today’s paper takes us inside Microsoft Azure’s distributed file service called the Azure Data Lake Store (ADLS). ADLS is the successor to an internal file system called Cosmos, and marries Cosmos semantics with HDFS, supporting both Cosmos and Hadoop workloads. Microsoft are in the process of migrating all Cosmos data and workloads onto ADLS.

Virtually all groups across the company, including Ad platforms, Bing, Halo, Office, Skype, Windows and XBOX, store many exabytes of heterogenous data in Cosmos, doing everything from exploratory analysis and stream processing to production workflows.

ADLS is not just used internally of course, it’s a part of the Azure cloud offerings, complementing Azure Data Lake Analytics.

ADLS is the first public PaaS cloud service that is designed to support full filesystem functionality at extreme scale… The largest Hadoop clusters that we are aware of are about 5K nodes; Cosmos clusters exceed 50K nodes each ; individual jobs can execute over more than 10K nodes. Every day, we process several hundred petabytes of data, and deliver tens of millions of compute hours to thousands of internal users.

Several aspects of the design stand out as noteworthy to me:

  • ADLS stores files across multiple storage tiers with support for partial overlapping
  • ADLS is architected as a collection of microservices, which themselves need to be scalable, highly available, have low-latency, and be strongly consistent. “The approach we have taken to solve the hard scalability problem for metadata management differs from typical filesystems in its deep integration of relational database and distributed systems technologies.”
  • ADLS is designed ground up for security.
  • ADLS has a fast path for small appends

Let’s go into each of these areas in more detail.

The structure of a file in ADLS

An ADFS file is referred to by URL and comprises a sequence of extents, each of which is in turn a sequence of blocks. Extents are the units of locality, blocks are the units of append atomicity and parallelism. All extents but the last one are sealed. Only an unsealed last extent may be appended to.

The notion of tiered storage is integral to ADLS. Any part of a file can be in one or more of several storage tiers.

In general, the design supports local tiers (including local SSD and HDD tiers), whose data is distributed across ADLS nodes for easy access during job computation, and remote tiers, whose data is stored outside the ADLS cluster.

ADLS has a concept of a partial file, which is a contiguous sequence of extents. A file is represented internally as an unordered collection of partial files, possible overapping, each mapped to a specific storage tier at any given time.

We move partial files between tiers through decoupled copying and deleting. To change the tier of a partial file, a new partial file is created in the target tier, by copying data from the source partial file. For a time, two separate partial files (in two different tiers) are represented in the Partial File Management service, containing identical data. Only then is the source partial file deleted. When a partial file is no longer needed, it is deleted, while ensuring that all extents in it also exist in some other partial file, unless the file itself is being deleted.

ADLS System Architecture and RSL-HK Rings

The big picture view of the ADLS architecture looks like this:


The Secure Store Service (part of the Gateway Cluster) is the point of entry and security boundary between ADLS and applications.

It implements the API end points by orchestrating between metadata services and storage providers, applying lightweight transaction coordination between them when needed, handling failures and timeouts in components by retries and/or aborting client requests as appropriate, and maintaining a consistent internal state throughout and ensuring that a consistent state is always presented to clients.

There are a collection of core microservices, each of which needs to scalable, highly-available, low-latency and strongly consistent. These core microservices are all built on a foundation the authors call an RSL-HK ring. More on that in a moment. The core microservices are:

  • The Naming Service, which maps mutable hierarchical paths to fixed references to objects in other ADLS metadata services. The Naming Service supports renames and moves of files and folders without copying data.
  • The Extent Management Service tracks the location of every extent of every file in a remote storage provider.
  • The Secret Management Service (SMS) handles all internal secrets needed for the functioning of the core microservices, as well as customer-managed secrets. It is both a secret repository and a broker to access external secret repositories (e.g., the Azure Key Vault). SMS also handles all compression, decompression, encryption, and decryption. The Trusted Software Module component of SMS handles all these data transactions in a separate process sat behind a dedicated proxy. As such, the way is paved to e.g., run the TSM inside a secure enclave in the future if so desired.
  • The Partial File Manager looks after the mapping from file ID to a set of one or more partial files.
  • The Throttling Service collects information from all of the Secure Store Service gateways every 50ms, identifying a list of accounts that should be throttled according to the quotas associated with them. The Secure Store Service uses this list to block indicated accounts for the next 50ms.

All of these microservices (except the throttling service) are themselves replicated systems. These take advantage of a Replicated State Library first built for Cosmos:

… we have long used the Replicated State Machine approach, based on a proprietary Replicated State Library (RSL) that implements the core underlying replication, consensus, checkpointing and recovery mechanisms. RSL implements Viewstamped Replication, with consensus based on Paxos and improvements described in [Tiered Storage. Architectural Note. Microsoft Nov 2012].

An RSL-based service is deployed in a quorum-based ring, usually made up of seven servers distributed across failure domains. RSL proved to be a useful building block, but service state management still required complex and custom code to use the RSL state management facilities efficiently. ADLS builds on the core RSL approach by adding a declarative state management layer based on the in-memory tables from the SQL Server Hekaton engine. This combination of RSL and Hekaton leads to RSL-HK rings.

  • The persistent state of a service is maintained as replicated in-memory Hekaton tables and indices.
  • Metadata (service) operations are written as transactions with ACID semantics via Hekaton optimistic concurrency support
  • In this way, service developers get fault tolerance for all state changes including checkpointing, logging, and recovery.

A service developer defines the structure of the state by specifying the schema of Hekaton tables and any desired indexes on them for performance. The developer also then provides the callback functions for service operations, which execute as transactions on the Hekaton tables.

RSL-HK leverages Hekaton to give service developers a more declarative way to manage state, and transparently provides replication and recovery in an application-agnostic way, thereby freeing service developers to concentrate solely on the service’s external interface and its logic.

RSL-HK achieves high throughput and latencies less than 1ms for read transactions, and 10ms for write transactions. Here you can see the performance of RSL-HK underpinning the naming service:


ADLS is architected for security, encryption, and regulatory compliance to be enforced at scale.

Authentication and role-based access control are performed through an integration with Azure Active Directory based on OAuth tokens from supported identity providers. Tokens are augmented with the user’s security groups, and this information is passed through all the ADLS microservices. Files and folders support POSIX-compliant permissions. Data flowing into and through ADLS is encrypted in transit and at rest.

ADLS provides encryption at rest for all data. Each append block is encrypted separately, using a unique key, ensuring that the amount of cypher text produced using any given key is small. The emphasis that ADLS places on extremely large files makes this especially important. The header for every block contains metadata to allow block-level integrity checks and algorithm identification.

There are three types of keys: a master encryption key (MEK), a data encryption key (DEK) and a block encryption key (BEK).

For user accounts, a user can generate a MEK and store in the Azure Key Vault, for service managed keys the Secret Management Service simply generates one on their behalf. The MEK is used to encrypt all other keys, so deleting the MEK “forgets” all of the data (best not to forget by accident!).

The DEK is generated by the Secret Management Service and is used as the root key for all subsequent file encryption keys. It is encrypted using the MEK and stored in the ADLS cluster.

The BEK is generated for each block using the account’s DEK and the block’s ID, and is used to encrypt the block.

Encryption and decryption might be expected to introduce significant overheads. However, in executing a board mix of U-SQL queries on ADLA, producing a 60/40 read/write mix of data traffic, encryption and decryption only added 0.22% in total execution time (on a baseline of over 3000 hours).

Small appends fast path

ADLS has a Small Append Service (SAS) that supports low-latency small appends (a weakness of HDFS).

Low-latency small appends are critical for transactional systems such as HBase on top of ADLS. HBase Write-Ahead Logging append latency directly impacts HBase transaction rate. The figure below shows the append latencies with and without SAS enabled.

When SAS determines that a file’s append pattern is dominated by small appends, it switches them to the small append path which uses low-latency storage to store the payload. Such appends are durable once acknowledged. (Large appends are passed directly onto the downstream providers). Small appends are then collected asynchronously into larger append blocks before being moved to one of the storage tiers.

Other bits and pieces

There’s plenty more in the paper that I didn’t have space to cover here, including details of some of the other services, how ADLS works with storage providers, and a walkthrough of the overall flows for major operations. If you’re interested in the topic, it’s well worth checking out.

Spanner: becoming a SQL system

July 3, 2017

Spanner: becoming a SQL system Bacon et al., SIGMOD’17

This week we’ll start digging into some of the papers from SIGMOD’17. First up is a terrific ‘update’ paper on Google’s Spanner which brings the story up to date in the five years since the original OSDI’12 paper.

… in many ways, today’s Spanner is very different from what was described there.

If you had to sum up the change in two words, they would probably be these: “more SQL!

Of course, as well as being one of the most important internal systems at Google, Spanner is also now available for you to use too in ‘Cloud Spanner‘ form as part of GCP. This paper will give you a deeper understanding of what Spanner can do for you.

Why did Spanner evolve?

A prime motivation for this evolution towards a more “database-like” system was driven by the experiences of Google developers trying to build on previous “key-value” storage systems. [E.g., Bigtable].

Bigtable is still widely used at Google, but for OLTP style applications developers struggled without a strong schema system, cross-row transactions, consistent replication, and a powerful query language. Trying to provide these features on top of Bigtable only went so far.

As a result, we decided to turn Spanner into a full featured SQL system, with query execution tightly integrated with the other architectural features of Spanner (such as strong consistency and global replication).

Just because Spanner embraced SQL though, that doesn’t mean it abandoned scalability goals: “In web companies, scalability has never been subject to compromise, hence all Google’s systems, including Spanner, start there.” The two key challenges in supporting scalability cited by the Spanner team are manageability and transactions.

A scalable data management system must address manageability early on to remain viable. For Spanner, than meant transparent failover across clusters and the ability to reshard data easily upon expanding the system… ACID transactions spanning arbitrary rows/keys is the next hardest challenge for scalable data management systems. Transactions are essential for mission-critical applications, in which any inconsistency, even temporary, is unacceptable. Making ACID work at scale is extremely difficult, and required substantial innovation in Spanner, as discussed in [the original OSDI’12 paper].

There’s plenty of great information in this paper, including a nice succinct overview of Spanner’s overall architecture in section 2. I’m mostly going to focus on three key aspects in this write-up:

  • how Spanner supports distributed query execution efficiently
  • how Spanner decides which servers should process a query and how it minimises scanning and locking on those servers, using range extraction
  • why Spanner expends so much effort to support restartable queries

Distributed query execution

The Spanner SQL query compiler represents distribution using explicit operators in the query algebra tree (following a long line going all the way back to Volcano). The fundamental building block is the Distributed Union operator, which ships a subquery to each relevant shard and concatenates the results.

Distributed Union is a fundamental operation in spanner, especially because the sharding of a table may change during query execution, and after a query restart.

To start with, a Distributed Union operator is inserted immediately above every Spanner table so that global scans are replaced by explicit distributed operations using local scans of table shards. Where possible, Distributed Unions are then pulled up the tree. This has the effect of leaving more of the computation below the distributed union – i.e., work is pushed down so that the maximum amount possible is carried out by the servers responsible for the data shards.

In order for these operator tree transformations to be equivalent, a property we call partitionability must be satisfied for any relational operation F that we want to push down.

This condition states that performing an ordered union of the results of applying F to each shard in table key order gives the same outcome as applying F to the results of a global scan.

Being aware of keys or identity columns of the underlying data set, Spanner can push more complex operations such as grouping and sorting to be executed close to the data where the sharding columns are a proper subset of grouping or sorting columns. This is possible because Spanner uses range sharding.

In addition to range sharding, Spanner supports table interleaving which colocates rows from multiple tables sharing a primary key prefix: all ‘child table’ rows are stored physically next to the ‘parent table’ row they join on. This means that joins between such tables can also be pushed down below the Distributed Union for common sharding keys.

Consider the SQL query

It results in the following distributed execution plan, where the distributed unions initially inserted directly above the Customer and Sales scans have been pulled up and unified almost at the very top of the tree:

At runtime, Distributed Union minimizes latency by using the Spanner coprocessor framework to route a subquery request addressed to a shard to one of the nearest replicas that can serve the request. Shard pruning is used to avoid querying irrelevant shards. Shard pruning leverages the range keys of the shards and depends on pushing down conditions on sharding keys of tables to the underlying scans.

Range extraction, described below, extracts a set of ranges guaranteed to fully cover all table rows on which a subquery may yield results, and is the heart of shard pruning.

Subqueries are dispatched to every relevant shard in parallel.

Joins between independently distributed tables could be the subject of a whole other paper. Here the authors focus on one example, the batched apply join which is mostly used to join a secondary index and its independently distributed base table.

Spanner implements a Distributed Apply operator by extending Distributed Union and implementing Apply style join in a batched manner – as two joins, a distributed join that applies batches of rows from the input to remote subquery, and another that applies rows from each batch to the original join’s subquery locally on a shard.

When a client makes a query via the API, Spanner will attempt to route the query directly to the server that owns all or part of the data referenced by the query. The first time the query is submitted, it may go to any server. At this point the query is analysed and a location hint is determined and sent to the client for caching. The location hint enables the client to cheaply compute where to send the query on subsequent executions.

Spanner also has explicit support for the case where the results of a query need to be processed in parallel, using the parallel consumer API. This works in two stages:

The first is to divide the work between the desired number of clients. The API receives a SQL query and the desired degree of parallelism, and returns a set of opaque query partition descriptors. In the second stage, the query is executed on the individual partitions, normally using requests initiated in parallel from separate machines. The parallel-consumer API guarantees that the concatenation of results from all the partitions yields the same unordered set of rows as for the query submitted through the single-consumer API.

Range extraction

Range extraction is the process of analysing a query to determine what portions of tables it references. There are three flavours of range extraction:

  • Distributed range extraction figure out which table shards are referenced by a query.
  • Seek range extraction determines what fragments of a relevant shard to read from the underlying storage stack.
  • Lock range extraction determines what fragments of a table are to be locked (pessimistic txns) or checked for potential pending modifications (snapshot txns).

Our implementation of range extraction in Spanner relies on two main techniques: At compile time, we normalize and rewrite a filtered scan expression into a tree of correlated self-joins that extract the ranges for successive key columns. At runtime, we use a special data structure called a filter tree for both computing the ranges via bottom-up interval arithmetic and for efficient evaluation of post-filtering conditions.

Range computation is in general a conservative approximation as isolating key columns in predicates can be arbitrarily complex and there are diminishing returns. In the worst case, for distribution range extraction, a query may be sent to a shard that ultimately returns no rows.

… the trade-offs of seeks vs scans and granularity of locking involve optimization decisions that can be very tricky and are beyond the scope of this paper.

Restartable queries

Spanner supports automatic query restarts in the event of failures, resharding, and binary rollouts. This is implement inside of the query processor by capturing the distributed state of the query plan being executed using restart tokens. Tokens must be robust to dynamic resharding (ongoing splitting, merging and moving of data). Furthermore, the non-determinism which makes for high performance query execution opportunities complicates restart as results may be returned in a non-repeatable order. Finally, Spanner also support restarts even across the rollout of new server versions which involves ensuring backwards compatibility of the restart token wire format, preserving query plans across query restarts, and backwards compatibility in operator behaviours. It’s a lot of work!

Overall, support for restarts came at a considerable engineering cost. Among other things, it required developing a restart token versioning mechanism, processes that force explicit versioning upon incompatible changes, and a framework to catch incompatibilities. Addressing those challenges was hard, but worth it because transparent restarts improve user-perceived system stability and provide important flexibility in other aspects of Spanner’s design.

Some of the benefits delivered by the query restart mechanism include:

  • Hiding transient failures including network disconnects, machine reboots, process crashes, distributed waits, and data movement.
  • A simplified programming model that does not require the programmer to code retry loops. (“Retry loops in database client code is a source of hard to troubleshoot bugs, since writing a retry loop with proper backoff is not trivial.”)
  • Support for long-running queries instead of paging queries (avoiding the need for sorting solely to support pagination)
  • Improved tail latencies for online requests through the minimal amount of work that needs to be redone when restarting queries
  • Forward progress guarantees for long-running queries where the running time is comparable to the mean time to (transient) failure.
  • Support for recurrent rolling upgrades. “For the past few years of widespread Spanner use inside Google there were very few moments when no Spanner zone was being upgraded to a new version, mostly during the brief holiday lulls.”

The ability to gradually upgrade all machines to a new version within a week or so while running a few versions concurrently has been a cornerstone of Spanner’s development agility.

Other interesting bits and pieces from the paper

Google moved to a standard internal SQL dialect (“Standard SQL”) shared by all of their systems (e.g., Spanner, F1, Dremel, BigQuery). To make this work took quite a bit of effort. Several shared components ensure consistency across systems: the compiler front-end, a library of scalar functions, and a shared testing framework and tests.

Spanner now has a new low-level storage format called Ressi, designed from the ground-up for handling SQL queries over large-scale distributed databases with a mix of OLTP and OLAP workloads. Ressi stores a database as an LSM tree, and divides values into an active file containing only the most recent values, and an inactive file which may contain older versions. This helps to more efficiently support Spanner’s time-versioned semantics.

Despite the critique of one-fits-all systems, combining OLTP, OLAP, and full-text search capabilities in a single system remains at the top of customer priorities. Large-scale deployment and monitoring of multiple systems with different availability guarantees, transactional semantics, rollout cycles, and language and API quirks is a major burden on customers. It is our goal to make Spanner perform well and be cost-effective across a broad spectrum of use cases over time.

Dhalion: self-regulating stream processing in Heron

June 30, 2017

Dhalion: Self-regulating stream processing in Heron Floratou et al., VLDB 2017

Dhalion follows on nicely from yesterday’s paper looking at the modular architecture of Heron, and aims to reduce the “complexity of configuring, managing, and deploying” streaming applications. In particular, streaming applications deployed as Heron topologies, although the authors are keen to point out the principles could be applied in other engines too.

Dhalion is a system that essentially allows stream processing frameworks to become self-regulating. Dhalion has been implemented and evaluated on top of Twitter Heron and we are in the process of releasing it to open-source as a contributed to the Heron code base. However, its architecture and basic abstractions are also applicable to other streaming engines as well.

And does self-regulating mean exactly? Floratou et al. break it down into three components:

  • The system should be able to self-tune: given a topology and a desired SLO, it should be able to automatically tune configuration parameters to achieve the stated objective.
  • The system should then be able to self-stabilise to continue to maintain that SLO in the face of changing workload (e.g., a sudden surge of tweets). (This functionality is more commonly referred to as auto-scaling).
  • The system should be able to detect service degradations (gray failures?), diagnose the internal faults causing them, and take actions to recover from them. (Self-healing).

Let’s pause for a moment and think about what might be necessary to achieve those goals. Clearly there has to be some kind of monitoring to observe the system status (perhaps from multiple perspectives), with alerts/events generated when an abnormal situation is detected or SLOs are not being met. Then we’ll need a way to map from alert conditions back to causes, and finally given a determined cause or cause(s) we’ll need a way to take corrective action to address the situation. While we’re doing all this, we’ll need to be careful about feedback loops too in case our interventions themselves cause the system to become unstable or to oscillate between configurations degrading performance.

The first three of those requirements are satisfied by Dhalion’s symptom detectors, diagnosers, and resolvers. Dhalion has only rudimentary mechanisms to control feedback though: actions shown not to help can be blacklisted (“if it hurts, stop doing it!”), and after taking an action, the system waits for a reasonable period of time before considering any further interventions. In the evaluation, these two measures seem to be enough to exhibit desirable behaviours, but it would have been nice to have some reference to control theory, which is a whole branch of science dedicated to achieving stability around a setpoint.

Symptom detectors collect metrics from the underlying streaming system (for example, tuple processing rates, and number of packets pending). From these metrics Dhalion looks for symptoms that may be evidence of a problem – for example, is the system having to apply backpressure, or is there processing skew across the tasks of a particular pipeline stage? A symptom description contains a compact representation of the symptom together with the metric values used to identify it.

Detected symptoms are passed to the diagnosers which seek to find explanations for the symptoms. Such

For example, the existence of backpressure can be attributed to various reasons such as resource underprovisioning at a particular stage, slow hosts/machines or data skew. Dhalion produces all the possible diagnoses that can explain the observed symptoms.

In the examples given in the paper, the diagnosers all look to be hand-coded rules, but you could in theory plug in machine learned classifiers or other such trickery.

Given a set of (possible) diagnoses, resolver selection explores possible actions to resolve the situation, which are then carried out by the resolvers. Typically there is a one-to-one mapping between diagnoses and resolvers. Major topology changes such as scaling up and down resources or restarting containers are typically invoked through the Heron Scheduler component.

… after every action is performed, Dhalion evaluates whether the action was able to resolve the problem or brought the system to a healthier state. If an action does not produce the expected outcome then it is blacklisted and it is not repeated again.

(Strictly, Dhalion tracks the ratio of unsuccessful interventions to invocations for a given action and diagnosis, and blacklists the action for that diagnosis when the ratio crosses a given threshold).

Let’s see how this all comes together for a couple of use cases: dynamic resource provisioning, and SLO maintenance.

Dynamic resource provisioning (autoscaling)

The Dhalion Dynamic Resource Provisioning Policy tries to maximise throughput while also avoiding under-utilisation. There are three different symptom detectors, four diagnosers, and four possible resolution.


  • The pending packets detector monitors the number of pending packets in Heron instance queues for bolts. It looks to see whether all of the instances have a similar queue size or whether there are outliers.
  • The backpressure detector generates a symptom description whenever backpressure is being applied. This description includes the bolt that is the source of the backpressure and the amount of time input data consumption was suspended during the 300 second measurement period.
  • The processing rate skew detector looks for mismatches in the number of tuples processed by each Heron instance.


  • The resource overprovisioning diagnoser looks at the symptoms from the pending packets and backpressure detectors and considers. If there is no backpressure and the average number of pending packets across instances of a bolt is almost zero, then it is possible the resources assigned to the bolt are over-provisioned.
  • The resource underprovisioning diagnoser considers whether observed backpressure could be the result of of underprovisioned resources. It attributes the cause when all the instances of a bolt have similar processing rates and queue sizes.
  • The slow instance diagnoser attributes observed backpressure to a slow instance when the instances initiating backpressure have a much higher number of pending packets than their peers and the similar processing rates. (The processing rates are similar because all instances operate at the speed of the slow instance).
  • The data skew diagnoser attributes observed backpressure to data skew when the instances initiating backpressure have a higher processing rate and a higher number of pending packets than their peers.


  • If a resource overprovisioning diagnosis is made, then the bolt scale down resolver decreases the number of Heron instances associated with the bolt. The scale down factor is a configuration option. If the action results in backpressure the operation will be blacklisted and a subsequent scale up operation will bring the topology back to a healthy state.
  • The restart instance resolver moves slow instances to new containers (i.e., kills the slow one and restarts it?).
  • The data skew resolver adjusts the hash function used to distributed data to the bolts.
  • The scale up resolver scales up the resources dedicated to a bolt initiating an underprovisioning diagnosis. “To determine the scale up factor, the Resolver computes the percentage of the total amount of time that the Heron Instances spent suspending the input data over the amount of time where backpressure was not observed.”

Here’s an example of dynamic resource provisioning at work while the workload is manipulated:

SLO maintenance

We observe that in a large number of streaming applications, users spend a significant amount of time tuning the topology to meet the requirement of a throughput above a certain threshold.

In their experiment, the authors simply submit a topology with a single Heron instance provisioned for each spout and bolt, and provide an SLO asking for at least 4 million tuples a minute at steady state. Dhalion is able to figure out a configuration that meets this objective as you can see in the figure below – although it does take about 70 minutes to do so!

The Dynamic Resource Provisioning Policy that we previously presented, assumes that the input data rate is given and attempts to allocate resources so that the system can handle the input data load as it varies over time. The Throughput SLO Policy goes a step further by attempting to adjust the input data rate by increasing the number of Heron Instances that belong to the spouts, in order to meet the performance SLO.

It uses an additional symptom detector, diagnoser, and resolver:

  • The emit count detector computes the total rate at which a spout emits data and forwards it to the throughput SLO violation diagnoser.
  • The diagnoser checks whether the topology is in a healthy state. If it is healthy, and the throughput doesn’t meet the required SLO, it emits a diagnosis that is forwarded the spout scale up resolver.
  • You can probably guess what the spout scale up resolver does: it scales up the resources associated with the spout.

The last word

Note that although Dhalion has been implemented on top of Heron, its architecture and basic policy abstractions can be adopted by other streaming engines as long as they provide a metrics collection API and potentially a scaling API.