akka.net

Akka.NET application logging in your browser

In this post I want to share with fairly simple trick - binding Akka.NET logging bus directly to browser console output. While this is mostly form of exercise, you may find it useful when you're developing system working with Akka and web sockets.

Lets start with creating our custom logger actor. Here's a starter code.

public class BrowserLogger : ReceiveActor  
{
    public BrowserLogger()
    {
        Receive<Debug>(e => Log(LogLevel.Debug, e.ToString()));
        Receive<Info>(e => Log(LogLevel.Info, e.ToString()));
        Receive<Warning>(e => Log(LogLevel.Warning, e.ToString()));
        Receive<Error>(e => Log(LogLevel.Error, e.ToString()));
        Receive<InitializeLogger>(_ => Sender.Tell(new LoggerInitialized()));
    }
}

BTW: Congratulations! If you had done that, you've actually created a fully functional logger library :)

Unfortunately, devil is in the details. Now we need to integrate our logging actor with SignalR hubs. For this, lets create a dedicated Hub class, which we could use to forward log messages back to the client.

public class LoggerHub : Hub  
{
    public override Task OnConnected()
    {
        Bootstrap.Register(this);
        return base.OnConnected();
    }

    public void Log(LogLevel level, string message)
    {
        Clients.All.log((int)level, message);
    }

    public override Task OnDisconnected(bool stopCalled)
    {
        Bootstrap.Unregister(this);
        return base.OnDisconnected(stopCalled);
    }
}

Don't worry, if you don't know, what Bootstrap class is all about. We're gonna cover it later.

Now, once our SingalR hub is ready, it's time to enrich our logger actor to use it for forwarding messages back to client. The idea is to simply have a list of hubs to be informed, and provide additional register/unregister message handlers from there.

// message used to register a hub
public class RegisterHub  
{
    public readonly LoggerHub Hub;
    public RegisterHub(LoggerHub hub)
    {
        Hub = hub;
    }
}

// message used to unregister a hub
public sealed class UnregisterHub  
{
    public readonly LoggerHub Hub;
    public UnregisterHub(LoggerHub hub)
    {
        Hub = hub;
    }
}

public class BrowserLogger : ReceiveActor  
{
    private readonly ISet<LoggerHub> _hubs = new HashSet<LoggerHub>();

    public BrowserLogger()
    {
        Receive<RegisterHub>(register => _hubs.Add(register.Hub));
        Receive<UnregisterHub>(unregister => _hubs.Remove(unregister.Hub));

        ...
    }

    private void Log(LogLevel level, string message)
    {
        foreach (var hub in _hubs) hub.Log(level, message);
    }
}

