Optimizing event journals

In this post I want to talk a little about some little optimization techniques, we've applied to some of the journals used in Akka.NET - those working with SQL backends. Before you read further - this post is written with assumption, that you're familiar with basics of eventsourcing architectures and a shape of them in Akka.NET. If you're not, you can read more about it here.

Old ways

The existing approach was quite simple and naive. Once a persistent actor had something to do with a persistent backend, it was sending a request to an EventJournal actor. In case of SQL, journal was creating a new DbConnection each time in order to execute every single request. This was not supposed to be something as bad as it sounds - ADO.NET handles connection pooling automatically.

However, reality resulted in some problems:

First one, a frequent creation and reopening of connection/command/transaction chain turned out to be not as cheap as we may wanted it to.

But more important, once the pressure kicks in, things starts to get nasty. Imagine tens of thousands of actors trying to persist their events at once. Eventually we'll saturate all of the connections from the pool - potentially even starving other parts of the application if our actor system is embedded into some bigger application (see: noisy neighbor problem). If this situation will continue, after some more time, timeouts will start to kick in, causing persist requests to throw and bubbling up errors to higher layers of application. The definitions of slow, unresponsive and dead database requests gets blurry at this point.

Is there anything we can do about it?

A new approach

Because nowadays, in order to present any new technique, we must give it some cheezy name, I've decided to describe this one as horizontal batching.

Initially we create a buffer for incoming requests and a a counter for a free connections remaining to be used.

In a happy case, when we are able to persist incoming events as they come, this new batching journal implementation works very similar to the old one, decrementing a connection counter. Once a connection gets released, we increment the counter back.

However when requests start to pile in, we may run out of free connections (counter go down to 0). In this case we start buffering them. Now, when a connection is released and can be reused, we take a page of requests from buffer and execute them within a single connection/command. There are few things we can configure here:

  • One is a maximum number of allowed operations to be executed concurrently (initial counter value). This way we may decide to give only a subset of our connection pool under supervision of the event journal.
  • Another one is maximum size of a single batch. As buffered requests can potentially grow big, we might decide to use only subset of them to be executed within a single connection.
  • The last important thing here is actually a maximum amount of stacked messages. In case when requests will come faster than they can be processed (even in batches), we could eventually run out of memory to even store them. With this setting, once a specified threshold is surpassed, an OnBufferOverflow method will be called. By default it will reject a request. However it can also be overridden to apply custom backpressure strategy.

