Bartosz Sypytkowski's Picture

Bartosz Sypytkowski

33 posts

An introduction to state-based CRDTs

This is one of the topics I've already talked about on several presentations in the past, but never actually written about. Here I want to cover a topic of Conflict-free Replicated Data Types: what problems they aim to solve and provide some basic implementations (in F#) to help you understand how to build them.

A motivation

Conflict-free Replicated Data Types are answer for a common problem of synchronizing data in distributed environments. While issues on that field where well-known and there were numerous attempts to solve it in the past, usually they were a variants of decentralized 2-phase commit transactions. However they all suffer for similar problems:

  • We are assuming, that all participants are available for the time of the transaction, which often (i.e. in case of edge computing and mobile apps) is not true.
  • Multi-phase and quorum-based commits require numerous round trips between communicating parties. That comes with a cost in terms of latency and throughput. Moreover this approach has tendency to degrade at scale, when number of machines or distance between them increases. In many cases it's not feasible to use distributed transaction across boundaries of a single data center.
  • They often determine a single master node used for exclusive write access or as a single source of truth. In some cases - as the ones mentioned above - it's not a feasible solution.

This doesn't mean, that existing approaches are bad. However what we are striving for is to give a wider area of possible solutions, that may better fit the problem under certain conditions. One of the examples, that I like to use to visualize the problem is that:

Imagine that you need to build a planet-scale video streaming service. Whenever a user uploads a video, we are replicating it across different data centers located on different continents to maintain good throughput and latency, and in result a better user experience. Additionally we want to show users a view count for that video.

Video uploading is a good example of master-slave replication. However things may complicate for such a small feature as view count. With many concurrent users over the entire planet and write-heavy characteristics, using the same approach for counter increments is not a great idea, as this may end up with congestion for more popular videos. We don't need transactions, as the nature of the problem allows us to loose constrains of strong consistency in favor of higher availability of our application. However as described earlier most of the existing solutions are based on exclusive write access to a replicated resource. This is where CRDTs and multi-master scenarios come to play.

Use cases and implementations

While this was quite simple example, there are many others that we can look up for in current industry:

  • Amazon uses CRDTs to keep their order cart in sync. They've also published their database known as Dynamo, which allows AWS audience to make use of CRDTs.
  • Riak is one of the most popular solutions in this area. One of their well-known customers are Riot games (company behind League of Legend), which uses Riak to implement their in-game chat.
  • Rovio (company behind Angry Birds game series) uses conflict-free counters for their advertisement platform to make their impression counters work freely even in offline scenarios.
  • SoundClound has their own implementation of Last-Write-Wins Set build in Go on top of Redis, known as Roshi, which they use for their observers management.
  • TomTom makes use of CRDTs to manage their navigation data.
  • CREAustralia uses them for their click stream analytics.

There are also other solutions around there:

  • AntidoteDB is another, pretty innovative approach to eventually consistent databases. One of its unique feature is transaction support in eventually consistent environment.
  • Akka.DistributedData is a plugin for Akka (and Akka.NET) distributed actor programming model, which exposes several CRDT types on top of Akka cluster.
  • Redislabs offers CRDB as part of their enterprise Redis database solution.
  • Cassandra and ScyllaDB allow their users to make use of eventually consistent counters in their databases.

What does it mean to be conflict-free?

Conflict-free is a vague description, but it comes to a simple statement: we are operating on a data structures, that don't require exclusive write access and are able to detect concurrent updates and perform deterministic, automatic conflict resolution. This doesn't mean that conflict doesn't ever occur, but we are able to always determine the output up front, based on a metadata contained within the structure itself. The core structures here are counters, registers and sets, but from them we can compose more advanced ones like maps, graphs or even JSON.

Types of CRDTs

We can discriminate CRDTs using two core categories: state-based (convergent) and operation-based (commutative) data types. No matter which one we talk about, they all consists of two parts: replication protocol and state application algorithms.

In practice both versions differ heavily on implementation and their "center of gravity" is focused in different place. Since we need some additional metadata to provide automatic conflict resolution: state-based CRDTs encapsulate it as part of the data structure, while operation-based tend to put more of it onto replication protocol itself. Here, I'm going to cover one of the simpler state-based approaches.

Convergent replicated data types

The single most important operation of state-based CRDTs is Merge method. What it does is essentially to take a two corresponding replicas of the same logical entity, and produce an updated state as an output. If any conflicts occur, it's up to merge operation to resolve them.

