Bartosz Sypytkowski's Picture

Bartosz Sypytkowski

35 posts

Optimizing state-based CRDTs (part 2)

In this blog post we'll continue exploring the ideas behind optimizing state-based CRDTs. This is a third post from the series. If you haven't read them before this article and don't feel familiar with CRDTs, I advise you to do so - we'll often refer to them:

In case if you're lost with code snippets described here, you can always play with them using this Github repository.

In the previous blog post we discussed problems with state-based approach to CRDTs. Major issue was, that in order to achieve convergence between replicas living on a different machines, we had to serialize and push the entire data structure over the network. With small number of changes and big collections this approach quickly becomes not optimal.

Example: we created a Twitter-like service and used CRDT replicated ORSet to represent a list of user's followers. In order to keep replicas on different machines in sync, we need to copy entire set over the network - ideally on every update. Imagine replicating all followers of i.e. Elon Musk (22mln people at the moment) every time a new person follows/unfollows him.

One of the solutions we talked about, was introduction of deltas, used to contain only the latest changeset. Finally we presented, how to implement them using counters as an example. Here we're take more rough ride - we'll do the same with sets and registers.

Inefficiency of Observed-Remove Sets

In the first part of the CRDT series we've seen a very naive implementation Observed-Remove Sets - a CRDTs collections which are able to track elements added and removed on different replicas without any need of synchronization. We could visualize this data structure using following image:

non-optimized ORSet

However this design comes with huge cost:

  1. We're tracking not only active elements of the set, but all elements that have ever appeared in it - removed elements are stored as tombstones.
  2. We attached a vector clock to keep "time" of element addition/removal. Since vector clocks are map-like structures, the size of metadata could even overgrow the actual payload we're interested in.

Can we do something with this? Of course, yes ;) But to do that, first we need to revisit our approach to how do we understand the notion of time.

Dots

Previously, we already learned how to represent time (or rater happened-before relations between events) using vector clocks. But let's think, what a minimal representation of an operation (addition or removal) could look like in eventually consistent systems?

We can use monotonically auto-incremented sequence number for each operation - just like when we are using traditional SQL databases. However since we cannot use a single global sequence number - as this would require replicas to coordinate a next number with each other - we'll use a separate sequencer for every replica and use pair of (replica-id, sequence-nr) (also known as dot) to represent a timestamp of a particular operation made on that replica. A dot has several properties:

  1. It allows us to uniquely identify every single operation made on any replica.
  2. It allows us to track causal relations between events - so while we may not be able to set a total order of events (fortunately we don't need it), we still know how to order operations made by a single replica.
  3. Unlike standard time, for every operation we increase sequence number by one. This way we can keep track of "holes" - a discontinuous events we haven't yet observed for some reason.

When using standard wall clock time, we're used to represent observed events using markers on some sort of a timeline, like this:

linear-timeline

However, with dots we can represent them using 2-dimensional space instead:

dot-context-timeline

Each filled cell represents a discrete event we observed, described as a dot. As you can see, most of the events form a continuous surface, only few ones are detached. This reflects a real life situation, since as we replicate our updates among replicas, each replica will try to keep up with incoming changes, filling the gaps it was not aware of before, thus over time filling the space from left to right. This is the place, we can optimize.

We could split area of our observed dots in two:

  1. A "flatten" version represented by vector clock, which contains only the oldest dot from continuous space. I.e. given dots A:1, A:2, A:3, A:5, A:6 it would be A:3 as this is as far as we can go without reaching the gap (A:4 is missing in this case).
  2. A second set of all dots that are could not fit inside vector clock. So for the example above this set would consist of A:5 and A:6, as those dots are detached from the rest. We'll call it a dot cloud.

Together those two form something, we'll call a DotContext - a logical representation of all observed events from a perspective of given replica (we've seen a visual representation of it in on a diagram above):

type Dot = ReplicaId * int64  
type VectorClock = Map<ReplicaId, int64>;  
type DotContext = { Clock: VectorClock; DotCloud: Set<Dot> }  

Now, we need to know which events have been already acknowledged by our replica, as we want to know which of the incoming events are important for us and which one are outdated or duplicated. With dot context, it's easy - dot has been observed if either its sequence number is less or equal a sequence number for corresponding replica in a vector clock or if that dot was found in a cloud of detached dots:

let contains dot ctx =  
    let (r, n) = dot
    match Map.tryFind r ctx.Clock with
    | Some found when found >= n -> true
    | _ -> Set.contains dot ctx.DotCloud

We can also use DotContext assigning a new dots for the events, that we want to produce:

let nextDot r (ctx): Dot * DotContext =  
    let newCtx = { ctx with Clock = Helpers.upsert r 1L ((+)1L) ctx.Clock }
    ((r, newCtx.Clock.[r]), newCtx)

As you can see here, in order to generate next dot value, we're using only vector clocks - we can do so simply because current replica is always up-to-date with its own updates, so it cannot see any detached events produced by itself.

Now, since we're using dot context to keep time of the updates, we'll need to pass it as a metadata along with the update itself, either in form of a full CRDT merge or using slimmer version - a delta containing only dot context with the latest updates.

If you've read previous blog posts in this series, you probably know what we'll need now - a merge function. Since we already talked how to merge sets and vector clocks/counters, we can compose both approaches to merge dot contexts themselves:

let merge a b =  
    let mergeMap x y = 
        y |> Map.fold (fun acc k v -> Helpers.upsert k v (max v) acc) x
    let clock = mergeMap a.Clock b.Clock
    let cloud = a.DotCloud + b.DotCloud
    { Clock = clock; DotCloud = cloud } |> compact

But wait: what is this mysterious compact function at the end? Well, this is when the fun begins...

When cloud grows

As you can imagine, when we merge two different contexts, some of the dots representing detached events can potentially no longer be detached - if we leave them that way we'll eventually end up with small vector clock and a big pile of garbage living in a dot cloud.

This is why we need compaction. The compaction process itself can be represented in following steps:

  • Traverse for each dot in a dot cloud:
    1. Check if a dot is no longer detached - if its sequence number is exactly one more than its replica counterpart in vector clock, it means that this event is actually continuous, so it can be joined to vector clock.
    2. Check if a dot's sequence number is less than or equal than its counterpart in vector clock - if it's so, it has been already represented inside vector clock itself, so we no longer need it.
    3. If dot doesn't match cases 1. or 2., it remains detached, so it should stay in a dot cloud.