How fast does it work? So far we managed to process over 12 000 command→events per second (we're talking about full eventsourced actor protocol roundtrips here) using both application and SQL Server database on default settings on a standard dev laptop (8 logical cores, 16GB RAM).

There are some more improvements still waiting on their turn. While new journal implementation is still experimental and needs some more use, it has been designed to work as drop-in replacement over existing one. That means it soon should came as a standard to existing SQL implementations.

Quest for optionally asynchronous APIs

In this blog post I wanted to share with some of the improvements, we've made while working on the FSharp.Data.GraphQL library. To start with I'm going to describe our use case, what issues did we observe and how we solved them. If you're not familiar with GraphQL, please introduce yourself with it.

Introduction

GraphQL is the application-level query language, that can be integrated with application logic on the server side through one of the libraries of your choice. FSharp.Data.GraphQL is one of those.

What most of the server implementations allows you to do, is to define custom resolution logic for any of the schema object's fields. We can define them as:

Define.Object("User", [  
    Define.Field("firstName", String, fun ctx user -> user.FirstName)
])

However sometimes we would like to have some more advanced logic. Some of the fields may not refer to a particular entity instance. They may be obtained as part of asynchronous call. Therefore we have introduced another creation method, which allows us to define asynchronous operations:

Define.AsyncField("picture", String, fun ctx user -> async { return! getPic user.AvatarId })  

GraphQL allows us to define queries to fetch only necessary fields. Queries are highly dynamic constructs - we never know which fields will be expected. This also means that if we have synchronous and asynchronous fields in our API, any combination of these may be expected on the output. This is, where the problem begins.

Problem

It turned out, that the case above - solving potential resolution of both synchronous and asynchronous values - was quite problematic for the underlying GraphQL execution engine, we was working on. Since F# is a statically typed language, we needed some uniform way to work with both synchronously and asynchronously resolved fields.

We started with modeling them all in terms of F# Async computations. However, Async introduces a constant overhead - both in terms of the CPU and memory. Overhead, which now applies to every resolved field. Bad part: As practice shows, for the ~99% of the time, field resolvers are synchronous. This means introducing a heavy overhead by default, where it was not needed for most cases.

In case if you think, you're free of that in C# Task Parallel Library - you're not. As I said, when combination of fields requested by query is dynamic and runtime-dependent, compiler is not able to determine when to optimize async methods or not at the compile time.

Solution

We needed other kind of abstraction - something that will allow us to work with Async computations, but also will respect mostly-synchronous nature of our problem.

If you're familiar with list of changes planned for the future for the C# language, you'll notice an idea called ValueTask - shortly speaking it's a lightweight value type that is going to conform async/await API and allows to use both immediately returned values and TPL Tasks in the same way. Exactly something, we needed here.

However, ValueTasks still belongs to the future. Besides, we're building F# library and we needed something, that would feel natural for the F# devs, where F# contains it's own Async primitive.

This is why we created our own AsyncVal type - it behaves similar to Async, but it's able to use optimized path for synchronously resolved values. To make it easier to work with we've also created asyncVal { ... } computation expression and interop methods for async { ... } builder. With it we are free to express things such as:

let result = asyncVal {  
    let rand = random.Next()
    if rand % 1000 = 1 
    then return! async { return! someAsyncOperation () }
    else return rand
} |> AsyncVal.get

... and get both support for asynchronous execution and optimal performance for happy case.

How fast it is? While this implementation is still in flux, we've made some initial benchmarks (remember: benchmarks lie and actual performance growth for our case was not so optimistic), comparing AsyncVal vs. async.Return. It turned out to be almost 3000 times faster with no heap allocations (it's a struct type, and introduces only 4 bytes of overhead for 32-bit and 8 bytes for 64-bit machines). For truly async computations, it introduces a minimal overhead over existing Async implementation. You can see actual benchmarks in the following PR.

This allowed us to optimize the most common cases, without loosing potential to work with some higher level abstractions.

Summary

Right now AsyncVal is part of the FSharp.Data.GraphQL library and probably will require some more polishing, as it's created to solve our specific problems, not ready to be used for a general purpose - i.e. error management isn't exactly uniform at the moment.

However this PoC already proves it's usefulness and may be used as a foundation for something greater. Maybe in the future it will deserve it's own package.

On GraphQL issues and how we're going to solve them

Facebook GraphQL is one of the emerging (web) technologies, giving a new light on the future of web APIs. I'm not going to introduce it here - there is a plenty of articles on that subject, starting from official site. What I'm going to write about is one specific issue of GraphQL design, we've met when working on its F# implementation. But lets start from the beginning.

Designing with capabilities

The most important difference between REST APIs and GraphQL endpoints is an approach to exposing data and available actions to the API consumers.

In RESTful APIs we describe them quite vaguely, as there is no standard approach here, and REST itself is more kind of a guidelines than natural rules. Of course some approaches to that have been already created (i.e. Swagger).

In general we expose our API as set of URLs (routes) which - when called with right set of input arguments and HTTP methods - will reply with the right data in the response.

Problem here is that eventually this always leads to epidemic growth of exposed routes, as we often need a little more (underfetching) or a little less (overfetching) fields than provided with endpoints exposed so far, but we don't want to end up making additional calls to complete missing data.

On the other hand, with GraphQL over- and under-fetching is almost never a problem. Reason for that is simple: instead of exposing endpoints aiming to reply to one particular query, we create a single endpoint an schema, which describes all data - with all possible relationships between entities - that is potentially accessible from corresponding service. Therefore you have all capabilities of your web service in one place.

So, where's the catch?

While all of that sounds simple and fun, the story behind that is a little more grim.