Moreover merge operation must conform to three properties, which give us great perks for using them:

  • Commutativity (x • y = y • x) and Asociativity ((x • y) • z = x • (y • z)) which means that we can perform out of order merge operations and still end up with correct state.
  • Idempotency (x • x = x), so we don't need to care about potential duplicates send from replication layer.

Those properties are not easy to guarantee, but you're going to see how far we can go only by using two basic operations, which meet those criteria:

  • union of two sets
  • maximum of two values

With them in our hands, the only requirement on our replication layer is to eventually dispatch all state changes to all replicas. There is however a problem with this: to perform those merge operations we need to carry whole data structure on every change. Imagine that we need to send the entire collection of 1000 elements over the wire, only because someone added one extra element. This problem can be solved by the approach known as delta-state CRDTs, which I'll discuss another time.

Now, lets cover some basic data structures and see how we could compose them into more advanced ones. Please keep in mind, that those examples are mind to be simple and present you the approach to the problem.

Counters

Counters are the first type of CRDTs, we'll cover here. Essentially they allow to read, increment and (optionally) decrement a counter value concurrently on many different machines, without worrying about locking.

Growing-only Counter

Also known as GCounter. It's a counter which value can only be ever increasing. One of it's use cases could be a page view counter I've referred to in the motivation section. Simply speaking it's a map of replica-id/partial-counter values.

In order to calculate total counter's value, we need to sum the values of all known counters specific to particular replicas.

If we want to increment a counter's value, we simply increment a partial counter in the context of the replica, we're working with at the moment. It's crucial, that a single replica should never increment the value of another replica.

module GCounter =  
    type GCounter = Map<ReplicaId, int64>
    let zero: GCounter = Map.empty
    let value (c: GCounter) = 
        c |> Map.fold (fun acc _ v -> acc + v) 0L
    let inc r (c: GCounter) =
        match Map.tryFind r c with
        | Some x -> Map.add r (x + 1) c
        | None   -> Map.add r 1 c
    let merge (a: GCounter) (b: GCounter): GCounter =
        a |> Map.fold (fun acc ka va ->
            match Map.tryFind ka acc with
            | Some vb -> Map.add ka (max va vb) acc
            | None    -> Map.add ka va acc) b

When it comes to a merge operation, we simply concatenate key/value entries of both counters. When we detect that both counters have different values for the same replica, we simply take a max of both values. This is a correct behavior, since we know that counter values could only be incremented. For the same reason, the decrement operation is not supported by this kind of CRDT.

Increment/decrement Counter

Now, lets talk about so called PNCounter, which is able to provide both increment and decrement operations. I think, it's particularly useful, as it's a simple example to show, how we can compose simple CRDTs to build more advanced ones.

The crucial trick here, is that PNCounter consists of two GCounters - one of them used to count increments and other used for decrements - so decrement operation is simply incrementing GCounter part responsible for counting decrements. Our output value is basically a difference between the two.

module PNCounter =  
    type PNCounter = GCounter.GCounter * GCounter.GCounter
    let zero: PNCounter = (GCounter.zero, GCounter.zero)
    let value (inc, dec) = GCounter.value inc - GCounter.value dec
    let inc replica (inc, dec) = (GCounter.inc replica inc, dec)
    let dec replica (inc, dec) = (inc, GCounter.inc replica dec)
    let merge (inc1, dec1) (inc2, dec2) = 
        (GCounter.merge inc1 inc2, GCounter.merge dec1 dec2)

As you can see merge operation is again pretty trivial: a simple merge of corresponding GCounter parts from both PNCounters.

There are also other types of counters, which I won't cover here. One of particularly interesting cases are Bounded Counters, which allow you to provide an arbitrary upper/lower bound on the counter to determine if a target threshold has been reached.

Note about vector clocks

We already mentioned counters implementation. Before we go forward, I think it's a good point to talk about the vector clocks and notion of time.

