Build your own actors

What the actor model is all about?

While actors are very broad term, in Software Engineering we comonly refer to them when speaking about languages (like Erlang or Pony) or frameworks (like Akka or Orleans).

Here however we'll try to distil this term into description of a more common pattern, that is used on daily basis in many frameworks.

It's all about state

So, what problem actors aim to solve? In their core, all boils down to a way to manage and share the application state in highly concurrent systems.

In the past the common way of state management was to use locks/mutexes/semaphores in order to get an exclusive access to a piece of state. The issue with that approach is that it doesn't scale well with the number of cores and can seriously hurt performance.

Actors offer different approach: we split our state into small pieces, isolated and "owned" by a dedicated processing units (actors). Nobody except actor can modify or access its state - therefore all updates must be issued via actor itself. If we need this state (or part of it) to be used elsewhere, we always get either a copy or a frozen (immutable) version of it. It's reasonable nowadays, as memory is not that much of an issue, as it was, when mutexes first came to live. While necessary, this solution alone still doesn't guarantee safe state access.

One could think about sending actor update requests from two parallel threads. In order to make it safe, we must add another rule: actor always processes requests in sequential manner, one at the time. That means, that even when we have machine with multiple CPUs, a single actor will always use at most one at the time. This means, that in order to make efficient use of all cores, we need to guarantee a fair work split between operating actors.

It also brings the next question - if actors are processing at most one request at any given time, what happens with other requests arriving in the meantime from parallel threads?

Queues

In order to provide a settlement between sequential request processing and parallel request sending, we can introduce a middleware layer in form of queues: basically every operation to be executed by an actor is going to be stored in form or a data structure (a message) inside a queue. Since this queue is going to be used by multiple threads in parallel, an additional requirement is that its implementation must guarantee a thread safe enqueue/dequeue operations.

Now, there are two philosophies of combining actors with queues:

  1. The approach originally used in Erlang (and adopted in Akka) is to give each actor its own private MPSC (Multi-Producer/Single-Consumer) queue.
  2. Some frameworks (like Microsoft Orleans) are using a predefined set of shared queues, that hold requests targeting many actors. Keep in mind, that all messages for a given actor still will be held in a single queue - this way we can easily ensure that actor won't process more than one message at the time.

If you're interested in building your own actor-based library, choosing a particular option is usually a result of many considerations. Example: actors in Akka/Erlang have specified lifecycles, meaning that they can be explicitly created and terminated. In case of termination, all actor's messages are disposed or land in a "special" dead-letter queue. Having a dedicated queue per actor makes this process very simple. However virtual actors - like MS Orleans Grains - are "eternal", meaning that they will be created ad-hoc on first incoming request, and are never officially dead, making dead letters unnecessary.

For purposes of our example implementation, we'll be using a dedicated queue per each actor. In fact, we'll be using two of them: in order to make our actors complete, well offer a way to send them a special system messages with higher priority. This way we'll be able to interrupt standard message processing in case of situations like actor shutdown. It's also a potential way to inject additional features like actor state/occupation monitoring.

Implementation

As there are many possible concepts related to the different actor implementations, we'll set some basic ruleset for purposes of this toy project:

  1. No configurability
  2. No parent-child hierarchies: while this is common option in frameworks like Erlang and Akka (and also coroutine libraries like Task Parallel Library), it's not necessary requirement. We'll keep our hierarchy tree flat.
  3. No networking - many popular actor implementations offer network layer integration (a.k.a. location transparency) in various forms. This however is beyond scope of this blog post.

Given all of these, let's first write the skeleton of our actor:

