In this blog post we'll continue exploring the ideas behind optimizing state-based CRDTs. This is a third post from the series. If you haven't read them before this article and don't feel familiar with CRDTs, I advise you to do so - we'll often refer to them:

In case if you're lost with code snippets described here, you can always play with them using this Github repository.

In the previous blog post we discussed problems with state-based approach to CRDTs. Major issue was, that in order to achieve convergence between replicas living on a different machines, we had to serialize and push the entire data structure over the network. With small number of changes and big collections this approach quickly becomes not optimal.

Example: we created a Twitter-like service and used CRDT replicated ORSet to represent a list of user's followers. In order to keep replicas on different machines in sync, we need to copy entire set over the network - ideally on every update. Imagine replicating all followers of i.e. Elon Musk (22mln people at the moment) every time a new person follows/unfollows him.

One of the solutions we talked about, was introduction of deltas, used to contain only the latest changeset. Finally we presented, how to implement them using counters as an example. Here we're take more rough ride - we'll do the same with sets and registers.

Inefficiency of Observed-Remove Sets

In the first part of the CRDT series we've seen a very naive implementation Observed-Remove Sets - a CRDTs collections which are able to track elements added and removed on different replicas without any need of synchronization. We could visualize this data structure using following image:

non-optimized ORSet

However this design comes with huge cost:

  1. We're tracking not only active elements of the set, but all elements that have ever appeared in it - removed elements are stored as tombstones.
  2. We attached a vector clock to keep "time" of element addition/removal. Since vector clocks are map-like structures, the size of metadata could even overgrow the actual payload we're interested in.

Can we do something with this? Of course, yes ;) But to do that, first we need to revisit our approach to how do we understand the notion of time.

Dots

Previously, we already learned how to represent time (or rater happened-before relations between events) using vector clocks. But let's think, what a minimal representation of an operation (addition or removal) could look like in eventually consistent systems?

We can use monotonically auto-incremented sequence number for each operation - just like when we are using traditional SQL databases. However since we cannot use a single global sequence number - as this would require replicas to coordinate a next number with each other - we'll use a separate sequencer for every replica and use pair of (replica-id, sequence-nr) (also known as dot) to represent a timestamp of a particular operation made on that replica. A dot has several properties:

  1. It allows us to uniquely identify every single operation made on any replica.
  2. It allows us to track causal relations between events - so while we may not be able to set a total order of events (fortunately we don't need it), we still know how to order operations made by a single replica.
  3. Unlike standard time, for every operation we increase sequence number by one. This way we can keep track of "holes" - a discontinuous events we haven't yet observed for some reason.

When using standard wall clock time, we're used to represent observed events using markers on some sort of a timeline, like this:

linear-timeline

However, with dots we can represent them using 2-dimensional space instead:

dot-context-timeline

Each filled cell represents a discrete event we observed, described as a dot. As you can see, most of the events form a continuous surface, only few ones are detached. This reflects a real life situation, since as we replicate our updates among replicas, each replica will try to keep up with incoming changes, filling the gaps it was not aware of before, thus over time filling the space from left to right. This is the place, we can optimize.

We could split area of our observed dots in two:

  1. A "flatten" version represented by vector clock, which contains only the oldest dot from continuous space. I.e. given dots A:1, A:2, A:3, A:5, A:6 it would be A:3 as this is as far as we can go without reaching the gap (A:4 is missing in this case).
  2. A second set of all dots that are could not fit inside vector clock. So for the example above this set would consist of A:5 and A:6, as those dots are detached from the rest. We'll call it a dot cloud.

Together those two form something, we'll call a DotContext - a logical representation of all observed events from a perspective of given replica (we've seen a visual representation of it in on a diagram above):

type Dot = ReplicaId * int64  
type VectorClock = Map<ReplicaId, int64>;  
type DotContext = { Clock: VectorClock; DotCloud: Set<Dot> }  

Now, we need to know which events have been already acknowledged by our replica, as we want to know which of the incoming events are important for us and which one are outdated or duplicated. With dot context, it's easy - dot has been observed if either its sequence number is less or equal a sequence number for corresponding replica in a vector clock or if that dot was found in a cloud of detached dots:

let contains dot ctx =  
    let (r, n) = dot
    match Map.tryFind r ctx.Clock with
    | Some found when found >= n -> true
    | _ -> Set.contains dot ctx.DotCloud

We can also use DotContext assigning a new dots for the events, that we want to produce:

let nextDot r (ctx): Dot * DotContext =  
    let newCtx = { ctx with Clock = Helpers.upsert r 1L ((+)1L) ctx.Clock }
    ((r, newCtx.Clock.[r]), newCtx)

As you can see here, in order to generate next dot value, we're using only vector clocks - we can do so simply because current replica is always up-to-date with its own updates, so it cannot see any detached events produced by itself.