In many systems, a standard way of defining causality (a happened-before relationship) is by using time stamps. There is a problem however with using them in terms of high-frequency distributed systems:

  • Operating systems, especially executing on machines in different data centers, can be subject of clock skews, which can kick your butt in write-heavy scenarios. Also another anomalies can occur: leap second bugs or even invalid time values happening between two threads.
  • While timestamps can potentially give us information necessary to determine the most recent update (we'll use that in a minute), they won't tell us anything about the "state of the world" at the moment, when update has happened. This means, we cannot detect if one update knew about another, or if they happened concurrently.

This is where vector clocks come to work. They are form of logical clocks, represented by monotonically incremented values, specific for each replica. Sounds much like GCounter we've seen above ;)

Why do we talk about them here? The internal implementation of vector clock is very close to what GCounter looks like. The major difference here is an ability to partially compare two vector clocks.

Unlike standard comparison, partial comparison allows us to determine fourth possible result - an indecisive one, when we are no longer able to determine if two values have lesser, greater or equal relationship. We can use this to recognize concurrent updates, which happened between two clocks.

Here, we'll define vector clocks as:

type Ord =  
    | Lt = -1  // lower
    | Eq = 0   // equal
    | Gt = 1   // greater
    | Cc = 2   // concurrent

type VTime = GCounter.GCounter  
module VClock =  
    let zero = GCounter.zero
    let inc = GCounter.inc
    let merge = GCounter.merge
    let compare (a: VTime) (b: VTime): Ord = 
        let valOrDefault k map =
            match Map.tryFind k map with
            | Some v -> v
            | None   -> 0L
        let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq
        let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq
        (akeys + bkeys)
        |> Seq.fold (fun prev k ->
            let va = valOrDefault k a
            let vb = valOrDefault k b
            match prev with
            | Ord.Eq when va > vb -> Ord.Gt
            | Ord.Eq when va < vb -> Ord.Lt
            | Ord.Lt when va > vb -> Ord.Cc
            | Ord.Gt when va < vb -> Ord.Cc
            | _ -> prev ) Ord.Eq

The comparison function, even thou long, is pretty simple - we'll compare pairwise entries of both VTime maps (if an entry didn't exist on the opposite side, we count its value as 0):

  • If all values of corresponding replicas are equal, clocks are equal:
  • If all values on the left side are lower than or equal to their counterparts on the right side, left side is lesser than the right one.
  • If all values on the left side are greater than or equal to their counterparts on the right side, left side is greater than the right one.
  • Any mix of lesser/greater entries comparison means, that we detected a concurrent update.

If you're more interested about the topic of time in distributed systems, I can recommend you a great talk about this subject: Keeping Time in Real Systems by Kavya Joshi.

Registers

The next type of CRDTs are registers. You can think of them as value cells, that are able to provide CRDT semantic over any defined type. Remember, that we're still constrained by commutativity/associativity/idempotency rules. For this reason we must apply additional metadata, which will allow us to provide arbitrary conflict resolution in case of conflict detection.

Last Write Wins Register

The most obvious way to solve conflicts, we already have talked about earlier, is to use timestamps. This is exactly what our implementation of LWWReg uses.

module LWWReg =  
    type LWWReg<'a> = 'a * DateTime
    let zero: LWWReg<'a> = (Unchecked.defaultof<'a>, DateTime.MinValue)
    let value (v, _) = v
    let set c2 v2 (v1, c1) = if c1 < c2 then (v2, c2) else (v1, c1)
    let merge (v1, c1) (v2, c2) = if c1 < c2 then (v2, c2) else (v1, c1)

It's quite obvious. Our set and merge operations simply compare two registers and pick register's value with a higher timestamp.

Just like in previous cases, we'll be able to compose LWW registers with other CRDTs to provide more advanced operations.

Sets

Once we've covered counters and registers, it's time to talk about collections. The most natural candidate there are different variations of sets - simply because set union conforms to associativity/commutativity/idempotency properties mentioned before. Later on, we could use them to define structures like maps, graphs or even indexed linear sequences (useful i.e. in collaborative text editing).

Growing-only Set

Just like in case of counters, here the most basic example is a growing-only set, also known as GSet. One of its cases could be i.e. a voting system, where we'd like to tell if a person has participated in voting, while still making his/her vote anonymous (in this case a total voting result could be GCounter itself).

module GSet =  
    type GSet<'a when 'a: comparison> = Set<'a>
    let zero: GSet<'a> = Set.empty
    let value (s: GSet<'a>) = s
    let add v (s: GSet<'a>) = Set.add v s
    let merge (a: GSet<'a>) (b: GSet<'a>) = a + b

No, it's not trolling. It's just a standard set! :) The only difference here is that we constrain ourselves not to perform any removals on the set. The reason for that is merge operator: since our merge is just a standard union, if we'd remove any element from any of replicas, after merging it with another replica (where that removal hasn't happened yet), removed element with auto-magically reappear in the result set.

There's also a lesson here: because we removed an element from the set, we lost some data. This is something, we often cannot afford in case of CRDTs and it's the reason, why we often must attach some additional metadata, even thou it may seem not to be explicitly needed by the result value.

2-Phase Set

The next step is two phase set. Like in case of PNCounter, we could simply combine two GSets - one for added elements, and one for removed ones (often referred to as tombstones). Add/remove element and merge also works pretty much like in case of PNCounter/GCounter.

