Global Sequence Protocol

Global Sequence Protocol: A Robust Abstraction for Replicated Shared State – Burckhardt et al. 2015

This is the ECOOP ’15 paper that we’ve been building up to so far this week. The problem domain is the familiar desire to support replicated shared data across nodes (mobile devices here) with eventual consistency. In the mobile context, this enables disconnected operation and reduces demand on battery life. The authors make a confident assertion about the contribution of the paper:

Overall, our work marks a big step forward towards a credible programming model for automatically replicated, weakly consistent shared data on devices, by providing both an understandable high-level system and data model, and a robust implementation containing powerful and interesting optimizations.

The Global Sequence Protocol is a general protocol on top of which many eventually consistent data types can be built. In this manner it reminds me of Viewstamped Replication. This differentiates the GSP from prior work on replicated data types:

Just like our work, replicated data types and in particular CRDTs provide optimized distributed protocols for certain data types. However, CRDTs are not easy to customize and compose, since the consistency protocol is not cleanly separated from the data model as in GSP, but specialized for a particular, fixed data type.

The authors implemented the cloud types model that we looked at yesterday but found that it was lacking in a few areas:

  • client programmers still find it difficult to reason about the behaviour of eventually consistent programs
  • there is a need to mix eventual and synchronous reads and updates in the same program for some applications
  • implementations need to be robust and handle failure states etc.!

Especially to help programmers reason about eventually consistent program behaviour, the authors introduce the Global Sequence Protocol (GSP): “an operational model describing the system behavior precisely, yet abstractly enough to be suitable as a simple mental model of the replicated store. ” The basic GSP is then extended to support transactions and judicious use of synchronization. Finally we see how to create an efficient implementation of the protocol based on streams of updates.

The Core Global Sequence Protocol

To write correct programs, we need a simple yet precise mental model of the store… In our experience, the key mental shift required to understand replicated data is to understand program behavior as a sequence of updates, rather than states. To this end, we characterize the shared data by its set of updates and queries, and represent a state by the sequence of updates that have led to it.

The GSP can work with many different data models. Similar to the QUA we saw yesterday, a data type is abstracted by a set of Read operations, a set of Update operations, and the set of Values returned by read operations. In addition, a read value (rvalue) function is defined that takes a Read operation and a sequence of Updates and returns the value that results from applying all of the updates in the sequence to the initial state of the data. (Reminiscent of append-only logs and event stores).

For example, we could define a simple counter as follows:

Update = { inc }
Read = { rd }
rvalue (rd,s) = s.length

(a read simply counts the number of updates).

The (core GSP) protocol specifies the behavior of a finite, but unbounded number of clients, by defining the state of each client, and transitions that fire in reaction to external stimuli. The transitions fall into two categories: the interface to the client program (from where update and read operations arrive), and the interface to the network (from where messages arrive). The clients communicate using reliable total order broadcast (RTOB), a group communication primitive that guarantees that all messages are reliably delivered to all clients, and in the same total order. RTOB has been well studied in the literature on distributed systems, and is often used to build replicated state machines. It can be implemented on various topologies (such as client-server or peer-to-peer) and for various degrees of fault-tolerance. We describe one particular such implementation and important optimizations in Section 6.

A Round is an update authored by a particular client:

class Round {
    origin : Client,
    number : N,
    update : Update
}

Clients maintain three pieces of state:

  • the round number of the client’s most recently submitted Round
  • a sequence of Rounds, known, representing the client’s known prefix of the global update sequence
  • a sequence of Rounds, pending, representing rounds that have been sent by the client but not yet confirmed in the known global update sequence

When the client program issues an update, we (1) append this update to the sequence of pending updates, and (2) wrap the update into a Round object, which includes the origin and a sequence number, and broadcast the round.

update(u : Update) {
    pending := pending + u;
    RTOB_submit(new Round{origin = this, number = round++, update = u}); 
}

(where + above represents sequence concatenation).

The value returned by a read is given by applying rvalue to the concatenation of the known global prefix with the client’s own pending updates (thus giving read-your-writes semantics) :

read(r : Read) {
    var composite_log = known + pending;
    return rvalue(r, updates(composite_log))
}

