To the best of our knowledge, this is the first paper to formulate and solve the problem of joining multiple streams continuously under these system constraints: exactly-once semantics, fault-tolerance at datacenter-level, high scalability, low latency, unordered streams, and delayed primary stream.
It’s interesting not just because of the stream joining technique employed, but also because this is a critically important system for Google: it powers the revenue from their advertising business. Reading the paper, we get to see how real-world constraints and trade-offs influence the system design choices – a luxury sometimes missing from systems evaluated in a purely academic context.
Photon is a geographically distributed system for joining multiple continuously flowing streams of data in real-time with high scalability and low latency, where the streams may be unordered or delayed.
Photon is deployed within Google Advertising System to join data streams such as web search queries and user clicks on advertisements. It produces joined logs that are used to derive key business metrics, including billing for advertisers. Our production deployment processes millions of events per minute at peak with an average end-to-end latency of less than 10 seconds.
(Impressive, but not quite the message rates and latency targets that we saw the finance industry working to in yesterday’s paper write-up…).
Photon is the system that “enables advertisers to fine-tune their bids, budgets and campaigns, and vary the parameters in real-time to suit changing user behaviour.” The essential task of Photon is joining search query events – that generate responses including adverts – with subsequent ad clicks.
- A user issues a search query. Google sends ads along with the search result, and stores information about the event to multiple logs-datacenters where the data is stored in GFS. Every query event is assigned a unique query_id.
After receiving the search results a user may (some time later) click on an ad. This click goes back to Google’s server where it is redirected. The click event is logged and stored in multiple logs_datacenters. The event is assigned a unique click_id, and the logged data includes the query_id and information about the ad that was clicked.
Photon joins the click event with its corresponding query event. It reads from log files on GFS and writes joined events to a new log files on GFS across multiple data centers. “Joined logs are used to generate rich statistical data, combining the information from queries, ads and clicks.
Google really, really, cares about the accuracy of this log processing:
The output produced by Photon is used for billing advertisers, reporting revenue to investors in Wall Street, and so on. Photon must guarantee that a single click is never processed twice, since this leads to double-charging the advertisers. On the other hand, Google loses money for every click which is not processed by Photon. In practice, to meet the business needs, we require Photon to join 99.9999% events within a few seconds, and 100% (of) events within a few hours. These requirements imply that Photon must provide: a) at-most-once semantics at any point of time, b) near-exact semantics in real-time, and c) exactly-once semantics eventually.
Latency is also important: “having Photon perform the join within a few seconds significantly improves the effectiveness of (these) business processes.” The query stream is approximately ordered by timestamp, but the ad clicks can be delayed by arbitrary amounts relative to queries. “This makes it very hard to apply window join algorithms proposed in the literature.”
Due to the multi-data center nature of the solution, and the realities of eventual consistency, you can also sometimes see a click event before the corresponding query event has arrived at the data center in question.
Logically, the query event always occurs before the corresponding click. However, the servers generating click events and query events are distributed throughout the world (to minimize end-user latency), and click and query logs are shipped independently to logs datacenters. The volume of query logs is orders of magnitude more than the volume of click logs, thus, it is not uncommon for a subset of query logs to be delayed relative to the corresponding click logs.
The system design also has to be able to tolerate loss of an entire data center. This is done by running Photon instances in parallel in multiple data centers.
Photon workers in multiple datacenters will attempt to join the same input event, but workers must coordinate their output to guarantee that each input event is joined at-most-once.
The set of click ids that have already been processed forms critical shared state therefore, and it is maintained in a synchronously replicated Paxos-based key-value store called the IdRegistry. The system design is informed by a pragmatic business decision here: only N days of events are stored in this system. Any event older than N days must therefore be thrown away (lost revenue).
The constant N is determined by evaluating the trade-off between the cost of storage, and the cost of dropping events that are delayed by more than N days. Note that the system should always discard events delayed by N days as there is no way to detect duplicates among such events, and our goal is to avoid double-charging the advertisers.
To get the high scalability needed from the Paxos-based registry Photon uses event batching (combining multiple event-level commits into one bigger commit) and sharding. Rebalancing (for example, upon increasing the number of shards) must avoid any risk of duplicate events during key movement. A timestamp-based approach that relies on a TrueTime server is used to achieve this.
Within each data center the processing pipeline is as follows:
- A dispatcher consumes click events from the logs as they come in, and looks up the event id in the IdRegistry. If the id is found, the event has already been processed and is skipped.
The dispatchers sends the click event to a joiner. If the joiner cannot join the click (due to e.g. network problems, or a missing query event), the dispatcher will keep trying with exponential backoff.
The joiner extracts the query id from the click, looks it up in the EventStore to find the corresponding query.
If the query isn’t found, the joiner sends a failure response to the dispatcher so that it can retry
If the joiner is able to register the click-id into the IdRegistry (it hasn’t been processed elsewhere) the joiner writes the joined event to the joined click logs.
If the joiner still fails to join a click after a certain number of retries, and the click is older than a threshold, the joiner will mark the click as unjoinable and return success to the dispatcher. In practice, unjoinable events represent a very small fraction of all the events.
Throttling mechanisms are implemented to cope with surges (for example, due to recovery after a failed data center comes back online).
When a joiner receives an event from the dispatcher, it first checks if there are already too many requests in flight. If so, the request will be rejected and the dispatcher will have to retry it later. Such throttling is essential to maintain the smooth flow of processing of events through the joiner.
We use appropriate throttling of outstanding requests to the IdRegistry
to avoid overloading it, and ensure adequate performance during catch-up.
The astute reader may have noticed the potential for the loss of joined events in step 5 above – the IdRegistry can record a click id, and if the joiner then times out due to slow network, or the IdRegistry response is lost in the network, then the joined event will not be written out.
In our early test deployment, we observed a noticeable number of clicks missing from the joined output logs due to this reason.
This problem was addressed by having the joiner assign a globally unique token to every click_id it processes. This is sent to the IdRegistry along with the click id. If the IdRegistry receives a subsequent request to store the same id and with the same token, it returns success. This enables the joiner to write the joined event to the log but still protects from duplicate processing by other joiners.
In our production deployment of Photon, this technique reduced the number of missing clicks in joined output logs by two orders of magnitude.
To minimize losses due to a crashing joiner process, and upper limit is placed on the number of in-flight requests between the joiner and the IdRegistry.
The EventStore reference in step 3 above is a two-level store with an in-memory cache and an on-disk Log EventStore. The in-memory cache is able to kep several minutes worth of query events in memory, and serves about 90% of all lookups.
The authors share some design lessons learned from building Photon:
- Keep the critical state managed by the system in Paxos as small as possible
- Ensure that dynamic resharding of a running system is a first-class citizen in the design
- Communicate via RPC rather than using disk. Put throttling mechanisms in place for asynchronous RPCs to avoid overloading servers
- Isolate the specialized work to a separate pool of workers to make the system more scalable. (Sounds like micro-services to me!)
- Carefully consider batching
There’s a lot in this paper that I didn’t have space to cover so I encourage you (as always) to go and read the original. It would be very interesting to see a TLA+ style formal analysis of the system as we recently learned AWS do for some of their services.
Exercise for the reader: how does the Photon design stack up against Stonebraker’s 8 requirements for real-time stream processing?