module PSet =  
    type PSet<'a when 'a: comparison> = GSet.GSet<'a> * GSet.GSet<'a>
    let zero: PSet<'a> = (GSet.zero, GSet.zero)
    // (add, rem) is a single PSet instance
    let value (add, rem) = add - rem  
    let add v (add, rem) = (GSet.add v add, rem)
    let rem v (add, rem) = (add, GSet.add v rem)
    let merge (add1, rem1) (add2, rem2) = 
        (GSet.merge add1 add2, GSet.merge rem1 rem2)

There are several problems with following implementation:

  • Common case of tombstone-based sets is the fact that removed set can grow infinitely, so that final value set will take only fraction of size of actual metadata necessary to keep the sets consistent. There are extra algorithms - known as tombstone pruning - to mitigate that problem.
  • While we are able to remove added element, the problem appears when we'll try to add removed element again. Since we're don't have any semantic to remove elements from any of the underlying GSets, once removed, element will stay in tombstone forever. This will cause removing it from the final value set. So no re-adding the value for you my friend. Again, we need some extra metadata that will allow us to track causality to determine when add/remove have happened.

Observed Remove Set

If you kept up to this point, congratulations! We're actually going to make a first semi-advanced case here: an observed remove set (known as ORSet), which will allow us to freely add/remove elements and still converge when merging replicas from different locations.

How does it work? We'll represent our ORSet as add/remove collections, but this time instead of sets, we'll use maps. The keys in those maps will be our elements, while values will be a (partially) comparable timestamps used to mark, when the latest add/remove has happened.

The actual specialization of ORSet depends on the timestamp and conflict resolution algorithm used:

  • You could use DateTime for timestamps and prefer the latest value on conflict resolution. This will essentially give us Last Write Wins semantics (just like in LWWReg) over particular elements of the set. This would greatly simplify things, but we're going to do better than that :)
  • Other approach is to use vector clocks, we defined earlier in this post. This will allow us to detect, when two replicas have added/removed the same element without knowing about other parties trying to do the same. When such case is detected, we need to arbitrary tell, what the outcome of our conflict resolution algorithm will be. The most common case is usually preferring additions over removals. This is known as Add-Wins Observed Remove Set (shortly AWORSet). This is what we'll implement here.
module ORSet =  
    type ORSet<'a when 'a: comparison> = Map<'a, VTime> * Map<'a, VTime>
    let zero: ORSet<'a> = (Map.empty, Map.empty)

To get the result set, our value function will iterate over add map and remove from it all entries from removals map, where add timestamp is lower than remove timestamp (this means that if both updates were concurrent, we keep the result).

    let value (add, rem) = 
        rem |> Map.fold(fun acc k vr ->
            match Map.tryFind k acc with
            | Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
            | _ -> acc) add

Just like in case of PNCounter our add/remove operations need to work in context of a particular replica r - this is the result of using vector clocks as timestamps. Here, we'll simply add element to corresponding map and increase it's vector clock.

    let add r e (add, rem) =
        match Map.tryFind e add, Map.tryFind e rem with
        | Some v, _ -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
        | _, Some v -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
        | _, _ -> (Map.add e (VClock.inc r VClock.zero) add, rem)

    let remove r e (add, rem) =
        match Map.tryFind e add, Map.tryFind e rem with
        | Some v, _ -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
        | _, Some v -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
        | _, _ -> (add, Map.add e (VClock.inc r VClock.zero) rem)

Additional thing, you may have noticed here is that we use Map.remove. It's safe to do so in this context, as we at the same time add value-timestamp pair to the opposite map, still keeping the information about element presence inside an object.

The most complex part is actual merge function. We start from simply squashing corresponding add/remove maps (in case of conflicting timestamps, we will simply merge them together). Then what we need is to converge merged add/remove maps by removing from add map all values with timestamps lower than corresponding entry timestamps in remove map (this already covers concurrent update case, as we decided to favor additions over removals at the beginning). For the remove set, we'll simply remove all elements with timestamps lower, equal or concurrent to the ones from add map. Just to keep things fairly compact.

    let merge (add1, rem1) (add2, rem2) =
        let mergeKeys a b =
            b |> Map.fold (fun acc k vb ->
                match Map.tryFind k acc with
                | Some va -> Map.add k (VClock.merge va vb) acc
                | None -> Map.add k vb acc ) a
        let addk = mergeKeys add1 add2
        let remk = mergeKeys rem1 rem2
        let add = remk |> Map.fold (fun acc k vr ->
            match Map.tryFind k acc with
            | Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
            | _ -> acc ) addk
        let rem = addk |> Map.fold (fun acc k va ->
            match Map.tryFind k acc with
            | Some vr when VClock.compare va vr <> Ord.Lt -> acc
            | _ -> Map.remove k acc ) remk
        (add, rem)