Lets consider a web service with some ORM and SQL-backed database - a quite common example nowadays. One of the well known problems with ORMs is their leaky abstraction. The one I'm going to write here about is known as N+1 SELECTs.

For those of you, who haven't heard of it: this is a common problem with almost all advanced ORMs, which happens when you haven't explicitly mentioned all fields, you're going to use, when sending query to a database. Once it has been executed and you're try to access a property, which has not been fetched from database, most "smart" ORMs will automagically send a subsequent query to retrieve missing data. If you're iterating over collection of those data objects, a new query will be send each time an (not fetched) field has been accessed.

How we solve this problem in RESTful API? Because we exactly know, what that is going to be returned from a route, we can simply tailor a logic able to retrieve all necessary data in a minimal number of queries.

And how about GraphQL? Well... you're fucked. There is no easy way to "tailor" a SQL query to match specific request, as an expected result set is highly dynamic, depending on the incoming GraphQL query string.

Of course, there are multiple approaches to solve this problem among many different implementations in multiple languages. Lets take a look at some of them.

Dedicated GraphQL-to-SQL library

Existing examples, I know about:

  • postgraph (node.js), which exposes PostgreSQL database schema into GraphQL, and is able to compile GraphQL queries into SQL.
  • GraphQL.NET (.NET), which translates GraphQL queries into LINQ.

Both of them have the same problems:

  • They specialize in translating one query language to another. This solves only a subset of problems. What if you need to merge response from database with a call to/from another web service?
  • They may usually introduce tight coupling between your Data Access Layer (or even database schema itself) and the model exposed on the endpoint.
  • They are separate libraries, usually hardly compatible with other, more general, GraphQL implementations.

Throttling and batching queries

Second kind - introduced in Ruby implementation - digs into backend of the ORM itself. What it did was splitting DB query execution into small timeframes. Therefore instead of immediate SQL query execution, we batch all requests within, lets say 10ms time frame, and execute them at once.

I won't cover that idea in detail, as it's more like the feature of underlying database provider than an application server implementation itself. Beside it feels a little hacky solution too ;)

Can we do more?

One of the major problems here is an interpretation of incoming GraphQL query. While most of the implementations expose info about things such as AST of the executed query and schema itself, usually that data is not correlated in any way.

In the FSharp.Data.GraphQL, we have created something called execution plan - it's an intermediate representation of GraphQL query, which includes things like inlining query fragment's fields and correlating them with schema and type system.

To show this on an example- lets say we have following type system described in our schema:

schema { query: Query }

type Query {  
    users: [User!]
}

type User {  
    id: ID!
    info: UserInfo!
    contacts: [UserInfo!]
}

type UserInfo {  
    firstName: String
    lastName: String
}

And we want to execute a query, which looks like this:

query Example {  
    users {
        id
        ...fname
        contacts {
            ...fname
        }
    }
}

fragment fname on UserInfo {  
    firstName
}

How an execution plan for that query looks like?

ResolveCollection: users of [User!]  
    SelectFields: users of User!
        ResolveValue: id of ID!
        SelectFields: info of UserInfo!
            ResolveValue: firstName of String
        ResolveCollection: contacts of [UserInfo!]
            SelectFields: contacts of UserInfo!
                ResolveValue: firstName of String

A little description:

  • ResolveValue is a leaf of the expression plan tree. It marks scalar or enum coercion.
  • SelectFields is used to mark retrieval of set of fields from object types.
  • ResolveCollection marks a node that is going to return a list of results.
  • ResolveAbstraction (not present on the example) is a map of object type ⇒ SelectFields set used for abstract types (unions and interfaces).

