akka.net

Akka.Persistence: most common misconceptions

Akka.Persistence is one of the plugins, that introduce an eventsourced persistence mechanics into Akka.NET. However, I've often seen people having problems, since neither actors nor eventsourcing are part of mainstream .NET development. A lot of these are often a general problems of eventsourced systems, therefore I've decided to write some more about them and present potential solutions from the perspective of Akka.Persistence user.

This post describes current state of things. They may change in the future, as Akka.NET team want to address some of them.

Control your own data

There are two ways, how Akka.NET event journals may store your data:

  1. Some of them may define totally custom serialization mechanics, i.e:
    • MongoDB persistence uses BSON serializer which requires you to define bindings to all serialized message types.
    • PostgreSQL plugin has configuration switch which will use JSON.NET to store your data as JSON/JSONB column (which is well understood by this database). In future, the rest of SQL databases will get that support as well.
  2. At the moment most of them however will use default akka serialization mechanics: the same that are used to pass messages between cluster nodes.

One of the first mistakes, everyone do, is giving up your data in the hands of the default serializer. First of all, you're giving up ability to encode/decode your data into 3rd party dependency (sic!). And what's most important, default serializer is build for remote message exchange, it may NOT be a good way to store immutable, everlasting data structures.

Even Newtonsoft.Json serializer (which was initially choosen as Akka.NET default serializer) is not a good option. If you were thinking "it's just a JSON, what could go wrong", the story is a little more complicated. Default configuration of it encodes a full type name as a part of the payload in order to support polymorphic deserialization. This means that once you'll change your event qualified type description (namespace, type name or assembly) - something you shouldn't do anyway - your old data is lost, as it'll try to deserialize it to type that can no longer be found.

So what to choose?

  1. Some people are using schema-less formats, as they allow for a certain degree of version tolerance without much work. This includes JSON, just not on the settings, that are used by Akka.NET remote serializer.
  2. Others (including me) are more enthusiasts of schema-based serializers i.e. Google Protocol Buffers or Microsoft Bond, as they keep event schema/contract explicit and well defined.

Why I've linked Google.Protobuf and not protobuf-net? I think that keeping schema in .proto files makes things more explicit and prevents accidental schema changes.

Configuring a custom serializer

Writing a custom serializer is rather easy. All you need is to write down a custom class inheriting from Serializer base class. But how to bind it to a specific event type?

Akka.NET serialization bindings are based on types. This means, that you need to specify an event-typeserializer pair. Type provided here doesn't have to be actually a concrete class - Akka will investigate inheritance chain and all implementations in order to resolve a binding. This means that usually we create an empty marker interface and implement it in all events, that we want to serialize with target serializer.

An example HOCON config may look like that:

akka.actor {  
  serializers.my-custom = "MyNamespace.MySerializer, MyAssembly"
  serialization-bindings {
    "MyNamespace.IDomainEvent, MyAssembly" = my-custom
  }
}

Have an event versioning strategy

As events are immutable by definition, we need to prepare some approach, how to behave if we want to be able to change them as we develop our system. This is what we call event versioning.

If you want to have a wider perspective on that problem, you probably should read Versioning in an Eventsourced System by Greg Young.

Most of the people see and understand a rationale behind having a migration strategy for their database. However, a lot less of them have an idea on how to approach a schema evolution in eventsourced systems.

Event adapters

One of the features of Akka.Persistence are event adapters. They are a middleware components that you can plug into a pipeline between event journal and a persistent actor, allowing you to modify deserialized events, before they reach the destination.

To create a custom event adapter, you need a class that implements either a IReadEventAdapter (to interpect events received from the journal), IWriteEventAdapter (to intercept events send to the journal) or just IEventAdapter which combines both.

Event adapters can be used to upgrade old event versions to newer ones or event to deconstruct single event into multiple ones. When is that useful? Example: if you made a mistake in the past, and defined an event that actually had more than a single responsibility (i.e. OrderedAndConfirmed instead of separate Ordered/Confirmed kind of events).

But how to bind an adapter to be used for some event in scope of the journal?

