eventsourcing

Akka.Persistence: most common misconceptions

Akka.Persistence is one of the plugins, that introduce an eventsourced persistence mechanics into Akka.NET. However, I've often seen people having problems, since neither actors nor eventsourcing are part of mainstream .NET development. A lot of these are often a general problems of eventsourced systems, therefore I've decided to write some more about them and present potential solutions from the perspective of Akka.Persistence user.

This post describes current state of things. They may change in the future, as Akka.NET team want to address some of them.

Control your own data

There are two ways, how Akka.NET event journals may store your data:

  1. Some of them may define totally custom serialization mechanics, i.e:
    • MongoDB persistence uses BSON serializer which requires you to define bindings to all serialized message types.
    • PostgreSQL plugin has configuration switch which will use JSON.NET to store your data as JSON/JSONB column (which is well understood by this database). In future, the rest of SQL databases will get that support as well.
  2. At the moment most of them however will use default akka serialization mechanics: the same that are used to pass messages between cluster nodes.

One of the first mistakes, everyone do, is giving up your data in the hands of the default serializer. First of all, you're giving up ability to encode/decode your data into 3rd party dependency (sic!). And what's most important, default serializer is build for remote message exchange, it may NOT be a good way to store immutable, everlasting data structures.

Even Newtonsoft.Json serializer (which was initially choosen as Akka.NET default serializer) is not a good option. If you were thinking "it's just a JSON, what could go wrong", the story is a little more complicated. Default configuration of it encodes a full type name as a part of the payload in order to support polymorphic deserialization. This means that once you'll change your event qualified type description (namespace, type name or assembly) - something you shouldn't do anyway - your old data is lost, as it'll try to deserialize it to type that can no longer be found.

So what to choose?

  1. Some people are using schema-less formats, as they allow for a certain degree of version tolerance without much work. This includes JSON, just not on the settings, that are used by Akka.NET remote serializer.
  2. Others (including me) are more enthusiasts of schema-based serializers i.e. Google Protocol Buffers or Microsoft Bond, as they keep event schema/contract explicit and well defined.

Why I've linked Google.Protobuf and not protobuf-net? I think that keeping schema in .proto files makes things more explicit and prevents accidental schema changes.

Configuring a custom serializer

Writing a custom serializer is rather easy. All you need is to write down a custom class inheriting from Serializer base class. But how to bind it to a specific event type?

Akka.NET serialization bindings are based on types. This means, that you need to specify an event-typeserializer pair. Type provided here doesn't have to be actually a concrete class - Akka will investigate inheritance chain and all implementations in order to resolve a binding. This means that usually we create an empty marker interface and implement it in all events, that we want to serialize with target serializer.

An example HOCON config may look like that:

akka.actor {  
  serializers.my-custom = "MyNamespace.MySerializer, MyAssembly"
  serialization-bindings {
    "MyNamespace.IDomainEvent, MyAssembly" = my-custom
  }
}

Have an event versioning strategy

As events are immutable by definition, we need to prepare some approach, how to behave if we want to be able to change them as we develop our system. This is what we call event versioning.

If you want to have a wider perspective on that problem, you probably should read Versioning in an Eventsourced System by Greg Young.

Most of the people see and understand a rationale behind having a migration strategy for their database. However, a lot less of them have an idea on how to approach a schema evolution in eventsourced systems.

Event adapters

One of the features of Akka.Persistence are event adapters. They are a middleware components that you can plug into a pipeline between event journal and a persistent actor, allowing you to modify deserialized events, before they reach the destination.

To create a custom event adapter, you need a class that implements either a IReadEventAdapter (to interpect events received from the journal), IWriteEventAdapter (to intercept events send to the journal) or just IEventAdapter which combines both.

Event adapters can be used to upgrade old event versions to newer ones or event to deconstruct single event into multiple ones. When is that useful? Example: if you made a mistake in the past, and defined an event that actually had more than a single responsibility (i.e. OrderedAndConfirmed instead of separate Ordered/Confirmed kind of events).

But how to bind an adapter to be used for some event in scope of the journal?

