How to create an Akka.NET cluster in F#

Today I'm going to show you how to setup your own Akka.NET cluster using F# code. This is a kind of newbie guide of how to setup a cluster, directed to people unfamiliar with it. If you don't want to use Akka with F#, but you're understand the language syntaxt, don't worry. You won't miss a thing, since most of the concepts are common for .NET (and JVM) environment.

In this example I'm going to use the Akkling - it's my fork of the existing Akka.FSharp library. While it's API is mostly compatible with official F# Akka API, one of it's features is that actor refs are statically typed on handled message types. If you're using F#, you'll probably find it handy.

install-package Akka.Cluster -pre  
install-package Akkling -pre  

or using Paket

paket add nuget Akka.Cluster  
paket add nuget Akkling  

Almost all of the cluster initialization can be done through configuration. In case if you're not yet familiar with the whole concept, here are some useful informations providing both high level and detailed overview:

Cluster setup

While I'll focus on differences bellow, basic configuration, common for all nodes may look like this:

akka {  
  actor {
    provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    helios.tcp {
      hostname = "127.0.0.1"
      port = 2551        
    }
  }
  cluster {
    roles = ["seed"]  # custom node roles
    seed-nodes = ["akka.tcp://cluster-system@127.0.0.1:2551"]
    # when node cannot be reached within 10 sec, mark is as down
    auto-down-unreachable-after = 10s
  }
}

With this configuration set up, our cluster will be able to configure itself as the new nodes will join/leave it. No additional code will be necessary.

Some explanations:

  • Since from the outside cluster looks like a single actor system, it's important, that all nodes being a part of it, must share the same actor system name.
  • Just like in Remote Actor Deployment example, we override default actor provider here. In this case we'll use ClusterActorRefProvider.
  • While cluster is able to self-manage joining nodes, each of them must know at least one node (access point) that is already part of the cluster. For this reason, we may define so called seed-nodes under akka.cluster.seed-nodes config key. While in this example there is only one defined, in practice you'd want to have at least 2-3 of them in case, when some will go down for any reason.

The basic difference between seed node and the joining ones, is that seed must listen on well-defined address to be easily accessed. Because we have already defined seed node address on port 2551, our seed node must have akka.remote.helios.tcp.port value set to that port.

What we are going to achieve is to create a fully operative cluster environment with an actor aware of it's presence inside it. What I mean by that, is that we want an actor to be able to react on incoming cluster events. For this let's use Cluster extension, which gives a many useful features, when working with cluster from within an actor.

let aref =  
    spawn system "listener"
    <| fun mailbox ->
        // subscribe for cluster events at actor start 
        // and usubscribe from them when actor stops
        let cluster = Cluster.Get (mailbox.Context.System)
        cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.IMemberEvent> |])
        mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
        printfn "Created an actor on node [%A] with roles [%s]" cluster.SelfAddress (String.Join(",", cluster.SelfRoles))
        let rec seed () = 
            actor {
                let! (msg: obj) = mailbox.Receive ()
                match msg with
                | :? ClusterEvent.IMemberEvent -> printfn "Cluster event %A" msg
                | _ -> printfn "Received: %A" msg
                return! seed () }
        seed ()

At this moment we will be able to track all of the control events going through the cluster.

Few more important properties, I think are worth explaining, are:

  • cluster.SelfAddress returns an Akka address of the current node (with host:port included). That allows to quickly recognize current node among the others.
  • cluster.ReadView contains cluster overview including things such as health check monitors or collection of all cluster member nodes. Members are useful in cases when you need to communicate between actors placed on different nodes, as they contain necessary localization addresses.
  • cluster.SelfRoles returns collection of so called roles attached to the current node. You may specify them using akka.cluster.roles configuration key. One of their purposes is to group the nodes inside a cluster and limit responsibilities of your system to a particular subset of nodes using common abstract discriminators. Example: if you want, you may run akka system on a web server or even mobile device as part of the cluster, but ensure that no resource-intensive work will ever run there.

Sending messages between nodes

The last thing, I want to show in this post, is how to pass message between two actors placed on the different nodes.

With first node already set up, configuration of a second node is very similar, except that in this case akka.remote.helios.tcp.port may be set to 0 (dynamic resolution) and akka.cluster.roles shouldn't contain a seed role.

Here's an actor code:

let aref =  
    spawn system "greeter"
    <| fun mailbox ->
        let cluster = Cluster.Get (mailbox.Context.System)
        cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.MemberUp> |])
        mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
        let rec loop () = 
            actor {
                let! (msg: obj) = mailbox.Receive ()
                match msg with
                // wait for member up message from seed
                | :? ClusterEvent.MemberUp as up when up.Member.HasRole "seed" -> 
                    let sref = select (up.Member.Address.ToString() + "/user/listener") mailbox
                    sref <! "Hello"
                | _ -> printfn "Received: %A" msg
                return! loop () }
        loop ()

Because in cluster environment node addresses are usually dynamic, we can't use static configuration to resolve them. For this sample we're waiting for a node to join the cluster, and receive MemberUp event. Then if the member passed with the event is marked with seed role, we'll send a message to it's listener actor.