# we will use SqlServer journal for this example
akka.persistence.journal.sql-server {  
  event-adapters {
    custom-adapter = "MyNamespace.MyEventAdapter, MyAssembly"
  }
  event-adapter-bindings {
    "MyNamespace.V1.IDomainEvent, MyAssembly" = custom-adapter
  }
}

Don't use snapshots without events

This is a popular problem for the people starting their journey with Akka.Persistence. Quite often they see persistence snapshots and think: hey, I don't need eventsourcing, let me just use one of these. However a sole role of Akka.Persistence snapshots is to optimize streams of events.

If you're into snapshot-based persistence, Akka.Persistence is definitely not for you. Persistent actors are build to support different needs, just like journals and even snapshots themselves - as they have fields used to correlate their position in a stream of events. They also don't map well into multi-table/multi-document database hierarchies.

Another thing: when you're persisting events with PersistentActor a next command won't be handled until the event has been persisted successfully and its callback has been called. This is not the case in SaveSnapshot, which doesn't block actor from picking a next message before confirming, that snapshot has been saved.

This is not the problem in eventsourced systems - if system will die before saving the snapshot, we replay from the last successfully stored snapshot and an events, that came afterward - but without events you may process tens of messages without any confirmation and lost that data upon actor system shutdown.

Optimizing event journals

In this post I want to talk a little about some little optimization techniques, we've applied to some of the journals used in Akka.NET - those working with SQL backends. Before you read further - this post is written with assumption, that you're familiar with basics of eventsourcing architectures and a shape of them in Akka.NET. If you're not, you can read more about it here.

Old ways

The existing approach was quite simple and naive. Once a persistent actor had something to do with a persistent backend, it was sending a request to an EventJournal actor. In case of SQL, journal was creating a new DbConnection each time in order to execute every single request. This was not supposed to be something as bad as it sounds - ADO.NET handles connection pooling automatically.

However, reality resulted in some problems:

First one, a frequent creation and reopening of connection/command/transaction chain turned out to be not as cheap as we may wanted it to.

But more important, once the pressure kicks in, things starts to get nasty. Imagine tens of thousands of actors trying to persist their events at once. Eventually we'll saturate all of the connections from the pool - potentially even starving other parts of the application if our actor system is embedded into some bigger application (see: noisy neighbor problem). If this situation will continue, after some more time, timeouts will start to kick in, causing persist requests to throw and bubbling up errors to higher layers of application. The definitions of slow, unresponsive and dead database requests gets blurry at this point.

Is there anything we can do about it?

A new approach

Because nowadays, in order to present any new technique, we must give it some cheezy name, I've decided to describe this one as horizontal batching.

Initially we create a buffer for incoming requests and a a counter for a free connections remaining to be used.

In a happy case, when we are able to persist incoming events as they come, this new batching journal implementation works very similar to the old one, decrementing a connection counter. Once a connection gets released, we increment the counter back.

However when requests start to pile in, we may run out of free connections (counter go down to 0). In this case we start buffering them. Now, when a connection is released and can be reused, we take a page of requests from buffer and execute them within a single connection/command. There are few things we can configure here:

  • One is a maximum number of allowed operations to be executed concurrently (initial counter value). This way we may decide to give only a subset of our connection pool under supervision of the event journal.
  • Another one is maximum size of a single batch. As buffered requests can potentially grow big, we might decide to use only subset of them to be executed within a single connection.
  • The last important thing here is actually a maximum amount of stacked messages. In case when requests will come faster than they can be processed (even in batches), we could eventually run out of memory to even store them. With this setting, once a specified threshold is surpassed, an OnBufferOverflow method will be called. By default it will reject a request. However it can also be overridden to apply custom backpressure strategy.

