In this blog post we'll continue exploring the ideas behind optimizing state-based CRDTs. This is a third post from the series. If you haven't read them before this article and don't feel familiar with CRDTs, I advise you to do so - we'll often refer to them:
- An introduction to state-based CRDTs
- Optimizing state-based CRDTs (part 1)
- Optimizing state-based CRDTs (part 2)
- State-based CRDTs: BoundedCounter
- State-based CRDTs: Map
- Operation-based CRDTs: protocol
- Operation-based CRDTs: registers and sets
- Operation-based CRDTs: arrays (part 1)
- Operation-based CRDTs: arrays (part 2)
- Operation-based CRDTs: JSON document
- Pure operation-based CRDTs
- CRDT optimizations
In case if you're lost with code snippets described here, you can always play with them using this Github repository.
In the previous blog post we discussed problems with state-based approach to CRDTs. Major issue was, that in order to achieve convergence between replicas living on a different machines, we had to serialize and push the entire data structure over the network. With small number of changes and big collections this approach quickly becomes not optimal.
Example: we created a Twitter-like service and used CRDT replicated ORSet to represent a list of user's followers. In order to keep replicas on different machines in sync, we need to copy entire set over the network - ideally on every update. Imagine replicating all followers of i.e. Elon Musk (22mln people at the moment) every time a new person follows/unfollows him.
One of the solutions we talked about, was introduction of deltas, used to contain only the latest changeset. Finally we presented, how to implement them using counters as an example. Here we're take more rough ride - we'll do the same with sets and registers.
Inefficiency of Observed-Remove Sets
In the first part of the CRDT series we've seen a very naive implementation Observed-Remove Sets - a CRDTs collections which are able to track elements added and removed on different replicas without any need of synchronization. We could visualize this data structure using following image:
However this design comes with huge cost:
- We're tracking not only active elements of the set, but all elements that have ever appeared in it - removed elements are stored as tombstones.
- We attached a vector clock to keep "time" of element addition/removal. Since vector clocks are map-like structures, the size of metadata could even overgrow the actual payload we're interested in.
Can we do something with this? Of course, yes ;) But to do that, first we need to revisit our approach to how do we understand the notion of time.
Dots
Previously, we already learned how to represent time (or rater happened-before relations between events) using vector clocks. But let's think, what a minimal representation of an operation (addition or removal) could look like in eventually consistent systems?
We can use monotonically auto-incremented sequence number for each operation - just like when we are using traditional SQL databases. However since we cannot use a single global sequence number - as this would require replicas to coordinate a next number with each other - we'll use a separate sequencer for every replica and use pair of (replica-id, sequence-nr)
(also known as dot) to represent a timestamp of a particular operation made on that replica. A dot has several properties:
- It allows us to uniquely identify every single operation made on any replica.
- It allows us to track causal relations between events - so while we may not be able to set a total order of events (fortunately we don't need it), we still know how to order operations made by a single replica.
- Unlike standard time, for every operation we increase sequence number by one. This way we can keep track of "holes" - a discontinuous events we haven't yet observed for some reason.
When using standard wall clock time, we're used to represent observed events using markers on some sort of a timeline, like this:
However, with dots we can represent them using 2-dimensional space instead:
Each filled cell represents a discrete event we observed, described as a dot. As you can see, most of the events form a continuous surface, only few ones are detached. This reflects a real life situation, since as we replicate our updates among replicas, each replica will try to keep up with incoming changes, filling the gaps it was not aware of before, thus over time filling the space from left to right. This is the place, we can optimize.
We could split area of our observed dots in two:
- A "flatten" version represented by vector clock, which contains only the oldest dot from continuous space. I.e. given dots
A:1
,A:2
,A:3
,A:5
,A:6
it would beA:3
as this is as far as we can go without reaching the gap (A:4
is missing in this case). - A second set of all dots that are could not fit inside vector clock. So for the example above this set would consist of
A:5
andA:6
, as those dots are detached from the rest. We'll call it a dot cloud.
Together those two form something, we'll call a DotContext
- a logical representation of all observed events from a perspective of given replica (we've seen a visual representation of it in on a diagram above):
type Dot = ReplicaId * int64
type VectorClock = Map<ReplicaId, int64>;
type DotContext = { Clock: VectorClock; DotCloud: Set<Dot> }
Now, we need to know which events have been already acknowledged by our replica, as we want to know which of the incoming events are important for us and which one are outdated or duplicated. With dot context, it's easy - dot has been observed if either its sequence number is less or equal a sequence number for corresponding replica in a vector clock or if that dot was found in a cloud of detached dots:
let contains dot ctx =
let (r, n) = dot
match Map.tryFind r ctx.Clock with
| Some found when found >= n -> true
| _ -> Set.contains dot ctx.DotCloud
We can also use DotContext
assigning a new dots for the events, that we want to produce:
let nextDot r (ctx): Dot * DotContext =
let newCtx = { ctx with Clock = Helpers.upsert r 1L ((+)1L) ctx.Clock }
((r, newCtx.Clock.[r]), newCtx)
As you can see here, in order to generate next dot value, we're using only vector clocks - we can do so simply because current replica is always up-to-date with its own updates, so it cannot see any detached events produced by itself.
Now, since we're using dot context to keep time of the updates, we'll need to pass it as a metadata along with the update itself, either in form of a full CRDT merge or using slimmer version - a delta containing only dot context with the latest updates.
If you've read previous blog posts in this series, you probably know what we'll need now - a merge
function. Since we already talked how to merge sets and vector clocks/counters, we can compose both approaches to merge dot contexts themselves:
let merge a b =
let mergeMap x y =
y |> Map.fold (fun acc k v -> Helpers.upsert k v (max v) acc) x
let clock = mergeMap a.Clock b.Clock
let cloud = a.DotCloud + b.DotCloud
{ Clock = clock; DotCloud = cloud } |> compact
But wait: what is this mysterious compact
function at the end? Well, this is when the fun begins...
When cloud grows
As you can imagine, when we merge two different contexts, some of the dots representing detached events can potentially no longer be detached - if we leave them that way we'll eventually end up with small vector clock and a big pile of garbage living in a dot cloud.
This is why we need compaction. The compaction process itself can be represented in following steps:
- Traverse for each dot in a dot cloud:
- Check if a dot is no longer detached - if its sequence number is exactly one more than its replica counterpart in vector clock, it means that this event is actually continuous, so it can be joined to vector clock.
- Check if a dot's sequence number is less than or equal than its counterpart in vector clock - if it's so, it has been already represented inside vector clock itself, so we no longer need it.
- If dot doesn't match cases 1. or 2., it remains detached, so it should stay in a dot cloud.
let compact ctx =
let (clock, dotsToRemove) =
ctx.DotCloud
|> Set.fold (fun (cc, rem) (r,n) ->
let n2 = defaultArg (Map.tryFind r cc) 0L
// check if dot happens right after the latest one
// observed in vector clock
if n = n2 + 1L then (Map.add r n cc, Set.add (r,n) rem)
// check if dot is represented "inside" vector clock
elif n <= n2 then (cc, Set.add (r,n) rem)
// go here if dot remains detached
else (cc, rem)
) (ctx.Clock, Set.empty)
{ Clock = clock; DotCloud = ctx.DotCloud - dotsToRemove}
What is important here is that in F# Set.fold
always traverses elements using lexicographic order. This means that we'll always process A:2
before A:3
, which is important for this implementation if we want to merge all continuous dots into a vector clock.
Kernel
Now, once we are able to manage time space of our observations via DotContext
, we could use it to construct our first instance of delta-aware AWORSet
(Add-Wins Observed Remove Set). However as you'll see later, we can implement several different CRDTs using more or less the same mechanics. Therefore we'll introduce here a common abstraction over them, we'll refer to as DotKernel
.
A dot kernel can be imagined as a Map of dots to values with a single dot context used for time keeping.
type DotKernel<'a when 'a: equality> =
{ Context: DotContext
Entries: Map<Dot, 'a> }
What's important here, the Entries
map contains only information about active (not-removed) elements. You could ask now: but in original implementation of ORSet<>
we needed removed elements as well in order to resolve merge conflicts - how are we going to resolve them now?
There's a simple solution here - we're going to use our dot context to track information about remove operations. As you remember each operation and its occurrence can be represented by a single dot. Now, when we merge two kernels, all we need to do is to check if element identified by its dot can be found inside entries and dot context of the second entries.
If element's dot was found in a dot context, it means that other kernel already knew about its addition. In that case we lookup for that element in kernel's entries - if dot was not there, it means that by now it must have been removed!
// get active values from the kernel
let values k = k.Entries |> Map.toSeq |> Seq.map snd
// merge two kernels together
let merge a b =
let active =
b.Entries
|> Map.fold (fun acc dot v ->
// add unseen elements
if not <| (Map.containsKey dot a.Entries || DotContext.contains dot a.Context)
then Map.add dot v acc else acc) a.Entries
let final =
a.Entries
|> Map.fold (fun acc dot _ ->
// remove elements visible in dot context but not among Entries
if DotContext.contains dot b.Context && not <| Map.containsKey dot b.Entries
then Map.remove dot acc else acc) active
{ Entries = final; Context = DotContext.merge a.Context b.Context }
All that's remaining now, is to implement addition/removal operations.
For element addition, we start from reserving a new dot for our addition operation, and we'll store it with element inside entries and in kernel's context.
let add replica value (k, d) =
let (dot, ctx) = DotContext.nextDot replica k.Context
let kernel = { k with
Entries = Map.add dot value k.Entries;
Context = ctx }
let delta = { d with
Entries = Map.add dot value d.Entries;
Context = DotContext.add dot d.Context
|> DotContext.compact }
(kernel, delta)
For element removal operation is even simpler - we don't even need a new dot! We can simply pick the dot which so far was used to mark additions and just drop element from kernel's entries - after all we recognize removal when it was acknowledged by kernel's dot context but not found among its entries, remember?
let remove value (k, d) =
let (entries, deltaCtx) =
k.Entries
|> Map.fold (fun (e, dc) dot v2 ->
if v2 = value
then (Map.remove dot e, DotContext.add dot dc)
else (e, dc)
) (k.Entries, d.Context)
let kernel = { k with Entries = entries }
let delta = { d with Context = deltaCtx |> DotContext.compact }
(kernel, delta)
We'll return to DotKernel
once more in a minute, but right now we have all, that we need to implement our ORSet.
Delta-aware Add-Wins Observed Remove Set
Now, finally! We've got everything in place to create our implementation of efficient AWORSet
with delta updates. We'll can simply represent entire set with delta in terms of dot kernels:
type AWORSet<'a when 'a: comparison> =
AWORSet of core:DotKernel<'a> * delta:DotKernel<'a> option
With dot kernel, we can simply recreate all core delta-CRDT operations, we've defined for our counters in the previous blog post:
module AWORSet
// zero or empty element
let zero = AWORSet(DotKernel.zero, None)
// An active set of entries
let value (AWORSet(k, _)) =
k
|> DotKernel.values
|> Set.ofSeq
let add r v (AWORSet(k, d)) =
let (k2, d2) = DotKernel.remove r v (k, defaultArg d DotKernel.zero)
let (k3, d3) = DotKernel.add r v (k2, d2)
AWORSet(k3, Some d3)
let rem r v (AWORSet(k, d)) =
let (k2, d2) = DotKernel.remove r v (k, defaultArg d DotKernel.zero)
AWORSet(k2, Some d2)
let merge (AWORSet(ka, da)) (AWORSet(kb, db)) =
let dc = Helpers.mergeOption DotKernel.merge da db
let kc = DotKernel.merge ka kb
AWORSet(kc, dc)
let mergeDelta (AWORSet(ka, da)) (delta) =
let dc = Helpers.mergeOption DotKernel.merge da (Some delta)
let kc = DotKernel.merge ka delta
AWORSet(kc, dc)
let split (AWORSet(k, d)) = AWORSet(k, None), d
As you can see, this implementation is pretty straightforward - we've already done all of the heavy lifting, when we defined DotKernel
and DotContext
. Now we're ready to reuse them in other fields.
Multi Value Registers
Remember, when I told you that DotKernel
can be used as a generalization over many different kinds of CRDTs? Let's implement one more - a Multi-Value Register.
An idea of registers was mentioned previously - back then we used a Last-Write-Wins register, which allowed us to encapsulate any value into CRDT. As name suggests LWW register was using wall clock time to determine which value should stay in case of conflict.
Multi-Value Register (or MVReg
) works differently - once a conflicting update has been detected, it doesn't try to guess the right answer. Instead it returns concurrent values and lets the programmer to decide which one should be used.
type MVReg<'a> when 'a: equality =
MVReg of core:DotKernel<'a> * delta:DotKernel<'a> option
module MVReg =
let zero: MVReg<_> = MVReg(DotKernel.zero, None)
let value (MVReg(k, _)) =
DotKernel.values k |> Seq.distinct |> Seq.toList
let set r v (MVReg(k, d)) =
let (k2, d2) = DotKernel.removeAll (k, defaultArg d DotKernel.zero)
let (k3, d3) = DotKernel.add r v (k2, d2)
MVReg(k3, Some d3)
let merge (MVReg(ka, da)) (MVReg(kb, db)) =
let dc = Helpers.mergeOption DotKernel.merge da db
let kc = DotKernel.merge ka kb
MVReg(kc, dc)
let mergeDelta (MVReg(ka, da)) (delta) =
let dc = Helpers.mergeOption DotKernel.merge da (Some delta)
let kc = DotKernel.merge ka delta
MVReg(kc, dc)
let split (MVReg(k, d)) = MVReg(k, None), d
As you can see the actual implementation is almost identical to AWORSet
- that's why we wanted to have dot kernel on the first place. The major difference where is DotKernel.removeAll
function. As we haven't introduce it before, lets do it now:
module DotKernel =
let removeAll (k, d) =
let deltaCtx =
k.Entries
|> Map.fold (fun acc dot _ -> DotContext.add dot acc) d.Context
let kernel = { k with Entries = Map.empty }
let delta = { d with Context = deltaCtx |> DotContext.compact }
(kernel, delta)
What we do here is to simply copy all dots from the kernel's entries to its context - so that we can keep them in tracked history for the merging purpose - and clear the entries themselves.
Summary
As you can see idea of dots is quite powerful, as it allows us to encode different kinds of CRDTs in efficient manner. We could elevate structures defined here to compose things like maps, graphs etc. However that's another story.
Comments