WANalytics: Analytics for a geo-distributed, data intensive world

WANalytics: analytics for a geo-distributed data intensive world – Vulimiri et al. 2015

…data is born distributed; we only control data replication and distributed execution strategies.

This is true for so many sources of data. Combine this with Dave McCrory’s observation that ‘Data has Gravity’ (i.e. it attracts applications and other data processing workloads to it) and you have the fundamental forces for an inevitably distributed data processing world. Now let’s zoom out – we have ‘warehouse scale computers,’ think of these as computers with incredibly large processing and storage capacity, and fast internal connectivity (aka ‘datacenters’). We connect a relatively small number of these extremely capable computers (datacenters) together to form the backbone of wide-area distributed systems. Given the immense processing and storage capability of each of these computers, the network bandwidth between them is comparatively very poor indeed. So we need to construct our distributed systems in such a way as to mimize dependencies on this ‘piece of string’ and to make the most efficient use of it when we do need to use it.

Vulimiri et al. look at this problem from the perspective of analytics workloads: “our Hadoop-based prototype delivers 257x reduction in WAN bandwidth on a production workload from Microsoft.” A running example in the paper is consistent with analytic workloads at Microsoft, Yahoo!, Twitter, Facebook, and LinkedIn. There are three sources of data: clickstreams, textual reviews, and a relational sales table with details of purchases. These are processed in a DAG to extract data about user behaviour, sales performance, and item reception. The workflow therefore includes computations processing unstructured data – session extraction and sentiment analysis, as well as machine learning (behaviour modeling).

Many large organizations today have a planetary-scale footprint and operate tens of data centers around the globe. Local data centers ensure low-latency access to users, availability, and regulatory compliance. Massive amounts of data are produced in each data center by logging interactions with users, monitoring compute infrastructures, and tracking business-critical functions. Analyzing all such data globally in a consistent and unified manner provides invaluable insight.

The typically approach to Wide Area Big Data today is to copy data to a central location for analysis. Spot the obvious problem! This is “destined to consume cross-data center bandwidth proportional to the volume of updates/growth in the base data.”

We argue that any centralized solution is on a collision course with current technological and geo-political trends: 1) data is growing much faster than cross-data center bandwidth, and 2) growing privacy and data sovereignty concerns are likely to result in government regulation that threatens the viability of centralized approaches (e.g., German-born data might not be stored in US data centers).

The novel features of the Wide Area Big Data problem are therefore:

  • No control over data partitioning / placement (it is born distributed) – only over replication
  • Arbitrary computation DAGs (vs relational)
  • Cross-data center bandwidth is the most precious resourcs (abundant CPU/storage)
  • Heterogeneous bandwidth
  • Data sovereignty constraints

WANalytics devises distributed execution plans in conjunction with data replication strategies in order to optimise for these constraints. Correspondingly it has a runtime layer for executing DAGs across data centers, and a workload analyzer that constantly monitors and optimizes the user workload, and provides physical distributed execution plans for given DAGs. The analyzer runs once every epoch (e.g. once a day) with a goal of progressively improving on current strategies.

The physical plan explicitly specifies where each stage is going to be executed, and how data will be transferred across data centers.


The fundamental strategy for reducing cross data center traffic is based on caching and diffs:

The unique setting we consider, in which each “node” is a full data center with virtually limitless CPU and storage capabilities, and connectivity among nodes is very costly/limited, lends itself to a novel optimization. We cache all intermediate results generated DAG execution at each data center, and systematically compute diffs to reduce cross-data center bandwidth. Whenever a source data center S sends the result for a computation C to a destination center D, both S and D store a copy of the result in a cache tagged with (signature(C)). The next time S needs to send results for C to D, it evaluates C again, but instead of sending the results afresh, it computes a diff between the old and new result, and sends the smaller between the diff and the new result. D then applies the diff onto its copy of the old result.

(Differential sync again, but on a big scale!). This is like ‘a form of view maintenance, but for arbitrary computations.’ For the workload studied, it proved very effective reducing cross-data center traffic down to ~1GB compared to ~250GB.

The analyzer component starts with the base data natural partitioning, and a set of DAGs.

It then determines the combination of choices for the following three factors that would minimize total bandwidth usage: (1) the physical operator to use for tasks that accept multiple implementations (e.g., hash, broadcast or semi join), (2) the data center where each task is executed (respecting sovereignty constraints), and (3) the set of data centers to which each partition of the base data is replicated.

This is a tough problem:

  • Just finding the best execution strategy for a DAG itself in isolation is non-trivial.
  • The choice of execution strategy for one DAG node can affect choices for other nodes as it may for example impact partitioning and placement of the node’s output data. With k execution strategies per node, and n nodes, this is an O(k^^n) search space.
  • The choices made for nodes of different DAGs influence each other as they might for example leverage a shared data replication strategy.

The solution is to use a greedy heuristic algorithm…

The heuristic optimizes each node of each DAG in isolation, proceeding from the source nodes and moving greedily outward in topological order. For each node, we evaluate all strategies compatible with sovereignty constraints, using pseudo-distributed measurement (§2.4) to measure their costs, and greedily pick the lowest cost alternative at that node. In the process, the system also evaluates whether systematically replicating any of the input base tables can help amortize transfer costs among DAGs.

This “performs remarkably well in practice, while exploring only a small subset of the search space.”

The optimizer has to cost each strategy it considers. Yet traditional cardinality estimation techniques are insufficient for the arbitrary computation problem being considered. Costs are instead estimated by running the workload within a single data, in such a way that an estimate can be made of the likely cost when distributed across data centers.

The key learnings from the teams evaluations are:

  1. The centralized approach grows linearly with raw data updates/growth
  2. Controlling base table replication is key to lower bandwidth consumption for frequently read, rarely updated tables.
  3. At low update rates, centralized can outperform distributed if frequent analytics operate on mostly unchanged data.
  4. At high update rates, distributed outperforms centralized by 3x to 360x.
  5. At low/medium update rates caching is effective.

The paper concludes:

We believe we have reached a new inflection point where the combination of big and geographically distributed data requires new approaches for geo-distributed analytics processing to minimize wide-area bandwidth costs.

Mesa focuses on geo-replicated data warehousing, and Riak (Enterprise) also provides multi-data center replication. “WANalytics differs from these approaches as it focuses on arbitrary DAGs, and it supports geo-distributed execution of DAGs of computation.”