Map reduce with FSharp and Akka.net

In my previous post, I’ve shown how to create a simple hello world application using Akka.net system leveraged with a F# API. Today, I’ll show a slightly more complex example. We’ll create a simple Map-Reduce system based on distributed word count algorithm.

Idea of our algorithm is quite simple – we want to group and count all repetitions of the same words (case-insensitive) in input text. This is a perfect subject to illustrate a map-reduce algorithm as well as Akka.net task distribution mechanism. To realize our objective, we’ll need to define three actor types:

  • Master – it’s main job is to spread an input data equally among other actors and forward request for result data.
  • Mappers – they will tokenize their text fragments into specific words and send it to reducers.
  • Reducers – they are responsible for word grouping and counting.

At the beginning, lets define basic message types used by our system:

  • One to propagate chunks of data to mapping nodes.
  • Next one to forward processed data to reducers.
  • Last one to invoke collection mechanism, which will concatenate all reduced data to one peace and send it back to the originator.

All of them are illustrated by the code below:

type MRMsg =  
    | Map of string
    | Reduce of (string * int) list
    | Collect

As you may have seen previously, instead of C#/Scala objective approach, I’ve used tail-recursive function to define an actor processing routine. Since a lot of it’s code would be repetitive among all of the actor types we want to define we can wrap actor behavior using actorOf and actorOf2 functions.

Lets start to defining a map-reduce logic into our application. Firstly we need to create a map actor. In this example, all what it needs to do, is to chop provided string into single words, and create from them a simple bag of words with their repetition. However we won’t sum them now – this is the job for reducer actors.

let mapWords (line:string) = seq { for word in line.Split() -> (word, 1) }

let map reducer (mailbox:Actor<MRMsg>) = function  
    | Map line -> reducer <! Reduce (mapWords line |> List.ofSeq)
    | m -> mailbox.Unhandled m      // mapper won't handle any other messages

As you may see, after finishing it’s work, mapper will send list of words directly to reducers. To make this possible, we need to provide reducer reference as one of the function parameters.

Next type is the reduce actor. It’s task is to group all incoming words, and count their occurrences. In our example we use shared ConcurrentDictionary for gathering data from all of the reducer instances. For more real life example, we should probably use some more sophisticated mechanism, such as aggregate actor concatenating all results provided by specific reducers. Here for sake of simplicity, we omit it.

let reduceWords (dict:ConcurrentDictionary<string,int>) iter =  
    |> List.iter (fun (k, v) -> dict.AddOrUpdate(k, v, System.Func<_,_,_>(fun key old -> old + v)) |> ignore)

let reduce (dict:ConcurrentDictionary<string,int>) (mailbox:Actor<MRMsg>) = function  
    | Reduce l -> reduceWords dict l |> ignore
    | Collect -> mailbox.Sender() <! seq { for e in dict -> (e.Key, e.Value) }
    | m -> mailbox.Unhandled m

Since reducer actors have direct access to shared dictionary, they also are able to respond on Collect command, providing all reduced data back to message sender.

The last behavior, is the master actor. It’s role is to be a proxy between application’s in/out data, and the rest of the actors. We use it for two tasks: 1) sending text line by line to be processed by our Map-Reduce application, 2) forwarding request for result of MR operation.

let master mapper (reducer:InternalActorRef) (mailbox:Actor<MRMsg>) = function  
    | Map str -> for line in str.Split '\n' do mapper <! Map line
    | Collect -> reducer.Tell(Collect, mailbox.Sender())    // forward message with info about it's originator
    | m -> mailbox.Unhandled m

Now when we have all of our actors defined, we may initialize an Akka framework to create our MR system. Now we can use a helper functions, we’ve defined before for fast and easy actor definition.

let system = System.create "MapReduceSystem" <| ConfigurationFactory.Default()  
let dict = ConcurrentDictionary<string,int>()  
let reducer = spawn system "reduce" <| actorOf2 (reduce dict)  
let mapper = spawn system "map" <| actorOf2 (map reducer)  
let master = spawn system "master" <| actorOf2 (master mapper reducer)  

To finalize, we can pass some data to our system to see, if it returns an expected results. Until now, we’ve only used send operator <! to delegate fire-and-forget messages for specific actors, but we’ve actually never used any two way request-response mechanism. To do so, we’ll use Ask method (shortcut operator <?) to send a request message and receive a handler to be used when a response will be returned in asynchronous manner.

master <! Map "Orange orange apple"  
master <! Map "cherry apple orange"

Threading.Thread.Sleep 500

// read the result
async {  
    let! res = master <? Collect
    for (k, v) in res :?> (string*int) seq do
        printfn "%s\t%d" k v
} |> Async.RunSynchronously

FSharp and Akka.net - the functional way

Actor model is one of the most influential paradigms of dealing with highly concurrent environments in present world. Simplifying, it’s based on concept of autonomous thread-safe computation units – surprisingly called actors – with no way to directly interfere with each others work. There are no locks or semaphores – instead of accessing shared resources, they are simply passing messages around – according to the saying Don’t communicate by sharing memory; share memory by communicating.

The first most widespread usage of the actor model was the Erlang Virtual Machine in 1986. While it was for many years closed in it’s own niche, programming model itself has been lately forwarded and adopted on the Java Virtual Machine through Akka framework. Until present moment there are actually none mature equivalents on .NET platform. While Microsoft is still developing it’s own response in form of Project Orleans, a bunch of developers took the initiative of porting Akka on .NET ground.