You can see here, we're using a map of vector clocks (which are also maps), which is a sub-optimal solution for this implementation. There are different ways used to mitigate this problem:

  1. The simplest way is to compress binary ORSet payload using for example L4Z or GZip.
  2. More advanced approach is to modify the existing implementation using Dotted vector versions.

But I hope to write about them another time.

What next?

With this set of data structures in our arsenal, we could build more advanced ones:

  • One example would be a Last-Write-Wins Map, which essentially looks somewhat like type LWWMap<'k,'v> = ORSet<('k * LWWReg<'v>)> (keep in mind, that comparison should depend only on key component).
  • Another one would be a graph structure, which is composed of two sets: one for nodes and one for edges.

As you may see, this is quite big topic and IMHO a pretty interesting one. There are many more things, like delta-state based optimizations nad operation-based CRDTs, which I won't cover here - simply to not turn this post into a book - but I hope to continue the upcoming posts.

Akka.Persistence: most common misconceptions

Akka.Persistence is one of the plugins, that introduce an eventsourced persistence mechanics into Akka.NET. However, I've often seen people having problems, since neither actors nor eventsourcing are part of mainstream .NET development. A lot of these are often a general problems of eventsourced systems, therefore I've decided to write some more about them and present potential solutions from the perspective of Akka.Persistence user.

This post describes current state of things. They may change in the future, as Akka.NET team want to address some of them.

Control your own data

There are two ways, how Akka.NET event journals may store your data:

  1. Some of them may define totally custom serialization mechanics, i.e:
    • MongoDB persistence uses BSON serializer which requires you to define bindings to all serialized message types.
    • PostgreSQL plugin has configuration switch which will use JSON.NET to store your data as JSON/JSONB column (which is well understood by this database). In future, the rest of SQL databases will get that support as well.
  2. At the moment most of them however will use default akka serialization mechanics: the same that are used to pass messages between cluster nodes.

One of the first mistakes, everyone do, is giving up your data in the hands of the default serializer. First of all, you're giving up ability to encode/decode your data into 3rd party dependency (sic!). And what's most important, default serializer is build for remote message exchange, it may NOT be a good way to store immutable, everlasting data structures.

Even Newtonsoft.Json serializer (which was initially choosen as Akka.NET default serializer) is not a good option. If you were thinking "it's just a JSON, what could go wrong", the story is a little more complicated. Default configuration of it encodes a full type name as a part of the payload in order to support polymorphic deserialization. This means that once you'll change your event qualified type description (namespace, type name or assembly) - something you shouldn't do anyway - your old data is lost, as it'll try to deserialize it to type that can no longer be found.

So what to choose?

  1. Some people are using schema-less formats, as they allow for a certain degree of version tolerance without much work. This includes JSON, just not on the settings, that are used by Akka.NET remote serializer.
  2. Others (including me) are more enthusiasts of schema-based serializers i.e. Google Protocol Buffers or Microsoft Bond, as they keep event schema/contract explicit and well defined.

Why I've linked Google.Protobuf and not protobuf-net? I think that keeping schema in .proto files makes things more explicit and prevents accidental schema changes.

Configuring a custom serializer

Writing a custom serializer is rather easy. All you need is to write down a custom class inheriting from Serializer base class. But how to bind it to a specific event type?

Akka.NET serialization bindings are based on types. This means, that you need to specify an event-typeserializer pair. Type provided here doesn't have to be actually a concrete class - Akka will investigate inheritance chain and all implementations in order to resolve a binding. This means that usually we create an empty marker interface and implement it in all events, that we want to serialize with target serializer.

An example HOCON config may look like that:

akka.actor {  
  serializers.my-custom = "MyNamespace.MySerializer, MyAssembly"
  serialization-bindings {
    "MyNamespace.IDomainEvent, MyAssembly" = my-custom
  }
}

Have an event versioning strategy

As events are immutable by definition, we need to prepare some approach, how to behave if we want to be able to change them as we develop our system. This is what we call event versioning.

If you want to have a wider perspective on that problem, you probably should read Versioning in an Eventsourced System by Greg Young.

Most of the people see and understand a rationale behind having a migration strategy for their database. However, a lot less of them have an idea on how to approach a schema evolution in eventsourced systems.

Event adapters

One of the features of Akka.Persistence are event adapters. They are a middleware components that you can plug into a pipeline between event journal and a persistent actor, allowing you to modify deserialized events, before they reach the destination.

