In the previous blog post I've written about building a thread-safe state access by using actor programming model (as opposed to common practices requiring synchronization barriers with locks) without compromising performance coming with running our programs on multi-CPU machines.

As part of that post, you could have implemented a very simple actors on your own. Today we'll push that a little bit further: we're going to implement our own general purpose thread pool with a very similar thread safety features baked in it.

We'll use a concept known as affinity: we'll bind a particular execution units - and their state - to a particular OS threads. This way we'll make sure that the same state will always be processed by the same thread, therefore safe from multi-threaded access.

Other advantages

While the initial motivation to build a custom thread pool was to provide a thread-safe state access without locks, there are also other low level advantages of this model. We'll discuss them shortly.

Modern CPU architecture

If we'll look at the diagram of present day CPUs, we'll see that they can be complicated beasts. Each CPU has it's own:

  • Memory cache (L1 and L2) used to quickly access the most frequently read data.
  • Instruction cache, used to store operations that CPU is going to execute. Remember that opcodes also have their size!

All of that can be presented using following diagram (keep in mind the memory access latency):

CPU-architecture

This implies that when executed thread is moving from one core to another we need to pay cost of context switching and all of the cache refresh/invalidation associated with it. If this is not enough, there's also a concept known as NUMA (Non-Uniform Memory Access), which puts CPU cores into groups, which are connected by buses.

NUMA-architecture

If you're interested with more information about that topic, please follow this StackOverflow thread.

This adds another layer of complexity - while it's (relatively) expensive to move data from cache of one CPU to another, it's more expensive when those CPUs live on different groups.

In general in order to achieve ideal performance, it's desired that the same piece of code running over the same piece of data should ideally be executed on the same CPU core whenever possible.

It all comes with a price...

You could ask: if this approach is so good, why it's not commonly used? The answer here is simple: using this approach prevents our system from fair splitting work among the cores and effectively prevents work stealing: a situation when the thread with free CPU cycles can take over work items scheduled for another, more occupied thread.

Most of the thread pools are used in generic purpose libraries - which means, that we cannot assume any specific work patterns of our programs. If this is the case, using the default strategy - fair splitting with work stealing - seems to be better option.

Build affinity-based thread pool

The conceptual diagram of our thread pool will look like the following:

Thread-pool

We create a number of agents aligned to number of CPU cores, we have in our disposition. Each agent will run its own dedicated thread. This is a standard way to provide an optimal machine resources usage without running into excessive thread context switches - which even nowadays can take hundreds of processor cycles.

In reality there's no guarantee that our threads will always be executed on the same CPU cores as previously. This however can be enforced when used together with so called affinity masks, telling operating system which CPU cores your thread is allowed (or preferred) to run on.

Next, each WorkerAgent will have its own private queue, which we can use to assign work items to be executed only by this agent (and by extension its thread). We'll use a ConcurrentQueue for queue implementation to keep things simple, but keep in mind that the work pattern here (Multi-Producer/Single-Consumer) gives us a field to make it better.

As an escape hatch to be able to use work stealing as well, we'll also provide a second (Multi-Producer/Multi-Consumer) queue used for this specific purpose. With all of this in mind, our WorkerAgent can look like this:

type WorkerAgent(shared: ConcurrentQueue<IThreadPoolWorkItem>) =
    let personal = ConcurrentQueue<IThreadPoolWorkItem>()
    let loop() = ???
    let thread = new Thread(ThreadStart(loop))
    member __.Start() = thread.Start()
    member __.Dispose() = thread.Abort()
    member this.Schedule(item) = ???
    interface IDisposable with member this.Dispose() = this.Dispose()

We left few methods not implemented, but don't worry - we'll cover them right away. Let's start with loop method which will be running continuously inside our worker thread. The algorithm seems to be pretty simple:

  1. Try to dequeue work item from private queue (if it exists), and execute it.
  2. Try to dequeue work item from shared queue (if it exists), and execute it.
  3. If there's not items left, put a thread to sleep: after all we don't want to have a hot running loop wastefully burning machine resources.

In order to achieve last point, we'll use ManualResetEventSlim which is a synchronization primitive, which will allow us to put current thread to sleep as well as wake it up on demand.

One of the nice properties of ManualResetEventSlim is that it allows us to optimize for frequent sleep/wake-up switches (which are expensive as they are executed in kernel mode) by running in a hot loop for a finite number of cycles before calling kernel code.

With all of that in place, our WorkerAgent will get few extra lines:

// remember to dispose `resetEvent` on WorkerAgent.Dispose method
let swap (l: byref<'t>, r: byref<'t>) =
    let tmp = l
    l <- r
    r <- tmp
        
let loop() =
    let mutable first = personal
    let mutable second = shared
    let mutable counter = 0
    while true do
        let mutable item = null
        if first.TryDequeue(&item) || second.TryDequeue(&item)
        then item.Execute()
        else
            resetEvent.Wait() // put thread to sleep
            resetEvent.Reset()   
        
        counter <- (counter + 1) % 32
        if counter = 0 then swap(&first, &second)