Now, since we're using dot context to keep time of the updates, we'll need to pass it as a metadata along with the update itself, either in form of a full CRDT merge or using slimmer version - a delta containing only dot context with the latest updates.

If you've read previous blog posts in this series, you probably know what we'll need now - a merge function. Since we already talked how to merge sets and vector clocks/counters, we can compose both approaches to merge dot contexts themselves:

let merge a b =  
    let mergeMap x y = 
        y |> Map.fold (fun acc k v -> Helpers.upsert k v (max v) acc) x
    let clock = mergeMap a.Clock b.Clock
    let cloud = a.DotCloud + b.DotCloud
    { Clock = clock; DotCloud = cloud } |> compact

But wait: what is this mysterious compact function at the end? Well, this is when the fun begins...

When cloud grows

As you can imagine, when we merge two different contexts, some of the dots representing detached events can potentially no longer be detached - if we leave them that way we'll eventually end up with small vector clock and a big pile of garbage living in a dot cloud.

This is why we need compaction. The compaction process itself can be represented in following steps:

  • Traverse for each dot in a dot cloud:
    1. Check if a dot is no longer detached - if its sequence number is exactly one more than its replica counterpart in vector clock, it means that this event is actually continuous, so it can be joined to vector clock.
    2. Check if a dot's sequence number is less than or equal than its counterpart in vector clock - if it's so, it has been already represented inside vector clock itself, so we no longer need it.
    3. If dot doesn't match cases 1. or 2., it remains detached, so it should stay in a dot cloud.
let compact ctx =  
    let (clock, dotsToRemove) = 
        ctx.DotCloud
        |> Set.fold (fun (cc, rem) (r,n) -> 
            let n2 = defaultArg (Map.tryFind r cc) 0L
            // check if dot happens right after the latest one
            // observed in vector clock
            if n = n2 + 1L then (Map.add r n cc, Set.add (r,n) rem)
            // check if dot is represented "inside" vector clock
            elif n <= n2   then (cc, Set.add (r,n) rem)
            // go here if dot remains detached
            else (cc, rem)
        ) (ctx.Clock, Set.empty)
    { Clock = clock; DotCloud = ctx.DotCloud - dotsToRemove}

What is important here is that in F# Set.fold always traverses elements using lexicographic order. This means that we'll always process A:2 before A:3, which is important for this implementation if we want to merge all continuous dots into a vector clock.

Kernel

Now, once we are able to manage time space of our observations via DotContext, we could use it to construct our first instance of delta-aware AWORSet (Add-Wins Observed Remove Set). However as you'll see later, we can implement several different CRDTs using more or less the same mechanics. Therefore we'll introduce here a common abstraction over them, we'll refer to as DotKernel.

A dot kernel can be imagined as a Map of dots to values with a single dot context used for time keeping.

