IronFleet: Proving Practical Distributed Systems Correct

IronFleet: Proving Practical Distributed Systems Correct – Hawblitzel et al. (Microsoft Research) 2015

Every so often a paper comes along that makes you re-evaluate your world view. I happily would have told you that full formal verification of non-trivial systems (especially distributed systems) in a practical manner (i.e. something you could consider using for real projects) was a long way off. Turns out it’s a lot closer than I thought….

This paper presents IronFleet, the first methodology for automated machine-checked verification of the safety and liveness of non-trivial distributed system implementations. The IronFleet methodology is practical: it supports complex, feature-rich implementations with reasonable performance and a tolerable proof burden.

The authors both developed the methodology, and built two distributed systems – a replicated state machine library and a key value store – in a total of 3.7 person-years. (It would be nice to see that broken down, i.e, given the methodology, how long did it actually take per-system?) The complex distributed system implementations worked first time!

In exchange for this effort, IronFleet produces a provably correct implementation with desirable liveness properties. Indeed, except for unverified components like our C# client, both IronRSL (including replication, view changes, log truncation, batching, etc.) as well as IronKV (including delegation and reliable delivery) worked the first time we ran them.

That’s impressive because distributed systems are notoriously hard to get right….

Distributed systems are notoriously hard to get right. Protocol designers struggle to reason about concurrent execution on multiple machines, which leads to subtle errors. Engineers implementing such protocols face the same subtleties and, worse, must improvise to fill in gaps between abstract protocol descriptions and practical constraints, e.g., that real logs cannot grow without bound. Thorough testing is considered best practice, but its efficacy is limited by distributed systems’ combinatorially large state spaces.

It’s interesting to juxtapose the approach in this paper with Colin Scott’s recently published blog post on Fuzzing Raft for fun and publication, in which he tests a Raft implementation and finds numerous correctness bugs. Raft is of course designed for ease of understandability and implementation, and has been widely studied…

The IronFleet methodology supports proving both safety and liveness properties of distributed system implementations. A safety property says that the system cannot perform incorrect actions; e.g., replicated-state-machine linearizability says that clients never see inconsistent results. A liveness property says that the system eventually performs a useful action, e.g., that it eventually responds to each client request. In large-scale deployments, ensuring liveness is critical, since a liveness bug may render the entire system unavailable… Our proofs reason all the way down to the bytes of the UDP packets sent on the network, guaranteeing correctness despite packet drops, reorderings, or duplications. Regarding liveness, IronFleet breaks new ground: to our knowledge, IronFleet is the first system to mechanically verify liveness properties of a practical protocol, let alone an implementation.

To pull off this feat, IronFleet breaks the verification problem into distinct layers, using refinement to prove the connections at each stage, and the appropriate tool for the job in each layer.

A high level specification is written as a state machine. Everything that follows has the objective of showing that the eventual implementation faithfully does what the spec. says it should do. Therefore keeping this top-level specification succint and easy to follow is important. For IronRSL (the replicated state machine library) the specification is 85 lines, and for the KV store it is only 34.

At the next level of refinement comes an abstract distributed protocol layer. This introduces the concept of individual hosts that communicate only via network messages. To keep the complexity in this layer manageable the state model uses unbounded integers and sequences of values, and immutable types. The network model deals in high-level structured packets. The protocol layer specifies only what must happen at each step – not how to achieve it, and furthermore assumes that steps are autonomous.

Of course, you need to show that the distributed protocol-based specification is a refinement of the top-level specification, and for this TLA-style techniques are used as embodied in the Dafny language.

The final stage is to produce an implementation of the protocol. Here the developer writes single-threaded imperative code to run on each host, using the Dafny language. Once more, it is necessary to show that the implementation is a faithful refinement of the protocol. The first step is to prove that the host implementation refines the host state machine in the distributed protocol layer, and the second step is to show that a distributed system comprising N host implementations refines the distributed protocol of N hosts.