To create a custom event adapter, you need a class that implements either a IReadEventAdapter (to interpect events received from the journal), IWriteEventAdapter (to intercept events send to the journal) or just IEventAdapter which combines both.

Event adapters can be used to upgrade old event versions to newer ones or event to deconstruct single event into multiple ones. When is that useful? Example: if you made a mistake in the past, and defined an event that actually had more than a single responsibility (i.e. OrderedAndConfirmed instead of separate Ordered/Confirmed kind of events).

But how to bind an adapter to be used for some event in scope of the journal?

# we will use SqlServer journal for this example
akka.persistence.journal.sql-server {  
  event-adapters {
    custom-adapter = "MyNamespace.MyEventAdapter, MyAssembly"
  }
  event-adapter-bindings {
    "MyNamespace.V1.IDomainEvent, MyAssembly" = custom-adapter
  }
}

Don't use snapshots without events

This is a popular problem for the people starting their journey with Akka.Persistence. Quite often they see persistence snapshots and think: hey, I don't need eventsourcing, let me just use one of these. However a sole role of Akka.Persistence snapshots is to optimize streams of events.

If you're into snapshot-based persistence, Akka.Persistence is definitely not for you. Persistent actors are build to support different needs, just like journals and even snapshots themselves - as they have fields used to correlate their position in a stream of events. They also don't map well into multi-table/multi-document database hierarchies.

Another thing: when you're persisting events with PersistentActor a next command won't be handled until the event has been persisted successfully and its callback has been called. This is not the case in SaveSnapshot, which doesn't block actor from picking a next message before confirming, that snapshot has been saved.

This is not the problem in eventsourced systems - if system will die before saving the snapshot, we replay from the last successfully stored snapshot and an events, that came afterward - but without events you may process tens of messages without any confirmation and lost that data upon actor system shutdown.

[C#] Using gRPC with custom serializers

I've decided to write this post, because I've noticed a little lack of gRPC-related knowledge in .NET community. For those of you who are not familiar with it - gRPC is a standard protocol for defining high-performance RPC services, build on top of HTTP 2 by Google.

Since most of the manuals talk about gRPC services from perspective of building services using code generation tools, I'll show how to tweak the API itself and adapting it for our needs.

Since I'm not a great fan of code gen - my mindset here is more or less avoid doing things that should belong to the compiler - I've decided to drop the dependency on service definitions for gRPC and proto files for Google Protocol Buffers. But how can we define custom services and use any serializer we want back there?

Building a custom gRPC service

Core component of gRPC is a method descriptor which informs both communicating parties about details, such as what type of messages are send/received on both sides, how should they be serialized and which of the message passing patterns do they apply.

First, what descriptor needs is to specify a communication pattern. gRPC allows to use one of 4 different patterns:

  • Unary which is simply an equivalent of standard request/response communication between two participants.
  • ServerStreaming when a client sends a single requests and gets an asynchronous stream of messages back from the server.
  • ClientStreaming when a client is a side, which sends an asynchronous stream of messages, and the server responds with a single reponse, once stream completes.
  • DuplexStreaming when both client an server share two simultaneous async streams of messages working in both directions. Nice thing with read streams is that they implement IAsyncEnumerator<> interface - an IEnumerator<> equivalent that allows for asynchronous element iteration. Who knows, maybe in the future it will be integrated into C# syntax like for loops ¯\_(ツ)_/¯.

While communication styles may be different, protocol always describe single message type responsible for handling requests inside provided method, and a single type of response messages. Method descriptor will require you to describe marshallers for each one of them.

Marshaller is basically a pair of functions used to serialize data back and for to array of bytes. This is my biggest complain around current design of gRPC API for .NET. IMHO there are better binary containers (ArraySegment<byte> as example) for a protocols designed for high throughput / low latency systems.

Given all of these, an example descriptor may look like that:

using Grpc.Core;

private static readonly Method<CustomRequest, CustomResponse> CustomMethod =  
    new Method<CustomRequest, CustomResponse>(
        type: MethodType.DuplexStreaming,
        serviceName: "CustomService",
        name: "CustomMethod",
        requestMarshaller: Marshallers.Create(
            serializer: request => SerializeIntoBytes(request),
            deserializer: bytes => DeserializeFromBytes(bytes)),
        responseMarshaller: Marshallers.Create(
            serializer: response => SerializeIntoBytes(response),
            deserializer: bytes => DeserializeFromBytes(bytes)));

I must admit, I like this approach to serialization. It's way more composable to give a pair of functions than to building a custom interface implementation each time a new library wants an integration point.

Once we got that, let's move to server definition. Server needs an instance of ServerServiceDefinition, a descriptor of the service itself (so far we created only a method descriptor). Thankfully we can use existing builders to quickly construct that for us.

Depending on MethodType we've chosen, we need a different versions of lambda handlers. Here I'm using duplex streaming, but keep in mind that due to API construction compiler won't detect mismatch between method type and actual delegate handler used.

var server = new Server  
{
    Ports = { { "127.0.0.1", 5000, ServerCredentials.Insecure } },
    Services =
    {
        ServerServiceDefinition.CreateBuilder()
        .AddMethod(CustomMethod, async (requestStream, responseStream, context) =>
            {
                await requestStream.ForEachAsync(async request =>
                {
                    // handle incoming request
                    // push response into stream
                    await responseStream.WriteAsync(new CustomResponse {Payload = request.Payload});
                });
            })
        .Build()
    }
};

Here a context describes all data characteristic to a connection itself.

Keep in mind that pulling data from request stream an pushing new data into response stream can happen at any pace. No need for keeping 1-for-1 rule here.

For a simple duplex streaming of ping-pong service, our server is now ready to start. Let's move to a client. For this, first we need to establish a Channel. What's easy to miss here: when creating a channel, we need to provide endpoint of a server we wish to connect to, not the client itself.

Once we got our channel, we need a call invoker. We have a few options here i.e. for a case when you want to inject your own interceptors. However here, we'll go just with a default one.

Given channel and call interceptor, we need to make an actual call. Here we're providing a method descriptor we defined earlier. We can provide some additional parameters (since gRPC uses HTTP 2.0, we can i.e. set some HTTP headers here). Again, just like in a case of server handler, make sure that the call method you're using reflects correct method type.

var channel = new Channel("127.0.0.1", 5000, ChannelCredentials.Insecure);  
var invoker = new DefaultCallInvoker(channel);  
using (var call = invoker.AsyncDuplexStreamingCall(CustomMethod, null, new CallOptions  
{
}))
{
    var responseCompleted = call.ResponseStream
        .ForEachAsync(async response => /* handle response*/));
    for (int i = 0; i < Iterations; i++)
    {
        await call.RequestStream.WriteAsync(new CustomRequest {Payload = i});
    }

    await call.RequestStream.CompleteAsync();
    await responseCompleted;
}