How fast does it work? So far we managed to process over 12 000 command→events per second (we're talking about full eventsourced actor protocol roundtrips here) using both application and SQL Server database on default settings on a standard dev laptop (8 logical cores, 16GB RAM).

There are some more improvements still waiting on their turn. While new journal implementation is still experimental and needs some more use, it has been designed to work as drop-in replacement over existing one. That means it soon should came as a standard to existing SQL implementations.

Don't Ask, Tell

Today I wanted to focus on some of the popular anti-patterns, many Akka.NET newcomers tend to use. I'm talking about using Ask method for communicating between actors. I want to get a closer look into what Ask really does underneath and why it's not a good choice for communicating between actors... for most of the time.

What is Ask and how does it work?

Ask is one of the extension methods for IActorRef interface, allowing to communicate between actors. Unlike Tell - which simply sends a message with fire and forget semantics - Ask is request/response based communication pattern. It sends a message to another actor, expecting it to respond with another message, and returning a task asynchronously notifying when the response will come back.

To understand, why Ask is usually a bad pattern for inter-actors communication, you need to know how does it work first.

As you may remember, actor communication is message-based. Additionally in Akka to buffer possible message bursts, each actor have its own mailbox. Therefore sender of the message always append it at the end of the recipient's mailbox, while the recipient is processing messages synchronously taking them off the mailbox, one by one. What's also important here, it won't take any new messages until the current one has been processed.

While for most of the time this is good approach (and can be performing better than i.e. SEDA-based solutions), it's kinda problematic for request/response pattern. Why? Since we need to correlate request with the response, after sending the request, we need to either:

  1. Scan the entire mailbox queue, looking for the response. The longer the mailbox (so the higher the congestion), the longer it will take to find a response. Also in this case mailbox must be able to pick a message from the middle of a queue, which would be not without limitations on the mailbox implementation itself.
  2. Create a temporary listener, which is able to receive the response. Moreover since communication may be proceed over the wire, a listener must encapsulate location transparency feature of actors.

Akka follows the second approach. This means that each Ask call is actually allocating a special kind of lightweight listener-actor. Such design makes an Ask a lot heavier - and we're talking about dozens times heavier - operation than using simple Tell. Good part is that thanks to such choice Tell still can be fast as hell.

Continuous blocking

As emphasized earlier, one of the actor's attributes is synchronous message processing. It's a great feature, since we no longer need to worry about things like race conditions, locks or blocking... except that request/response is inherently blocking communication pattern. To be clear, I'm not talking here about blocking the thread, as we're able to define async receive methods for actors. What I'm talking about, is blocking an actor itself, which cannot pick any new messages until the response arrives and processing finishes.

Imagine following example:

class MyActor: ReceiveActor  
{
    public MyActor(IActorRef other)
    {
        Recive<Request>(async request => {
            // this is really BAD practice
            var reply = await other.Ask<Reply>(request, TimeSpan.FromSeconds(30));
            Sender.Tell(reply);
        });
    }
}

In normal conditions, a single Akka.NET actor (v1.0.7) is able to receive up to 3.3mln messages/sec. Ok, even given bad characteristic of Ask, this would be still a fairly high amount (hundreds of thousands msg/sec under local affinity). But what if asked actor will never receive the message (suppose, it has been lost, or a receiver died for some reason)? Well, it will become idle until the timeout will trigger. In case above this mean around 30 sec of unresponsiveness... for anyone sending a message to that actor, not only the asker alone. This could be changed by the reentrancy of actors... which again would reintroduce race conditions in parallel access to actor's state, which is what actors were build to solve on the first place :)

I know that at this point some of you may say "But I really need this response to proceed with my work". Well, as practice shows... most of the time you don't ;) Most of the programmers get used to RPC, because it's inherent part of HTTP protocol. However in message-based systems, it's only one of the many communication patterns. Use your senses, take a look at the world around you, and you will see that most of the information exchange in the real life are not based on request/response. Believe me, once you change your mindset, you'll start to see things in a bigger picture.

NOTE: Another flaw of this particular part of code is that in case of failure on the asked actor's side, this failure is propagated back to requester - this may ultimately cause a cascading failure chain. While crashing an actor usually is not a big deal, don't think of it as a resource-free operation.

If not Ask, then what?

I wanted to describe few cases, where we usually think in terms of RPC, and how to change them into more message-embracing solutions. Take into account, that each case is different and there is no silver bullet here.

Push based events

Lets think about following scenario:

We have a multiplayer game, represented as actor system. We have a single actor representing state of the world, and multiple client-actors - each one working locally on part of that state. Before client proceeds with some action, it must ensure, it has an actual state of the world. Therefore it asks an world-actor for a current state at the beginning of every operation.

While this example is settled in multiplayer game scenario, it's shares many traits with standard business applications (world state ⇒ database). So how can we do that without using request/response model?

Now, change your mindset a little - instead of thinking about asking for current state, let allow the world-actor to notify every interested client with the state change, once it occurs. As result we have living actor system.

Need response? Use Forward + Tell back

Sometimes we may want to send a result of a computation back to the caller. What if that computation is a request chain between multiple different actors.

Imagine that we have 4 actors A, B, C and D. In our scenario A is the requester, that needs to be replied. It sends a message to B, which makes some operations, then asks C to make some more operations, which then also ask D for the same. Once the D finishes, the reply is passed down the call chain back to A. Given Ask as message pattern, we can model that scenario using following image:

Ask pipeline

Now we ended up with 3 temporal allocations (remember that Ask allocates), 6 message passes, and blocking 2 actors (B and C) until the whole request is completed... even thou they don't need participate in it after doing their part.

How can we change that? Use the following scenario:

Forward pipeline

As you may see, we replaced most of the Asks with Forward, but what it actually does? Forward is a special version of Tell, which propagates original message sender. Therefore A will be recognized as a message sender, as long as actors are passing the message using Forward. Thanks to that, D can send a reply directly to A, as other actors are not necessary anymore.

Moreover, because actors B and C don't need to wait for the reply, they are free to proceed other messages in the meantime.

Aggregator pattern

Other case you may think of, is a scatter/gather scenario. We have some group of actors to be requested, then we want to gather all replies and send the aggregated result back.

I've seen people trying to solve that case with combination of multiple Ask requests + Task.WhenAll. However because of weight of Ask (and in more extreme cases weight of Tasks themselves), this is not a performance-wise operation. So what should we do?

We can create a dedicated Aggregator actor, responsible for collecting all results from the requested actors and sending them back to the requester. There are many variants, taking many cases into account, however I think, that one of the most common ones is to:

  • Init an aggregator actor with collection of actors to ask. It'll be responsible for gathering responses for a particular request.
  • Propagate request to all actors through an aggregator.
  • When a reply arrives, remove the sender from the list of actors remaining to answer.
  • Apply receive timeout for safety (so we don't wait for response infinitely).
  • Once all actors have responded or timeout occurred, send aggregated replies back to the original sender and stop aggregator actor.

Following scenario can be represented just like in the code below and be abstracted to generic solution:

class Aggregator<T> : ReceiveActor  
{
    private IActorRef originalSender;
    private ISet<IActorRef> refs;

    public Aggregator(ISet<IActorRef> refs)
    {
        this.refs = refs;
        // this operation will finish after 30 sec of inactivity
        // (when no new message arrived)
        Context.SetReceiveTimeout(TimeSpan.FromSeconds(30));
        ReceiveAny(x =>
        {
            originalSender = Sender;
            foreach (var aref in refs) aref.Tell(x);
            Become(Aggregating);
        });
    }

    private void Aggregating()
    {
        var replies = new List<T>();
        // when timeout occurred, we reply with what we've got so far
        Receive<ReceiveTimeout>(_ => ReplyAndStop(replies));
        Receive<T>(x =>
        {
            if (refs.Remove(Sender)) replies.Add(x);
            if (refs.Count == 0) ReplyAndStop(replies);
        });
    }

    private void ReplyAndStop(List<T> replies)
    {
        originalSender.Tell(new AggregatedReply<T>(replies));
        Context.Stop(Self);
    }
}

When to use Ask?

After spending so much time writing, how bad usage of Ask is, the time has come to answer the question: so when I should use Ask?

  • First of all, Ask is great to maintain communication with external systems willing to communicate with Akka.NET. This is natural trend, as quite often such systems are in fact RESTfull web services.
  • Other case I can think about, is specific version of scatter/gather scenario, when number of potentially participating actors may vary depending on message parameters and cannot be cached. In that case it's hard to set stop condition for aggregator pattern upfront, so fallback to listener based option seems to be justified.

Akka.NET underestimated features - Akka.IO

Today I want to talk about one of the Akka.NET features, I think deserves a lot more attention - Akka.IO. What it gives you, is the ability to connect your actors directly to OS socket layer. This way you could handle any TCP/UDP-based protocol directly in your actor model. Just imagine the possibilities :)

But lets start from the basics. What would you say for simple telnet listener? We'll create a simple TCP server, which will listen for incoming connections and print input on the console.

To do so, we'll need two types of actors:

  1. Listener which will bind itself to the given endpoint and accept incoming connections.
  2. Connection handler, which will be delegated to serve particular client, once it connects to our server.

Nice thing here is, that whole TCP model uses the same ordinary actors, you can create by yourself. The whole process is pretty simple:

  • Create your TCP listener actor.
  • Bind it to some endpoint.
  • Handle connection event by creating connection handler actor and registering it for an incoming connection.
  • Inside connection handler, handle all received data, print it or make it stop, once connection gets closed.

First part is pretty simple. To bind actor to specific endpoint you can use so called TCP manager, which is part of Akka.NET extension. It's just an actor and you can get it by calling Context.System.Tcp() method. Then just send it a new Tcp.Bind(Self, endpoint) message if you want to register current actor as a listener for defined endpoint. Everything is message-based.

Second part is making our listener aware of incoming connections. They come in the form of Tcp.Connected messages. Once you receive them, Sender value associated with message is instance of actor reference being an abstraction over provided connection. What's great here is that it honors all attributes of actors, including location transparency. This means, we can handle incoming connections by actors materialized on different machines out of the box!

In this case we'll simply create a new actor and register it for this connection by sending new Tcp.Register(connectionHandler) message. To make this all clear and visible, you can see the whole actor definition below:

public class TelnetListener : ReceiveActor  
{
    public TelnetListener(EndPoint endpoint)
    {
        Context.System.Tcp().Tell(new Tcp.Bind(Self, endpoint));
        Receive<Tcp.Connected>(connected =>
        {
            var connection = Sender;
            var connectionHandler = Context.ActorOf(Props.Create(() => new TelnetHandler(connection)));
            connection.Tell(new Tcp.Register(connectionHandler));
        });
    }
}

Once you've got this ready, it's time for our connection handler actor. As we gonna provide it a reference to our connection - which as you may remember, is also an actor - we also want to link it's life time directly to the connection itself. In this context, having actor working on dead connection is unwanted. Instead we gonna watch for the connection using Context.Watch(connection) and stop current actor once it'll receive either Terminated or Tcp.ConnectionClosed message.

Then all that is left, is to handle incoming bytes from socket, transform them to string and display on the console. They come in form of Tcp.Received messages. The whole definition can be seen below:

public class TelnetHandler : ReceiveActor  
{
    public TelnetHandler(IActorRef connection)
    {
        Context.Watch(connection);
        Receive<Tcp.Received>(received =>
        {
            var text = Encoding.UTF8.GetString(received.Data.ToArray()).Trim();
            Console.Write(text);
        });
        Receive<Tcp.ConnectionClosed>(closed => Context.Stop(Self));
        Receive<Terminated>(terminated => Context.Stop(Self));
    }
}

What I think, is worth mentioning here is Data property of Tcp.Received message. It's not an usual byte array. It's instance of ByteString type instead. One of the problems with byte arrays, is that they may be heavily used in socket communication, causing a lot of allocations and deallocations, increasing pressure on .NET garbage collector. To avoid this we can use ByteStrings, which are fragments of preallocated buffer. Once they are no longer necessary, they are send back to buffer to be reused later. This way we can avoid unnecessary garbage collection cycles.

To test whole example, simply initialize TelnetListener actor under any free port, and connect to it using telnet <ip> <port> from your terminal. After that you should be able to see any text, you're writing in command line on the application console.

How Akka.NET persistence works?

In this post I'll explain, how the events sourcing concepts has been used by Akka.NET persistence plugin to create statefull actors being persisted and work in reliable way. My major goal is to give you an introduction over basic Akka.Persistence primitives and to describe how state recovering works for persistent actors.

State persistence and recovery

The most notable novelty, the Akka.Persistence offers, is a new actor type, you could possibly inherit from - a PersistentActor. The major difference when compared to standard Akka's actors is that it's state can be persisted into durable data store, and later recovered safely in case of actor restart. There is a variety of persistent backends already supported, including:

After persisting a state, it may be recovered at any time, once an actor gets recreated.

Persisting actor's state

There are two ways of storing a state. Major (required) is event sourcing - saving the list of all state changes as soon, as they occur. The second one is state snapshotting - serializing the whole state of an actor into a single object and store it. What is very important, snapshotting is only an optimization technique, not a reliable way of achieving persistence within in-memory systems. Reason for that is, that while you may ensure to store an actor state in case on it's failure, you're not able to ensure state persistence is case of external problems (infrastructure crashes, outages etc.).

Basic persist mechanism consists of several steps:

  1. After receiving a command, actor tries to persist an event, with a callback to be called, once persistence has been confirmed. In order to do so actor should invoke Persist(event, callback) method. There is also another version called PersistAsync, which may take advantage of event batching - useful, when a frequent state updates are expected.
  2. Event is send to dedicated actor - event journal - which stores it in the persistent backend.
  3. If message has been persisted successfully, journal sends a confirmation message back to actor. It updates it's state using previously defined callback. All state updates, that should be persisted, must be defined only inside persist callback. If you'll try to alter persistent state outside of the callback, you're risking potential state corruption.
  4. In case, when journal encountered a problem - usually due to problems or lack of response from backend provider - a failure message will be send. In that case, actor will restart itself. You may consider to use Backoff Supervision pattern in order to apply exponential backoff mechanism to reduce recovering footprint in case of cascading failures.

In case of snapshotting, there is no need for callbacks - whole mechanism is fully asynchronous. Use SaveSnapshot method for this purpose.

You can see whole process in the animation below:

Akka persistence writing

  • C - message (command) to be handled by persistent actor, which may result in producing a zero or more events.
  • E - sequenced events used to trigger actor state change.
  • S - persistent actor state, that may be snapshotted. Remember: snapshotting is optional here.

State recovering

Actor state recovery logic is encapsulated in the separate method (called OnRecovery or ReceiveRecovery). What is important here, this method should always produce a deterministic behavior - during recovery actor should neither try to persist any events nor communicate with other actors.

Here's how it works:

  1. Actor asks journal for the latest known snapshot of it's state. State is returned inside SnapshotOffer message. If no snapshot has been found, offer will contain null. Warning: don't rely on snapshot versioning, as recovered snapshot may have earlier version, than the one you've send before recovering - remember, what I said about asynchronous nature of this operation?
  2. Second step is to replay all events since the received snapshot creation date. If no snapshot was received, the whole event stream will be replayed. Event messages are received in the same order, as they were produced.
  3. Once actor fully recovered, it's invoking OnReplaySuccess method. If any exception has occurred, OnReplayFailure will be invoked instead. Override them, if you need a custom action once persistent actor is ready.

What I think is worth notice, is that persist/recovery mechanism is not aware of current actor behavior. Therefore, if you're having state machines (i.e. by using Become) in your actor, you're in charge of returning to the correct step after recovery.

One of the tradeoffs of persistent actors is that, they lay heavily on CP (Consistency) site of CAP theorem ;) . What does it actually mean?

