PSync: A Partially Synchronous Language for Fault-Tolerant Distributed Algorithms

PSync: A Partially Synchronous Language for Fault-Tolerant Distributed Algorithms – Drăgoi et al. 2016

Last month we looked at the RAMCloud team’s design pattern for building distributed, concurrent, fault-tolerant modules. Today’s paper goes one step beyond a pattern, and introduces a domain-specific language called PSync with the goal of unifying the modeling, programming, and verification of fault-tolerant distributed algorithms. PSync is implemented as an embedding in Scala, the implementation is available at https://github.com/dzufferey/psync.

Despite the importance of fault-tolerant algorithms, no widely accepted programming model has emerged for these systems. The algorithms are often published with English descriptions, or, in the best case, pseudo-code. Moreover, fault-tolerant algorithms are rarely implemented as published, but modified to fit the constraints and requirements of the system in which they are incorporated. General purpose programming languages lack the primitives to implement these algorithms in a simple way.

PSync is based on top of the HeardOf (HO) model. In the HO model, communication proceeds in rounds. Rounds are ‘communication closed’ which means that each message is either delivered in the round it is sent, or it is dropped. To model the asynchronous and faulty behaviours of the network, each process is associated with a ‘Heard-Of’ set under the control of the environment. In a given round, process p receives a message sent to it by process q iff q ∈ HO(p). “The network’s degree of synchrony and the type of faults correspond to assumptions on how the environment assigns the HO-sets.”

The round structure together with the HO-sets define an abstract notion of time. Using communication-closed rounds, PSYNC introduces a high-level control structure that allows the programmer to focus on the data computation performed by each process to ensure progress towards solving the considered problem, as opposed to spending time on the programming language constructs, e.g., incrementing message counters, and setting timers.

PSync programs are therefore structured as a sequence of rounds with a lockstep semantics whereby all processes execute the same round. In each round processes first send messages, and then receive messages and update their local state accordingly. The implementation is based on asynchronous messages, which begs the question ‘how do you know when to close a round?’ (i.e., how long do you wait for all messages in the round to be delivered). In PSync this is based on (configurable) timeouts – after invoking the send() callback, PSync will accumulate messages for the duration of the timeout, and only then call the process’ update() method. An extension allows for the timeout duration to be updated over time based on the number of ‘out of round’ messages that get received.

An example helps to make this clearer. The LastVoting algorithm is an adaption of Paxos to the HeardOf model. In PSync it looks like this…

Each process defines a module in which a candidate value can be submitted via an init function, and a value is returned via out

interface
    init(v: Int); out(v: Int)

The concurrent module may have some internal state:

variable x: Int; ts: Int; vote: Int; ready: Boolean;
         commit: Boolean; decided: Boolean;
         decision: Int;

And helper functions, here to decide on a rotating coordinator:

def coord(phi: Int): ProcessID = 
    new ProcessId((phi/phase.length) % n)

The implementation of init is used to initialized the module:

def init(v: Int) =
    x := v
    ts := -1
    ready := false
    commit := false
    decided := false

And the the heart of the module is the definition of the communication rounds in the algorithm:

val phase = Array[Round] (
    Round /* Collect */ {
        ...
    }
    Round /* Candidate */ {
        ...
    }
    Round /* Quorum */ {
        ...
    }
    Round /* Accept */ {
        ...
    }
)

Each round is defined by two callback functions, send and update. All processes execute in lockstep in the same round. First the send function is invoked, and it should return the set of messages the process wishes to send in this round. Messages are returned in a map from process id to payload. PSync then gathers incoming messages until the timeout expires, then it invokes update with the received set of messages.

In the first round for example, each process sends the coordinator its vote, and it update the processes check to see whether or not they are the coordinator, and if so which value they should try to impose:

    Round /* Collect */ {
        def send() : Map[ProcessId, (Int,Int)] = 
            return MapOf(coord(r) -> (x,ts))

        def update(mbox: Map[ProcessId, (Int,Int)]) =
            if (id == coord(r) && mbox.size > n/2)
                vote := mbox.valWithMaxTS
                commit := true
    }

Given this structure, the authors implemented a state-based verification engine for PSync, that checks safety and liveness properties – PSync’s round structure and lockstep semantics lead to a smaller number of interleavings and simpler inductive characteristics of the set of reachable states.

The engine assumes that the program is annotated with inductive invariants and ranking functions, and proves the validity of those annotations and the fact that they imply the specification. In general annotating a program with inductive invariants is a hard task even for an expert. The advantage of PSYNC lies in the simplicity of the required inductive invariants. Compared with asynchronous programming models, the round structure allows looking at the system’s invariants at the boundary between rounds where the communication channels are empty.

Specifications include both safety and liveness properties written in linear temporal logic, and state properties in the logic CL (a first-order logic over sets of program states).

I found myself very much in need of an example while reading the paper. For this see LastVoting.scala in the PSync Github repo.

Here’s an example of a liveness predicate from that file:

    val livenessPredicate = List[Formula](
        P.exists( p => P.forall( q => p == coord &&
                                 p.HO.contains(q) &&
                                 p.HO.size > n/2) )

This is how invariants might be built up:

    val noDecision : Formula = ...
    val majority: Formula = ...
    val keepInit: Formula = ...
    val safetyInv = And(keepInit, Or(noDecision,majority)

    val invariants = List[Formula](
        safetyInv,
        P.exists( j => P.forall( 
            i => i.decided && i.decision == init(j.x)) )
    )

For each round, the pre- and post-condititions can be specified:

    override val roundInvariants = List(
        /* Collect */
        List[Formula](
            true,
            P.exists( i => i.commit )
        ),
        /* Candidate */
        List[Formula](
            ...
        ),
        ...
    )

State properties are specified as so:

    val properties = List[(String,Formula)](
      ("Termination",    P.forall( i => i.decided) ),
      ("Agreement",      P.forall( i => ...),
      ("Validity",       P.forall( i => ...),
      ("Integrity",      P.exists( j => ...),
      ("Irrevocability", P.forall( i => ...)
    )

(See LastVoting.scala for the full definitions). It takes a while for me to get used to seeing these kind of statements expressed this way in a source file.

The verifier checks the validity of the invariants and that they imply the specification by generating verification conditions that can be discharged using an SMT solver.

Verification of course requires a semantics for PSync programs, and this is defined in terms of labeled transition systems. See sections 3 and 4 in the full paper for details.

As an indication of the expressiveness of PSync, the authors compare Paxos implementations (using LastVoting for PSync) across different programming and specification languages:

We have implemented PSYNC as an embedding in the Scala programming language. The runtime of PSYNC is built on top of the Netty framework. For the transport layer, we use UDP. The serialization of messages uses the pickling library. The implementation is available at https://github.com/dzufferey/psync under an Apache 2.0 license.