Bartosz Sypytkowski's Picture

Bartosz Sypytkowski

32 posts

Akka.Persistence: most common misconceptions

Akka.Persistence is one of the plugins, that introduce an eventsourced persistence mechanics into Akka.NET. However, I've often seen people having problems, since neither actors nor eventsourcing are part of mainstream .NET development. A lot of these are often a general problems of eventsourced systems, therefore I've decided to write some more about them and present potential solutions from the perspective of Akka.Persistence user.

This post describes current state of things. They may change in the future, as Akka.NET team want to address some of them.

Control your own data

There are two ways, how Akka.NET event journals may store your data:

  1. Some of them may define totally custom serialization mechanics, i.e:
    • MongoDB persistence uses BSON serializer which requires you to define bindings to all serialized message types.
    • PostgreSQL plugin has configuration switch which will use JSON.NET to store your data as JSON/JSONB column (which is well understood by this database). In future, the rest of SQL databases will get that support as well.
  2. At the moment most of them however will use default akka serialization mechanics: the same that are used to pass messages between cluster nodes.

One of the first mistakes, everyone do, is giving up your data in the hands of the default serializer. First of all, you're giving up ability to encode/decode your data into 3rd party dependency (sic!). And what's most important, default serializer is build for remote message exchange, it may NOT be a good way to store immutable, everlasting data structures.

Even Newtonsoft.Json serializer (which was initially choosen as Akka.NET default serializer) is not a good option. If you were thinking "it's just a JSON, what could go wrong", the story is a little more complicated. Default configuration of it encodes a full type name as a part of the payload in order to support polymorphic deserialization. This means that once you'll change your event qualified type description (namespace, type name or assembly) - something you shouldn't do anyway - your old data is lost, as it'll try to deserialize it to type that can no longer be found.

So what to choose?

  1. Some people are using schema-less formats, as they allow for a certain degree of version tolerance without much work. This includes JSON, just not on the settings, that are used by Akka.NET remote serializer.
  2. Others (including me) are more enthusiasts of schema-based serializers i.e. Google Protocol Buffers or Microsoft Bond, as they keep event schema/contract explicit and well defined.

Why I've linked Google.Protobuf and not protobuf-net? I think that keeping schema in .proto files makes things more explicit and prevents accidental schema changes.

Configuring a custom serializer

Writing a custom serializer is rather easy. All you need is to write down a custom class inheriting from Serializer base class. But how to bind it to a specific event type?

Akka.NET serialization bindings are based on types. This means, that you need to specify an event-typeserializer pair. Type provided here doesn't have to be actually a concrete class - Akka will investigate inheritance chain and all implementations in order to resolve a binding. This means that usually we create an empty marker interface and implement it in all events, that we want to serialize with target serializer.

An example HOCON config may look like that:

akka.actor {  
  serializers.my-custom = "MyNamespace.MySerializer, MyAssembly"
  serialization-bindings {
    "MyNamespace.IDomainEvent, MyAssembly" = my-custom
  }
}

Have an event versioning strategy

As events are immutable by definition, we need to prepare some approach, how to behave if we want to be able to change them as we develop our system. This is what we call event versioning.

If you want to have a wider perspective on that problem, you probably should read Versioning in an Eventsourced System by Greg Young.

Most of the people see and understand a rationale behind having a migration strategy for their database. However, a lot less of them have an idea on how to approach a schema evolution in eventsourced systems.

Event adapters

One of the features of Akka.Persistence are event adapters. They are a middleware components that you can plug into a pipeline between event journal and a persistent actor, allowing you to modify deserialized events, before they reach the destination.

To create a custom event adapter, you need a class that implements either a IReadEventAdapter (to interpect events received from the journal), IWriteEventAdapter (to intercept events send to the journal) or just IEventAdapter which combines both.

Event adapters can be used to upgrade old event versions to newer ones or event to deconstruct single event into multiple ones. When is that useful? Example: if you made a mistake in the past, and defined an event that actually had more than a single responsibility (i.e. OrderedAndConfirmed instead of separate Ordered/Confirmed kind of events).

But how to bind an adapter to be used for some event in scope of the journal?

# we will use SqlServer journal for this example
akka.persistence.journal.sql-server {  
  event-adapters {
    custom-adapter = "MyNamespace.MyEventAdapter, MyAssembly"
  }
  event-adapter-bindings {
    "MyNamespace.V1.IDomainEvent, MyAssembly" = custom-adapter
  }
}

Don't use snapshots without events

This is a popular problem for the people starting their journey with Akka.Persistence. Quite often they see persistence snapshots and think: hey, I don't need eventsourcing, let me just use one of these. However a sole role of Akka.Persistence snapshots is to optimize streams of events.