In terms of command side evaluation, each persistent actor is guaranteed to maintain consistent memory model - use Defer method when you need to execute some logic, after actor has updated it's state from all stored events. Events are stored in linear fashion - each one of them is uniquely identified by PersistenceId/SequenceNr pair. PersistenceId is used to define stream inside event log related to a particular persistent actor, while SequenceNr is a monotonically increasing number used for ordering inside an event stream.

The negative side of that approach is that persistent actor's event stream cannot be globaly replicated, as this may result in inconsistent stream on updates. For the same reason your persistent actors should be singletons - there should never be more than one instance of an actor using the same PersistentId at the time.

Persistent Views

Sometimes you may decide, that in-memory model stored by persistent actor is not exactly what you want, and you want to display data associated with it in different schema. In this case Akka.Persistence gives you an ability to create so called Persistent Views. Major difference is that while persistent actor may write events directly to an event journal, views work in read only mode.

You may associate a persistent view with particular actor by joining them using PersistenceId property. Additionally since one persistent actor may have many views associated with it (each showing the same stream of events related to particular entity from a different perspective), each view has it's own unique persistent identifier called ViewId. It's used for view snapshotting - because views can have possibly different states (from the same stream of events), their snapshots must be unique for each particular view. Also because views don't alter an event stream, you can have multiple instances of each view actor.