As we can see, information about query fragment has been erased (fragment's fields have been inlined), and every field contains a type information. What we have in a result is fully declarative representation of our query. What can we do from here?:

  • Standard execution path is trivial - we simply make a tree traversal and execute every field's resolve function on the way.
  • We can decide to use an interpreter to create another representation from it (i.e. LINQ/SQL query).
  • Since execution plan is build as a tree, we could "manually" add/remove nodes from it, or even split it in two, and then again interpret each part separately. Having a set operations (such as unions and intersections) over them is especially promising.
  • We could cache the execution plans, reducing number of operations necessary to perform when executing each query. This can be a big advantage in the future - especially when combined with publish/subscribe pattern.

While presence of intermediate representation doesn't solve all problems, I think it's a step in the right direction, as it allows us to compose our server logic in more declarative fashion without headaches about performance issues.

Don't Ask, Tell

Today I wanted to focus on some of the popular anti-patterns, many Akka.NET newcomers tend to use. I'm talking about using Ask method for communicating between actors. I want to get a closer look into what Ask really does underneath and why it's not a good choice for communicating between actors... for most of the time.

What is Ask and how does it work?

Ask is one of the extension methods for IActorRef interface, allowing to communicate between actors. Unlike Tell - which simply sends a message with fire and forget semantics - Ask is request/response based communication pattern. It sends a message to another actor, expecting it to respond with another message, and returning a task asynchronously notifying when the response will come back.

To understand, why Ask is usually a bad pattern for inter-actors communication, you need to know how does it work first.

As you may remember, actor communication is message-based. Additionally in Akka to buffer possible message bursts, each actor have its own mailbox. Therefore sender of the message always append it at the end of the recipient's mailbox, while the recipient is processing messages synchronously taking them off the mailbox, one by one. What's also important here, it won't take any new messages until the current one has been processed.

While for most of the time this is good approach (and can be performing better than i.e. SEDA-based solutions), it's kinda problematic for request/response pattern. Why? Since we need to correlate request with the response, after sending the request, we need to either:

  1. Scan the entire mailbox queue, looking for the response. The longer the mailbox (so the higher the congestion), the longer it will take to find a response. Also in this case mailbox must be able to pick a message from the middle of a queue, which would be not without limitations on the mailbox implementation itself.
  2. Create a temporary listener, which is able to receive the response. Moreover since communication may be proceed over the wire, a listener must encapsulate location transparency feature of actors.

Akka follows the second approach. This means that each Ask call is actually allocating a special kind of lightweight listener-actor. Such design makes an Ask a lot heavier - and we're talking about dozens times heavier - operation than using simple Tell. Good part is that thanks to such choice Tell still can be fast as hell.

Continuous blocking

As emphasized earlier, one of the actor's attributes is synchronous message processing. It's a great feature, since we no longer need to worry about things like race conditions, locks or blocking... except that request/response is inherently blocking communication pattern. To be clear, I'm not talking here about blocking the thread, as we're able to define async receive methods for actors. What I'm talking about, is blocking an actor itself, which cannot pick any new messages until the response arrives and processing finishes.

Imagine following example:

class MyActor: ReceiveActor  
{
    public MyActor(IActorRef other)
    {
        Recive<Request>(async request => {
            // this is really BAD practice
            var reply = await other.Ask<Reply>(request, TimeSpan.FromSeconds(30));
            Sender.Tell(reply);
        });
    }
}

In normal conditions, a single Akka.NET actor (v1.0.7) is able to receive up to 3.3mln messages/sec. Ok, even given bad characteristic of Ask, this would be still a fairly high amount (hundreds of thousands msg/sec under local affinity). But what if asked actor will never receive the message (suppose, it has been lost, or a receiver died for some reason)? Well, it will become idle until the timeout will trigger. In case above this mean around 30 sec of unresponsiveness... for anyone sending a message to that actor, not only the asker alone. This could be changed by the reentrancy of actors... which again would reintroduce race conditions in parallel access to actor's state, which is what actors were build to solve on the first place :)

I know that at this point some of you may say "But I really need this response to proceed with my work". Well, as practice shows... most of the time you don't ;) Most of the programmers get used to RPC, because it's inherent part of HTTP protocol. However in message-based systems, it's only one of the many communication patterns. Use your senses, take a look at the world around you, and you will see that most of the information exchange in the real life are not based on request/response. Believe me, once you change your mindset, you'll start to see things in a bigger picture.

NOTE: Another flaw of this particular part of code is that in case of failure on the asked actor's side, this failure is propagated back to requester - this may ultimately cause a cascading failure chain. While crashing an actor usually is not a big deal, don't think of it as a resource-free operation.

If not Ask, then what?

I wanted to describe few cases, where we usually think in terms of RPC, and how to change them into more message-embracing solutions. Take into account, that each case is different and there is no silver bullet here.

Push based events

Lets think about following scenario:

We have a multiplayer game, represented as actor system. We have a single actor representing state of the world, and multiple client-actors - each one working locally on part of that state. Before client proceeds with some action, it must ensure, it has an actual state of the world. Therefore it asks an world-actor for a current state at the beginning of every operation.

While this example is settled in multiplayer game scenario, it's shares many traits with standard business applications (world state ⇒ database). So how can we do that without using request/response model?

Now, change your mindset a little - instead of thinking about asking for current state, let allow the world-actor to notify every interested client with the state change, once it occurs. As result we have living actor system.

Need response? Use Forward + Tell back

Sometimes we may want to send a result of a computation back to the caller. What if that computation is a request chain between multiple different actors.

Imagine that we have 4 actors A, B, C and D. In our scenario A is the requester, that needs to be replied. It sends a message to B, which makes some operations, then asks C to make some more operations, which then also ask D for the same. Once the D finishes, the reply is passed down the call chain back to A. Given Ask as message pattern, we can model that scenario using following image:

Ask pipeline

Now we ended up with 3 temporal allocations (remember that Ask allocates), 6 message passes, and blocking 2 actors (B and C) until the whole request is completed... even thou they don't need participate in it after doing their part.

How can we change that? Use the following scenario:

Forward pipeline

As you may see, we replaced most of the Asks with Forward, but what it actually does? Forward is a special version of Tell, which propagates original message sender. Therefore A will be recognized as a message sender, as long as actors are passing the message using Forward. Thanks to that, D can send a reply directly to A, as other actors are not necessary anymore.

Moreover, because actors B and C don't need to wait for the reply, they are free to proceed other messages in the meantime.

Aggregator pattern

Other case you may think of, is a scatter/gather scenario. We have some group of actors to be requested, then we want to gather all replies and send the aggregated result back.

I've seen people trying to solve that case with combination of multiple Ask requests + Task.WhenAll. However because of weight of Ask (and in more extreme cases weight of Tasks themselves), this is not a performance-wise operation. So what should we do?

We can create a dedicated Aggregator actor, responsible for collecting all results from the requested actors and sending them back to the requester. There are many variants, taking many cases into account, however I think, that one of the most common ones is to:

  • Init an aggregator actor with collection of actors to ask. It'll be responsible for gathering responses for a particular request.
  • Propagate request to all actors through an aggregator.
  • When a reply arrives, remove the sender from the list of actors remaining to answer.
  • Apply receive timeout for safety (so we don't wait for response infinitely).
  • Once all actors have responded or timeout occurred, send aggregated replies back to the original sender and stop aggregator actor.

Following scenario can be represented just like in the code below and be abstracted to generic solution:

class Aggregator<T> : ReceiveActor  
{
    private IActorRef originalSender;
    private ISet<IActorRef> refs;

    public Aggregator(ISet<IActorRef> refs)
    {
        this.refs = refs;
        // this operation will finish after 30 sec of inactivity
        // (when no new message arrived)
        Context.SetReceiveTimeout(TimeSpan.FromSeconds(30));
        ReceiveAny(x =>
        {
            originalSender = Sender;
            foreach (var aref in refs) aref.Tell(x);
            Become(Aggregating);
        });
    }

    private void Aggregating()
    {
        var replies = new List<T>();
        // when timeout occurred, we reply with what we've got so far
        Receive<ReceiveTimeout>(_ => ReplyAndStop(replies));
        Receive<T>(x =>
        {
            if (refs.Remove(Sender)) replies.Add(x);
            if (refs.Count == 0) ReplyAndStop(replies);
        });
    }

    private void ReplyAndStop(List<T> replies)
    {
        originalSender.Tell(new AggregatedReply<T>(replies));
        Context.Stop(Self);
    }
}

When to use Ask?

After spending so much time writing, how bad usage of Ask is, the time has come to answer the question: so when I should use Ask?

  • First of all, Ask is great to maintain communication with external systems willing to communicate with Akka.NET. This is natural trend, as quite often such systems are in fact RESTfull web services.
  • Other case I can think about, is specific version of scatter/gather scenario, when number of potentially participating actors may vary depending on message parameters and cannot be cached. In that case it's hard to set stop condition for aggregator pattern upfront, so fallback to listener based option seems to be justified.

About Finite State Machines

One of the problem I see with explaining usefulness of Finite State Machines design to web developers is similar to explaining the monads - once you've get them, you loose the ability to explain them to others. It's sad, as their understanding opens before you a totally new ways of solving the problems. Nonetheless, let's try to do the impossible :)

Example

Let's take following example:

We're responsible for creating logic handling cinema seat reservations. Only free seats can be reserved, and before being paid for, they also need to be reserved first. Cinema customers can revoke reserved seats. They also have a limited time to pay for reservation - after time has passed, seat becomes automatically freed.

This can be simply modeled using diagram below:

Steat reservation state diagram

Looks quite easy, right? The actual problem is that this is nowhere close to real life. Quoting Morpheus from "Matrix" movie:

You've been living in the dream world, Neo. This is the world as exists today:

void Reserve(SeatSession seat, out CancellationToken cancel)  
{
    if (seat.ReservationDate != null) throw new SeatAlreadyReservedException();

    seat.ReservationDate = DateTime.UtcNow;
    cancel = Scheduler.Schedule(this.ReservationDate + paymentTimeout, 
        () => RevokeReservation(seat));
    // some more stuff
}

void RevokeReservation(SeatSession seat, CancellationToken cancel = null)  
{
    if (seat.ReservationDate == null) throw new InvalidOperationException();
    if (seat.HasBeenPaid) throw new SeatAlreadyPaidException();

    this.ReservationDate == null;

    if (cancel != null) cancel.Cancel();
    // some more stuff
}

void ConfirmPayment(SeatSession seat)  
{
    if (seat.ReservationDate == null) throw new InvalidOperationException();
    if (seat.HasBeenPaid) throw new SeatAlreadyPaidException();

    seat.HasBeenPaid = true;
}

This is not yet truly convoluted code, but I think, even at this point you can hardly tell, what was the original intent of it. To say, which methods are valid to call from current seat state, you need to go through the whole code. Now try to maintain or develop code written this way. I think, most of you already know that pain too well. The original idea has leaked from this representation. Why? Because most of us, when we think about business logic, we want to think in terms of those higher abstractions, states and transitions. Boolean flags and ifs are good for machines, not humans.

This is place, where Finite State Machines comes in. Instead of using cryptic if/else branches, we may try to represent original logic structure of states and transitions directly in our code.

In functional languages (like F#), you could express them using mutually recursive functions, like follows:

// lets define all valid state change triggers
type Transition = | Reserve | Revoke | Pay 

// each recursive function defines current state
// while pattern match describes valid transitions from it
let rec free seat = function  
    | Reserve -> 
        let cancel = schedule(DateTime.UtcNow + paymentTimeout, Revoke)
        // some more stuff
        reserved cancel seat
    | other -> 
        unhandled other
        free seat
and reserved cancel seat = function  
    | Revoke -> 
        // some more stuff
        free seat
    | Pay -> 
        cancel()
        // some more stuff
        paid seat
and paid seat = function  
    | other ->
        unhandled other
        paid seat

Looks closer to original domain design, isn't it? It's essentially a written representation of our original state diagram, to (and from) which we can go at any point in time.

Now think about modifying current logic? In first case we usually end up trying to guess original idea while singing "WTF?!" song of our people. In FSM approach we can easily recreate the mental model of this approach directly from the code (no need to look into outdated docs).

PS: While you may think "it's nice, but I'm a C# guy", some frameworks (like Akka.NET) allows you do build dynamic behaviors which can be utilized to build FSM also on the C# side.