If you're into snapshot-based persistence, Akka.Persistence is definitely not for you. Persistent actors are build to support different needs, just like journals and even snapshots themselves - as they have fields used to correlate their position in a stream of events. They also don't map well into multi-table/multi-document database hierarchies.

Another thing: when you're persisting events with PersistentActor a next command won't be handled until the event has been persisted successfully and its callback has been called. This is not the case in SaveSnapshot, which doesn't block actor from picking a next message before confirming, that snapshot has been saved.

This is not the problem in eventsourced systems - if system will die before saving the snapshot, we replay from the last successfully stored snapshot and an events, that came afterward - but without events you may process tens of messages without any confirmation and lost that data upon actor system shutdown.

[C#] Using gRPC with custom serializers

I've decided to write this post, because I've noticed a little lack of gRPC-related knowledge in .NET community. For those of you who are not familiar with it - gRPC is a standard protocol for defining high-performance RPC services, build on top of HTTP 2 by Google.

Since most of the manuals talk about gRPC services from perspective of building services using code generation tools, I'll show how to tweak the API itself and adapting it for our needs.

Since I'm not a great fan of code gen - my mindset here is more or less avoid doing things that should belong to the compiler - I've decided to drop the dependency on service definitions for gRPC and proto files for Google Protocol Buffers. But how can we define custom services and use any serializer we want back there?

Building a custom gRPC service

Core component of gRPC is a method descriptor which informs both communicating parties about details, such as what type of messages are send/received on both sides, how should they be serialized and which of the message passing patterns do they apply.

First, what descriptor needs is to specify a communication pattern. gRPC allows to use one of 4 different patterns:

  • Unary which is simply an equivalent of standard request/response communication between two participants.
  • ServerStreaming when a client sends a single requests and gets an asynchronous stream of messages back from the server.
  • ClientStreaming when a client is a side, which sends an asynchronous stream of messages, and the server responds with a single reponse, once stream completes.
  • DuplexStreaming when both client an server share two simultaneous async streams of messages working in both directions. Nice thing with read streams is that they implement IAsyncEnumerator<> interface - an IEnumerator<> equivalent that allows for asynchronous element iteration. Who knows, maybe in the future it will be integrated into C# syntax like for loops ¯\_(ツ)_/¯.

While communication styles may be different, protocol always describe single message type responsible for handling requests inside provided method, and a single type of response messages. Method descriptor will require you to describe marshallers for each one of them.

Marshaller is basically a pair of functions used to serialize data back and for to array of bytes. This is my biggest complain around current design of gRPC API for .NET. IMHO there are better binary containers (ArraySegment<byte> as example) for a protocols designed for high throughput / low latency systems.

Given all of these, an example descriptor may look like that:

using Grpc.Core;

private static readonly Method<CustomRequest, CustomResponse> CustomMethod =  
    new Method<CustomRequest, CustomResponse>(
        type: MethodType.DuplexStreaming,
        serviceName: "CustomService",
        name: "CustomMethod",
        requestMarshaller: Marshallers.Create(
            serializer: request => SerializeIntoBytes(request),
            deserializer: bytes => DeserializeFromBytes(bytes)),
        responseMarshaller: Marshallers.Create(
            serializer: response => SerializeIntoBytes(response),
            deserializer: bytes => DeserializeFromBytes(bytes)));

I must admit, I like this approach to serialization. It's way more composable to give a pair of functions than to building a custom interface implementation each time a new library wants an integration point.

Once we got that, let's move to server definition. Server needs an instance of ServerServiceDefinition, a descriptor of the service itself (so far we created only a method descriptor). Thankfully we can use existing builders to quickly construct that for us.

Depending on MethodType we've chosen, we need a different versions of lambda handlers. Here I'm using duplex streaming, but keep in mind that due to API construction compiler won't detect mismatch between method type and actual delegate handler used.

var server = new Server  
{
    Ports = { { "127.0.0.1", 5000, ServerCredentials.Insecure } },
    Services =
    {
        ServerServiceDefinition.CreateBuilder()
        .AddMethod(CustomMethod, async (requestStream, responseStream, context) =>
            {
                await requestStream.ForEachAsync(async request =>
                {
                    // handle incoming request
                    // push response into stream
                    await responseStream.WriteAsync(new CustomResponse {Payload = request.Payload});
                });
            })
        .Build()
    }
};

Here a context describes all data characteristic to a connection itself.

Keep in mind that pulling data from request stream an pushing new data into response stream can happen at any pace. No need for keeping 1-for-1 rule here.

For a simple duplex streaming of ping-pong service, our server is now ready to start. Let's move to a client. For this, first we need to establish a Channel. What's easy to miss here: when creating a channel, we need to provide endpoint of a server we wish to connect to, not the client itself.

Once we got our channel, we need a call invoker. We have a few options here i.e. for a case when you want to inject your own interceptors. However here, we'll go just with a default one.

Given channel and call interceptor, we need to make an actual call. Here we're providing a method descriptor we defined earlier. We can provide some additional parameters (since gRPC uses HTTP 2.0, we can i.e. set some HTTP headers here). Again, just like in a case of server handler, make sure that the call method you're using reflects correct method type.

var channel = new Channel("127.0.0.1", 5000, ChannelCredentials.Insecure);  
var invoker = new DefaultCallInvoker(channel);  
using (var call = invoker.AsyncDuplexStreamingCall(CustomMethod, null, new CallOptions  
{
}))
{
    var responseCompleted = call.ResponseStream
        .ForEachAsync(async response => /* handle response*/));
    for (int i = 0; i < Iterations; i++)
    {
        await call.RequestStream.WriteAsync(new CustomRequest {Payload = i});
    }

    await call.RequestStream.CompleteAsync();
    await responseCompleted;
}