To avoid complex reasoning about interleaved execution of low-level operations at multiple hosts, we use a concurrency containment strategy: the proofs above assume that every implementation step performs an atomic protocol step. Since the real implementation’s execution is not atomic, we use a reduction argument (§3.6) to show that a proof assuming atomicity is equally valid as a proof for the real system. This argument requires a mechanically verified property of the implementation, as well as a small paper-only proof about the implications of the property.

So far we have a methodology for proving safety properties of the system (it must not deviate from the high-level state machine specification). But we also want to prove liveness – that is, that the system will make forward progress and eventually do what it is supposed to. We want to prove the absence of livelock and deadlock.

Liveness properties are much harder to verify than safety properties. Safety proofs need only reason about two system states at a time: if each step between two states preserves the system’s safety invariants, then we can inductively conclude that all behaviors are safe. Liveness, in contrast, requires reasoning about infinite series of system states. Such reasoning creates challenges for automated theorem provers (§2.4), often causing the prover to time out rather than return a successful verification or a useful error message.

IronFleet addresses this by embedding TLA in Dafny in a way that hides detail from the prover except where it is truly necessary.

We then use our TLA embedding to build a library of fundamental TLA proof rules verified from first principles. This library is a useful artifact for proving liveness properties of arbitrary distributed systems: its rules allow both the human developer and Dafny to operate at a high level by taking large proof steps with a single call to a lemma from the library. Finally, by structuring our protocols with always-enabled actions, we significantly simplify the task of proving liveness properties.

Top-Level Spec

With IronFleet, the developer writes the system’s spec as a state machine: starting with some initial state, the spec succinctly describes how that state can be transformed. The spec defines the state machine via three predicates, i.e., functions that return true or false. SpecInit describes acceptable starting states, SpecNext describes acceptable ways to move from an old to a new state, and SpecRelation describes the required conditions on the relation between an implementation state and its corresponding abstract state.

Here’s an example spec. for a distributed lock, which can only be held by one host at any one point in time:


    datatype SpecState = SpecState(history : seq)
    
    predicate SpecInit SpecInit(ss:SpecState){ 
       |ss.history|==1 && ss.history[0] in AllHostIds() 
    }
    
    predicate SpecNext(ss_old:SpecState,ss_new:SpecState){ 
      exists new_holder :: new_holder in AllHostIds() &&
        ss_new.history == ss_old.history + [new_holder] 
    }
    
    // an impl is consistent with the spec. if all msgs in epoch n
    // come from the nth host in the history
    predicate SpecRelation(is:ImplState,ss:SpecState){ 
      forall p :: p in is.sentPackets && p.msg.lock? ==> 
        p.src == ss.history[p.msg.epoch] 
    }

See the IronRSL host specification and the IronKV host specification in the project’s GitHub repository.

Abstract Protocol

We formally specify, in Dafny (§2.2), a distributed system state machine. This state machine consists of N host state machines and a collection of network packets. In each step of the distributed system state machine, one host’s state machine takes a step, allowing it to atomically read messages from the network, update its state, and send messages to the network; §3.6 relaxes this atomicity assumption. The developer must specify each host’s state machine: the structure of the host’s local state, how that state is initialized (HostInit), and how it is updated (HostNext).