Note: logging may be a risky operation, since invoking hub method may result with throwing an exception and reseting an actor. IMO in that case, the best solution is to catch an exception, wrap it with our own custom Exception to be ignored by current logger (using in example following statement: Receive<Error>(e => !(e.Cause is BrowserLoggingException), e => Log(LogLevel.Error, e.ToString()));) and propagate it using standard akka logging mechanism to other loggers in our actor system (you don't want to loose these exceptions, right?).

In one of the previous code snippets you might notice, that we're using Bootstrap.Register and Bootstrap.Unregister methods, which hasn't been defined yet. For this we create a static class used as access point to actor system and it's functionalities.

public static class Bootstrap  
{
    private static ActorSystem _system;
    private static ActorSelection _logger;

    public static ActorSystem System { get { return _system; } }

    // call this method in your Startup.cs or Global.asax when application starts
    public static void Init()
    {
        _system = ActorSystem.Create("logging-system", 
            ConfigurationFactory.ParseString(@"akka.loggers = [""MyNamespace.BrowserLogger, MyAssembly""]"));
        _logger = _system.ActorSelection("/system/log*");
    }

    public static void Register(LoggerHub hub)
    {
        _logger.Tell(new RegisterHub(hub));
    }

    public static void Unregister(LoggerHub hub)
    {
        _logger.Tell(new UnregisterHub(hub));
    }
}

I think, that one thing worth notice is the way, we refer to logger actor in code above. Loggers are created internally by actor system on it's start and live on their own rights. Therefore it's hard to get any reference to them. /system/log* path may be confusing at the beginning, here's why:

  • Logger actor is part of the actor system internals (Akka.NET not only exposes actors to external users, but also uses them to manage itself). For this reason unlike usual /user prefix, here we have /system actor guardian used for internal actors.
  • Loggers names are generated and they usually take a form of log-<inc>-<logger_type_name>, where inc is auto-incremented number. Therefore it's hard to predict, what exact name of our logger will be. To omit this issue we use wildchart operator to access all actors with name prefixed with log (basically all loggers). It's not a problem, since only our BrowserLogger is able to respond on Register/Unregister messages.

The final step is to register our hub on the browser side. Assuming that you already have SignalR configured in your application, this is pretty straightforward code:

 var logLevel = {
     DEBUG: 0,
     INFO: 1,
     WARNING: 2,
     ERROR: 3
 };
 var log = $.connection.loggerHub;
 log.client.log = function (level, message) {
     switch (level) {
         case logLevel.DEBUG: console.debug(message); break;
         case logLevel.INFO: console.info(message); break;
         case logLevel.WARNING: console.warn(message); break;
         case logLevel.ERROR: console.error(message); break;
     }
 };

 $.connection.hub.start().done(function() {
     console.log('connection initialized');
 });

Final notes

Following example can be used only in local actor system scope. In cluster wide scenarios you should ensure that hub registration doesn't pass local system scope, since SignalR hubs are not simple POCOs, and cannot be freely serialized/deserialized.

Additionally remember that logging bus, just like any other Akka's event stream, works only in local scope. To publish message in cluster-wide environments you'll need to provide your own event propagation between node boundaries (it's not so hard) or use Akka.Cluster.Tools plugin (which is still in progress at the moment of writing that post).

Create your own Akka.NET persistence plugin

Subject of this post is one of the Akka.NET plugins, Akka.Persistence. It’s major task is to give your actors an ability to store their internal state and recover it from the persistent storage after actor is created or underlying actor system crashes. While the rules driving the whole persistence mechanism deserves a separate article, for purposes of this post I’ll cover only topic of event journals and snapshot stores.

Introduction

If you’re already familiar with the idea of Eventsourcing, you probably know everything what’s needed by now. If not, here is a simplified model – statefull actors don’t store their state directly, but in form of the events. Event is a single immutable fact informing about a change made in time over actor’s state. Later, when any form of the recovery is needed, actor’s state is recreated by sending to it a series of persisted events, from which it can rebuild itself. Storage responsible for persisting those events is called Event Journal . In Akka.Persistence by default it works only in memory which does not make it a truly persistent one.

Since a possible stream of events to be replayed on actor may require a long time to full recovery, it’s internal state may be serialized at any point of time and stored separately. In this case by default actor will first try to get the latest intermediate state (a snapshot) and then recover only from those events, which have been stored from that point in time onward. Store used to persist those snapshots is called a Snapshot Store and in Akka.NET by default it uses a local file system as a persistence layer.

Akka.Persistence plugin is designed in modular fashion and allows you to create your own plugins and integrate them with any underlying persistence provider.

Journal

Just like most of the other entities in Akka, journals and snapshot stores are actors. To create your own journal, you could derive from either AsyncWriteJournal or SyncWriteJournal. Difference between these two is that first one returns a Task continuation from all of it’s operations (reads, writes and deletions – as said earlier journals don’t perform updates), while the second only from reads, expecting writes and deletions to be completed in synchronous manner. For this example I’m going to use sync journal implementation.

Lets start from the following state of our class:

public class MyJournal : SyncWriteJournal  
{
    private static readonly object Ack = new object();
    private readonly ConcurrentDictionary<string, ISet<IPersistentRepresentation>> _eventStreams =
              new ConcurrentDictionary<string, ISet<IPersistentRepresentation>>();

    Task ReplayMessagesAsync(string pid, long from, long to, long max, Action<IPersistentRepresentation> replay) ...
    Task<long> ReadHighestSequenceNrAsync(string pid, long from) ...
    void WriteMessages(IEnumerable<IPersistentRepresentation> messages) ...
    void DeleteMessagesTo(string pid, long to, bool isPermanent) ...
}

For this example all persistent actors events are stored in memory. Journal is able to recognize, which event stream refers to specific actor thanks to PersistenceId – an unique identifier for each PersistentActor which may identify it’s state across many actor incarnations. Each event inside the stream has it’s own Sequence number – an unique number in scope of event stream. It’s always increasing monotonically for every separate persistent actor.

This is how WriteMessages method may look like:

protected override void WriteMessages(IEnumerable<IPersistentRepresentation> messages)  
{
    foreach (var message in messages)
    {
        var list = _eventStreams.GetOrAdd(message.PersistenceId,
            new SortedSet<IPersistentRepresentation>(PersistentComparer.Instance));
        list.Add(message);
    }
}

This is a straightforward operation of finding an event stream for each message persistence id (or creating a new one if it was not found) and appending a message to it. PersistentComparer is simply a IComparer used to order messages by their sequence number. IPersistentRepresentation wraps user-defined event and contain all data necessary for the persistence plugin to perform it’s magic.

To be able to replay stored messages, now we must define a ReplayMessagesAsync logic:

public override Task ReplayMessagesAsync(string pid, long from, long to,  
      long max, Action<IPersistentRepresentation> replay)
{
    var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
    var promise = new TaskCompletionSource<object>();
    dispatcher.Schedule(() =>
    {
        try
        {
            Replay(pid, from, to, max, replay);
            promise.SetResult(Ack);
        }
        catch (Exception e)
        {
            promise.SetException(e);
        }
    });

    return promise.Task;
}

Because this method is already prepared to work in asynchronous manner – which is the most probable solution for third party providers integration nowadays – here we delegate this work through our system’s dispatcher. Because API of the underlying dispatcher may work in various ways (Akka is not bound to TPL), it’s Schedule method doesn’t expose any task to be returned. Here I’ve introduced a little trick using TaskCompletionSource to create similar behavior.

The Replay method itself is pretty straightforward:

private void Replay(string pid, long from, long to,  
      long max, Action<IPersistentRepresentation> replay)
{
    ISet<IPersistentRepresentation> eventStream;
    if (_eventStreams.TryGetValue(pid, out eventStream))
    {
        var s = eventStream as IEnumerable<IPersistentRepresentation>;

        if (from > 0) s = s.Where(p => p.SequenceNr >= from);
        if (to < int.MaxValue) s = s.Where(p => p.SequenceNr <= to);
        if (max < eventStream.Count) s = s.Take((int)max);

        foreach (var persistent in s)
        {
            replay(persistent);
        }
    }
}

What’s worth to notice here is that, from/to sequence number range provided is expected to be inclusive from both sides.

Next in line is ReadHighestSequenceNrAsync – it’s used to receive the highest sequence number for current event stream associated with a particular persistent actor. While in your solution you may find appropriate to use some async API or dispatcher task delegation described above, I’ve used a simple synchronous solution here:

public override Task<long> ReadHighestSequenceNrAsync(string pid, long from)  
{
    ISet<IPersistentRepresentation> eventStream;
    if (_eventStreams.TryGetValue(pid, out eventStream))
    {
        var last = eventStream.LastOrDefault();
        var seqNr = last == null ? 0L : last.SequenceNr;
        return Task.FromResult(seqNr);
    }

    return Task.FromResult(0L);
}

The last method remaining is message deletion. While this is not a usual way of working i.e. in event sourcing model (it introduces a data loss), it’s still a viable option.

Akka.Persistance recognizes two types of deletion:

  • Logical, when messages are marked as deleted (IPersistentRepresentation.IsDeleted flag) but still resides in event journal.
  • Physical, when messages are physically removed from the journal and cannot be restored.
protected override void DeleteMessagesTo(string pid, long to, bool isPermanent)  
{
    ISet<IPersistentRepresentation> eventStream;
    if (_eventStreams.TryGetValue(pid, out eventStream))
    {
        foreach (var message in eventStream.ToArray())
        {
            if (message.SequenceNr <= to)
            {
                eventStream.Remove(message);
                if(!isPermanent)
                {
                    // copy message with IsDeleted flag set
                    var copy = message.Update(message.SequenceNr,
                    message.PersistenceId, isDeleted: true, sender: message.Sender);
                    eventStream.Add(copy);
                }
            }
            // messages are already stored by their sequence number order
            else break;
        }
    }
}

Test specification fulfillment

When your custom persistence plugin is ready, it’s good to check if it’s behavior is a valid one from the Akka.Persistence perspective. In order to do this, you may get an existing test suite in form of a Akka.Persistence.TestKit available as a NuGet package – it’s a set of tests used to check if your custom persistence plugin fulfills expected journal (and snaphot store) behavior. It used Akka.NET TestKit along with xUnit in order to run.

The only thing needed is your own spec class inheriting from JournalSpec with configuration set to use your plugin as a default one. The same configuration set may be used to inject your persistence provider to real working actor system.

public class MyJournalSpec : JournalSpec  
{
    private static readonly Config SpecConfig =
    ConfigurationFactory.ParseString(@"
       akka.persistence {
           publish-plugin-commands = on
           journal {
               plugin = ""akka.persistence.journal.my""
               my {
                   class = ""MyModule.MyJournal, MyModule""
                   plugin-dispatcher = ""akka.actor.default-dispatcher""
               }
           }
       }");

    public MyJournalSpec() : base(SpecConfig) 
    {
        Initialize();
    }
}

Snapshot store

Custom snapshot store implementation seems to be an easier task. Here I’m going to use again the store based on in-memory snapshots, which still is not a valid persistence layer, but works good for training purposes.

Actor state snapshots are identified by two entities: SnapshotMetadata and Snapshot itself. First covers the data necessary to correctly place snapshot instance in the lifetime of a specific persistent actor, while the second one is a wrapper over actor’s state itself.

For purposes of this in-memory plugin, I’ll wrap both of them with single object defined like this:

public class SnapshotBucket  
{
    public readonly SnapshotMetadata Metadata;
    public readonly Snapshot Snapshot;

    public SnapshotBucket(SnapshotMetadata metadata, Snapshot snapshot)
    {
        Metadata = metadata;
        Snapshot = snapshot;
    }
}

This way we may describe the snapshot store in the following manner:

 public class MySnapshotStore : SnapshotStore
 {
   private static readonly object Ack = new object();
   private readonly LinkedList<SnapshotMetadata> _saving = new LinkedList<SnapshotMetadata>();
   private readonly List<SnapshotBucket> _buckets = new List<SnapshotBucket>();

   Task<SelectedSnapshot> LoadAsync(string pid, SnapshotSelectionCriteria criteria) ...
   Task SaveAsync(SnapshotMetadata metadata, object snapshot) ...
   void Saved(SnapshotMetadata metadata) ...
   void Delete(SnapshotMetadata metadata) ...
   void Delete(string pid, SnapshotSelectionCriteria criteria) ...
}

To begin with, lets abstract dispatcher behavior described before. Remember, this is only a trick – nothing stands on your way to simply use Task.Run or delegating work to child actors. In real life scenario you’d probably use built in async behavior of underlying provider.

private Task<T> Dispatch<T>(Func<T> fn)  
{
    var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
    var promise = new TaskCompletionSource<T>();
    dispatcher.Schedule(() =>
    {
        try
        {
            var result = fn();
            promise.SetResult(result);
        }
        catch (Exception e)
        {
            promise.SetException(e);
        }
    });
    return promise.Task;
}

Next, lets take a saving operation. In case of snapshot stores it’s divided into two separate methods:

  • SaveAsync asynchronous operation of saving the snapshot, returning the Task continuation.
  • Saved called only after message has been successfully saved.

They may be combined to i.e. introduce a log for actually saving, but not yet saved snapshots, as I’ve presented below.

protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)  
{
    return Dispatch(() =>
    {
        var bucket = new SnapshotBucket(metadata, new Snapshot(snapshot));
        _saving.AddLast(metadata);
        _buckets.Add(bucket);

        return Ack;
    });
}

protected override void Saved(SnapshotMetadata metadata)  
{
    _saving.Remove(metadata);
}

When it comes to loading a snapshots, snapshot store is expected to return only one snapshot being the latest one from the provided range. Just like any other range in persistence plugin, this one is expected to have inclusive boundaries.

protected override Task<SelectedSnapshot> LoadAsync(string pid, SnapshotSelectionCriteria criteria)  
{
    return Dispatch(() =>
    {
        var limited = _buckets.Where(bucket => bucket.Metadata.PersistenceId == pid
        && bucket.Metadata.SequenceNr <= criteria.MaxSequenceNr
        && bucket.Metadata.Timestamp <= criteria.MaxTimeStamp);

        var latest = limited.OrderBy(bucket => bucket.Metadata.Timestamp).Last();

        return new SelectedSnapshot(latest.Metadata, latest.Snapshot.Data);
    });
}

The last behavior is message deletion, represented in this case by two methods – one used for single snapshot and one able to operate on specific range.

protected override void Delete(SnapshotMetadata metadata)  
{
    var found = _buckets.SingleOrDefault(bucket => bucket.Metadata.Equals(metadata));
    if (found != null)
    {
        _buckets.Remove(found);
    }
}

protected override void Delete(string pid, SnapshotSelectionCriteria criteria)  
{
    _buckets.RemoveAll(bucket =>
       bucket.Metadata.PersistenceId == pid
        && bucket.Metadata.SequenceNr <= criteria.MaxSequenceNr
        && bucket.Metadata.Timestamp <= criteria.MaxTimeStamp);
}

Test specification fulfillment

Just like in the case of journal spec, you may verify your actor behavior simply by creating custom spec. While you may see a minimal configuration string below, nothing stands on your way to extend it with your custom settings.

public class MySnapshotStoreSpec : SnapshotStoreSpec  
{
    private static readonly Config SpecConfig =
      ConfigurationFactory.ParseString(@"
        akka.persistence.snapshot-store {
            plugin = ""akka.persistence.snapshot-store.my""
            my {
                class = ""TestPersistencePlugin.MySnapshotStore, TestPersistencePlugin""
                plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher""
            }
        }
    ");

    public MySnapshotStoreSpec() : base(SpecConfig)
    {
        Initialize();
    }
}

Another difference from event journal spec is that fore most test cases snapshot store spec requires an existing collection of snapshots metadata to work. They are initialized in constructor in the code above.

Final notes

  1. Examples, I’ve presented, work on predefined constructs – they derive directly from SnapshotStore and SyncWriteJournal. However it’s not necessary, as you may promote any actor type to work as snapshot store or event journal as long as they expose necessary behavior and are able to respond on required set of persistence messages.
  2. While persistence stores specs may be used to verify your implementations, remember that they don’t ensure things such as data races etc.
  3. Remember that Akka.Persistence is still in Prerelease version. Therefore some breaking changes may occur in incoming versions.

Akka.NET remote deployment with F#

Today I want to present how Akka.NET can be used to leverage distributed computing and discuss some of the changes in the latest F# package (Akka.FSharp 0.7.1). If you are not interested in explanation of how Akka.NET internals actually allow us to create distributed actor-based application, just skip the next part and dive directly into examples.

How Akka.NET remote deployment works

When a new actor should be created, it’s actor system deployer have to figure out if deployment should occur on local machine or on the remote node. Second option requires a network connection to be established between local/remote nodes participating in the communication. To be able to do so, each participating node needs to know a localization of the others. In Akka, this is done by using actor paths.

Actor path allows us to identify actors in scope of their systems actor hierarhies as well as localize them over the network. There are two formats of actor paths, both of them uses standard URI convention:

  1. Local paths (eg. akka://local-system/user/parent/child) may be used to identify actors deployed (remotely or not) by our local actor system, we are referring to. In provided example actor path refers to actor with name child, which parent/supervisor actor is called parent, which resides under user guardian (which is a specialized actor supervisor existing inside actor system kernel). All of them exists in actor system named local-system.
  2. Remote paths (eg. akka.tcp://remote-system@localhost:9001/) are similar to local ones, but a few differences occur. First we need to rename akka protocol to either akka.tcp or akka.udp to show what kind of underlying network connection we want to target. Second we have to suffix actor system name with it’s localization. This is done by providing address and port, remote node is expected to listening on.

Image below shows how actually actors refer to each other in remote deployment scenario.

When an actor is deployed remotely, a remote node is responsible for creating it’s instances, but we still refer to it using it’s actor reference and our local system context. Local/remote coordination is done by remote daemon – specialized system actor, which resides directly under /remote path of the system root. With example above we may refer to remotely deployed child using akka.tcp://sys@A:2552/user/parent/child address, while it’s true hierarchy lays under akka.tcp://sys@B:2552/remote/sys@A:2552/user/parent/child path. This allows us to preserve location transparency, hiding true underlying actor. References returned this way all almost indistinguishable from their local counterparts, allowing use of features such as monitoring, supervising and so on.

Remote deployment

Hocon configuration

To start playing with remote deployment, we should configure both endpoints to be able to maintain network communication. This is achievable through the HOCON configuration, which is the default format for both JVM and .NET Akka implementations. While you may be a little irritated by need of learning new format only to use Akka, remember:

  1. It’s actually very easy to learn
  2. It’s not XML ;)

Before continue, I encourage you to get yourself familiar with it.

To enable remote deployment feature, this would be a minimal Akka.NET configuration setup:

akka {  
    actor {
        provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"
    }
    remote.helios.tcp {
        port = 9001
        hostname = localhost
    }
}

First, every configuration node resides under common root called akka. Since remoting is not default feature of Akka.NET, we have to inform our actor system how to couple remotely deployed actors with our local actor system. This setup is done through RemoteActorRefProvider defined under akka.actor.provider config node. It will allow us to associate references to spawned actors on both local and remote systems.

Next one is remote node which defines configuration specific to remoting feature. There, we need to inform how our system will communicate with others. At the present moment it’s achievable using Helios server by default, which is lightweight and highly-concurrent TCP/UDP server developed by Aaron Stannard, one of the Akka.NET core developers – don’t confuse it with Microsoft Helios, which is totally different project. Akka.Remote will use it automatically under the hood – the only thing we have to define is address and port, under which it will listen for incoming messages. If you want port to be resolved automatically, just set 0 as the port number.

Excluding differences in system nodes localization, this configuration string may be shared by both local and remote node. You may use this configuration simply by parsing it using Configuration.parse function.

Let’s get to the point

Unlike the C#, F# API is able to construct actors using expressions compiled directly at the runtime (while still providing type safety, F# programmers are used to have). It’s achievable by leveraging F# quotations, which can be serialized and compiled on demand on other machine.

For the sample we don’t need to have two separate machines, instead we may mock them by running two Akka applications. What is necessary for the most basic example, is that both of them have to share at least Akka.FSharp and Akka.Remote assemblies as well as all of their dependencies.

install-package Akka.FSharp -version 0.7.1  
install-package Akka.Remote -version 0.7.1  

Remote node

The only job of the remote system is to listen for incoming actor deployment requests and execute them. Therefore implementation is very simplistic:

let config =  
    Configuration.parse
        @"akka {
            actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
            remote.helios.tcp {
                hostname = localhost
                port = 9001
            }
        }"

[<EntryPoint>]
let main args =  
    use system = System.create "remote-system" config
    System.Console.ReadLine()
    0

After running it our remote system should be listening on localhost on port 9001 and be ready to instantiate remotely deployed actors.

Local actor system instance

Second application will be used for defining actors and sending deployment requests to remote node. To do so it has to know it’s address.

To deploy our actors remotely, lets build some helper functions. To begin with, write some logic to inform our local system, that deployment should occur on the remote machine. Remember that we need to provide full address to the remote system including it’s network localization and protocol type used for communication.

open Akka.Actor  
// return Deploy instance able to operate in remote scope
let deployRemotely address = Deploy(RemoteScope (Address.Parse address))  

Remote deployment in Akka F# is done through spawne function and it requires deployed code to be wrapped into F# quotation.

let spawnRemote systemOrContext remoteSystemAddress actorName expr =  
    spawne systemOrContext actorName expr [SpawnOption.Deploy (deployRemotely remoteSystemAddress)]

Quotations give us a few nice features, but also have some limitations:

  1. Unlike the C# approach, here we don’t define actors in shared libraries, which have to be bound to both endpoints. Actor logic is compiled in the runtime, while remote actor system is operating. That means, there is no need to stop your remote nodes to reload shared actor assemblies when updated.
  2. Code embedded inside quotation must use only functions, types and variables known to both endpoints. There are limited ways to define functions inside quotation expression (and no way to define types), but generally speaking in most cases it’s better to define them in separate library and share between nodes.

Last line of the spawne function is list of options used to configure actor. We used SpawnOption.Deploy to specify what deployment specifics are meant to occur. Other options may describe specifics such as message mailboxes, actor routers or failure handling strategies.

Because Akka actor system is required to negotiate deployment specifics with external nodes, it’s local instance has to be provided even thou we want to deploy our actors on the remote machines.

Finally when everything is set, we can run our example (remember that remote node must be up and running):

let system = System.create "local-system" config  
let aref =  
    spawnRemote system "akka.tcp://remote-system@localhost:9001/" "hello"
       // actorOf wraps custom handling function with message receiver logic
       <@ actorOf (fun msg -> printfn "received '%s'" msg) @>

// send example message to remotely deployed actor
aref <! "Hello world"

// thanks to location transparency, we can select 
// remote actors as if they where existing on local node
let sref = select "akka://local-system/user/hello" system  
sref <! "Hello again"

// we can still create actors in local system context
let lref = spawn system "local" (actorOf (fun msg -> printfn "local '%s'" msg))  
// this message should be printed in local application console
lref <! "Hello locally"  

As a result, you should receive two messages printed in remote application console and one locally. See?

Final thoughts

Remember that Akka.NET is still in beta and all of the F# API functions are subject to change. If you have some concepts or interesting ideas, or want to help and become part of the family, you may share them on Akka.NET discussion group or directly on Github.

Appendix A: set your configuration string in application config file

While you may define configuration strings in code, the better idea is to actually store them in .config files. To be able to do so, we must simply extend configuration file with custom Akka config section:

<configSections>  
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
</configSections>  

Next, we can embed our Hocon-formated configuration string directly into configuration file by using <![CDATA[]]> marker (directly under main <configuration> root node):

<akka>  
    <hocon>
        <![CDATA[
          ... paste your config string here
        ]]>
    </hocon>
</akka>  

To load the configuration, simply call Akka.Configuration.ConfigurationFactory.Load() method.

Appendix B: Share-nothing

Since keeping actual assemblies in sync over all of the remote nodes may feel cumbersome for some people, I decided to show a little trick, which may be used to replace some of the complex data structures in your F# code. Lets look how data structure problem may be simply solved in Akka predecessor, Erlang:

go() ->  
  Pid = spawn(RemoteNode, loop),
  Pid ! {hello, "world"},
  Pid ! {hi}.

loop() ->  
  receive
      {hello, Msg} ->
          io:fwrite("hello ~p~n", [Msg]),
          loop();
      {hi} ->
          io:fwrite("hi~n"),
          loop()
  end.

I think, that based on your knowledge about functional programming, most of it should be at least familiar if not self-explanatory. Crux is the {hello, Msg} and {hi} syntax – these are actually instances of tuples (nested into pattern matched message receiver construct). Standard Erlang convention precises them as so called tagged tuples, which first argument is an atom – also known as symbol in other languages. Since Erlang is a dynamic language, this way we may differentiate tuples of the same size from each other.

Example below shows how usage of the Erlang’s tagged tuples could be moved into F#. The big difference is that F# has no Erlang atom equivalent. Therefore they has been replaced by F# literal integers, which gives us an advantage of human readable tags, while still counted as integers inside quotation expressions.

[<Literal>]let Hello = 1
[<Literal>]let Hi = 2

let pid = spawne system "remote" (<@ fun mailbox ->  
        let rec loop() : Cont<obj, obj> = actor {
            let! msg = mailbox.Receive()
            match msg with
            | (Hello, str) -> printfn "hello %A" str
            | (Hi) -> print "hi\n"
            | _ -> mailbox.Unhandled()
            return! loop()
        }
        loop() @>) [SpawnOption.Deploy Deploy(RemoteScope(Address.Parse remoteAddr))]

pid <! (Hello, "world")  
pid <! (Hi)  

Hipsterize your backend for The Greater Good with Akka.NET, F# and some DDD flavor

Initially this post was supposed to cover concepts of implementing an example DDD (Domain Driven Design) architecture using F# and Akka.NET. But the more I’ve written, the more I’ve realized that whole idea won’t fit into a single blog post. Therefore I’ve decided for now to focus on some basic example covering command/event handlers. To make this more interesting I want to show how Akka.NET and F# features can be used to leverage design established this way.

Before we begin…

I want to notice, that all examples in this post are made mostly to learning purposes. There are some things missing, but I’ve decided to leave them since they don’t have direct impact on the overall design or current state of the application. If I will continue this thread, I will probably try to extend them and provide some more meaningful context. So please don’t treat this code as a real-life reference of DDD implementation.

What is clearly missing in example below? I didn’t describe aggregate roots, because boundary context is very simplistic – to make this post clearer I’ve focused more on command-handler-event pattern. Sample doesn’t enforce any specific database, using simple dictionary as abstraction. Also it doesn’t provide any form of data validation or bindings to existing HTTP server frameworks. They are outside of scope of this post but are very simple to integrate.

Step 1: Define our domain data structures

Lets start with domain model. As a boundary context I’ve chosen one of the most generic ones – user account’s space. Our simplified business logic in this example will cover two actions – user registration and sign in. Users will be identified by their email and will use the most basic security features.

But before we move there, lets use some of the features, F# could give us, to shape a type system for our advantage.

type Email = string  
type PasswordHash = byte array  

These are examples of type aliases. You may consider using them, if you’ve found that usage of some primitive type in given context may be defined as subtype/supertype relation. In this example Email can be considered as a specialization of string – that means, each email is string, but not each string is a valid email. This kind of granularity can be stretched quite far. Just think about any primary/foreign key field in any entity as it’s own unique type. This will automatically disallow mistakes such as assigning keys from entities of incompatible type eg. assigning UserId value to ProductId field! F# expressiveness promotes this behavior.

Now use these types to define our User entity. It’s very straightforward:

type User =  
    { email : Email
      phash : PasswordHash
      salt  : byte array }
    static member empty =  { email = ""; phash = [||]; salt = [||] }

Next I’m going to describe data models for our domain commands and events. It’s important to understand purpose of both. Commands are produced to describe tasks to be executed by the system and contains all data necessary to perform it. Events are describing diffs in general application state as a results of performed tasks. What is wort mentioning, events should be described in way, that will allow us to recreate current application state by replaying all events timeline stream. And because events describe changes, that already occurred, they should be immutable by nature – don’t try to change your past if you not wish to mess with your future.

type UserCommand =  
    | RegisterUser of email : Email * password : string
    | LoginUser of email : Email * password : string

NOTE: If you want to use F# records to describe your commands/events, that’s perfectly fine. I’ve chosen discriminated unions for their expressiveness.

type UserEvent =  
    | UserRegistered of email : Email * salt: byte array * phash : byte array * timestamp : DateTime
    | UserLogged of email : Email * timestamp : DateTime

As you can see there are few things worth of emphasizing:

  • We didn’t store user password provided directly from command, using salt + password hash instead. Reason behind is that in DDD events usually could be serialized and stored in some kind of event database. Therefore it’s not secure to provide them any sensitive data. However events should contain enough information to give us ability to recreate current database state based on provided stream of events, assuming that it’s complete.
  • Each of our event’s contain a timestamp field. Events give us a historical change set of the application. And because they contain only diffs between database states rather than snapshot of entities, they have to be ordered by their creation date. Without it we couldn’t recreate them.

What other fields may be useful when working with events? Consider attaching current API version number – this will allow you to endure breaking changes in your code responsible for handling events. Other handy field is unique event id, when you may need functionality associated with generating cascading event chains in response to some other events. In that case having some kind of pointer to originating event may be useful.

Step 2: describe application behavior in functional manner

This may be a little controversial statement, but I must say it – exceptions are bad. They leaves the leaks in your type system and fucks with your application logic flow in the way, that many programmers and almost all compilers cannot understand.

The alternative conception of error handling is old and well-known. It’s based on returning error codes. To start with – if you already know Rust, this may look familiar to you:

type Result<'ok, 'err> =  
    | Ok of 'ok
    | Error of 'err

You may ask: why the hell do we return error values instead of just throwing exceptions? Are these some archaeological excavations and we’re gone back to the C era? Actually no. There is very good reason behind this.

Think about exceptions as a side effects of the application state behavior. You cannot be 100% sure about flow of your application process. Since information about exceptions is not part of any function/method signature, they actually act as a dynamically typed cascading return values anyway – both compiler and programmers will ignore them until they will occur at runtime.

This time instead of cryptic error codes returned by C, we use F# power to provide type-safe and expressive way for this solution. If you’re interested more about interesting ways to utilize return-based error handling using higher-level concepts, I recommend you Railway Oriented Programming blogpost and video presented on NDC Oslo 2014. Here, I’ll use a greatly simplified version, exposing operator which chain two input functions together and call second one only if result from the first was Ok, and forwarding an Error result otherwise:

let (>=>) f1 f2 arg =  
    match f1 arg with
    | Ok data -> f2 data
    | Error e -> Error e

I’ve also decided to use more discriminated unions to describe each specific business logic error in actual bounded context scope. I’ve found that returning a string-based messages back to programmers in error logs are not better in any way than creating a specialized error type for each case. The only disadvantage is amount of code you need to write, to declare new type, but again – it’s not a problem with F#.

type UserError =  
    | UserAlreadyExists of userEmail : Email
    | UserNotFound of userEmail : Email
    | WrongPassword of userEmail : Email * hashedInput : PasswordHash

Step 3: handle user registration

How does F# command handling differs from it’s more objective oriented counterpart? Since we deal with functional oriented language, most of our logic is composed into functions instead of objects. You’ve used to execute your logic with services, here you’ve got a function. Wanna some Dependency Injection? Here, you have some currying.

I’ll describe our service as handle function which takes some UserCommand as an input and produces Result<UserEvent, UserError> as output:

let handle clock findUser saveUser =  
    function
    | RegisterUser(email, password) ->
        match findUser email with
        | Some _ -> Error(UserAlreadyExists email)
        | None ->
            let salt, phash = defaultGeneratePassword password
            let user =
                { email = email
                  salt = salt
                  phash = phash }
            saveUser user
            Ok (UserRegistered(email, salt, phash, clock()))

As you’ve seen, this function actually takes 4 parameters (clock, findUser, saveUser and command as implicit argument of calling function keyword). But as I’ve said, our handle function should take only one input argument. That’s because we’ll provide three first arguments through partial function application later – now you can think about them as constructor arguments injected by your IoC container when resolving new service.

While most of the parameters are quite understandable, one of them may be confusing – the clock parameter. It’s only functionality is to provide current DateTime value. If so, why didn’t I just used DateTime.Now directly inside the code? In this example it’s actually not so important, but I’ve used it to expose simple problem – things such as date/configuration/environment providers or random number generators could make our handler behavior non-deterministic. That means, if we call this function two times with the same input, we couldn’t count on the same output. It’s actually problem for logic predictability and in some cases is especially cumbersome when writing tests to verify application behavior. Therefore I think that it’s better to separate them as injectable arguments.

Password hashing and generation

While you may find some good libraries providing real life hashing features out of hand, I’ve decided to leave my example free on unnecessary dependencies and use a standard library instead.

let private saltWith salt (p : string) =  
    let passBytes = System.Text.Encoding.UTF8.GetBytes p
    Array.append passBytes salt

let sha512 (bytes : byte array) =  
    use sha = System.Security.Cryptography.SHA512.Create()
    sha.ComputeHash bytes

let hashPassword hashFn salt password = hashFn (saltWith salt password)

let generatePassword hashFn saltSize password =  
    use saltGen = System.Security.Cryptography.RandomNumberGenerator.Create()
    let salt = Array.zeroCreate saltSize
    saltGen.GetBytes salt
    (salt, hashPassword hashFn salt password)

let inline defaultGeneratePassword pass = generatePassword sha512 64 pass  
let inline defaultHashPassword salt pass = hashPassword sha512 salt pass  

I hope this code doesn’t need any explanations.

Step 4: handle user logging

Once we already described nuances of the function-based services, this one shouldn’t be a problem. This case was created to give our example more diversity.

let handle clock findUser saveUser =  
    function
    | RegisterUser(email, password) -> ...
    | LoginUser(email, password, remember) ->
        match findUser email with
        | None -> Error(UserNotFound email)
        | Some user ->
            let computedPasswordHash = defaultHashPassword (user.salt) password
            if computedPasswordHash = user.phash then
                Ok (UserLogged(user.email, clock()))
            else Error(WrongPassword(user.email, computedPasswordHash))

Mock database access

As I’ve said in the introduction, I won’t use any real-life database provider, mocking it with concurrent dictionary instead. However if you want to use a normal database, nothing stands in your way.

let userStore = ConcurrentDictionary()

let inline flip f x y = f y x  
let findInUserStore email (store : ConcurrentDictionary<string, User>) =  
    match store.TryGetValue email with
    | (false, _) -> None
    | (true, user) -> Some user

let saveInUserStore user (store : ConcurrentDictionary<string, User>) =  
    store.AddOrUpdate(user.email, user, System.Func<_, _, _>(fun key old -> user)) |> ignore

F# is heavily function-oriented and this also corresponds to arguments precedence. While in OOP subject (this) usually precedes method invocation, in functional languages it’s better to use it as last argument of the function. Argumentation for this is currying and partial function application, which allows us to define functions with only part of all necessary arguments provided. The more specific argument, the later it could be applied. Therefore common convention is to provide more detailed parameters at the end of the function parameters list. It’s a very useful feature, especially when combined with pipeline operators.

On the other side flip function may be used to reverse parameters precedence in case when, for example, we want to partially apply second argument to the function without providing first one (because it may be yet unknown in this moment). This option will be presented later.

Handle event subscribers

One of the nice things in Akka is that it provides a Publish/Subscribe pattern offhand. As an actor-based framework, the only necessary task to do is to subscribe an ActorRef of desired observer directly to Akka system’s event stream.

As a lazy person I don’t want to be forced to explicitly create new actor each time I want to subscribe some behavior to react for events I’ve produced somewhere else. Therefore I’ve created simple subscribe helper, which will handle subscriber actor creation for me.

// automatic unique concurrent name generator
let mutable subscriptionNr = 0  
let inline getSubNr() = System.Threading.Interlocked.Increment(&subscriptionNr)

let subscribe system (fn : 'a -> unit) =  
    let subId = getSubNr()
    let subscriber = spawn system ("subscriber-" + (string subId)) <| actAs fn
    system.EventStream.Subscribe(subscriber, typeof<'a>) |> ignore
    subscriber

let publish (bus : Akka.Event.EventStream) evt =  
    bus.Publish evt
    Ok evt

Bind everything together

Now when all logic has been defined, it’s time to show it in some example code. It will initialize a new Akka system, setup console logging and show features of successful and unsuccessful registering and signing in of the user.

let inline defaultClock() = DateTime.Now  
// display any errors returned by command handler
let handleErrors = function  
    | Ok _ -> ()
    | Error data -> printf "User error: %A\n" data

let system = System.create "system" <| Akka.Configuration.ConfigurationFactory.Default()  
let userHandler =  
    // inject "dependencies" into handle function
    handle
    <| defaultClock
    <| flip findInUserStore userStore
    <| flip saveInUserStore userStore
let subscriber = subscribe system <| printf "User event: %A\n"  
let userActor = spawn system "users" <| actorOf (userHandler >=> (publish system.EventStream) >> handleErrors)

userActor <! RegisterUser("j.doe@testmail.co", "pass")  
userActor <! RegisterUser("j.doe@testmail.co", "pass")  // UserAlreadyExists error  
System.Threading.Thread.Sleep 100

userActor <! LoginUser("j.doe@testmail.co", "pass")  
userActor <! LoginUser("j.doe@testmail.co", "fails")     // WrongPassword error

System.Threading.Thread.Sleep 100  
system.Shutdown()  

That’s all. Whole example took about 130 lines of F# code to create. I hope this would give you some insights about embedding your business logic into F# function composition model.

PS: If you’re interested in more details about DDD and F# programming, at the present moment I can recommend you following other exploring lecture of the other developers, such as Lev Gorodinski (@eulerfx) and his blog for more advanced concepts.

Actor supervisors in Akka.NET FSharp API

This post will describe changes, that will affect a F# API for Akka.NET in the incoming versions of the framework (>= 0.62) – mostly those, which concerns supervisioning and manipulating actor hierarchies.

I don’t want to overload this post with explaining what Akka supervision is all about, so I’ll just recommend a very good article, which could be found here.

While previous versions of the Akka F# API introduced some nice features such as function-based actor definitions and creations (spawn function), there still was no simple way to handle some core framework concepts. Remember that march towards Akka.NET 1.0 is still in progress, more and more features are implemented. Therefore it’s F# API most likely will also be a subject to change.

To give a better description of the new features, lets see an example:

let strategy =  
    Strategy.oneForOne (fun e ->
        match e with
        | :? ArithmeticException -> Directive.Resume
        | :? ArgumentException -> Directive.Stop
        | _ -> Directive.Escalate)

let supervisor =  
    spawnOpt system "master" (fun mailbox ->
        // by using spawn we may create a child actors without exiting a F# functional API
        let worker = spawn mailbox "worker" workerFun

        let rec loop() =
            actor {
                let! msg = mailbox.Receive()
                match msg with
                | Respond -> worker.Tell(msg, mailbox.Sender())
                | _ -> worker <! msg
                return! loop()
            }
        loop()) [ SupervisorStrategy(strategy) ]

A whole example is available inside Akka.NET project source code, along with other examples.

Unlike standard spawn, spawnOpt function could take a list of options used to configure spawned actor internals. SupervisorStrategy is one of them. By using F# API you are able to use Strategy module to quickly create a corresponding strategy instance. It supports two types of strategy creators:

  1. Strategy.oneForOne – strategy will affect only an exception thrower.
  2. Strategy.allForOne – strategy result will propagate to all children of the actor implementing that strategy.

Both functions come in two variants. First one takes only one argument – decider function, which determines an expected behavior based upon type of exception thrown. Second one (Strategy.oneForOne2 and Strategy.allForOne2) precedes decider argument with a timeout and number of possible behavior retries. All of them corresponds directly to standard Akka strategy API, but are wrapped to fit F# functional approach instead of method overrides.