(the updates function above gives the sequence of Updates from the Round sequence).

All that remains is to specify the behaviour when a Round is received over the network interface:

onReceive(r : Round) {
    known := known + r;
    if (r.origin = this) {
        assert( r = pending[0]);
        pending := pending[1..];
    }
}

By design, Core GSP is not strongly consistent: updates are asynchronous and take effect with a delay. Programmers who are not aware of this can easily run into trouble…

One workaround is to introduce sychronization where needed, a more elegant alternative is to use a richer data model – for example exposing an ‘add’ operation for a counter rather than a ‘set’ operation.

Luckily, it turns out we can quite easily define higher level data types on top of the data model abstraction. In particular, we can implement full Cloud Types as proposed by previous work. Cloud types allow users to define and compose all of the data type examples given earlier (registers, counters, key-value stores), plus tables, which support dynamic creation and deletion of storage.

Core GSP is:

  • quiescently consistent, since when updates stop clients converge to the same known prefix with an empty pending queue.
  • eventually consistent, since all updates become eventually visible to all clients and are ordered in the same arbitration order
  • causally consistent, because an update U by some client cannot become visible to other clients before all of updates visible to the initiating client at the time it issued U.

Transactions and Synchronization

Transactional GSP adds push and pull synchronization operations, and the synchronization query confirmed. When a client makes several updates together that are intended to be atomic, they can’t be released one at a time otherwise other clients would see partial results.

To solve this problem, GSP uses a transactionbuffer. Updates performed by the client program go into this buffer. All updates in the buffer are broadcast in a single round when the client program calls push, and only then…. Updates in the transaction buffer are included in the composite log and hence immediately available to local reads.

To provide read stability, GSP uses a receiveBuffer:

Received rounds are stored in this buffer. All rounds in the receivebuffer are processed when the client program calls pull , and only then. Thus, the client program can rely on read stability – the visible state can change only when issuing pull , or when performing local updates.

The confirmed operation returns true iff there are no local updates awaiting confirmation – it can thus be used to find out if an update has been committed into the global known sequence.

GSP is sufficiently expressive to allow client programs to recover strong consistency when desired. To this end, we can write a flush operation that waits for all pending updates to commit (and receives any other updates in the meantime):

flush() {
    push();
    while (!confirmed()) { pull(); }
}

A synchronous update can be achieved by calling update() followed by flush(). And a synchronous read is flush() followed by read().

In the reactive web applications written by the authors, the following yield() operation is automatically called between event handlers and when the event queue is empty:

yield() {
    push();
    pull();
}

Our update transactions are different from conventional transactions (read-committed, serializable, snapshot isolation, or parallel snapshot isolation) since they do not check for any read or write conflicts. In particular, they never fail. The advantage is that they are highly available, i.e. progress is not hampered by network partitions. The disadvantage is that unlike serializable transactions (but like read-committed,
snapshot, or parallel snapshot transactions), they not preserve all data invariants.

Efficient Implementation

The model above is easy to understand, but is not necessarily conducive to an efficient and robust implementation. The authors show a streaming implementation and prove by refinement that it implements the GSP model faithfully.

In this section, we show that all of these issues are fixable without needing to change the abstract GSP protocol. Specifically, we describe a robust streaming server-client implementation of GSP. It explicitly models communication using sockets (duplex streams) and contains explicit transitions to model failures of the server, clients, and the network. Moreover, it eliminates all update sequences, and instead stores current server state and deltas in reduced form. Importantly, it is robust in the following sense:

  • client programs never need to wait for operations to complete, regardless of failures
  • connections can fail at any time, on any end
  • the server can crash and recover losing soft state but preserving persistent state. The persisted state comprises a state snapshot and round number of the last round committed by each client. There are no stored logs.
  • clients can crash silently or temporarily stop executing for unbounded amounts of time, and are always able to reconnect.

The streaming model does not store any update sequences (neither in the client, nor on the server). Instead, it eagerly reduces such sequences, and stores them in reduced form, either as state objects (if the sequence is a prefix of the global update sequence) or as delta objects (if the sequence is a segment of the global update sequence).

See the full paper for details.