Flow: Actor-based Concurrency with C++

Engineering challenges

FoundationDB began with ambitious goals for both high performance per node and scalability. We knew that to achieve these goals we would face serious engineering challenges while developing the FoundationDB core. We'd need to implement efficient asynchronous communicating processes of the sort supported by Erlang or the Async library in .NET, but we'd also need the raw speed and I/O efficiency of C++. Finally, we'd need to perform extensive simulation to engineer for reliability and fault tolerance on large clusters.

To meet these challenges, we developed several new tools, the first of which is Flow, a new programming language that brings actor-based concurrency to C++11. To add this capability, Flow introduces a number of new keywords and control-flow primitives for managing concurrency. Flow is implemented as a compiler which analyzes an asynchronous function (actor) and rewrites it as an object with many different sub-functions that use callbacks to avoid blocking (see streamlinejs for a similar concept using JavaScript). The Flow compiler's output is normal C++11 code, which is then compiled to a binary using traditional tools. Flow also provides input to our simulation tool, Lithium, which conducts deterministic simulations of the entire system, including its physical interfaces and failure modes. In short, Flow allows efficient concurrency within C++ in a maintainable and extensible manner, achieving all three major engineering goals:

A first look

Actors in Flow receive asynchronous messages from each other using a data type called a future. When an actor requires a data value to continue computation, it waits for it without blocking other actors. The following simple actor performs asynchronous addition. It takes a future integer and a normal integer as an offset, waits on the future integer, and returns the sum of the value and the offset:

ACTOR Future asyncAdd(Future f, int offset) {
  int value = wait( f );
  return value + offset;
}

Flow performance

We're sometimes asked if we really needed to implement our own programming language extension. Aren't there enough programming languages and libraries in the world already? We carefully evaluated a number of languages before implementing Flow. Having done that evaluation, we can confirm that Flow provides a combination of performance, concurrency, and simulation support that isn't readily available elsewhere.

Let's look a bit more at performance and concurrency using the ring benchmark proposed by Joe Armstrong in his book Programming Erlang: Software for a Concurrent World. Armstrong describes the benchmark as follows:

Write a ring benchmark. Create N processes in a ring. Send a message round the ring M times so that a total of N * M messages get sent. Time how long this takes for different values of N and M. Write a similar program in some other programming language you are familiar with. Compare the results. Write a blog, and publish the results on the internet!

Here are the results for N = 1000 and M = 1000 (Caveat: These benchmarks were found around the Internet ca. 2010. They are not from identical computers, so the results are only roughly comparable.)

Programming language Time in seconds
Ruby (using threads) 1990 sec
Ruby (using queues) 360 sec
Objective C (using threads) 26 sec
Java (using threads) 12 sec
Stackless Python 1.68 sec
Erlang 1.09 sec
Go 0.87 sec
Flow 0.075 sec

On this benchmark, Flow is about an order of magnitude faster than its closest competitors (Go, Erlang, and Stackless Python) and over two orders of magnitude faster than languages using conventional multithreading. We've found that our investment in Flow has paid off very well in the performance gains we've received.

Flow features

Flow's new keywords and control-flow primitives support the capability to pass messages asynchronously between components. Here's a brief overview.

Promise and Future

The data types that connect asynchronous senders and receivers are Promise and Future for some C++ type T. When a sender holds a Promise, it represents a promise to deliver a value of type T at some point in the future to the holder of the Future. Conversely, a receiver holding a Future can asynchronously continue computation until the point at which it actually needs the T.

Promises and futures can be used within a single process, but their real strength in a distributed system is that they can traverse the network. For example, one computer could create a promise/future pair, then send the promise to another computer over the network. The promise and future will still be connected, and when the promise is fulfilled by the remote computer, the original holder of the future will see the value appear.

wait()

At the point when a receiver holding a Future needs the T to continue computation, it invokes the wait() statement with the Future as its parameter. The wait() statement allows the calling actor to pause execution until the value of the future is set, returning a value of type T. During the wait, other actors can continue execution, providing asynchronous concurrency within a single process.

ACTOR

Only functions labeled with the ACTOR tag can call wait(). Actors are the essential unit of asynchronous work and can be composed to create complex message-passing systems. By composing actors, futures can be chained together so that the result of one depends on the output of another.

An actor is declared as returning a Future, where T may be Void if the actor's return value is used only for signaling. Each actor is preprocessed into a C++11 class with internal callbacks and supporting functions.

State

The state keyword is used to scope a variable so that it is visible across multiple wait() statements within an actor. The use of a state variable is illustrated in the example actor below.

PromiseStream, FutureStream

When a component wants to work with a stream of asynchronous messages rather than a single message, it can use PromiseStream and FutureStream. These constructs allow for two important features: multiplexing and reliable delivery of messages. They also play an important role in Flow design patterns. For example, many of the servers in FoundationDB expose their interfaces as a struct of promise streams—one for each request type.

waitNext()

waitNext() is the counterpart of wait() for streams. It pauses program execution and waits for the next value in a FutureStream. If there is a value ready in the stream, execution continues without delay.

choose . . . when

The choose and when constructs allow an actor to wait for multiple futures at once in a ordered and predictable way.

Example: A Server Interface

Below is a actor that runs on single server communicating over the network. Its functionality is to maintain a count in response to asynchronous messages from other actors. It supports an interface implemented with a loop containing a choose statement with a when for each request type. Each when uses waitNext() to asynchronously wait for the next request in the stream. The add and subtract interfaces modify the count itself, stored with a state variable. The get interface takes a Promise instead of just an int to facilitate sending back the return message.

To write the equivalent code directly in C++, a developer would have to implement a complex set of callbacks with exception-handling, requiring far more engineering effort. Flow makes it much easier to implement this sort of asynchronous coordination, with no loss of performance.

ACTOR void serveCountingServerInterface(
           CountingServerInterface csi) {
    state int count = 0;
    while (1) {
        choose {
            when (int x = waitNext(csi.addCount.getFuture())){
                count += x;
            }
            when (int x = waitNext(csi.subtractCount.getFuture())){
                count -= x;
            }
            when (Promise r = waitNext(csi.getCount.getFuture())){
                r.send( count ); // goes to client
            }
        }
    }
}