Create your own Akka.NET persistence plugin
Subject of this post is one of the Akka.NET plugins, Akka.Persistence. It’s major task is to give your actors an ability to store their internal state and recover it from the persistent storage after actor is created or underlying actor system crashes. While the rules driving the whole persistence mechanism deserves a separate article, for purposes of this post I’ll cover only topic of event journals and snapshot stores.
Introduction
If you’re already familiar with the idea of Eventsourcing, you probably know everything what’s needed by now. If not, here is a simplified model – statefull actors don’t store their state directly, but in form of the events. Event is a single immutable fact informing about a change made in time over actor’s state. Later, when any form of the recovery is needed, actor’s state is recreated by sending to it a series of persisted events, from which it can rebuild itself. Storage responsible for persisting those events is called Event Journal . In Akka.Persistence by default it works only in memory which does not make it a truly persistent one.
Since a possible stream of events to be replayed on actor may require a long time to full recovery, it’s internal state may be serialized at any point of time and stored separately. In this case by default actor will first try to get the latest intermediate state (a snapshot) and then recover only from those events, which have been stored from that point in time onward. Store used to persist those snapshots is called a Snapshot Store and in Akka.NET by default it uses a local file system as a persistence layer.
Akka.Persistence plugin is designed in modular fashion and allows you to create your own plugins and integrate them with any underlying persistence provider.
Journal
Just like most of the other entities in Akka, journals and snapshot stores are actors. To create your own journal, you could derive from either AsyncWriteJournal
or SyncWriteJournal
. Difference between these two is that first one returns a Task continuation from all of it’s operations (reads, writes and deletions – as said earlier journals don’t perform updates), while the second only from reads, expecting writes and deletions to be completed in synchronous manner. For this example I’m going to use sync journal implementation.
Lets start from the following state of our class:
public class MyJournal : SyncWriteJournal
{
private static readonly object Ack = new object();
private readonly ConcurrentDictionary<string, ISet<IPersistentRepresentation>> _eventStreams =
new ConcurrentDictionary<string, ISet<IPersistentRepresentation>>();
Task ReplayMessagesAsync(string pid, long from, long to, long max, Action<IPersistentRepresentation> replay) ...
Task<long> ReadHighestSequenceNrAsync(string pid, long from) ...
void WriteMessages(IEnumerable<IPersistentRepresentation> messages) ...
void DeleteMessagesTo(string pid, long to, bool isPermanent) ...
}
For this example all persistent actors events are stored in memory. Journal is able to recognize, which event stream refers to specific actor thanks to PersistenceId – an unique identifier for each PersistentActor
which may identify it’s state across many actor incarnations. Each event inside the stream has it’s own Sequence number – an unique number in scope of event stream. It’s always increasing monotonically for every separate persistent actor.
This is how WriteMessages
method may look like:
protected override void WriteMessages(IEnumerable<IPersistentRepresentation> messages)
{
foreach (var message in messages)
{
var list = _eventStreams.GetOrAdd(message.PersistenceId,
new SortedSet<IPersistentRepresentation>(PersistentComparer.Instance));
list.Add(message);
}
}
This is a straightforward operation of finding an event stream for each message persistence id (or creating a new one if it was not found) and appending a message to it. PersistentComparer
is simply a IComparer
used to order messages by their sequence number. IPersistentRepresentation
wraps user-defined event and contain all data necessary for the persistence plugin to perform it’s magic.
To be able to replay stored messages, now we must define a ReplayMessagesAsync
logic:
public override Task ReplayMessagesAsync(string pid, long from, long to,
long max, Action<IPersistentRepresentation> replay)
{
var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
var promise = new TaskCompletionSource<object>();
dispatcher.Schedule(() =>
{
try
{
Replay(pid, from, to, max, replay);
promise.SetResult(Ack);
}
catch (Exception e)
{
promise.SetException(e);
}
});
return promise.Task;
}
Because this method is already prepared to work in asynchronous manner – which is the most probable solution for third party providers integration nowadays – here we delegate this work through our system’s dispatcher. Because API of the underlying dispatcher may work in various ways (Akka is not bound to TPL), it’s Schedule
method doesn’t expose any task to be returned. Here I’ve introduced a little trick using TaskCompletionSource
to create similar behavior.
The Replay
method itself is pretty straightforward:
private void Replay(string pid, long from, long to,
long max, Action<IPersistentRepresentation> replay)
{
ISet<IPersistentRepresentation> eventStream;
if (_eventStreams.TryGetValue(pid, out eventStream))
{
var s = eventStream as IEnumerable<IPersistentRepresentation>;
if (from > 0) s = s.Where(p => p.SequenceNr >= from);
if (to < int.MaxValue) s = s.Where(p => p.SequenceNr <= to);
if (max < eventStream.Count) s = s.Take((int)max);
foreach (var persistent in s)
{
replay(persistent);
}
}
}
What’s worth to notice here is that, from/to sequence number range provided is expected to be inclusive from both sides.
Next in line is ReadHighestSequenceNrAsync
– it’s used to receive the highest sequence number for current event stream associated with a particular persistent actor. While in your solution you may find appropriate to use some async API or dispatcher task delegation described above, I’ve used a simple synchronous solution here:
public override Task<long> ReadHighestSequenceNrAsync(string pid, long from)
{
ISet<IPersistentRepresentation> eventStream;
if (_eventStreams.TryGetValue(pid, out eventStream))
{
var last = eventStream.LastOrDefault();
var seqNr = last == null ? 0L : last.SequenceNr;
return Task.FromResult(seqNr);
}
return Task.FromResult(0L);
}
The last method remaining is message deletion. While this is not a usual way of working i.e. in event sourcing model (it introduces a data loss), it’s still a viable option.
Akka.Persistance recognizes two types of deletion:
- Logical, when messages are marked as deleted (
IPersistentRepresentation.IsDeleted
flag) but still resides in event journal. - Physical, when messages are physically removed from the journal and cannot be restored.
protected override void DeleteMessagesTo(string pid, long to, bool isPermanent)
{
ISet<IPersistentRepresentation> eventStream;
if (_eventStreams.TryGetValue(pid, out eventStream))
{
foreach (var message in eventStream.ToArray())
{
if (message.SequenceNr <= to)
{
eventStream.Remove(message);
if(!isPermanent)
{
// copy message with IsDeleted flag set
var copy = message.Update(message.SequenceNr,
message.PersistenceId, isDeleted: true, sender: message.Sender);
eventStream.Add(copy);
}
}
// messages are already stored by their sequence number order
else break;
}
}
}
Test specification fulfillment
When your custom persistence plugin is ready, it’s good to check if it’s behavior is a valid one from the Akka.Persistence perspective. In order to do this, you may get an existing test suite in form of a Akka.Persistence.TestKit
available as a NuGet package – it’s a set of tests used to check if your custom persistence plugin fulfills expected journal (and snaphot store) behavior. It used Akka.NET TestKit along with xUnit in order to run.
The only thing needed is your own spec class inheriting from JournalSpec
with configuration set to use your plugin as a default one. The same configuration set may be used to inject your persistence provider to real working actor system.
public class MyJournalSpec : JournalSpec
{
private static readonly Config SpecConfig =
ConfigurationFactory.ParseString(@"
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.my""
my {
class = ""MyModule.MyJournal, MyModule""
plugin-dispatcher = ""akka.actor.default-dispatcher""
}
}
}");
public MyJournalSpec() : base(SpecConfig)
{
Initialize();
}
}
Snapshot store
Custom snapshot store implementation seems to be an easier task. Here I’m going to use again the store based on in-memory snapshots, which still is not a valid persistence layer, but works good for training purposes.
Actor state snapshots are identified by two entities: SnapshotMetadata
and Snapshot
itself. First covers the data necessary to correctly place snapshot instance in the lifetime of a specific persistent actor, while the second one is a wrapper over actor’s state itself.
For purposes of this in-memory plugin, I’ll wrap both of them with single object defined like this:
public class SnapshotBucket
{
public readonly SnapshotMetadata Metadata;
public readonly Snapshot Snapshot;
public SnapshotBucket(SnapshotMetadata metadata, Snapshot snapshot)
{
Metadata = metadata;
Snapshot = snapshot;
}
}
This way we may describe the snapshot store in the following manner:
public class MySnapshotStore : SnapshotStore
{
private static readonly object Ack = new object();
private readonly LinkedList<SnapshotMetadata> _saving = new LinkedList<SnapshotMetadata>();
private readonly List<SnapshotBucket> _buckets = new List<SnapshotBucket>();
Task<SelectedSnapshot> LoadAsync(string pid, SnapshotSelectionCriteria criteria) ...
Task SaveAsync(SnapshotMetadata metadata, object snapshot) ...
void Saved(SnapshotMetadata metadata) ...
void Delete(SnapshotMetadata metadata) ...
void Delete(string pid, SnapshotSelectionCriteria criteria) ...
}
To begin with, lets abstract dispatcher behavior described before. Remember, this is only a trick – nothing stands on your way to simply use Task.Run
or delegating work to child actors. In real life scenario you’d probably use built in async behavior of underlying provider.
private Task<T> Dispatch<T>(Func<T> fn)
{
var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
var promise = new TaskCompletionSource<T>();
dispatcher.Schedule(() =>
{
try
{
var result = fn();
promise.SetResult(result);
}
catch (Exception e)
{
promise.SetException(e);
}
});
return promise.Task;
}
Next, lets take a saving operation. In case of snapshot stores it’s divided into two separate methods:
- SaveAsync asynchronous operation of saving the snapshot, returning the Task continuation.
- Saved called only after message has been successfully saved.
They may be combined to i.e. introduce a log for actually saving, but not yet saved snapshots, as I’ve presented below.
protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
return Dispatch(() =>
{
var bucket = new SnapshotBucket(metadata, new Snapshot(snapshot));
_saving.AddLast(metadata);
_buckets.Add(bucket);
return Ack;
});
}
protected override void Saved(SnapshotMetadata metadata)
{
_saving.Remove(metadata);
}
When it comes to loading a snapshots, snapshot store is expected to return only one snapshot being the latest one from the provided range. Just like any other range in persistence plugin, this one is expected to have inclusive boundaries.
protected override Task<SelectedSnapshot> LoadAsync(string pid, SnapshotSelectionCriteria criteria)
{
return Dispatch(() =>
{
var limited = _buckets.Where(bucket => bucket.Metadata.PersistenceId == pid
&& bucket.Metadata.SequenceNr <= criteria.MaxSequenceNr
&& bucket.Metadata.Timestamp <= criteria.MaxTimeStamp);
var latest = limited.OrderBy(bucket => bucket.Metadata.Timestamp).Last();
return new SelectedSnapshot(latest.Metadata, latest.Snapshot.Data);
});
}
The last behavior is message deletion, represented in this case by two methods – one used for single snapshot and one able to operate on specific range.
protected override void Delete(SnapshotMetadata metadata)
{
var found = _buckets.SingleOrDefault(bucket => bucket.Metadata.Equals(metadata));
if (found != null)
{
_buckets.Remove(found);
}
}
protected override void Delete(string pid, SnapshotSelectionCriteria criteria)
{
_buckets.RemoveAll(bucket =>
bucket.Metadata.PersistenceId == pid
&& bucket.Metadata.SequenceNr <= criteria.MaxSequenceNr
&& bucket.Metadata.Timestamp <= criteria.MaxTimeStamp);
}
Test specification fulfillment
Just like in the case of journal spec, you may verify your actor behavior simply by creating custom spec. While you may see a minimal configuration string below, nothing stands on your way to extend it with your custom settings.
public class MySnapshotStoreSpec : SnapshotStoreSpec
{
private static readonly Config SpecConfig =
ConfigurationFactory.ParseString(@"
akka.persistence.snapshot-store {
plugin = ""akka.persistence.snapshot-store.my""
my {
class = ""TestPersistencePlugin.MySnapshotStore, TestPersistencePlugin""
plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher""
}
}
");
public MySnapshotStoreSpec() : base(SpecConfig)
{
Initialize();
}
}
Another difference from event journal spec is that fore most test cases snapshot store spec requires an existing collection of snapshots metadata to work. They are initialized in constructor in the code above.
Final notes
- Examples, I’ve presented, work on predefined constructs – they derive directly from
SnapshotStore
andSyncWriteJournal
. However it’s not necessary, as you may promote any actor type to work as snapshot store or event journal as long as they expose necessary behavior and are able to respond on required set of persistence messages. - While persistence stores specs may be used to verify your implementations, remember that they don’t ensure things such as data races etc.
- Remember that Akka.Persistence is still in Prerelease version. Therefore some breaking changes may occur in incoming versions.