let compact ctx =  
    let (clock, dotsToRemove) = 
        ctx.DotCloud
        |> Set.fold (fun (cc, rem) (r,n) -> 
            let n2 = defaultArg (Map.tryFind r cc) 0L
            // check if dot happens right after the latest one
            // observed in vector clock
            if n = n2 + 1L then (Map.add r n cc, Set.add (r,n) rem)
            // check if dot is represented "inside" vector clock
            elif n <= n2   then (cc, Set.add (r,n) rem)
            // go here if dot remains detached
            else (cc, rem)
        ) (ctx.Clock, Set.empty)
    { Clock = clock; DotCloud = ctx.DotCloud - dotsToRemove}

What is important here is that in F# Set.fold always traverses elements using lexicographic order. This means that we'll always process A:2 before A:3, which is important for this implementation if we want to merge all continuous dots into a vector clock.

Kernel

Now, once we are able to manage time space of our observations via DotContext, we could use it to construct our first instance of delta-aware AWORSet (Add-Wins Observed Remove Set). However as you'll see later, we can implement several different CRDTs using more or less the same mechanics. Therefore we'll introduce here a common abstraction over them, we'll refer to as DotKernel.

A dot kernel can be imagined as a Map of dots to values with a single dot context used for time keeping.

type DotKernel<'a when 'a: equality> =  
    { Context: DotContext      
      Entries: Map<Dot, 'a> }

What's important here, the Entries map contains only information about active (not-removed) elements. You could ask now: but in original implementation of ORSet<> we needed removed elements as well in order to resolve merge conflicts - how are we going to resolve them now?

There's a simple solution here - we're going to use our dot context to track information about remove operations. As you remember each operation and its occurrence can be represented by a single dot. Now, when we merge two kernels, all we need to do is to check if element identified by its dot can be found inside entries and dot context of the second entries.

If element's dot was found in a dot context, it means that other kernel already knew about its addition. In that case we lookup for that element in kernel's entries - if dot was not there, it means that by now it must have been removed!

// get active values from the kernel
let values k = k.Entries |> Map.toSeq |> Seq.map snd

// merge two kernels together
let merge a b =  
    let active =
        b.Entries
        |> Map.fold (fun acc dot v -> 
            // add unseen elements
            if not <| (Map.containsKey dot a.Entries || DotContext.contains dot a.Context)
            then Map.add dot v acc else acc) a.Entries
    let final = 
        a.Entries
        |> Map.fold (fun acc dot _ ->
            // remove elements visible in dot context but not among Entries
            if DotContext.contains dot b.Context && not <| Map.containsKey dot b.Entries
            then Map.remove dot acc else acc) active
    { Entries = final; Context = DotContext.merge a.Context b.Context }

All that's remaining now, is to implement addition/removal operations.

For element addition, we start from reserving a new dot for our addition operation, and we'll store it with element inside entries and in kernel's context.

let add replica value (k, d) =  
    let (dot, ctx) = DotContext.nextDot replica k.Context
    let kernel = { k with 
        Entries = Map.add dot value k.Entries; 
        Context = ctx }
    let delta = { d with 
        Entries = Map.add dot value d.Entries; 
        Context = DotContext.add dot d.Context 
                  |> DotContext.compact }
    (kernel, delta)

For element removal operation is even simpler - we don't even need a new dot! We can simply pick the dot which so far was used to mark additions and just drop element from kernel's entries - after all we recognize removal when it was acknowledged by kernel's dot context but not found among its entries, remember?

let remove value (k, d) =  
    let (entries, deltaCtx) =
        k.Entries 
        |> Map.fold (fun (e, dc) dot v2 ->
            if v2 = value 
            then (Map.remove dot e, DotContext.add dot dc)
            else (e, dc)
        ) (k.Entries, d.Context)
    let kernel = { k with Entries = entries }
    let delta = { d with Context = deltaCtx |> DotContext.compact }
    (kernel, delta)

We'll return to DotKernel once more in a minute, but right now we have all, that we need to implement our ORSet.

Delta-aware Add-Wins Observed Remove Set

Now, finally! We've got everything in place to create our implementation of efficient AWORSet with delta updates. We'll can simply represent entire set with delta in terms of dot kernels:

type AWORSet<'a when 'a: comparison> =  
    AWORSet of core:DotKernel<'a> * delta:DotKernel<'a> option

With dot kernel, we can simply recreate all core delta-CRDT operations, we've defined for our counters in the previous blog post:

module AWORSet

// zero or empty element
let zero = AWORSet(DotKernel.zero, None)

// An active set of entries
let value (AWORSet(k, _)) =  
    k 
    |> DotKernel.values
    |> Set.ofSeq

let add r v (AWORSet(k, d)) =  
    let (k2, d2) = DotKernel.remove r v (k, defaultArg d DotKernel.zero)
    let (k3, d3) = DotKernel.add r v (k2, d2)
    AWORSet(k3, Some d3)

let rem r v (AWORSet(k, d)) =  
    let (k2, d2) = DotKernel.remove r v (k, defaultArg d DotKernel.zero)
    AWORSet(k2, Some d2)

let merge (AWORSet(ka, da)) (AWORSet(kb, db)) =  
    let dc = Helpers.mergeOption DotKernel.merge da db
    let kc = DotKernel.merge ka kb
    AWORSet(kc, dc)

let mergeDelta (AWORSet(ka, da)) (delta) =  
    let dc = Helpers.mergeOption DotKernel.merge da (Some delta)
    let kc = DotKernel.merge ka delta
    AWORSet(kc, dc)

let split (AWORSet(k, d)) = AWORSet(k, None), d  

As you can see, this implementation is pretty straightforward - we've already done all of the heavy lifting, when we defined DotKernel and DotContext. Now we're ready to reuse them in other fields.

Multi Value Registers

Remember, when I told you that DotKernel can be used as a generalization over many different kinds of CRDTs? Let's implement one more - a Multi-Value Register.

An idea of registers was mentioned previously - back then we used a Last-Write-Wins register, which allowed us to encapsulate any value into CRDT. As name suggests LWW register was using wall clock time to determine which value should stay in case of conflict.

Multi-Value Register (or MVReg) works differently - once a conflicting update has been detected, it doesn't try to guess the right answer. Instead it returns concurrent values and lets the programmer to decide which one should be used.

type MVReg<'a> when 'a: equality =  
    MVReg of core:DotKernel<'a> * delta:DotKernel<'a> option

module MVReg =  
    let zero: MVReg<_> = MVReg(DotKernel.zero, None)
    let value (MVReg(k, _)) = 
        DotKernel.values k |> Seq.distinct |> Seq.toList

    let set r v (MVReg(k, d)) =
        let (k2, d2) = DotKernel.removeAll (k, defaultArg d DotKernel.zero)
        let (k3, d3) = DotKernel.add r v (k2, d2)
        MVReg(k3, Some d3)

    let merge (MVReg(ka, da)) (MVReg(kb, db)) = 
        let dc = Helpers.mergeOption DotKernel.merge da db
        let kc = DotKernel.merge ka kb
        MVReg(kc, dc)

    let mergeDelta (MVReg(ka, da)) (delta) =
        let dc = Helpers.mergeOption DotKernel.merge da (Some delta)
        let kc = DotKernel.merge ka delta
        MVReg(kc, dc)

    let split (MVReg(k, d)) = MVReg(k, None), d

As you can see the actual implementation is almost identical to AWORSet - that's why we wanted to have dot kernel on the first place. The major difference where is DotKernel.removeAll function. As we haven't introduce it before, lets do it now:

module DotKernel =  
    let removeAll (k, d) =
        let deltaCtx = 
            k.Entries 
            |> Map.fold (fun acc dot _ -> DotContext.add dot acc) d.Context
        let kernel = { k with Entries = Map.empty }
        let delta = { d with Context = deltaCtx |> DotContext.compact }
        (kernel, delta)

What we do here is to simply copy all dots from the kernel's entries to its context - so that we can keep them in tracked history for the merging purpose - and clear the entries themselves.

Summary

As you can see idea of dots is quite powerful, as it allows us to encode different kinds of CRDTs in efficient manner. We could elevate structures defined here to compose things like maps, graphs etc. However that's another story.

Optimizing state-based CRDTs (part 1)

Other posts from this series:

Last time we've talked about what are CRDTs and introduced the state-based variant of them. In this blog post we'll talk about the downsides of presented approaches and ways to optimize them. If you're not familiar with the previous post or CRDTs in general, I highly encourage you to read it, as I'll be referring to it a lot.

I've decided to split this topic in two. This is the first part, which aims to give you the better overview of delta-based state CRDT optimization. The second part will target more specific cases - causal contexts and observed-remove sets.

Updating state with deltas

As I mentioned in previous post, a state-based CRDT replication depends on the ability to serialize and disseminate the entire data structure over the network. While this approach has some advantages - i.e. very simple and straightforward replication protocol - it also comes with a cost.

Imagine that you have a set of 1000 elements. Now, when we'll try to add another item to it, the only way to synchronize this replica with others is to send an entire payload over the wire. It means serializing and sending 1001 elements, even though most of them have remain unchanged. This is even more painful given the fact that state-based CRDTs often carry additional metadata with them.

To face this problem, an alternative approach has been proposed: a delta-state CRDT. As the name suggests, instead of sending an entire state, we're going to accumulate the updates since the last synchronization as part of change set (delta). Once sync message was send, we simply reset delta and start from scratch. This way we'll send only the unseen parts.

This design doesn't stop us from performing full-state merge from time to time: delta-aware CRDTs still maintain the semantics of a state-based ones. Keep in mind that in terms of persistence, the delta itself doesn't have to be stored on disk.

If you want to play with the example data structures, I'm presenting here, feel free to take a look at this repository.

Delta-state growing-only counter

Let's start from defining some helper functions, that we'll use later on:

[<AutoOpen>]
module Helpers =

    /// Insert or update the value of the map.
    let upsert k v fn map =
        match Map.tryFind k map with
        | None -> Map.add k v map
        | Some v -> Map.add k (fn v) map

    /// Option-aware merge operation.
    let mergeOption merge a b =
        match a, b with
        | Some x, Some y -> Some (merge x y)
        | Some x, None   -> Some x
        | None, Some y   -> Some y
        | None, None     -> None

Now, let's write the G-Counter implementation and the explanation will follow:

type GCounter = GCounter of values:Map<ReplicaId, int64> * delta:GCounter option

[<RequireQualifiedAccess>]
module GCounter =  
    /// Empty G-counter
    let zero = GCounter(Map.empty, None)
    /// Compute value of the G-counter
    let value (GCounter(v, _)) =
        v |> Map.toSeq |> Seq.map snd |> Seq.fold (+) 0L
    /// Increment G-counter value for a given replica.
    let inc r (GCounter(v, d)) =
        let add1 map = upsert r 1L ((+) 1L) map
        let (GCounter(dmap,None)) = defaultArg d zero
        GCounter(add1 v, Some(GCounter(add1 dmap, None)))
    /// Merge two G-counters.
    let rec merge (GCounter(a, da)) (GCounter(b, db)) = 
        let values = a |> Map.fold (fun acc k va -> upsert k va (max va) acc) b
        let delta = mergeOption merge da db
        GCounter(values, delta)
    /// Merge full-state G-counter with G-counter delta.
    let mergeDelta delta counter = merge counter delta
    /// Split G-counter into full-state G-counter with empty delta, and a delta itself.
    let split (GCounter(v, d)) = GCounter(v, None), d

While this example is more complex than the original GCounter implementation, we again try to keep things fairly easy. Here, we're gonna represent GCounter values as map of partial counters for each replica (just like the last time)... while the detla is optionally a GCounter itself! Why? Because of composition of course :) This way we can merge counters with deltas and deltas themselves using the same merge operation we defined before. In this design the delta of a delta counter will always be None - we don't use it, so there's no risk of creating possibly infinite recursive data structure.

Delta-state increment/decrement counter

Previously, we've build a PNCounter using two GCounters (one for incremented and one for decremented values). While you may ask, how will this work now? This is pretty simple: we'll build a delta of a PNCounter dynamically from deltas of its two components.

type PNCounter = PNCounter of inc:GCounter * dec:GCounter

[<RequireQualifiedAccess>]
module PNCounter =  
    let zero = PNCounter(GCounter.zero, GCounter.zero)
    let value (PNCounter(inc, dec)) = GCounter.value inc - GCounter.value dec
    let inc r (PNCounter(inc, dec)) = PNCounter(GCounter.inc r inc, dec)
    let dec r (PNCounter(inc, dec)) = PNCounter(inc, GCounter.inc r dec)
    let rec merge (PNCounter(inc1, dec1)) (PNCounter(inc2, dec2)) =
        PNCounter(GCounter.merge inc1 inc2, GCounter.merge dec1 dec2)
    let mergeDelta delta counter = merge counter delta
    let split (PNCounter(GCounter(inc, a), GCounter(dec, b))) = 
        let delta =
            match a, b with
            | None, None -> None
            | _, _       -> 
                let inc = defaultArg a GCounter.zero
                let dec = defaultArg b GCounter.zero
                Some <| PNCounter(inc, dec)
        PNCounter(GCounter(inc, None), GCounter(dec, None)), delta

I think, the only tricky part here is delta construction. We want it to be a (optionally) PNCounter itself - because of this way we still can preserve commutativity, associativity and idempotency rules, all of our CRDT conform to, but also to reuse implementations of existing functions. Thankfully we can simply build PNCounter from two GCounter and since GCounter.delta itself is a GCounter instance, we can leverage that fact. If one of the deltas is not provided, we simply use a zero element.

Delta-state growing-only set

For the GSet, its implementation is pretty analogous to GCounter:

type GSet<'a when 'a: comparison> = GSet of values:Set<'a> * delta:GSet<'a> option

module GSet =  
    let zero = GSet(Set.empty, None)
    let value (GSet(s, _)) = s
    let add elem (GSet(v, d)) =
        let (GSet(delta, None)) = defaultArg d zero
        GSet(Set.add elem v, Some(GSet(Set.add elem delta, None)))
    let rec merge (GSet(a, da)) (GSet(b, db)) = 
        let values = a + b
        let delta = Helpers.mergeOption merge da db
        GSet(values, delta)
    let mergeDelta delta gset = merge gset delta
    let split (GSet(v, d)) = GSet(v, None), d

Just like in case of original state-based CRDTs, we can take advantage of composability.

The technical difference here is that for G-Sets we should acknowledge either deltas or full state updates being broadcasted to all replicas. It was not so necessary for the counter implementations, as if we sent two following updates (D1, D2), if a first one (D1) didn't reach the target, but the later one (D2) did, we'll still end up in correct state - simply because D2 already override the partial counter value of D1.

Final notes about replication protocol

Up to this point our replication protocol is still pretty simple - the only requirement here is that we have to eventually be able to reach every single replica. However this doesn't have to occur immediately and doesn't have to occur in order (read: you're not constrained to use TCP for delta replication).

In the code presented above I'm merging deltas as part of a merge operation. This however is not a requirement, i.e. if you disseminate delta right after the operation being performed, you may as well clear deltas on each merge.

Summary

This part presented how to optimize counters and simple growing only set to take advantage of delta-state CRDTs. In the next post we'll take a closer look as specific optimizations targeting OR-Sets.

An introduction to state-based CRDTs

Other posts from this series:

This is one of the topics I've already talked about on several presentations in the past, but never actually written about. Here I want to cover a topic of Conflict-free Replicated Data Types: what problems they aim to solve and provide some basic implementations (in F#) to help you understand how to build them.

A motivation

Conflict-free Replicated Data Types are answer for a common problem of synchronizing data in distributed environments. While issues on that field where well-known and there were numerous attempts to solve it in the past, usually they were a variants of decentralized 2-phase commit transactions. However they all suffer for similar problems:

  • We are assuming, that all participants are available for the time of the transaction, which often (i.e. in case of edge computing and mobile apps) is not true.
  • Multi-phase and quorum-based commits require numerous round trips between communicating parties. That comes with a cost in terms of latency and throughput. Moreover this approach has tendency to degrade at scale, when number of machines or distance between them increases. In many cases it's not feasible to use distributed transaction across boundaries of a single data center.
  • They often determine a single master node used for exclusive write access or as a single source of truth. In some cases - as the ones mentioned above - it's not a feasible solution.

This doesn't mean, that existing approaches are bad. However what we are striving for is to give a wider area of possible solutions, that may better fit the problem under certain conditions. One of the examples, that I like to use to visualize the problem is that:

Imagine that you need to build a planet-scale video streaming service. Whenever a user uploads a video, we are replicating it across different data centers located on different continents to maintain good throughput and latency, and in result a better user experience. Additionally we want to show users a view count for that video.

Video uploading is a good example of master-slave replication. However things may complicate for such a small feature as view count. With many concurrent users over the entire planet and write-heavy characteristics, using the same approach for counter increments is not a great idea, as this may end up with congestion for more popular videos. We don't need transactions, as the nature of the problem allows us to loose constrains of strong consistency in favor of higher availability of our application. However as described earlier most of the existing solutions are based on exclusive write access to a replicated resource. This is where CRDTs and multi-master scenarios come to play.

Use cases and implementations

While this was quite simple example, there are many others that we can look up for in current industry:

  • Amazon uses CRDTs to keep their order cart in sync. They've also published their database known as Dynamo, which allows AWS audience to make use of CRDTs.
  • Riak is one of the most popular solutions in this area. One of their well-known customers are Riot games (company behind League of Legend), which uses Riak to implement their in-game chat.
  • Rovio (company behind Angry Birds game series) uses conflict-free counters for their advertisement platform to make their impression counters work freely even in offline scenarios.
  • SoundClound has their own implementation of Last-Write-Wins Set build in Go on top of Redis, known as Roshi, which they use for their observers management.
  • TomTom makes use of CRDTs to manage their navigation data.
  • CREAustralia uses them for their click stream analytics.

There are also other solutions around there:

  • AntidoteDB is another, pretty innovative approach to eventually consistent databases. One of its unique feature is transaction support in eventually consistent environment.
  • Akka.DistributedData is a plugin for Akka (and Akka.NET) distributed actor programming model, which exposes several CRDT types on top of Akka cluster.
  • Redislabs offers CRDB as part of their enterprise Redis database solution.
  • Cassandra and ScyllaDB allow their users to make use of eventually consistent counters in their databases.

What does it mean to be conflict-free?

Conflict-free is a vague description, but it comes to a simple statement: we are operating on a data structures, that don't require exclusive write access and are able to detect concurrent updates and perform deterministic, automatic conflict resolution. This doesn't mean that conflict doesn't ever occur, but we are able to always determine the output up front, based on a metadata contained within the structure itself. The core structures here are counters, registers and sets, but from them we can compose more advanced ones like maps, graphs or even JSON.

Types of CRDTs

We can discriminate CRDTs using two core categories: state-based (convergent) and operation-based (commutative) data types. No matter which one we talk about, they all consists of two parts: replication protocol and state application algorithms.

In practice both versions differ heavily on implementation and their "center of gravity" is focused in different place. Since we need some additional metadata to provide automatic conflict resolution: state-based CRDTs encapsulate it as part of the data structure, while operation-based tend to put more of it onto replication protocol itself. Here, I'm going to cover one of the simpler state-based approaches.

Convergent replicated data types

The single most important operation of state-based CRDTs is Merge method. What it does is essentially to take a two corresponding replicas of the same logical entity, and produce an updated state as an output. If any conflicts occur, it's up to merge operation to resolve them.

Moreover merge operation must conform to three properties, which give us great perks for using them:

  • Commutativity (x • y = y • x) and Asociativity ((x • y) • z = x • (y • z)) which means that we can perform out of order merge operations and still end up with correct state.
  • Idempotency (x • x = x), so we don't need to care about potential duplicates send from replication layer.

Those properties are not easy to guarantee, but you're going to see how far we can go only by using two basic operations, which meet those criteria:

  • union of two sets
  • maximum of two values

With them in our hands, the only requirement on our replication layer is to eventually dispatch all state changes to all replicas. There is however a problem with this: to perform those merge operations we need to carry whole data structure on every change. Imagine that we need to send the entire collection of 1000 elements over the wire, only because someone added one extra element. This problem can be solved by the approach known as delta-state CRDTs, which I'll discuss another time.

Now, lets cover some basic data structures and see how we could compose them into more advanced ones. Please keep in mind, that those examples are mind to be simple and present you the approach to the problem.

Counters

Counters are the first type of CRDTs, we'll cover here. Essentially they allow to read, increment and (optionally) decrement a counter value concurrently on many different machines, without worrying about locking.

Growing-only Counter

Also known as GCounter. It's a counter which value can only be ever increasing. One of it's use cases could be a page view counter I've referred to in the motivation section. Simply speaking it's a map of replica-id/partial-counter values.

In order to calculate total counter's value, we need to sum the values of all known counters specific to particular replicas.

If we want to increment a counter's value, we simply increment a partial counter in the context of the replica, we're working with at the moment. It's crucial, that a single replica should never increment the value of another replica.

module GCounter =  
    type GCounter = Map<ReplicaId, int64>
    let zero: GCounter = Map.empty
    let value (c: GCounter) = 
        c |> Map.fold (fun acc _ v -> acc + v) 0L
    let inc r (c: GCounter) =
        match Map.tryFind r c with
        | Some x -> Map.add r (x + 1) c
        | None   -> Map.add r 1 c
    let merge (a: GCounter) (b: GCounter): GCounter =
        a |> Map.fold (fun acc ka va ->
            match Map.tryFind ka acc with
            | Some vb -> Map.add ka (max va vb) acc
            | None    -> Map.add ka va acc) b

When it comes to a merge operation, we simply concatenate key/value entries of both counters. When we detect that both counters have different values for the same replica, we simply take a max of both values. This is a correct behavior, since we know that counter values could only be incremented. For the same reason, the decrement operation is not supported by this kind of CRDT.

Increment/decrement Counter

Now, lets talk about so called PNCounter, which is able to provide both increment and decrement operations. I think, it's particularly useful, as it's a simple example to show, how we can compose simple CRDTs to build more advanced ones.

The crucial trick here, is that PNCounter consists of two GCounters - one of them used to count increments and other used for decrements - so decrement operation is simply incrementing GCounter part responsible for counting decrements. Our output value is basically a difference between the two.

module PNCounter =  
    type PNCounter = GCounter.GCounter * GCounter.GCounter
    let zero: PNCounter = (GCounter.zero, GCounter.zero)
    let value (inc, dec) = GCounter.value inc - GCounter.value dec
    let inc replica (inc, dec) = (GCounter.inc replica inc, dec)
    let dec replica (inc, dec) = (inc, GCounter.inc replica dec)
    let merge (inc1, dec1) (inc2, dec2) = 
        (GCounter.merge inc1 inc2, GCounter.merge dec1 dec2)

As you can see merge operation is again pretty trivial: a simple merge of corresponding GCounter parts from both PNCounters.

There are also other types of counters, which I won't cover here. One of particularly interesting cases are Bounded Counters, which allow you to provide an arbitrary upper/lower bound on the counter to determine if a target threshold has been reached.

Note about vector clocks

We already mentioned counters implementation. Before we go forward, I think it's a good point to talk about the vector clocks and notion of time.

In many systems, a standard way of defining causality (a happened-before relationship) is by using time stamps. There is a problem however with using them in terms of high-frequency distributed systems:

  • Operating systems, especially executing on machines in different data centers, can be subject of clock skews, which can kick your butt in write-heavy scenarios. Also another anomalies can occur: leap second bugs or even invalid time values happening between two threads.
  • While timestamps can potentially give us information necessary to determine the most recent update (we'll use that in a minute), they won't tell us anything about the "state of the world" at the moment, when update has happened. This means, we cannot detect if one update knew about another, or if they happened concurrently.

This is where vector clocks come to work. They are form of logical clocks, represented by monotonically incremented values, specific for each replica. Sounds much like GCounter we've seen above ;)

Why do we talk about them here? The internal implementation of vector clock is very close to what GCounter looks like. The major difference here is an ability to partially compare two vector clocks.

Unlike standard comparison, partial comparison allows us to determine fourth possible result - an indecisive one, when we are no longer able to determine if two values have lesser, greater or equal relationship. We can use this to recognize concurrent updates, which happened between two clocks.

Here, we'll define vector clocks as:

type Ord =  
    | Lt = -1  // lower
    | Eq = 0   // equal
    | Gt = 1   // greater
    | Cc = 2   // concurrent

type VTime = GCounter.GCounter  
module VClock =  
    let zero = GCounter.zero
    let inc = GCounter.inc
    let merge = GCounter.merge
    let compare (a: VTime) (b: VTime): Ord = 
        let valOrDefault k map =
            match Map.tryFind k map with
            | Some v -> v
            | None   -> 0L
        let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq
        let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq
        (akeys + bkeys)
        |> Seq.fold (fun prev k ->
            let va = valOrDefault k a
            let vb = valOrDefault k b
            match prev with
            | Ord.Eq when va > vb -> Ord.Gt
            | Ord.Eq when va < vb -> Ord.Lt
            | Ord.Lt when va > vb -> Ord.Cc
            | Ord.Gt when va < vb -> Ord.Cc
            | _ -> prev ) Ord.Eq

The comparison function, even thou long, is pretty simple - we'll compare pairwise entries of both VTime maps (if an entry didn't exist on the opposite side, we count its value as 0):

  • If all values of corresponding replicas are equal, clocks are equal:
  • If all values on the left side are lower than or equal to their counterparts on the right side, left side is lesser than the right one.
  • If all values on the left side are greater than or equal to their counterparts on the right side, left side is greater than the right one.
  • Any mix of lesser/greater entries comparison means, that we detected a concurrent update.

If you're more interested about the topic of time in distributed systems, I can recommend you a great talk about this subject: Keeping Time in Real Systems by Kavya Joshi.

Registers

The next type of CRDTs are registers. You can think of them as value cells, that are able to provide CRDT semantic over any defined type. Remember, that we're still constrained by commutativity/associativity/idempotency rules. For this reason we must apply additional metadata, which will allow us to provide arbitrary conflict resolution in case of conflict detection.

Last Write Wins Register

The most obvious way to solve conflicts, we already have talked about earlier, is to use timestamps. This is exactly what our implementation of LWWReg uses.

module LWWReg =  
    type LWWReg<'a> = 'a * DateTime
    let zero: LWWReg<'a> = (Unchecked.defaultof<'a>, DateTime.MinValue)
    let value (v, _) = v
    let set c2 v2 (v1, c1) = if c1 < c2 then (v2, c2) else (v1, c1)
    let merge (v1, c1) (v2, c2) = if c1 < c2 then (v2, c2) else (v1, c1)

It's quite obvious. Our set and merge operations simply compare two registers and pick register's value with a higher timestamp.

Just like in previous cases, we'll be able to compose LWW registers with other CRDTs to provide more advanced operations.

Sets

Once we've covered counters and registers, it's time to talk about collections. The most natural candidate there are different variations of sets - simply because set union conforms to associativity/commutativity/idempotency properties mentioned before. Later on, we could use them to define structures like maps, graphs or even indexed linear sequences (useful i.e. in collaborative text editing).

Growing-only Set

Just like in case of counters, here the most basic example is a growing-only set, also known as GSet. One of its cases could be i.e. a voting system, where we'd like to tell if a person has participated in voting, while still making his/her vote anonymous (in this case a total voting result could be GCounter itself).

module GSet =  
    type GSet<'a when 'a: comparison> = Set<'a>
    let zero: GSet<'a> = Set.empty
    let value (s: GSet<'a>) = s
    let add v (s: GSet<'a>) = Set.add v s
    let merge (a: GSet<'a>) (b: GSet<'a>) = a + b

No, it's not trolling. It's just a standard set! :) The only difference here is that we constrain ourselves not to perform any removals on the set. The reason for that is merge operator: since our merge is just a standard union, if we'd remove any element from any of replicas, after merging it with another replica (where that removal hasn't happened yet), removed element with auto-magically reappear in the result set.

There's also a lesson here: because we removed an element from the set, we lost some data. This is something, we often cannot afford in case of CRDTs and it's the reason, why we often must attach some additional metadata, even thou it may seem not to be explicitly needed by the result value.

2-Phase Set

The next step is two phase set. Like in case of PNCounter, we could simply combine two GSets - one for added elements, and one for removed ones (often referred to as tombstones). Add/remove element and merge also works pretty much like in case of PNCounter/GCounter.

module PSet =  
    type PSet<'a when 'a: comparison> = GSet.GSet<'a> * GSet.GSet<'a>
    let zero: PSet<'a> = (GSet.zero, GSet.zero)
    // (add, rem) is a single PSet instance
    let value (add, rem) = add - rem  
    let add v (add, rem) = (GSet.add v add, rem)
    let rem v (add, rem) = (add, GSet.add v rem)
    let merge (add1, rem1) (add2, rem2) = 
        (GSet.merge add1 add2, GSet.merge rem1 rem2)

There are several problems with following implementation:

  • Common case of tombstone-based sets is the fact that removed set can grow infinitely, so that final value set will take only fraction of size of actual metadata necessary to keep the sets consistent. There are extra algorithms - known as tombstone pruning - to mitigate that problem.
  • While we are able to remove added element, the problem appears when we'll try to add removed element again. Since we're don't have any semantic to remove elements from any of the underlying GSets, once removed, element will stay in tombstone forever. This will cause removing it from the final value set. So no re-adding the value for you my friend. Again, we need some extra metadata that will allow us to track causality to determine when add/remove have happened.

Observed Remove Set

If you kept up to this point, congratulations! We're actually going to make a first semi-advanced case here: an observed remove set (known as ORSet), which will allow us to freely add/remove elements and still converge when merging replicas from different locations.

How does it work? We'll represent our ORSet as add/remove collections, but this time instead of sets, we'll use maps. The keys in those maps will be our elements, while values will be a (partially) comparable timestamps used to mark, when the latest add/remove has happened.

The actual specialization of ORSet depends on the timestamp and conflict resolution algorithm used:

  • You could use DateTime for timestamps and prefer the latest value on conflict resolution. This will essentially give us Last Write Wins semantics (just like in LWWReg) over particular elements of the set. This would greatly simplify things, but we're going to do better than that :)
  • Other approach is to use vector clocks, we defined earlier in this post. This will allow us to detect, when two replicas have added/removed the same element without knowing about other parties trying to do the same. When such case is detected, we need to arbitrary tell, what the outcome of our conflict resolution algorithm will be. The most common case is usually preferring additions over removals. This is known as Add-Wins Observed Remove Set (shortly AWORSet). This is what we'll implement here.
module ORSet =  
    type ORSet<'a when 'a: comparison> = Map<'a, VTime> * Map<'a, VTime>
    let zero: ORSet<'a> = (Map.empty, Map.empty)

To get the result set, our value function will iterate over add map and remove from it all entries from removals map, where add timestamp is lower than remove timestamp (this means that if both updates were concurrent, we keep the result).

    let value (add, rem) = 
        rem |> Map.fold(fun acc k vr ->
            match Map.tryFind k acc with
            | Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
            | _ -> acc) add

Just like in case of PNCounter our add/remove operations need to work in context of a particular replica r - this is the result of using vector clocks as timestamps. Here, we'll simply add element to corresponding map and increase it's vector clock.

    let add r e (add, rem) =
        match Map.tryFind e add, Map.tryFind e rem with
        | Some v, _ -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
        | _, Some v -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
        | _, _ -> (Map.add e (VClock.inc r VClock.zero) add, rem)

    let remove r e (add, rem) =
        match Map.tryFind e add, Map.tryFind e rem with
        | Some v, _ -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
        | _, Some v -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
        | _, _ -> (add, Map.add e (VClock.inc r VClock.zero) rem)

Additional thing, you may have noticed here is that we use Map.remove. It's safe to do so in this context, as we at the same time add value-timestamp pair to the opposite map, still keeping the information about element presence inside an object.

The most complex part is actual merge function. We start from simply squashing corresponding add/remove maps (in case of conflicting timestamps, we will simply merge them together). Then what we need is to converge merged add/remove maps by removing from add map all values with timestamps lower than corresponding entry timestamps in remove map (this already covers concurrent update case, as we decided to favor additions over removals at the beginning). For the remove set, we'll simply remove all elements with timestamps lower, equal or concurrent to the ones from add map. Just to keep things fairly compact.

    let merge (add1, rem1) (add2, rem2) =
        let mergeKeys a b =
            b |> Map.fold (fun acc k vb ->
                match Map.tryFind k acc with
                | Some va -> Map.add k (VClock.merge va vb) acc
                | None -> Map.add k vb acc ) a
        let addk = mergeKeys add1 add2
        let remk = mergeKeys rem1 rem2
        let add = remk |> Map.fold (fun acc k vr ->
            match Map.tryFind k acc with
            | Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
            | _ -> acc ) addk
        let rem = addk |> Map.fold (fun acc k va ->
            match Map.tryFind k acc with
            | Some vr when VClock.compare va vr <> Ord.Lt -> acc
            | _ -> Map.remove k acc ) remk
        (add, rem)

You can see here, we're using a map of vector clocks (which are also maps), which is a sub-optimal solution for this implementation. There are different ways used to mitigate this problem:

  1. The simplest way is to compress binary ORSet payload using for example L4Z or GZip.
  2. More advanced approach is to modify the existing implementation using Dotted vector versions.

But I hope to write about them another time.

What next?

With this set of data structures in our arsenal, we could build more advanced ones:

  • One example would be a Last-Write-Wins Map, which essentially looks somewhat like type LWWMap<'k,'v> = ORSet<('k * LWWReg<'v>)> (keep in mind, that comparison should depend only on key component).
  • Another one would be a graph structure, which is composed of two sets: one for nodes and one for edges.

As you may see, this is quite big topic and IMHO a pretty interesting one. There are many more things, like delta-state based optimizations nad operation-based CRDTs, which I won't cover here - simply to not turn this post into a book - but I hope to continue the upcoming posts.

Akka.Persistence: most common misconceptions

Akka.Persistence is one of the plugins, that introduce an eventsourced persistence mechanics into Akka.NET. However, I've often seen people having problems, since neither actors nor eventsourcing are part of mainstream .NET development. A lot of these are often a general problems of eventsourced systems, therefore I've decided to write some more about them and present potential solutions from the perspective of Akka.Persistence user.

This post describes current state of things. They may change in the future, as Akka.NET team want to address some of them.

Control your own data

There are two ways, how Akka.NET event journals may store your data:

  1. Some of them may define totally custom serialization mechanics, i.e:
    • MongoDB persistence uses BSON serializer which requires you to define bindings to all serialized message types.
    • PostgreSQL plugin has configuration switch which will use JSON.NET to store your data as JSON/JSONB column (which is well understood by this database). In future, the rest of SQL databases will get that support as well.
  2. At the moment most of them however will use default akka serialization mechanics: the same that are used to pass messages between cluster nodes.

One of the first mistakes, everyone do, is giving up your data in the hands of the default serializer. First of all, you're giving up ability to encode/decode your data into 3rd party dependency (sic!). And what's most important, default serializer is build for remote message exchange, it may NOT be a good way to store immutable, everlasting data structures.

Even Newtonsoft.Json serializer (which was initially choosen as Akka.NET default serializer) is not a good option. If you were thinking "it's just a JSON, what could go wrong", the story is a little more complicated. Default configuration of it encodes a full type name as a part of the payload in order to support polymorphic deserialization. This means that once you'll change your event qualified type description (namespace, type name or assembly) - something you shouldn't do anyway - your old data is lost, as it'll try to deserialize it to type that can no longer be found.

So what to choose?

  1. Some people are using schema-less formats, as they allow for a certain degree of version tolerance without much work. This includes JSON, just not on the settings, that are used by Akka.NET remote serializer.
  2. Others (including me) are more enthusiasts of schema-based serializers i.e. Google Protocol Buffers or Microsoft Bond, as they keep event schema/contract explicit and well defined.

Why I've linked Google.Protobuf and not protobuf-net? I think that keeping schema in .proto files makes things more explicit and prevents accidental schema changes.

Configuring a custom serializer

Writing a custom serializer is rather easy. All you need is to write down a custom class inheriting from Serializer base class. But how to bind it to a specific event type?

Akka.NET serialization bindings are based on types. This means, that you need to specify an event-typeserializer pair. Type provided here doesn't have to be actually a concrete class - Akka will investigate inheritance chain and all implementations in order to resolve a binding. This means that usually we create an empty marker interface and implement it in all events, that we want to serialize with target serializer.

An example HOCON config may look like that:

akka.actor {  
  serializers.my-custom = "MyNamespace.MySerializer, MyAssembly"
  serialization-bindings {
    "MyNamespace.IDomainEvent, MyAssembly" = my-custom
  }
}

Have an event versioning strategy

As events are immutable by definition, we need to prepare some approach, how to behave if we want to be able to change them as we develop our system. This is what we call event versioning.

If you want to have a wider perspective on that problem, you probably should read Versioning in an Eventsourced System by Greg Young.

Most of the people see and understand a rationale behind having a migration strategy for their database. However, a lot less of them have an idea on how to approach a schema evolution in eventsourced systems.

Event adapters

One of the features of Akka.Persistence are event adapters. They are a middleware components that you can plug into a pipeline between event journal and a persistent actor, allowing you to modify deserialized events, before they reach the destination.

To create a custom event adapter, you need a class that implements either a IReadEventAdapter (to interpect events received from the journal), IWriteEventAdapter (to intercept events send to the journal) or just IEventAdapter which combines both.

Event adapters can be used to upgrade old event versions to newer ones or event to deconstruct single event into multiple ones. When is that useful? Example: if you made a mistake in the past, and defined an event that actually had more than a single responsibility (i.e. OrderedAndConfirmed instead of separate Ordered/Confirmed kind of events).

But how to bind an adapter to be used for some event in scope of the journal?

# we will use SqlServer journal for this example
akka.persistence.journal.sql-server {  
  event-adapters {
    custom-adapter = "MyNamespace.MyEventAdapter, MyAssembly"
  }
  event-adapter-bindings {
    "MyNamespace.V1.IDomainEvent, MyAssembly" = custom-adapter
  }
}

Don't use snapshots without events

This is a popular problem for the people starting their journey with Akka.Persistence. Quite often they see persistence snapshots and think: hey, I don't need eventsourcing, let me just use one of these. However a sole role of Akka.Persistence snapshots is to optimize streams of events.

If you're into snapshot-based persistence, Akka.Persistence is definitely not for you. Persistent actors are build to support different needs, just like journals and even snapshots themselves - as they have fields used to correlate their position in a stream of events. They also don't map well into multi-table/multi-document database hierarchies.

Another thing: when you're persisting events with PersistentActor a next command won't be handled until the event has been persisted successfully and its callback has been called. This is not the case in SaveSnapshot, which doesn't block actor from picking a next message before confirming, that snapshot has been saved.

This is not the problem in eventsourced systems - if system will die before saving the snapshot, we replay from the last successfully stored snapshot and an events, that came afterward - but without events you may process tens of messages without any confirmation and lost that data upon actor system shutdown.

[C#] Using gRPC with custom serializers

I've decided to write this post, because I've noticed a little lack of gRPC-related knowledge in .NET community. For those of you who are not familiar with it - gRPC is a standard protocol for defining high-performance RPC services, build on top of HTTP 2 by Google.

Since most of the manuals talk about gRPC services from perspective of building services using code generation tools, I'll show how to tweak the API itself and adapting it for our needs.

Since I'm not a great fan of code gen - my mindset here is more or less avoid doing things that should belong to the compiler - I've decided to drop the dependency on service definitions for gRPC and proto files for Google Protocol Buffers. But how can we define custom services and use any serializer we want back there?

Building a custom gRPC service

Core component of gRPC is a method descriptor which informs both communicating parties about details, such as what type of messages are send/received on both sides, how should they be serialized and which of the message passing patterns do they apply.

First, what descriptor needs is to specify a communication pattern. gRPC allows to use one of 4 different patterns:

  • Unary which is simply an equivalent of standard request/response communication between two participants.
  • ServerStreaming when a client sends a single requests and gets an asynchronous stream of messages back from the server.
  • ClientStreaming when a client is a side, which sends an asynchronous stream of messages, and the server responds with a single reponse, once stream completes.
  • DuplexStreaming when both client an server share two simultaneous async streams of messages working in both directions. Nice thing with read streams is that they implement IAsyncEnumerator<> interface - an IEnumerator<> equivalent that allows for asynchronous element iteration. Who knows, maybe in the future it will be integrated into C# syntax like for loops ¯\_(ツ)_/¯.

While communication styles may be different, protocol always describe single message type responsible for handling requests inside provided method, and a single type of response messages. Method descriptor will require you to describe marshallers for each one of them.

Marshaller is basically a pair of functions used to serialize data back and for to array of bytes. This is my biggest complain around current design of gRPC API for .NET. IMHO there are better binary containers (ArraySegment<byte> as example) for a protocols designed for high throughput / low latency systems.

Given all of these, an example descriptor may look like that:

using Grpc.Core;

private static readonly Method<CustomRequest, CustomResponse> CustomMethod =  
    new Method<CustomRequest, CustomResponse>(
        type: MethodType.DuplexStreaming,
        serviceName: "CustomService",
        name: "CustomMethod",
        requestMarshaller: Marshallers.Create(
            serializer: request => SerializeIntoBytes(request),
            deserializer: bytes => DeserializeFromBytes(bytes)),
        responseMarshaller: Marshallers.Create(
            serializer: response => SerializeIntoBytes(response),
            deserializer: bytes => DeserializeFromBytes(bytes)));

I must admit, I like this approach to serialization. It's way more composable to give a pair of functions than to building a custom interface implementation each time a new library wants an integration point.

Once we got that, let's move to server definition. Server needs an instance of ServerServiceDefinition, a descriptor of the service itself (so far we created only a method descriptor). Thankfully we can use existing builders to quickly construct that for us.

Depending on MethodType we've chosen, we need a different versions of lambda handlers. Here I'm using duplex streaming, but keep in mind that due to API construction compiler won't detect mismatch between method type and actual delegate handler used.

var server = new Server  
{
    Ports = { { "127.0.0.1", 5000, ServerCredentials.Insecure } },
    Services =
    {
        ServerServiceDefinition.CreateBuilder()
        .AddMethod(CustomMethod, async (requestStream, responseStream, context) =>
            {
                await requestStream.ForEachAsync(async request =>
                {
                    // handle incoming request
                    // push response into stream
                    await responseStream.WriteAsync(new CustomResponse {Payload = request.Payload});
                });
            })
        .Build()
    }
};

Here a context describes all data characteristic to a connection itself.

Keep in mind that pulling data from request stream an pushing new data into response stream can happen at any pace. No need for keeping 1-for-1 rule here.

For a simple duplex streaming of ping-pong service, our server is now ready to start. Let's move to a client. For this, first we need to establish a Channel. What's easy to miss here: when creating a channel, we need to provide endpoint of a server we wish to connect to, not the client itself.

Once we got our channel, we need a call invoker. We have a few options here i.e. for a case when you want to inject your own interceptors. However here, we'll go just with a default one.

Given channel and call interceptor, we need to make an actual call. Here we're providing a method descriptor we defined earlier. We can provide some additional parameters (since gRPC uses HTTP 2.0, we can i.e. set some HTTP headers here). Again, just like in a case of server handler, make sure that the call method you're using reflects correct method type.

var channel = new Channel("127.0.0.1", 5000, ChannelCredentials.Insecure);  
var invoker = new DefaultCallInvoker(channel);  
using (var call = invoker.AsyncDuplexStreamingCall(CustomMethod, null, new CallOptions  
{
}))
{
    var responseCompleted = call.ResponseStream
        .ForEachAsync(async response => /* handle response*/));
    for (int i = 0; i < Iterations; i++)
    {
        await call.RequestStream.WriteAsync(new CustomRequest {Payload = i});
    }

    await call.RequestStream.CompleteAsync();
    await responseCompleted;
}

EDIT (2017.03.29): if you want to try it, you may check a simple hello world example, available now in this Github repository.