EDIT (2017.03.29): if you want to try it, you may check a simple hello world example, available now in this Github repository.

Optimizing event journals

In this post I want to talk a little about some little optimization techniques, we've applied to some of the journals used in Akka.NET - those working with SQL backends. Before you read further - this post is written with assumption, that you're familiar with basics of eventsourcing architectures and a shape of them in Akka.NET. If you're not, you can read more about it here.

Old ways

The existing approach was quite simple and naive. Once a persistent actor had something to do with a persistent backend, it was sending a request to an EventJournal actor. In case of SQL, journal was creating a new DbConnection each time in order to execute every single request. This was not supposed to be something as bad as it sounds - ADO.NET handles connection pooling automatically.

However, reality resulted in some problems:

First one, a frequent creation and reopening of connection/command/transaction chain turned out to be not as cheap as we may wanted it to.

But more important, once the pressure kicks in, things starts to get nasty. Imagine tens of thousands of actors trying to persist their events at once. Eventually we'll saturate all of the connections from the pool - potentially even starving other parts of the application if our actor system is embedded into some bigger application (see: noisy neighbor problem). If this situation will continue, after some more time, timeouts will start to kick in, causing persist requests to throw and bubbling up errors to higher layers of application. The definitions of slow, unresponsive and dead database requests gets blurry at this point.

Is there anything we can do about it?

A new approach

Because nowadays, in order to present any new technique, we must give it some cheezy name, I've decided to describe this one as horizontal batching.

Initially we create a buffer for incoming requests and a a counter for a free connections remaining to be used.

In a happy case, when we are able to persist incoming events as they come, this new batching journal implementation works very similar to the old one, decrementing a connection counter. Once a connection gets released, we increment the counter back.

However when requests start to pile in, we may run out of free connections (counter go down to 0). In this case we start buffering them. Now, when a connection is released and can be reused, we take a page of requests from buffer and execute them within a single connection/command. There are few things we can configure here:

  • One is a maximum number of allowed operations to be executed concurrently (initial counter value). This way we may decide to give only a subset of our connection pool under supervision of the event journal.
  • Another one is maximum size of a single batch. As buffered requests can potentially grow big, we might decide to use only subset of them to be executed within a single connection.
  • The last important thing here is actually a maximum amount of stacked messages. In case when requests will come faster than they can be processed (even in batches), we could eventually run out of memory to even store them. With this setting, once a specified threshold is surpassed, an OnBufferOverflow method will be called. By default it will reject a request. However it can also be overridden to apply custom backpressure strategy.