Here’s an example of an abstract protocol specification for the distributed lock :


    datatype Host = Host(held:bool,epoch:int)  
    
    predicate HostInit(s:Host,id:HostId,held:bool) { 
      s.held==held && s.epoch==0
    }
    
    predicate HostGrant(s_old:Host,s_new:Host,spkt:Packet) {
      s_old.held && 
      !s_new.held && 
      spkt.msg.transfer? && 
      spkt.msg.epoch == s_old.epoch+1 
    }
    
    predicate HostAccept(s_old:Host,s_new:Host, rpkt:Packet,spkt:Packet){ 
      !s_old.held && 
      s_new.held && 
      rpkt.msg.transfer? && 
      s_new.epoch == rpkt.msg.epoch == spkt.msg.epoch && 
      spkt.msg.lock?
    }
    
    predicate HostNext(s_old:Host,s_new:Host, rpkt:Packet,spkt:Packet { 
      HostGrant(s_old,s_new,spkt) ||
      HostAccept(s_old,s_new,rpkt,spkt)
    }

The challenge of proving the protocol-to-spec theorem comes from reasoning about global properties of the distributed system. One key tool is to establish invariants: predicates that should hold throughout the execution of the distributed protocol… Identifying the right invariants for a given protocol requires a deep understanding of the protocol, but it is a skill one develops with experience.

The Dafny language is used for the abstract protocol specification (and also for the implementation). This reduces but does not eliminate the human proof effort required. Having both the protocol specification and the implementation written in Dafny avoids any semantic gaps between the implementation’s view of the protocol and the protocol that is actually proved correct.

See the IronRSL protocol specification and the IronKV protocol specification in GitHub.

Implementation

In the implementation layer the developer writes single-threaded, imperative code to run on each host. This code must cope with all of the ugly practicalities we abstracted away in the protocol layer. For instance, it must handle real-world constraints on how hosts interact: since network packets must be bounded-sized byte arrays, we need to prove the correctness of our routines for marshalling high-level data structures into bytes and for parsing those bytes. We also write the implementation with performance in mind, e.g., using mutable arrays instead of immutable sequences and using uint64s instead of infinite-precision integers. The latter requires us to prove the system correct despite the potential for integer overflow.

Dafny does not natively support networking (which would be limiting for building distributed systems 😉 ), so the language is extended with a trusted UDP specification.

There’s an interesting observation regarding functional vs imperative programming styles here too:

Verifying imperative code is challenging compared with verifying purely functional code, even when using a state-of-the-art tool like Dafny that is designed for imperative programs (§2.2). Thus, we found it profitable to implement the system in two stages. First, we develop an implementation using immutable value (functional) types and show that it refines the protocol layer. Avoiding heap reasoning simplifies the refinement proof, but, it produces a slow implementation, since it cannot exploit the performance of heap references. In the second stage, we replace the value types with mutable heap types, improving performance while solving only a narrow verification problem.

Once a program verifies, Dafny compiles it to C# and has the .NET compiler produce an executable.

Refinement

The refinement proofs follow the same basic strategy at each layer.

  • First there is a refinement function defined that maps a state in the lower specification (e.g. protocol spec) to a corresponding state in the higher level spec (e.g. top-level spec).
  • Then it is proved that applying this refinement function to the initial state of the lower level specification, produces a state that satisfies the initial conditions of the higher-level spec.
  • Finally it is proved that if a step in the lower level protocol takes the state from A to B, then there is a valid sequence of higher level spec. steps that goes from refine(A) to refine(B).

Evaluation

The authors built a replicated state machine library and a key-value store, without degrading functionality to make the specification and proof work easier!

IronRSL replicates a deterministic application on multiple machines to make that application fault-tolerant. Such replication is commonly used for services, like Chubby and Zookeeper, on which many other services depend. Due to these dependencies, correctness bugs in replication can lead to cascading problems, and liveness bugs can lead to widespread outages of all dependent services. IronRSL guarantees safety and liveness without sacrificing complex implementation features necessary to run real
workloads. For instance, it uses batching to amortize the cost of consensus across multiple requests, log truncation to constrain memory usage, responsive view-change timeouts to avoid hard-coded assumptions about timing, state transfer to let nodes recover from extended network disconnection, and a reply cache to avoid unnecessary work.

In the development of IronRSL and IronKV the authors also wrote and verified several generic libraries useful for distributed systems. The performance of IronRSL is compared to the Go implementation of Multi-Paxos in the EPaxos codebase, and found to be within 2.4x. IronKV’s performance is competitive with Redis! I’ll leave you with a reminder that both programs worked correctly the very first time they were run!. What would be really interesting here, is to see what would happen if Colin Scott were to fuzz IronRSL for fun and publication

See the IronClad project page for links to all the tools and repos used in the paper.