EDIT (2017.03.29): if you want to try it, you may check a simple hello world example, available now in this Github repository.

Optimizing event journals

In this post I want to talk a little about some little optimization techniques, we've applied to some of the journals used in Akka.NET - those working with SQL backends. Before you read further - this post is written with assumption, that you're familiar with basics of eventsourcing architectures and a shape of them in Akka.NET. If you're not, you can read more about it here.

Old ways

The existing approach was quite simple and naive. Once a persistent actor had something to do with a persistent backend, it was sending a request to an EventJournal actor. In case of SQL, journal was creating a new DbConnection each time in order to execute every single request. This was not supposed to be something as bad as it sounds - ADO.NET handles connection pooling automatically.

However, reality resulted in some problems:

First one, a frequent creation and reopening of connection/command/transaction chain turned out to be not as cheap as we may wanted it to.

But more important, once the pressure kicks in, things starts to get nasty. Imagine tens of thousands of actors trying to persist their events at once. Eventually we'll saturate all of the connections from the pool - potentially even starving other parts of the application if our actor system is embedded into some bigger application (see: noisy neighbor problem). If this situation will continue, after some more time, timeouts will start to kick in, causing persist requests to throw and bubbling up errors to higher layers of application. The definitions of slow, unresponsive and dead database requests gets blurry at this point.

Is there anything we can do about it?

A new approach

Because nowadays, in order to present any new technique, we must give it some cheezy name, I've decided to describe this one as horizontal batching.

Initially we create a buffer for incoming requests and a a counter for a free connections remaining to be used.

In a happy case, when we are able to persist incoming events as they come, this new batching journal implementation works very similar to the old one, decrementing a connection counter. Once a connection gets released, we increment the counter back.

However when requests start to pile in, we may run out of free connections (counter go down to 0). In this case we start buffering them. Now, when a connection is released and can be reused, we take a page of requests from buffer and execute them within a single connection/command. There are few things we can configure here:

  • One is a maximum number of allowed operations to be executed concurrently (initial counter value). This way we may decide to give only a subset of our connection pool under supervision of the event journal.
  • Another one is maximum size of a single batch. As buffered requests can potentially grow big, we might decide to use only subset of them to be executed within a single connection.
  • The last important thing here is actually a maximum amount of stacked messages. In case when requests will come faster than they can be processed (even in batches), we could eventually run out of memory to even store them. With this setting, once a specified threshold is surpassed, an OnBufferOverflow method will be called. By default it will reject a request. However it can also be overridden to apply custom backpressure strategy.