You may ask: what's the purpose of a swap function here? We use it to periodically change the order in which we dequeue agent-owned and shared work queues. This let us to avoid starvation: a situation when work items from the second queue will never be executed, because of the constant flow of work items flowing over to the first one.

The last missing parts is how to schedule a work item on a target agent and how to wake up the corresponding thread. Given, what we already did, this is pretty simple task:

type WorkerAgent(shared: ConcurrentQueue<IThreadPoolWorkItem>) =
	// rest of the code...	
    member this.Schedule(item) = 
        personal.Enqueue(item)
        this.WakeUp()
    member __.WakeUp() = 
        if not resetEvent.IsSet then resetEvent.Set()

PS: Important piece here is that while worker will put itself to sleep once it's out of work to do, it must be awoken by another component, either by Scheduling an element or explicit WakeUp call.

Once our WorkerAgent is ready, let's move to compose many of them into a single ThreadPool implementation. Here I'm going to make it relatively simple and loosely aligned to a canonical .NET ThreadPool.

In short we want to be able to create a thread pool with a fixed number of agents and methods that will allow us to put a functions/object to be executed on them:

type ThreadPool(size: int) =
    // field used for round robin work stealing
    let mutable i: int = 0
    // queue used for work stealing
    let sharedQ = ConcurrentQueue<IThreadPoolWorkItem>()
    let agents: WorkerAgent[] = Array.init size <| fun _ -> new WorkerAgent(sharedQ)
    do 
    	for a in agents do a.Start() 
            
    member __.UnsafeQueueUserWorkItem(item) = ???
    member this.UnsafeQueueUserWorkItem(affinityId, item) = ???
    
    member this.Queue(fn: unit -> unit) = 
    	this.UnsafeQueueUserWorkItem 
    		{ new IThreadPoolWorkItem with member __.Execute() = fn () }
        
    member this.Queue(affinityId, fn: unit -> unit) = 
        this.UnsafeQueueUserWorkItem (affinityId,
        	{ new IThreadPoolWorkItem with member __.Execute() = fn () })
        	
    member __.Dispose() = for a in agents do a.Dispose()
    interface IDisposable with member this.Dispose() = this.Dispose()

Here, our thread pool skeleton is pretty simple. Only missing parts are:

  • UnsafeQueueUserWorkItem(affinityId, item) used to specify where given item is supposed to be executed. We want to guarantee, that the item scheduled with the same affinityId should always run on the same thread and on that thread only.
  • UnsafeQueueUserWorkItem(item) which we can use to schedule items, that are allowed to be subjects of work stealing - this way any WorkerAgent with a spare processing capability will be able to pick them up.

The work stealing variant in this case is pretty simple:

type ThreadPool(size) =
	//...
    member __.UnsafeQueueUserWorkItem(item) = 
        sharedQ.Enqueue item
        i <- Interlocked.Increment(&i) % size
        agents.[i].WakeUp()

We simply enqueue a work item on a shared queue and wake up worker agents in round robin fashion. While this is not strictly necessary, this way we can ensure that when a huge number of work items will start to arrive, we'll be able to process them using as many threads/cores as necessary.

The "affine" variant is not much different:

type ThreadPool(size) =
	// ...
    member this.UnsafeQueueUserWorkItem(affinityId, item) = 
        agents.[affinityId % size].Schedule(item) 

We use hash/modulo technique to specify, which worker agent should be used to execute a given item. This way we can ensure that item marked with the same affinityId will always run on the same worker.

The last part I wanted to add was to create a global variant of our ThreadPool so that we could use it in a similar fashion to ThreadPoolQueue we know and love:

type ThreadPool(size) =
    // make it lazy to avoid initialization, when it's not necessary
    static let shared = lazy (new ThreadPool(Environment.ProcessorCount))
    static member Global with get() = shared.Value
    // ... 

... and to create a method, that will let us work with thread pool with custom state object in a way similar to canonical ThreadPool.QueueUserWorkItem(callback, state):

type ThreadPool(size) =    
    // ...
    member tp.QueueUserWorkItem(fn, s) =
        let affinityId = s.GetHashCode()
        tp.UnsafeQueueUserWorkItem(
           { new IThreadPoolWorkItem with member __.Execute() = fn s }, affinityId)

Here, we take advantage of GetHashCode, which allows us to identify an object, as its affinity ID which lets manage the access to that object in a thread safe way. Now wrapped in plain old idiomatic .NET API.

Summary

With of that set up, we now have a functioning thread pool implementation which we can use to schedule our code:

let call () = 
	let threadId = Thread.CurrentThread.ManagedThreadId
	printfn "Calling from thread %i" threadId
	
/// schedule all calls on the same thread
let affinityId = 1
for i=0 to 100 do
	ThreadPool.Global.Queue(affinityId, call)
	
/// schedule without specific thread requirements
for i=0 to 100 do
	ThreadPool.Global.Queue(call)

With this implementation of ThreadPool we can easily guarantee thread-safe data access, as long as all work over a given state is issued via the same affinity identifier. If you had a problem along the way, checkout sample implementation available in this snippet.