After runing both nodes we should be able to see Hello message on the seed node received each time a new process with the joining node code is started - with dynamic port allocation config value (0), you should be able to run it as a separate process multiple times receiving message on the seed each time.

Akka.NET application logging in your browser

In this post I want to share with fairly simple trick - binding Akka.NET logging bus directly to browser console output. While this is mostly form of exercise, you may find it useful when you're developing system working with Akka and web sockets.

Lets start with creating our custom logger actor. Here's a starter code.

public class BrowserLogger : ReceiveActor  
{
    public BrowserLogger()
    {
        Receive<Debug>(e => Log(LogLevel.Debug, e.ToString()));
        Receive<Info>(e => Log(LogLevel.Info, e.ToString()));
        Receive<Warning>(e => Log(LogLevel.Warning, e.ToString()));
        Receive<Error>(e => Log(LogLevel.Error, e.ToString()));
        Receive<InitializeLogger>(_ => Sender.Tell(new LoggerInitialized()));
    }
}

BTW: Congratulations! If you had done that, you've actually created a fully functional logger library :)

Unfortunately, devil is in the details. Now we need to integrate our logging actor with SignalR hubs. For this, lets create a dedicated Hub class, which we could use to forward log messages back to the client.

public class LoggerHub : Hub  
{
    public override Task OnConnected()
    {
        Bootstrap.Register(this);
        return base.OnConnected();
    }

    public void Log(LogLevel level, string message)
    {
        Clients.All.log((int)level, message);
    }

    public override Task OnDisconnected(bool stopCalled)
    {
        Bootstrap.Unregister(this);
        return base.OnDisconnected(stopCalled);
    }
}

Don't worry, if you don't know, what Bootstrap class is all about. We're gonna cover it later.

Now, once our SingalR hub is ready, it's time to enrich our logger actor to use it for forwarding messages back to client. The idea is to simply have a list of hubs to be informed, and provide additional register/unregister message handlers from there.

// message used to register a hub
public class RegisterHub  
{
    public readonly LoggerHub Hub;
    public RegisterHub(LoggerHub hub)
    {
        Hub = hub;
    }
}

// message used to unregister a hub
public sealed class UnregisterHub  
{
    public readonly LoggerHub Hub;
    public UnregisterHub(LoggerHub hub)
    {
        Hub = hub;
    }
}

public class BrowserLogger : ReceiveActor  
{
    private readonly ISet<LoggerHub> _hubs = new HashSet<LoggerHub>();

    public BrowserLogger()
    {
        Receive<RegisterHub>(register => _hubs.Add(register.Hub));
        Receive<UnregisterHub>(unregister => _hubs.Remove(unregister.Hub));

        ...
    }

    private void Log(LogLevel level, string message)
    {
        foreach (var hub in _hubs) hub.Log(level, message);
    }
}