How fast does it work? So far we managed to process over 12 000 command→events per second (we're talking about full eventsourced actor protocol roundtrips here) using both application and SQL Server database on default settings on a standard dev laptop (8 logical cores, 16GB RAM).

There are some more improvements still waiting on their turn. While new journal implementation is still experimental and needs some more use, it has been designed to work as drop-in replacement over existing one. That means it soon should came as a standard to existing SQL implementations.

Quest for optionally asynchronous APIs

In this blog post I wanted to share with some of the improvements, we've made while working on the FSharp.Data.GraphQL library. To start with I'm going to describe our use case, what issues did we observe and how we solved them. If you're not familiar with GraphQL, please introduce yourself with it.

Introduction

GraphQL is the application-level query language, that can be integrated with application logic on the server side through one of the libraries of your choice. FSharp.Data.GraphQL is one of those.

What most of the server implementations allows you to do, is to define custom resolution logic for any of the schema object's fields. We can define them as:

Define.Object("User", [  
    Define.Field("firstName", String, fun ctx user -> user.FirstName)
])

However sometimes we would like to have some more advanced logic. Some of the fields may not refer to a particular entity instance. They may be obtained as part of asynchronous call. Therefore we have introduced another creation method, which allows us to define asynchronous operations:

Define.AsyncField("picture", String, fun ctx user -> async { return! getPic user.AvatarId })  

GraphQL allows us to define queries to fetch only necessary fields. Queries are highly dynamic constructs - we never know which fields will be expected. This also means that if we have synchronous and asynchronous fields in our API, any combination of these may be expected on the output. This is, where the problem begins.

Problem

It turned out, that the case above - solving potential resolution of both synchronous and asynchronous values - was quite problematic for the underlying GraphQL execution engine, we was working on. Since F# is a statically typed language, we needed some uniform way to work with both synchronously and asynchronously resolved fields.

We started with modeling them all in terms of F# Async computations. However, Async introduces a constant overhead - both in terms of the CPU and memory. Overhead, which now applies to every resolved field. Bad part: As practice shows, for the ~99% of the time, field resolvers are synchronous. This means introducing a heavy overhead by default, where it was not needed for most cases.

In case if you think, you're free of that in C# Task Parallel Library - you're not. As I said, when combination of fields requested by query is dynamic and runtime-dependent, compiler is not able to determine when to optimize async methods or not at the compile time.

Solution

We needed other kind of abstraction - something that will allow us to work with Async computations, but also will respect mostly-synchronous nature of our problem.

If you're familiar with list of changes planned for the future for the C# language, you'll notice an idea called ValueTask - shortly speaking it's a lightweight value type that is going to conform async/await API and allows to use both immediately returned values and TPL Tasks in the same way. Exactly something, we needed here.

However, ValueTasks still belongs to the future. Besides, we're building F# library and we needed something, that would feel natural for the F# devs, where F# contains it's own Async primitive.

This is why we created our own AsyncVal type - it behaves similar to Async, but it's able to use optimized path for synchronously resolved values. To make it easier to work with we've also created asyncVal { ... } computation expression and interop methods for async { ... } builder. With it we are free to express things such as:

let result = asyncVal {  
    let rand = random.Next()
    if rand % 1000 = 1 
    then return! async { return! someAsyncOperation () }
    else return rand
} |> AsyncVal.get

... and get both support for asynchronous execution and optimal performance for happy case.

How fast it is? While this implementation is still in flux, we've made some initial benchmarks (remember: benchmarks lie and actual performance growth for our case was not so optimistic), comparing AsyncVal vs. async.Return. It turned out to be almost 3000 times faster with no heap allocations (it's a struct type, and introduces only 4 bytes of overhead for 32-bit and 8 bytes for 64-bit machines). For truly async computations, it introduces a minimal overhead over existing Async implementation. You can see actual benchmarks in the following PR.

This allowed us to optimize the most common cases, without loosing potential to work with some higher level abstractions.

Summary

Right now AsyncVal is part of the FSharp.Data.GraphQL library and probably will require some more polishing, as it's created to solve our specific problems, not ready to be used for a general purpose - i.e. error management isn't exactly uniform at the moment.

However this PoC already proves it's usefulness and may be used as a foundation for something greater. Maybe in the future it will deserve it's own package.

On GraphQL issues and how we're going to solve them

Facebook GraphQL is one of the emerging (web) technologies, giving a new light on the future of web APIs. I'm not going to introduce it here - there is a plenty of articles on that subject, starting from official site. What I'm going to write about is one specific issue of GraphQL design, we've met when working on its F# implementation. But lets start from the beginning.

Designing with capabilities

The most important difference between REST APIs and GraphQL endpoints is an approach to exposing data and available actions to the API consumers.

In RESTful APIs we describe them quite vaguely, as there is no standard approach here, and REST itself is more kind of a guidelines than natural rules. Of course some approaches to that have been already created (i.e. Swagger).

In general we expose our API as set of URLs (routes) which - when called with right set of input arguments and HTTP methods - will reply with the right data in the response.

Problem here is that eventually this always leads to epidemic growth of exposed routes, as we often need a little more (underfetching) or a little less (overfetching) fields than provided with endpoints exposed so far, but we don't want to end up making additional calls to complete missing data.

On the other hand, with GraphQL over- and under-fetching is almost never a problem. Reason for that is simple: instead of exposing endpoints aiming to reply to one particular query, we create a single endpoint an schema, which describes all data - with all possible relationships between entities - that is potentially accessible from corresponding service. Therefore you have all capabilities of your web service in one place.