How fast does it work? So far we managed to process over 12 000 command→events per second (we're talking about full eventsourced actor protocol roundtrips here) using both application and SQL Server database on default settings on a standard dev laptop (8 logical cores, 16GB RAM).

There are some more improvements still waiting on their turn. While new journal implementation is still experimental and needs some more use, it has been designed to work as drop-in replacement over existing one. That means it soon should came as a standard to existing SQL implementations.

Quest for optionally asynchronous APIs

In this blog post I wanted to share with some of the improvements, we've made while working on the FSharp.Data.GraphQL library. To start with I'm going to describe our use case, what issues did we observe and how we solved them. If you're not familiar with GraphQL, please introduce yourself with it.

Introduction

GraphQL is the application-level query language, that can be integrated with application logic on the server side through one of the libraries of your choice. FSharp.Data.GraphQL is one of those.

What most of the server implementations allows you to do, is to define custom resolution logic for any of the schema object's fields. We can define them as:

Define.Object("User", [  
    Define.Field("firstName", String, fun ctx user -> user.FirstName)
])

However sometimes we would like to have some more advanced logic. Some of the fields may not refer to a particular entity instance. They may be obtained as part of asynchronous call. Therefore we have introduced another creation method, which allows us to define asynchronous operations:

Define.AsyncField("picture", String, fun ctx user -> async { return! getPic user.AvatarId })  

GraphQL allows us to define queries to fetch only necessary fields. Queries are highly dynamic constructs - we never know which fields will be expected. This also means that if we have synchronous and asynchronous fields in our API, any combination of these may be expected on the output. This is, where the problem begins.

Problem

It turned out, that the case above - solving potential resolution of both synchronous and asynchronous values - was quite problematic for the underlying GraphQL execution engine, we was working on. Since F# is a statically typed language, we needed some uniform way to work with both synchronously and asynchronously resolved fields.

We started with modeling them all in terms of F# Async computations. However, Async introduces a constant overhead - both in terms of the CPU and memory. Overhead, which now applies to every resolved field. Bad part: As practice shows, for the ~99% of the time, field resolvers are synchronous. This means introducing a heavy overhead by default, where it was not needed for most cases.

In case if you think, you're free of that in C# Task Parallel Library - you're not. As I said, when combination of fields requested by query is dynamic and runtime-dependent, compiler is not able to determine when to optimize async methods or not at the compile time.

Solution

We needed other kind of abstraction - something that will allow us to work with Async computations, but also will respect mostly-synchronous nature of our problem.

If you're familiar with list of changes planned for the future for the C# language, you'll notice an idea called ValueTask - shortly speaking it's a lightweight value type that is going to conform async/await API and allows to use both immediately returned values and TPL Tasks in the same way. Exactly something, we needed here.

However, ValueTasks still belongs to the future. Besides, we're building F# library and we needed something, that would feel natural for the F# devs, where F# contains it's own Async primitive.

This is why we created our own AsyncVal type - it behaves similar to Async, but it's able to use optimized path for synchronously resolved values. To make it easier to work with we've also created asyncVal { ... } computation expression and interop methods for async { ... } builder. With it we are free to express things such as:

let result = asyncVal {  
    let rand = random.Next()
    if rand % 1000 = 1 
    then return! async { return! someAsyncOperation () }
    else return rand
} |> AsyncVal.get

... and get both support for asynchronous execution and optimal performance for happy case.

How fast it is? While this implementation is still in flux, we've made some initial benchmarks (remember: benchmarks lie and actual performance growth for our case was not so optimistic), comparing AsyncVal vs. async.Return. It turned out to be almost 3000 times faster with no heap allocations (it's a struct type, and introduces only 4 bytes of overhead for 32-bit and 8 bytes for 64-bit machines). For truly async computations, it introduces a minimal overhead over existing Async implementation. You can see actual benchmarks in the following PR.

This allowed us to optimize the most common cases, without loosing potential to work with some higher level abstractions.

Summary

Right now AsyncVal is part of the FSharp.Data.GraphQL library and probably will require some more polishing, as it's created to solve our specific problems, not ready to be used for a general purpose - i.e. error management isn't exactly uniform at the moment.

However this PoC already proves it's usefulness and may be used as a foundation for something greater. Maybe in the future it will deserve it's own package.