type Actor<'state, 'msg>(initState: 'state, handler: 'state -> 'msg -> 'state) =
    let mutable status : int = Status.Idle
    let systemMessages = ConcurrentQueue<SystemMessage>()
    let userMessages = ConcurrentQueue<'msg>()
    let mutable state = initState
    member this.Post(systemMessage: SystemMessage): unit =
        systemMessages.Enqueue systemMessage
        this.Schedule() // schedule actor on a thread pool if it's not there already
    member this.Post(message: 'msg): unit =
        userMessages.Enqueue message
        this.Schedule()

As told previously, we use two queues (I'm using here a ConcurrentQueue from System.Collections.Concurrent because of its thread-safe operations). We can also communicate with our actor simply via Post methods.

Some actor model implementations (like Akka), have dozens of system messages to manage various common scenarios for actor lifetime and interactions. Here however we'll define just few, simply to present the concept:

[<Struct>]
type SystemMessage =
    | Die // stop an actor
    | Restart of exn // restart actor state

Actor lifecycle

In the core model, we need to differentiate between three different states:

  1. Idle when actor has nothing to do and is awaiting for messages.
  2. Occupied which happens when actor was scheduled to process its messages.
  3. Stopped when an actor has been irrecoverably killed and is no longer able to process any messages.
module Status =
    let [<Literal>] Idle = 0
    let [<Literal>] Occupied = 1
    let [<Literal>] Stopped = 2

Actor will initially start with an Idle status and will transition back and forth between Idle<->Occupied as the new messages will be posted and processed by it. Once it reaches Stopped status, we no longer consider it available. Now we need to create methods that will allow us to transition between those states. We'll start with simple stopping/disposing an actor.

type Actor<'state, 'msg>(...) =
    // ... rest of the implementation
    let stop () =
        Interlocked.Exchange(&status, Status.Stopped) |> ignore
    interface IDisposable with
        member __.Dispose() = stop()

This is pretty trivial implementation - we simply set actor's status to Stopped. To make that thread safe, we do so using atomic Interlocked.Exchange operation (it allows us to make thread-safe field access without need to establishing expensive locking mechanisms).

Now, let's handle scheduling of an actor to do some work. For simplicity I'm using here a good old ThreadPool.QueueUserWorkItem method, but feel free to use any scheduling API you want (maybe even create your own :) ).

type Actor<'state, 'msg>(...) =
    // ... rest of the implementation
    static let callback: WaitCallback = new WaitCallback(fun o ->
        let actor = o :?> Actor<'state, 'msg>
        actor.Execute())
    member private this.Schedule() =
        if Interlocked.CompareExchange(&status, Status.Occupied, Status.Idle) = Status.Idle
        then ThreadPool.QueueUserWorkItem(callback, this) |> ignore
    member private this.Execute () = ???

Again, we use atomic operations to do a conditional swap of actor's processing state. This way we can ensure that we won't schedule processing of the same actor more than once. Remember our original requirements (actors must process messages one at the time)?

Side note: .NET Core 3.0 offers so called IThreadPoolWorkItem interface, which when implemented by an actor directly, can be used together with ThreadPool.UnsafeQueueWorkItem(this, false) to schedule your actors without having any need for WaitCallback delegate.

Since the Execute method itself is quite big, we'll split its body into core parts:

member private this.Execute () =
    let rec loop iterations = ???
    try
        let status' = loop 300
        if status' <> Status.Stopped then
        Interlocked.Exchange(&status, Status.Idle) |> ignore
        if systemMessages.Count <> 0 || userMessages.Count <> 0 then
            // if there were any messages left, reschedule an actor
            this.Schedule()
    with err ->
        Interlocked.Exchange(&status, Status.Idle) |> ignore
        this.Post(Restart err)

First we'll start with processing loop wrapping: we're going to process up to 300 messages in one quant of the processor. Why?

The rationale behind that is pretty simple - in many cases, most of the synchronous message handlers are working for very short amount of time. If we'd simply yield current thread after processing a single message, we'll trigger a context switching, which may be more expensive than processing a message itself.

Therefore, we're taking advantage of buffering capabilities of our actor's queues, and process a batch of messages before yielding the thread. Keep in mind that the bigger batch we're going to process, we'll spend less CPU on doing context switching (and doing actual work instead), but also making our actors less responsive. It's a classic tradeoff between throughput and latency.

Now how the single actor's iterration could look like?

let rec loop iterations = 
	if Volatile.Read(&status) <> Status.Stopped then
		if iterations <> 0 then
        	let ok, sysMsg = systemMessages.TryDequeue()
            if ok then
            	match sysMsg with
                | Die ->
                	stop() // stop an actor
                    Status.Stopped
                | Restart error -> 
                    state <- initState // reset actor's state
                    loop (iterations-1)
            else
            	let ok, msg = userMessages.TryDequeue()
                if ok then
                   	state <- handler state msg
                    loop (iterations-1)
                else Status.Idle
        else Status.Idle
    else Status.Stopped

Unless our actor has already been stopped, we're going to try to dequeue system messages first (as we've gave them higher priority for things like work interruption) and execute corresponding behavior depending on system message type. Once there are no more of them, we'll continue to process user-defined messages.

Here the processing logic is extremely simple (expressed as a single state <- handler state msg line), but if you want, you can do all kinds of crazy stuff over there, like dynamic behavior switching, asynchronous message handling with Async/Task etc.

Once actor finished its processing loop, it goes back into an Idle state (if there were any messages left, we simply reschedule an actor).

Dead letters

The last part, I wanted to cover in this blog post, is an answer for the question: what happens with our mailbox after we die? Once our actor gets terminated, it could of course just throw/forget about all pending messages. However, this would make our debugging sessions very interesting - just not in a funny way.

There are different reactions for actor termination event, depending on the communication protocol, eg. the etiquette of request-response pattern would encourage us to send failure response for all pending requests.

Here, we'll just reuse a dead letters mentioned in this post already - a dedicated "subscribable" channel, where all unread user messages will land eventually.

static let deadLetters = Event<'msg>()

As you can see, here I'm using a simple static event handler (which is F# object combining C# event pattern with IObservable publisher). Now all what's left to do is to modify our stop function to redirect user messages to dead letters:

let stop () =
    Interlocked.Exchange(&status, Status.Stopped) |> ignore
    for msg in userMessages do
        deadLetters.Trigger msg

Summary

That's pretty much it. We covered how to create an actor, that will be able to process messages send within the same process, one at the time. We also shortly described queue theory in a scope necessary by our toy actor implementation.

As you may see, there's a ton of things we didn't mention about like supervised error handling, location transparency etc., but I didn't want to stretch this article too far. I've decided to cover most important aspects of actors and show points, which could be extended in the future.