So, where's the catch?

While all of that sounds simple and fun, the story behind that is a little more grim.

Lets consider a web service with some ORM and SQL-backed database - a quite common example nowadays. One of the well known problems with ORMs is their leaky abstraction. The one I'm going to write here about is known as N+1 SELECTs.

For those of you, who haven't heard of it: this is a common problem with almost all advanced ORMs, which happens when you haven't explicitly mentioned all fields, you're going to use, when sending query to a database. Once it has been executed and you're try to access a property, which has not been fetched from database, most "smart" ORMs will automagically send a subsequent query to retrieve missing data. If you're iterating over collection of those data objects, a new query will be send each time an (not fetched) field has been accessed.

How we solve this problem in RESTful API? Because we exactly know, what that is going to be returned from a route, we can simply tailor a logic able to retrieve all necessary data in a minimal number of queries.

And how about GraphQL? Well... you're fucked. There is no easy way to "tailor" a SQL query to match specific request, as an expected result set is highly dynamic, depending on the incoming GraphQL query string.

Of course, there are multiple approaches to solve this problem among many different implementations in multiple languages. Lets take a look at some of them.

Dedicated GraphQL-to-SQL library

Existing examples, I know about:

  • postgraph (node.js), which exposes PostgreSQL database schema into GraphQL, and is able to compile GraphQL queries into SQL.
  • GraphQL.NET (.NET), which translates GraphQL queries into LINQ.

Both of them have the same problems:

  • They specialize in translating one query language to another. This solves only a subset of problems. What if you need to merge response from database with a call to/from another web service?
  • They may usually introduce tight coupling between your Data Access Layer (or even database schema itself) and the model exposed on the endpoint.
  • They are separate libraries, usually hardly compatible with other, more general, GraphQL implementations.

Throttling and batching queries

Second kind - introduced in Ruby implementation - digs into backend of the ORM itself. What it did was splitting DB query execution into small timeframes. Therefore instead of immediate SQL query execution, we batch all requests within, lets say 10ms time frame, and execute them at once.

I won't cover that idea in detail, as it's more like the feature of underlying database provider than an application server implementation itself. Beside it feels a little hacky solution too ;)

Can we do more?

One of the major problems here is an interpretation of incoming GraphQL query. While most of the implementations expose info about things such as AST of the executed query and schema itself, usually that data is not correlated in any way.

In the FSharp.Data.GraphQL, we have created something called execution plan - it's an intermediate representation of GraphQL query, which includes things like inlining query fragment's fields and correlating them with schema and type system.

To show this on an example- lets say we have following type system described in our schema:

schema { query: Query }

type Query {  
    users: [User!]
}

type User {  
    id: ID!
    info: UserInfo!
    contacts: [UserInfo!]
}

type UserInfo {  
    firstName: String
    lastName: String
}

And we want to execute a query, which looks like this:

query Example {  
    users {
        id
        ...fname
        contacts {
            ...fname
        }
    }
}

fragment fname on UserInfo {  
    firstName
}

How an execution plan for that query looks like?

ResolveCollection: users of [User!]  
    SelectFields: users of User!
        ResolveValue: id of ID!
        SelectFields: info of UserInfo!
            ResolveValue: firstName of String
        ResolveCollection: contacts of [UserInfo!]
            SelectFields: contacts of UserInfo!
                ResolveValue: firstName of String

A little description:

  • ResolveValue is a leaf of the expression plan tree. It marks scalar or enum coercion.
  • SelectFields is used to mark retrieval of set of fields from object types.
  • ResolveCollection marks a node that is going to return a list of results.
  • ResolveAbstraction (not present on the example) is a map of object type ⇒ SelectFields set used for abstract types (unions and interfaces).

As we can see, information about query fragment has been erased (fragment's fields have been inlined), and every field contains a type information. What we have in a result is fully declarative representation of our query. What can we do from here?:

  • Standard execution path is trivial - we simply make a tree traversal and execute every field's resolve function on the way.
  • We can decide to use an interpreter to create another representation from it (i.e. LINQ/SQL query).
  • Since execution plan is build as a tree, we could "manually" add/remove nodes from it, or even split it in two, and then again interpret each part separately. Having a set operations (such as unions and intersections) over them is especially promising.
  • We could cache the execution plans, reducing number of operations necessary to perform when executing each query. This can be a big advantage in the future - especially when combined with publish/subscribe pattern.

While presence of intermediate representation doesn't solve all problems, I think it's a step in the right direction, as it allows us to compose our server logic in more declarative fashion without headaches about performance issues.