type DotKernel<'a when 'a: equality> =  
    { Context: DotContext      
      Entries: Map<Dot, 'a> }

What's important here, the Entries map contains only information about active (not-removed) elements. You could ask now: but in original implementation of ORSet<> we needed removed elements as well in order to resolve merge conflicts - how are we going to resolve them now?

There's a simple solution here - we're going to use our dot context to track information about remove operations. As you remember each operation and its occurrence can be represented by a single dot. Now, when we merge two kernels, all we need to do is to check if element identified by its dot can be found inside entries and dot context of the second entries.

If element's dot was found in a dot context, it means that other kernel already knew about its addition. In that case we lookup for that element in kernel's entries - if dot was not there, it means that by now it must have been removed!

// get active values from the kernel
let values k = k.Entries |> Map.toSeq |> Seq.map snd

// merge two kernels together
let merge a b =  
    let active =
        b.Entries
        |> Map.fold (fun acc dot v -> 
            // add unseen elements
            if not <| (Map.containsKey dot a.Entries || DotContext.contains dot a.Context)
            then Map.add dot v acc else acc) a.Entries
    let final = 
        a.Entries
        |> Map.fold (fun acc dot _ ->
            // remove elements visible in dot context but not among Entries
            if DotContext.contains dot b.Context && not <| Map.containsKey dot b.Entries
            then Map.remove dot acc else acc) active
    { Entries = final; Context = DotContext.merge a.Context b.Context }

All that's remaining now, is to implement addition/removal operations.

For element addition, we start from reserving a new dot for our addition operation, and we'll store it with element inside entries and in kernel's context.

let add replica value (k, d) =  
    let (dot, ctx) = DotContext.nextDot replica k.Context
    let kernel = { k with 
        Entries = Map.add dot value k.Entries; 
        Context = ctx }
    let delta = { d with 
        Entries = Map.add dot value d.Entries; 
        Context = DotContext.add dot d.Context 
                  |> DotContext.compact }
    (kernel, delta)

For element removal operation is even simpler - we don't even need a new dot! We can simply pick the dot which so far was used to mark additions and just drop element from kernel's entries - after all we recognize removal when it was acknowledged by kernel's dot context but not found among its entries, remember?

let remove value (k, d) =  
    let (entries, deltaCtx) =
        k.Entries 
        |> Map.fold (fun (e, dc) dot v2 ->
            if v2 = value 
            then (Map.remove dot e, DotContext.add dot dc)
            else (e, dc)
        ) (k.Entries, d.Context)
    let kernel = { k with Entries = entries }
    let delta = { d with Context = deltaCtx |> DotContext.compact }
    (kernel, delta)

We'll return to DotKernel once more in a minute, but right now we have all, that we need to implement our ORSet.

Delta-aware Add-Wins Observed Remove Set

Now, finally! We've got everything in place to create our implementation of efficient AWORSet with delta updates. We'll can simply represent entire set with delta in terms of dot kernels:

type AWORSet<'a when 'a: comparison> =  
    AWORSet of core:DotKernel<'a> * delta:DotKernel<'a> option

With dot kernel, we can simply recreate all core delta-CRDT operations, we've defined for our counters in the previous blog post:

module AWORSet

// zero or empty element
let zero = AWORSet(DotKernel.zero, None)

// An active set of entries
let value (AWORSet(k, _)) =  
    k 
    |> DotKernel.values
    |> Set.ofSeq

let add r v (AWORSet(k, d)) =  
    let (k2, d2) = DotKernel.remove r v (k, defaultArg d DotKernel.zero)
    let (k3, d3) = DotKernel.add r v (k2, d2)
    AWORSet(k3, Some d3)

let rem r v (AWORSet(k, d)) =  
    let (k2, d2) = DotKernel.remove r v (k, defaultArg d DotKernel.zero)
    AWORSet(k2, Some d2)

let merge (AWORSet(ka, da)) (AWORSet(kb, db)) =  
    let dc = Helpers.mergeOption DotKernel.merge da db
    let kc = DotKernel.merge ka kb
    AWORSet(kc, dc)

let mergeDelta (AWORSet(ka, da)) (delta) =  
    let dc = Helpers.mergeOption DotKernel.merge da (Some delta)
    let kc = DotKernel.merge ka delta
    AWORSet(kc, dc)

let split (AWORSet(k, d)) = AWORSet(k, None), d  

As you can see, this implementation is pretty straightforward - we've already done all of the heavy lifting, when we defined DotKernel and DotContext. Now we're ready to reuse them in other fields.

Multi Value Registers

Remember, when I told you that DotKernel can be used as a generalization over many different kinds of CRDTs? Let's implement one more - a Multi-Value Register.

An idea of registers was mentioned previously - back then we used a Last-Write-Wins register, which allowed us to encapsulate any value into CRDT. As name suggests LWW register was using wall clock time to determine which value should stay in case of conflict.

Multi-Value Register (or MVReg) works differently - once a conflicting update has been detected, it doesn't try to guess the right answer. Instead it returns concurrent values and lets the programmer to decide which one should be used.

type MVReg<'a> when 'a: equality =  
    MVReg of core:DotKernel<'a> * delta:DotKernel<'a> option

module MVReg =  
    let zero: MVReg<_> = MVReg(DotKernel.zero, None)
    let value (MVReg(k, _)) = 
        DotKernel.values k |> Seq.distinct |> Seq.toList

    let set r v (MVReg(k, d)) =
        let (k2, d2) = DotKernel.removeAll (k, defaultArg d DotKernel.zero)
        let (k3, d3) = DotKernel.add r v (k2, d2)
        MVReg(k3, Some d3)

    let merge (MVReg(ka, da)) (MVReg(kb, db)) = 
        let dc = Helpers.mergeOption DotKernel.merge da db
        let kc = DotKernel.merge ka kb
        MVReg(kc, dc)

    let mergeDelta (MVReg(ka, da)) (delta) =
        let dc = Helpers.mergeOption DotKernel.merge da (Some delta)
        let kc = DotKernel.merge ka delta
        MVReg(kc, dc)

    let split (MVReg(k, d)) = MVReg(k, None), d

As you can see the actual implementation is almost identical to AWORSet - that's why we wanted to have dot kernel on the first place. The major difference where is DotKernel.removeAll function. As we haven't introduce it before, lets do it now:

module DotKernel =  
    let removeAll (k, d) =
        let deltaCtx = 
            k.Entries 
            |> Map.fold (fun acc dot _ -> DotContext.add dot acc) d.Context
        let kernel = { k with Entries = Map.empty }
        let delta = { d with Context = deltaCtx |> DotContext.compact }
        (kernel, delta)

What we do here is to simply copy all dots from the kernel's entries to its context - so that we can keep them in tracked history for the merging purpose - and clear the entries themselves.

Summary

As you can see idea of dots is quite powerful, as it allows us to encode different kinds of CRDTs in efficient manner. We could elevate structures defined here to compose things like maps, graphs etc. However that's another story.