Note: logging may be a risky operation, since invoking hub method may result with throwing an exception and reseting an actor. IMO in that case, the best solution is to catch an exception, wrap it with our own custom Exception to be ignored by current logger (using in example following statement: Receive<Error>(e => !(e.Cause is BrowserLoggingException), e => Log(LogLevel.Error, e.ToString()));) and propagate it using standard akka logging mechanism to other loggers in our actor system (you don't want to loose these exceptions, right?).

In one of the previous code snippets you might notice, that we're using Bootstrap.Register and Bootstrap.Unregister methods, which hasn't been defined yet. For this we create a static class used as access point to actor system and it's functionalities.

public static class Bootstrap  
{
    private static ActorSystem _system;
    private static ActorSelection _logger;

    public static ActorSystem System { get { return _system; } }

    // call this method in your Startup.cs or Global.asax when application starts
    public static void Init()
    {
        _system = ActorSystem.Create("logging-system", 
            ConfigurationFactory.ParseString(@"akka.loggers = [""MyNamespace.BrowserLogger, MyAssembly""]"));
        _logger = _system.ActorSelection("/system/log*");
    }

    public static void Register(LoggerHub hub)
    {
        _logger.Tell(new RegisterHub(hub));
    }

    public static void Unregister(LoggerHub hub)
    {
        _logger.Tell(new UnregisterHub(hub));
    }
}

I think, that one thing worth notice is the way, we refer to logger actor in code above. Loggers are created internally by actor system on it's start and live on their own rights. Therefore it's hard to get any reference to them. /system/log* path may be confusing at the beginning, here's why:

  • Logger actor is part of the actor system internals (Akka.NET not only exposes actors to external users, but also uses them to manage itself). For this reason unlike usual /user prefix, here we have /system actor guardian used for internal actors.
  • Loggers names are generated and they usually take a form of log-<inc>-<logger_type_name>, where inc is auto-incremented number. Therefore it's hard to predict, what exact name of our logger will be. To omit this issue we use wildchart operator to access all actors with name prefixed with log (basically all loggers). It's not a problem, since only our BrowserLogger is able to respond on Register/Unregister messages.

The final step is to register our hub on the browser side. Assuming that you already have SignalR configured in your application, this is pretty straightforward code:

 var logLevel = {
     DEBUG: 0,
     INFO: 1,
     WARNING: 2,
     ERROR: 3
 };
 var log = $.connection.loggerHub;
 log.client.log = function (level, message) {
     switch (level) {
         case logLevel.DEBUG: console.debug(message); break;
         case logLevel.INFO: console.info(message); break;
         case logLevel.WARNING: console.warn(message); break;
         case logLevel.ERROR: console.error(message); break;
     }
 };

 $.connection.hub.start().done(function() {
     console.log('connection initialized');
 });

Final notes

Following example can be used only in local actor system scope. In cluster wide scenarios you should ensure that hub registration doesn't pass local system scope, since SignalR hubs are not simple POCOs, and cannot be freely serialized/deserialized.

Additionally remember that logging bus, just like any other Akka's event stream, works only in local scope. To publish message in cluster-wide environments you'll need to provide your own event propagation between node boundaries (it's not so hard) or use Akka.Cluster.Tools plugin (which is still in progress at the moment of writing that post).

Create your own Akka.NET persistence plugin

Subject of this post is one of the Akka.NET plugins, Akka.Persistence. It’s major task is to give your actors an ability to store their internal state and recover it from the persistent storage after actor is created or underlying actor system crashes. While the rules driving the whole persistence mechanism deserves a separate article, for purposes of this post I’ll cover only topic of event journals and snapshot stores.

Introduction

If you’re already familiar with the idea of Eventsourcing, you probably know everything what’s needed by now. If not, here is a simplified model – statefull actors don’t store their state directly, but in form of the events. Event is a single immutable fact informing about a change made in time over actor’s state. Later, when any form of the recovery is needed, actor’s state is recreated by sending to it a series of persisted events, from which it can rebuild itself. Storage responsible for persisting those events is called Event Journal . In Akka.Persistence by default it works only in memory which does not make it a truly persistent one.

Since a possible stream of events to be replayed on actor may require a long time to full recovery, it’s internal state may be serialized at any point of time and stored separately. In this case by default actor will first try to get the latest intermediate state (a snapshot) and then recover only from those events, which have been stored from that point in time onward. Store used to persist those snapshots is called a Snapshot Store and in Akka.NET by default it uses a local file system as a persistence layer.

Akka.Persistence plugin is designed in modular fashion and allows you to create your own plugins and integrate them with any underlying persistence provider.

Journal

Just like most of the other entities in Akka, journals and snapshot stores are actors. To create your own journal, you could derive from either AsyncWriteJournal or SyncWriteJournal. Difference between these two is that first one returns a Task continuation from all of it’s operations (reads, writes and deletions – as said earlier journals don’t perform updates), while the second only from reads, expecting writes and deletions to be completed in synchronous manner. For this example I’m going to use sync journal implementation.

Lets start from the following state of our class:

public class MyJournal : SyncWriteJournal  
{
    private static readonly object Ack = new object();
    private readonly ConcurrentDictionary<string, ISet<IPersistentRepresentation>> _eventStreams =
              new ConcurrentDictionary<string, ISet<IPersistentRepresentation>>();

    Task ReplayMessagesAsync(string pid, long from, long to, long max, Action<IPersistentRepresentation> replay) ...
    Task<long> ReadHighestSequenceNrAsync(string pid, long from) ...
    void WriteMessages(IEnumerable<IPersistentRepresentation> messages) ...
    void DeleteMessagesTo(string pid, long to, bool isPermanent) ...
}

For this example all persistent actors events are stored in memory. Journal is able to recognize, which event stream refers to specific actor thanks to PersistenceId – an unique identifier for each PersistentActor which may identify it’s state across many actor incarnations. Each event inside the stream has it’s own Sequence number – an unique number in scope of event stream. It’s always increasing monotonically for every separate persistent actor.

This is how WriteMessages method may look like:

protected override void WriteMessages(IEnumerable<IPersistentRepresentation> messages)  
{
    foreach (var message in messages)
    {
        var list = _eventStreams.GetOrAdd(message.PersistenceId,
            new SortedSet<IPersistentRepresentation>(PersistentComparer.Instance));
        list.Add(message);
    }
}

This is a straightforward operation of finding an event stream for each message persistence id (or creating a new one if it was not found) and appending a message to it. PersistentComparer is simply a IComparer used to order messages by their sequence number. IPersistentRepresentation wraps user-defined event and contain all data necessary for the persistence plugin to perform it’s magic.

To be able to replay stored messages, now we must define a ReplayMessagesAsync logic:

public override Task ReplayMessagesAsync(string pid, long from, long to,  
      long max, Action<IPersistentRepresentation> replay)
{
    var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
    var promise = new TaskCompletionSource<object>();
    dispatcher.Schedule(() =>
    {
        try
        {
            Replay(pid, from, to, max, replay);
            promise.SetResult(Ack);
        }
        catch (Exception e)
        {
            promise.SetException(e);
        }
    });

    return promise.Task;
}

Because this method is already prepared to work in asynchronous manner – which is the most probable solution for third party providers integration nowadays – here we delegate this work through our system’s dispatcher. Because API of the underlying dispatcher may work in various ways (Akka is not bound to TPL), it’s Schedule method doesn’t expose any task to be returned. Here I’ve introduced a little trick using TaskCompletionSource to create similar behavior.

The Replay method itself is pretty straightforward:

private void Replay(string pid, long from, long to,  
      long max, Action<IPersistentRepresentation> replay)
{
    ISet<IPersistentRepresentation> eventStream;
    if (_eventStreams.TryGetValue(pid, out eventStream))
    {
        var s = eventStream as IEnumerable<IPersistentRepresentation>;

        if (from > 0) s = s.Where(p => p.SequenceNr >= from);
        if (to < int.MaxValue) s = s.Where(p => p.SequenceNr <= to);
        if (max < eventStream.Count) s = s.Take((int)max);

        foreach (var persistent in s)
        {
            replay(persistent);
        }
    }
}

What’s worth to notice here is that, from/to sequence number range provided is expected to be inclusive from both sides.

Next in line is ReadHighestSequenceNrAsync – it’s used to receive the highest sequence number for current event stream associated with a particular persistent actor. While in your solution you may find appropriate to use some async API or dispatcher task delegation described above, I’ve used a simple synchronous solution here:

public override Task<long> ReadHighestSequenceNrAsync(string pid, long from)  
{
    ISet<IPersistentRepresentation> eventStream;
    if (_eventStreams.TryGetValue(pid, out eventStream))
    {
        var last = eventStream.LastOrDefault();
        var seqNr = last == null ? 0L : last.SequenceNr;
        return Task.FromResult(seqNr);
    }

    return Task.FromResult(0L);
}

The last method remaining is message deletion. While this is not a usual way of working i.e. in event sourcing model (it introduces a data loss), it’s still a viable option.

Akka.Persistance recognizes two types of deletion:

  • Logical, when messages are marked as deleted (IPersistentRepresentation.IsDeleted flag) but still resides in event journal.
  • Physical, when messages are physically removed from the journal and cannot be restored.
protected override void DeleteMessagesTo(string pid, long to, bool isPermanent)  
{
    ISet<IPersistentRepresentation> eventStream;
    if (_eventStreams.TryGetValue(pid, out eventStream))
    {
        foreach (var message in eventStream.ToArray())
        {
            if (message.SequenceNr <= to)
            {
                eventStream.Remove(message);
                if(!isPermanent)
                {
                    // copy message with IsDeleted flag set
                    var copy = message.Update(message.SequenceNr,
                    message.PersistenceId, isDeleted: true, sender: message.Sender);
                    eventStream.Add(copy);
                }
            }
            // messages are already stored by their sequence number order
            else break;
        }
    }
}

Test specification fulfillment

When your custom persistence plugin is ready, it’s good to check if it’s behavior is a valid one from the Akka.Persistence perspective. In order to do this, you may get an existing test suite in form of a Akka.Persistence.TestKit available as a NuGet package – it’s a set of tests used to check if your custom persistence plugin fulfills expected journal (and snaphot store) behavior. It used Akka.NET TestKit along with xUnit in order to run.

The only thing needed is your own spec class inheriting from JournalSpec with configuration set to use your plugin as a default one. The same configuration set may be used to inject your persistence provider to real working actor system.

public class MyJournalSpec : JournalSpec  
{
    private static readonly Config SpecConfig =
    ConfigurationFactory.ParseString(@"
       akka.persistence {
           publish-plugin-commands = on
           journal {
               plugin = ""akka.persistence.journal.my""
               my {
                   class = ""MyModule.MyJournal, MyModule""
                   plugin-dispatcher = ""akka.actor.default-dispatcher""
               }
           }
       }");

    public MyJournalSpec() : base(SpecConfig) 
    {
        Initialize();
    }
}

Snapshot store

Custom snapshot store implementation seems to be an easier task. Here I’m going to use again the store based on in-memory snapshots, which still is not a valid persistence layer, but works good for training purposes.

Actor state snapshots are identified by two entities: SnapshotMetadata and Snapshot itself. First covers the data necessary to correctly place snapshot instance in the lifetime of a specific persistent actor, while the second one is a wrapper over actor’s state itself.

For purposes of this in-memory plugin, I’ll wrap both of them with single object defined like this:

public class SnapshotBucket  
{
    public readonly SnapshotMetadata Metadata;
    public readonly Snapshot Snapshot;

    public SnapshotBucket(SnapshotMetadata metadata, Snapshot snapshot)
    {
        Metadata = metadata;
        Snapshot = snapshot;
    }
}

This way we may describe the snapshot store in the following manner:

 public class MySnapshotStore : SnapshotStore
 {
   private static readonly object Ack = new object();
   private readonly LinkedList<SnapshotMetadata> _saving = new LinkedList<SnapshotMetadata>();
   private readonly List<SnapshotBucket> _buckets = new List<SnapshotBucket>();

   Task<SelectedSnapshot> LoadAsync(string pid, SnapshotSelectionCriteria criteria) ...
   Task SaveAsync(SnapshotMetadata metadata, object snapshot) ...
   void Saved(SnapshotMetadata metadata) ...
   void Delete(SnapshotMetadata metadata) ...
   void Delete(string pid, SnapshotSelectionCriteria criteria) ...
}

To begin with, lets abstract dispatcher behavior described before. Remember, this is only a trick – nothing stands on your way to simply use Task.Run or delegating work to child actors. In real life scenario you’d probably use built in async behavior of underlying provider.

private Task<T> Dispatch<T>(Func<T> fn)  
{
    var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
    var promise = new TaskCompletionSource<T>();
    dispatcher.Schedule(() =>
    {
        try
        {
            var result = fn();
            promise.SetResult(result);
        }
        catch (Exception e)
        {
            promise.SetException(e);
        }
    });
    return promise.Task;
}

Next, lets take a saving operation. In case of snapshot stores it’s divided into two separate methods:

  • SaveAsync asynchronous operation of saving the snapshot, returning the Task continuation.
  • Saved called only after message has been successfully saved.

They may be combined to i.e. introduce a log for actually saving, but not yet saved snapshots, as I’ve presented below.

protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)  
{
    return Dispatch(() =>
    {
        var bucket = new SnapshotBucket(metadata, new Snapshot(snapshot));
        _saving.AddLast(metadata);
        _buckets.Add(bucket);

        return Ack;
    });
}

protected override void Saved(SnapshotMetadata metadata)  
{
    _saving.Remove(metadata);
}

When it comes to loading a snapshots, snapshot store is expected to return only one snapshot being the latest one from the provided range. Just like any other range in persistence plugin, this one is expected to have inclusive boundaries.

protected override Task<SelectedSnapshot> LoadAsync(string pid, SnapshotSelectionCriteria criteria)  
{
    return Dispatch(() =>
    {
        var limited = _buckets.Where(bucket => bucket.Metadata.PersistenceId == pid
        && bucket.Metadata.SequenceNr <= criteria.MaxSequenceNr
        && bucket.Metadata.Timestamp <= criteria.MaxTimeStamp);

        var latest = limited.OrderBy(bucket => bucket.Metadata.Timestamp).Last();

        return new SelectedSnapshot(latest.Metadata, latest.Snapshot.Data);
    });
}

The last behavior is message deletion, represented in this case by two methods – one used for single snapshot and one able to operate on specific range.

protected override void Delete(SnapshotMetadata metadata)  
{
    var found = _buckets.SingleOrDefault(bucket => bucket.Metadata.Equals(metadata));
    if (found != null)
    {
        _buckets.Remove(found);
    }
}

protected override void Delete(string pid, SnapshotSelectionCriteria criteria)  
{
    _buckets.RemoveAll(bucket =>
       bucket.Metadata.PersistenceId == pid
        && bucket.Metadata.SequenceNr <= criteria.MaxSequenceNr
        && bucket.Metadata.Timestamp <= criteria.MaxTimeStamp);
}

Test specification fulfillment

Just like in the case of journal spec, you may verify your actor behavior simply by creating custom spec. While you may see a minimal configuration string below, nothing stands on your way to extend it with your custom settings.

public class MySnapshotStoreSpec : SnapshotStoreSpec  
{
    private static readonly Config SpecConfig =
      ConfigurationFactory.ParseString(@"
        akka.persistence.snapshot-store {
            plugin = ""akka.persistence.snapshot-store.my""
            my {
                class = ""TestPersistencePlugin.MySnapshotStore, TestPersistencePlugin""
                plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher""
            }
        }
    ");

    public MySnapshotStoreSpec() : base(SpecConfig)
    {
        Initialize();
    }
}

Another difference from event journal spec is that fore most test cases snapshot store spec requires an existing collection of snapshots metadata to work. They are initialized in constructor in the code above.

Final notes

  1. Examples, I’ve presented, work on predefined constructs – they derive directly from SnapshotStore and SyncWriteJournal. However it’s not necessary, as you may promote any actor type to work as snapshot store or event journal as long as they expose necessary behavior and are able to respond on required set of persistence messages.
  2. While persistence stores specs may be used to verify your implementations, remember that they don’t ensure things such as data races etc.
  3. Remember that Akka.Persistence is still in Prerelease version. Therefore some breaking changes may occur in incoming versions.

Design patterns: Circuit Breaker

Today I want to introduce a CircuitBreaker – one of the reactive design patterns, especially usefull in areas such as web services interop. To get you better understand on it’s concepts, lets consider following scenario:

You’ve built a successful web service, making use of another external services in order to handle user requests. During the big churn of end users requesting your site, one of them became overloaded, starting to respond with increasing delays. Ultimately while trying to satisfy all incomming requests, it has exhausted all resources and gone down, entailing your service with it.

I think it’s a well-known example and a good one to show a nature of the problem. Because of RPC nature, it’s characteristics differ from in-proc calls:

  • They operate on separate resources pool and might be used by more than one remote caller at the time.
  • Their internal state, available resources and performance may be hard to predict. In most cases, they neither can be controlled nor monitored by the caller.
  • Their life cycle is not bound to your local host. They may be reset/shut down while your service is still running and serving requests.

What reactive applications have to do with that? One of the key principles governing the Reactive Manifesto stands for quick responsiveness of the system, no matter if this response is possitive or not. Nowadays users are impatient. When they take action, they want a response, and they want it now.

This is when Circuit Breakers kicks in. They main role is to act as a decorator around your code to ensure, that you can quickly respond on any reliability problems:

  • First, Circuit Breaker defines a safe time delay given to the action to respond. Service which won’t fit specified timeout is considered to be unresponsive or working incorrectly. It’s better to inform your user about possible problems after a reasonable delay than to show him/her a smoke screen and spinner for the next 2 minutes.
  • Secondly it’s able to cut out the service in any sign of problems and give it a time to self repair. If you already know, that external service is not working properly, what is the point of calling it any way?

Most of the circuit breaker are realized as a state machines. Picture below presents a simple one:

where:

  • Closed – this is initial state. While closed, Circuit Breaker will simply pass any request to underlying function. However it also checks if response will met a specified timeout criteria or ended with failure i.e. due to external service overload or crash. Any of these will trigger Circuit Breaker to come into open state.
  • Open – while in this state, any request send through Circuit Breaker will follow fail fast semantic. Instead of forwarding request to the underlying function, CB will immediately throw an exception informing that it’s closed. Open state is not permanent – CB should automatically switch to half-open state after a specified time delay.
  • HalfOpen – this is a kind of the probing state, signing that CB is checking if undrelying function is responsive again. While in it, CB will pass first request call as if it was in Closed state, to eventually get the response. All subsequent calls will fail fast just like in Open state. If request failed again or response didn’t came in specified timeout, CB switches back to Open state. If response was successful, CB becomes Closed.

I’ve created a simplistic implementation of the CircruitBreaker pattern in C# (source code). It allows to perform multiple asynchronous calls through one CB instance in non-blocking, thread-safe manner. It has also attached a complete test suite describing it’s behavior. Feel free to use it and to modify as you wish.

Example usage:

    // this service could be a singleton, and could be called concurrently
    public class CurrencyExchangeProxy : ICurrencyExchangeProxy
    {
      private readonly CircuitBreaker<ExchangeCurrencies, ExchangeRate> exchangeRates;

      public CurrencyExchangeProxy(Configuration config)
      {
          exchangeRates = new CircuitBreaker<ExchangeCurrencies, ExchangeRate>(
              async req => HttpGetAsync<ExchangeRate>(config.ExchangeServiceUrl, req),    // async function, CB should take care of
              config.GetServiceTimeout(),         // time given to function to finish
              config.OpenTimeout());              // time given to function dependent component to restore in case of failure
      }

      public async Task<ExchangeRate> GetExchangeRate(Currency from, Currency to)
      {
          return await exchangeRates.Call(new ExchangeCurrencies(from, to));
      }
    }

    public class InvoiceController : Controller
    {
      private readonly ICurrencyExchangeProxy currencyExchange;

     [HandleError(ExceptionType=typeof(CircuitBreakerOpenException), View="UnresponsiveExternalService")]
      public async Task<ActionView> DisplayInvoice(int id)
      {
          ...
          var exchangeRate = await currencyExchange.GetExchangeRate(invoice.Balance.Currency, localization.Currency);
          ...
      }

[C#] A different look at service design

Today I want to present a different point of view on C# applications design. If you are programming in that language most of the time, it’s probably not what you’re used to see. However I found it interesting, because it allows to achieve a few important goals:

  • Decompose various, complex interfaces into few simpler, well build components.
  • Seriously reduce risk of common OO anti-patterns in your application i.e. God Object.
  • Make your code shorter and methods simpler.
  • Replace a lot of external assembly dependencies in your code with some simple design patterns.

If you’re ready, prepare to step down to the Wonderland. Let the Catepillar be our guide.

The basics

Step 1 – define an interface

Lets start with twisting our mindsets a little bit. Take the following example:

public interface IAuthService  
{
   Task<User> SignUp(string email, string password);
   Task<User> SignIn(string email, string password);
   Task ResetPassword(string email);
}

This is our new trendy-looking, non-blocking service interface. What’s also important here, it follows a Single Responsibility Principle.

Step 2 – separate responsibilities

Since SRP has no well-defined bounds and it is a matter of a personal taste, we could stretch it a little further:

public interface ISignInService  
{
    Task<User> Handle(string email, string password);
}

public interface ISingUpService  
{
    Task<User> Handle(string email, string password);
}

public interface IResetPasswordService  
{
    Task Handle(string email);
}

Now instead of one service responsible for three possible operations, we have a three atomic services, each one having only one operation to take care off. If you’re thinking, it’s overly complicated, you’re probably right. But right now we’re still in Objective Oriented world and the Wonderland awaits ahead.

Step 3 – more simplification

Now, when we have a three single-member services, take a look into the mirror. If you do, you’ll notice an important characteristic – interface with single method is almost indistinguishable from a delegate. It looks better now, see?:

public delegate Task<User> SignUpService(string email, string password);  
public delegate Task<User> SignInService(string email, string password);  
public delegate Task ResetPasswordService(string email);  

At this point you may want to turn back. This design is problematic. For example, it isn’t as easily composable with Dependency Injection frameworks as interfaces. Don’t worry. We will cover this up later. Now step in.

Step 4 – final generalization

Our service signatures are now almost ready. But to reach behind the mirror, we must apply two more rules:

  • Each service takes no more than one parameter.
  • Each service should always return a value.

Why does it matter? If each service will satisfy one input / one output rule, we may compose them together with ease.

public delegate Task<User> SignUpService(SignUpRequest request);  
public delegate Task<User> SignInService(SignInRequest request);  
public delegate Task<object> ResetPasswordService(string email);  

Now, all of our delegates could be abstracted away to a single generic definition:

public delegate Task<TRep> Service<in TReq, TRep>(TReq request);  

where:

  • TReq is type of request object passed to service.
  • TRep is type of service reply.

What we end up with is a highly abstracted – in Wonderland things looks different – and universal service signature.

Dependency Injection vs partial application

Walking down the object oriented road, often you could passed a view similar to this one:

public class TransportationService  
{
    private readonly ILocalizationService _localizationService;
    private readonly IOrdersRepository _ordersRepository;
    private readonly ICustomersRepository _customersRepository;
    private readonly IConfigurationProvider _configuration;

    public TransportationService (
        ILocalizationService localizationService,
        IOrdersRepository ordersRepository,
        ICustomersRepository customersRepository,
        IConfigurationProvider configuration)
    {
        _localizationService = localizationService;
        _ordersRepository = ordersRepository;
        _customersRepository = customersRepository;
        _configuration = configuration;
    }

    public async Task<TransportationDetails> PrepareTransportation(int orderId)
    {
        ...
    }
}

But now when you’re on the other side of the mirror, it looks more like:

public static TransportationServices  
{
    public static Service<int, TransportationDetails> PrepareTransportationService (
        ILocalizationService localizationService,
        IOrdersRepository ordersRepository,
        ICustomersRepository customersRepository,
        IConfigurationProvider configuration)
        {
            return async orderId => { ... };
        }
}

Here we simply return an asynchronous lambda. And because it’s nested inside, it can use all of the provided parameters directly in it’s scope.

Of course, there is still a matter of lifetime scopes. In case of singleton scopes, we simply may pass shared instance directly. But when more universal lifetimes are required, we can slide down the road along with the delegates to reach even higher abstractions – it’s twisted, but we’re in Wonderland, remember?

public static Service<LocalizationRequest, LocalizationReply> LocalizationService() { ... }

public static Service<MyRequest, MyReply> MyService(Func<Service<LocalizationRequest, LocalizationReply>> localizationServiceProvider) { ... }

// transient scope
var myService = MyService(() => LocalizationService());  
// or even shorter
var myService2 = MyService(LocalizationService);

// singleton scope
var localizator = LocalizationService();  
var myService = MyService(() => localizator);  
var myService2 = MyService(() => localizator);  

There is a one simple but powerful idea visible on the example above – an input parameter type has been changed from interface to another delegate. Now it’s delegates all the way down. This way we may start from the most atomic services and combine them into more complex ones without limitations.

Instead of complex, reflection-based DI frameworks we have one simple universal abstraction. You may find this more verbose, but it’s actually simpler, faster and more error-proof solution than any of IoC libraries. You don’t need to learn next DI framework or use StackOverflow guide to travel through the Wonderland.

There are other problems solved automatically:

  • No need to worry about cyclic references.
  • There is no risk, that our DI framework won’t know how to bind parameters to construct object. You’ll never get a runtime errors when walking this way.
  • Your application won’t inject bad interface implementation by mistake.

Repositories and testing

Another popular OO desing pattern is a repository and it’s most corrupted (and misunderstood) version – generic repository.

public interface IGenericRepository<T>  
{
    Task<IEnumerable<T>> GetAll();
    Task<T> GetById(int id);
    Task<T> Create(T entity);
    Task<T> Update(T entity);
    Task<bool> Delete(T entity);
}
public interface IUserRepository : IGenericRepository<User> {  
    Task<User> GetByEmail(string email);
}

Lets be honest – you’ll probably never use all of those methods at once. If you think it’s good abstraction, it’s not. It isn’t a good SRP example either. After we had stepped into the mirror, we’ve surely taken something from our world with us. So lets take a one of the things we’ve hidden in the pocket – changing user password.

public class UserService  
{
    private readonly IUserRepository _userRepository;
    public UserService(IUserRepository userRepository)
    {
        _userRepository = userRepository;
    }

    public async Task<PasswordChanged> ChangePassword(ChangePassword request)
    {
        var user = await _userRepository.FindByEmail(request.Email);
        if(IsValid(user, request.OldPassword))
        {
            user.PasswordHash = ComputePasswordHash(request.NewPassword);
            await userUpdater(user);
            return new PasswordChanged();
        }
        else throw new UnauthorizedException();
    }

    private byte[] ComputePasswordHash(string password) { ... }
    private bool IsValid(User user, string password) { ... }
}

Basically changing password would require only two out of six methods provided by IUserRepository – matching user and saving his/her state after changing. Now smoke the Catepillar’s hookah and take a look again:

public static Service<ChangePassword, PasswordChanged> ChangePasswordService(Func<Service<string, User>> userFinderProvider, Func<Service<User, User>> userUpdaterProvider)  
{
    var userFinder = userFinderProvider();
    var userUpdater = userUpdaterProvider();

    return async request => {
        var user = await userFinder(request.Email);
        if (IsValid(user, request.OldPassword))
        {
            user.PasswordHash = ComputePasswordHash(request.NewPassword);
            await userUpdater(user);
            return new PasswordChanged();
        }
        else throw new UnauthorizedException();
    };
}

private static byte[] ComputePasswordHash(string password) { ... }  
private static bool IsValid(User user, string password) { ... }  

We totally dealt with repository interface, introducing two service providers presented before.

Hint:

You can turn ComputePasswordHash (or even IsValid) into service on it’s own. All hail the modularity!

From testing perspective …

In traditional OO world, to test this feature, you’d probably include some mocking library, mock repository’s interface and check if correct method was called with correct arguments. You may also mock underlying database directly with something like Effort.

In Wonderland mocklibs are quite rare creatures. They are even harder to catch. We must find another way. Can we simply test it without any mocking library? Lets see:

// Arrange:
// if you abstracted ComputePasswordHash earlier, it'll be easier at this point
var testUser = new User(email, passwordHash);  
var db = new Dictionary<string, User> { { testUser.Email, testUser } };  
Service<string, User> userFinder = async email => {  
    User user;
    return db.TryGetValue(email, out user) ? user : null;
};
Service<User, User> userUpdater = async user => user;  
    var changePasswordService = ChangePasswordService(() => userFinder, () => userUpdater);

// Act:
await changePasswordService(new ChangePassword(email, oldPassword, newPassword));

// Assert:
Assert.NotEqual(passwordHash, testUser.PasswordHash);  

Actually, that’s all. Including mock initialization and result verification. No mocklib was harmed in the making of this test.

Aspect Oriented Programming vs lambda combinators

If you’re still interested in miracles of the Wonderland, we can go further. Next question: how can we bend reflection-based Aspect Oriented Programming to our will? There are possibly many ways, but I’ll focus only on the one.

Just like we used services to replace all of our interfaces, we replace aspects / attributes with filters:

public delegate Task<TOut> Filter<in TIn, TOut, out TReq, TRep>(TIn request, Service<TReq, TRep> service);  

where:

  • TIn is filter’s input type.
  • TOut is filter’s output type.

Filter is actually a wrapper around specific service, which may apply additional code before or after it, modify it’s input or output or even decide not to call it at all. In case when filter don’t change service’s in / out parameters, it’s signature is basically equal to public delegate Task<TRep> Filter<TReq, TRep>(TReq request, Service<TReq, TRep> service);.

Now, having those two types defines, lets create some utility methods, which can make our travel easier. Lets start with filter composer.

public static Filter<TIn, TOut, TReq2, TRep2> Then<TIn, TOut, TReq1, TRep1, TReq2, TRep2>(  
    this Filter<TIn, TOut, TReq1, TRep1> self,
    Filter<TReq1, TRep1, TReq2, TRep2> next)
{
    return (request, service) => self(request, req => next(req, service));
}

If we omit the method signature, this method is a pretty straightforward one liner which combines two filters together and gives another filter in return. As you can guess, with this trick we can join a whole chains of filters into one piece.

There is a second part of the puzzle missing. We still don’t have a well defined methods to work with both filters and services. This is the one:

public static Service<TReqIn, TRepOut> ApplyTo<TReqIn, TRepOut, TReqOut, TRepIn>(  
    this Filter<TReqIn, TRepOut, TReqOut, TRepIn> filter,
    Service<TReqOut, TRepIn> service)
{
    return request => filter(request, service);
}

Just like we were combining filters, now we combine a filter with a service to get service in return.

With this two methods (and basically two lines of code!) we can easily interop between filters and services. How will this weapon examine against aspects?

// logging example
public static Filter<TReq, TRep, TReq, TRep> LogFilter<TReq, TRep>(Action<string> logger)  
{
    return async (request, service) =>
    {
        logger("Log before: " + request);
        var reply = await service(request);
        logger("Log after: " + reply);
        return reply;
    };
}

// authorization example
public static Filter<UnauthorizedRequest, TReply, AuthorizedRequest, TReply> AuthorizeFilter<TReply>(  
    Func<Service<UnauthorizedRequest, Identity>> authorizationServiceProvider)
{
    return async (request, service) =>
    {
        var authService = authorizationServiceProvider();
        var authorizedIdentity = await authService(request);
        if (authorizedIdentity != null)
        {
            return await service(new AuthorizedRequest(authorizedIdentity, request));
        }
        else throw new UnauthorizedAccessException();
    };
}

// combine various filters together
var filters = AuthorizeFilter(AuthorizationService)  
    .Then(LogFilter(Console.WriteLine))
    .Then(MonitorFilter(MonitorService)
    .Then(UnitOfWorkFilter());

// apply all filters at once to the service
var enchancedService = filters.ApplyTo(myService);  

Again, thankfully to the power of composability we can join multiple filters into one and apply them to any number of services we wish to. Basically two simple types and two lines of code compiled upfront!

We ripped off (or seriously reduced) any runtime uncertainties, AOP, DI and mock libraries out of our code, as well as whole bunch of interfaces and abstraction layers. Did you enjoy the travel? Welcome in the Wonderland. Land of functional programming.

PS: presented concept is heavily inspired by Finagle library, originally written in Scala on a Twitter’s purposes.