# we will use SqlServer journal for this example
akka.persistence.journal.sql-server {  
  event-adapters {
    custom-adapter = "MyNamespace.MyEventAdapter, MyAssembly"
  }
  event-adapter-bindings {
    "MyNamespace.V1.IDomainEvent, MyAssembly" = custom-adapter
  }
}

Don't use snapshots without events

This is a popular problem for the people starting their journey with Akka.Persistence. Quite often they see persistence snapshots and think: hey, I don't need eventsourcing, let me just use one of these. However a sole role of Akka.Persistence snapshots is to optimize streams of events.

If you're into snapshot-based persistence, Akka.Persistence is definitely not for you. Persistent actors are build to support different needs, just like journals and even snapshots themselves - as they have fields used to correlate their position in a stream of events. They also don't map well into multi-table/multi-document database hierarchies.

Another thing: when you're persisting events with PersistentActor a next command won't be handled until the event has been persisted successfully and its callback has been called. This is not the case in SaveSnapshot, which doesn't block actor from picking a next message before confirming, that snapshot has been saved.

This is not the problem in eventsourced systems - if system will die before saving the snapshot, we replay from the last successfully stored snapshot and an events, that came afterward - but without events you may process tens of messages without any confirmation and lost that data upon actor system shutdown.

Optimizing event journals

In this post I want to talk a little about some little optimization techniques, we've applied to some of the journals used in Akka.NET - those working with SQL backends. Before you read further - this post is written with assumption, that you're familiar with basics of eventsourcing architectures and a shape of them in Akka.NET. If you're not, you can read more about it here.

Old ways

The existing approach was quite simple and naive. Once a persistent actor had something to do with a persistent backend, it was sending a request to an EventJournal actor. In case of SQL, journal was creating a new DbConnection each time in order to execute every single request. This was not supposed to be something as bad as it sounds - ADO.NET handles connection pooling automatically.

However, reality resulted in some problems:

First one, a frequent creation and reopening of connection/command/transaction chain turned out to be not as cheap as we may wanted it to.

But more important, once the pressure kicks in, things starts to get nasty. Imagine tens of thousands of actors trying to persist their events at once. Eventually we'll saturate all of the connections from the pool - potentially even starving other parts of the application if our actor system is embedded into some bigger application (see: noisy neighbor problem). If this situation will continue, after some more time, timeouts will start to kick in, causing persist requests to throw and bubbling up errors to higher layers of application. The definitions of slow, unresponsive and dead database requests gets blurry at this point.

Is there anything we can do about it?

A new approach

Because nowadays, in order to present any new technique, we must give it some cheezy name, I've decided to describe this one as horizontal batching.

Initially we create a buffer for incoming requests and a a counter for a free connections remaining to be used.

In a happy case, when we are able to persist incoming events as they come, this new batching journal implementation works very similar to the old one, decrementing a connection counter. Once a connection gets released, we increment the counter back.

However when requests start to pile in, we may run out of free connections (counter go down to 0). In this case we start buffering them. Now, when a connection is released and can be reused, we take a page of requests from buffer and execute them within a single connection/command. There are few things we can configure here:

  • One is a maximum number of allowed operations to be executed concurrently (initial counter value). This way we may decide to give only a subset of our connection pool under supervision of the event journal.
  • Another one is maximum size of a single batch. As buffered requests can potentially grow big, we might decide to use only subset of them to be executed within a single connection.
  • The last important thing here is actually a maximum amount of stacked messages. In case when requests will come faster than they can be processed (even in batches), we could eventually run out of memory to even store them. With this setting, once a specified threshold is surpassed, an OnBufferOverflow method will be called. By default it will reject a request. However it can also be overridden to apply custom backpressure strategy.

How fast does it work? So far we managed to process over 12 000 command→events per second (we're talking about full eventsourced actor protocol roundtrips here) using both application and SQL Server database on default settings on a standard dev laptop (8 logical cores, 16GB RAM).

There are some more improvements still waiting on their turn. While new journal implementation is still experimental and needs some more use, it has been designed to work as drop-in replacement over existing one. That means it soon should came as a standard to existing SQL implementations.