I’ve decided to try out Akka.NET a little. I’ve noticed that it’s still a far from completion or production-ready phase, but hopefully since it’s based on already existing and mature framework, with help of OSS community missing holes could be patched soon.

Hello Akka

You can find an original first step into Akka.NET with F# API here. While I found this sample useful, I’ve decided to investigate more about Akka.NET source code and it’s F# API. Below you may see my example based on more functional-specific concepts:

type ActorMsg =  
    | Greet of string
    | Hi

let system = System.create "MySystem" <| ConfigurationFactory.Default()  
let greeter =  
    spawn system "Greeter"
    <| fun mailbox ->
        let rec loop() = actor {
            let! msg = mailbox.Receive()
            match msg with
            | Greet name -> printfn "Hello %s" name
            | Hi         -> printfn "Hi"
            return! loop() }

greeter <! Greet "Alex"  
greeter <! Hi  

Here are some explanations:

  • We defined a discriminated union of messages ActorMsg, we want to respond to.
  • The next step is to create an ActorSystem, analogously to C# and original Scala versions.
  • We need to define and instantiate an Actor. However unlike the object approach – which required a custom actor type inheriting from one of the Akka actor classes and defining it’s own receiving method – here we define a tail recursive function, which uses an actor { ... } computation expression instead of actor type declaration.
  • We pass that function through the lambda to spawn method, which attaches that behavior to our system and returns an ActorRef (not to be confused with Actor instance), assigned to our greeter variable. This way we may refer to actors, and pass messages to them, even if they’re not present on our local machine (which is also one of the Akka use cases).
  • In the last to lines we simply send two messages to actor reference using tell operator <!.

As you can see, this piece of code is not directly translatable to C# or even Scala API equivalent. It’s much more Erlangish. We don’t have to define any classes and method overrides. Instead, we have a tail-recursive function. For me this seems more natural approach for F#, since it’s more functional-first language, while it’s syntax for object oriented programming is very verbose and ugly.

Value absence - why Scala/F# approach is bad?

Handling an absent values is one of the most common problems in programming languages. Why we consider this so important? It’s because data processing is essential task of every program ever made. And one of the main reasons of systems errors and undefined behaviors are values not present in the system. Most of the modern programming languages tries to aim programmers with solution to handle those cases. Lets look at the most common way of dealing with them:

  1. Null pointer reference – also called a “billion-dollar mistake”, introduced by Tony Hoare. It allows to represent an absence of the heap allocated values (objects) in form of pointer referring to not a valid instance of represented type. In most of the mainstream programming languages all heap objects are nullable by default – no matter if you want it or not.
  2. Option/Maybe type – this approach is characteristic for most of the functional languages. Instead of creating null pointer, it defines a special generic type (eg. Option), represented by either one of it’s subtypes (Some as wrapper for existing underlying value, or None if value is not present).

So why we consider a dedicated Option type a better solution? Because it’s explicit, propagates type information about value nullability and – most of all – it’s non-default. By the most of the time, we expect to have a meaningful references to our data. Their absence is not a desired and certainly should not be an expected behavior. Problem in nullable references is that literally any instance in our code could represent a non-existing value. Analogously to functional approach, all of our objects always represent an Option type.

Currently, after decades on living in shadows, functional paradigm is gaining more and more ground of the field of mainstream programming languages. This also reflects in two of the most popular programming environments, JVM and .NET. Their functional representatives, Scala and F#, claims to combine best of both programming paradigms. This way they also introduced and popularized an Option types among object oriented approach.

But something has been missed along the way, and (in my opinion) both Scala and F# had failed in face of the problem of value absence. Why? Because of their dualism. Since they are functional languages, both of them allows you to create an Option types with full support expected from a functional language. But while they also are object oriented and built on top of object oriented VMs, they allow you to use and create a nullable references. This leads to false sense of security, when using Option types. Lets look at the following example (F#):

let value: String option = Some doSmthAndReturnString()  

How can you be sure that function doSmthAndReturnString won’t return a null? Actually you can’t until you perform an explicit null check. From the runtime perspective Some null is a perfectly valid record. Does it sound rational? No. So why is this even possible?

How to combine functional and object oriented worlds?

From my perspective, the best solution of that problem came with Ceylon language. It’s based on the concept of Union Types, one of the key features of Ceylon. For tl;dr people – union type of X|Y is a supertype of both types X and Y. How does it correspond to null/options?

In Ceylon null have it’s own unique type Null (just like in Scala). By default all reference types are non-nullable (similarly to functional languages). However they may be nulled if referenced as union type of T|Null (or T? using some syntax suggar). This concept is basically – however not entirely – equivalent of Scala Either[T,Null] but without verbose requirement for object wrapping. Moreover, Ceylon compiler is able to optimize this away to standard JVM null references, without any overhead.

// Scala - Either
var m: Either[String, Null] = Left("hello")  
var n: Either[String, Null] = Right(null)  
var m: Either[String, Null] = Left(null)    // still valid

// Scala - Option type
var m: Option[String] = Some("hello")  
var n: Option[String] = None  
var m: Option[String] = Some(null)         // still valid

// Ceylon
String|Null m = "hello";    // no need for value wrapping  
String? n = null;          // String? -> String|Null  
String f = null;          // compile time error, references are not nullable by default  

As you can see on the example above, this way we combined advantages of safe, explicitly nullable types with verbosity and performance of null referenced types. In my opinion this is a proper, yet still not widespread, solution of one of the most common problem in languages theory.