How persistent views update their state, when new event is emitted? Akka.Persistence provides two methods by default:

  1. You may explicitly send Update messages through a view's ref to provoke it to read events from the journal since last update.
  2. All views used scheduled interval signals (ScheduledUpdate messages) to automatically update their internal state when signal is received. You may configure these by setting akka.persistence.view.auto-update (on by default) and akka.persistence.view.auto-update-interval (5sec by default) keys in your actor system HOCON configuration.

You probably already noticed, that views are eventually consistent with the state produced by their persistent actor. That means, they won't reflect the changes right away, as the update delay may occur.

NOTE: This part of Akka.Persistence plugin is the most likely to change in the future, once reactive streams will be fully implemented in Akka.NET.

AtLeastOnceDelivery

The last part of the Akka.Persistence is AtLeastOnceDeliveryActor. What's so special about it? By default Akka's messaging uses at-most-once delivery semantics. That means, each message is send with best effort... but it also means that it's possible for a message to never reach it's recipient.

In short at least once delivery means that message sending will be retried until confirmed or provided retry limit has been reached. In order to send a message using at-least-once-delivery semantics, you must post it with Deliver method instead of Tell. This way messages will be resend if not confirmed in specified timeout. You can adjust timeout by specifying akka.persistence.at-least-once-delivery.redeliver-interval in your HOCON configuration - by default it's 5 seconds. It's up to message sender to decide, whether to confirm message or not. If so, it may confirm a message delivery using ConfirmDelivery method.

What if our actor is trying to deliver a message, but no acknowledgement is being received? In that case we have 2 thresholds:

  • At some point your actor will see, that messages has not been confirmed for a particular number of times. If that will occur it will receive a UnconfirmedWarning message with list of unconfirmed messages. At that point you may decide how to handle that behavior. You may specify how many retries are allowed before warning to be received by setting an akka.persistence.at-leas-once-delivery.warn-after-number-of-unconfirmed-attempts key in the HOCON config (it's 5 by default).
  • Since each unconfirmed message is being stored inside current actor, in order to prevent burning down all of the program's memory, after reaching specified unconfirmed messages limit it will actively refuse to Deliver any more messages by throwing MaxUnconfirmedMessagesExceededException. You can set the possible threshold using akka.persistence.at-least-once-delivery.max-unconfirmed-messages key (it's 100 000 by default). This limit is set per each actor using at-least-once-delivery semantics.

All unconfirmed messages are stored on the internal list. If you want, you may snapshot them using GetDeliverySnapshot and set back on actor recovery using SetDeliverySnapshot methods. Remember that this methods are meant to be used for snapshotting only and won't guarantee true persistence without event sourcing for the reasons described above in this post.

Be cautious when making decisions when to use this semantics in your code. First it's not good for performance reasons. Second it would require from you recipient to have idempotent behavior, since the same message may be possibly received more than once.

Journals and Snapshot Stores

Last part of the Akka.Persistence are journals and snapshot stores, that make all persistence possible. Both of them are essentially actors working as proxy around underlying persistence mechanism. You can decide what direct implementation of the underlying backend to choose. You can also choose to create your own. If you decide to do so see my previous post to get some more details.

In order to set a default persistence backend, you must set akka.persistence.journal.plugin or akka.persistence.snapshot-store.plugin to point at specific HOCON configuration paths. Each specific plugin should describe it's configuration path, however it may also require some custom configuration to be applied first.

It's possible for different actor types to use different persistence providers. It's possible with persistent actor's JournalPluginId and SnapshotPluginId properties, which should return HOCON configuration paths to a particular journal/snapshot.