HyParView: cluster membership that scales

In the past I've made an introduction to different layers of cluster protocols and presented SWIM: a simple and intuitive approach to cluster membership, popularized by success of tools such as Hashicorp Consul. This time we'll focus on another protocol called HyParView (hybrid partial view).

HyParView is quite unique in terms of its goals: it's expected to work over clusters consisting of huge numbers of nodes (potentially dozens of thousands) in a lightweight manner.

Intuition

For building intuition around this algorithm, let's try to use some common analogy. We can imagine traditional cluster of nodes as human gathering. As long as it consists only of handful of peers, everyone can easily track and participate in conversations within the group. However, as that number grows, we spend more and more time just trying to keep up with all participants. Also as the distance grows, it may turn out that we are not able to directly listen or talk to others sitting far away from us.

Using this analogy, we could explain HyParView algorithm as having two circles of relationships:

  • Inner circle, called active view, is the group we actually have an active conversation with. In software terms it's correlated with having a maintained connection to them (and vice versa). In HyParView we call these active peers neighbors.
  • Outer circle, called passive view, is the list of contacts we know about, but don't actually talk with at the moment. We only know (or at least hope) they're out there somewhere and that we might want to get in touch with them at some point, therefore moving them into our active view.

So why do we even need passive view if we only talk with peers from active one? Its purpose is to serve as a supplementary pool:

  • If our active peer has been removed (eg. because it was shut down) and we have a free sit at the table, we can invite one of the peers from our passive view to step in.
  • On the other hand, if a peer disconnected gracefully (eg. because it had to make free space for other higher priority peer), we still can keep them in our passive view to ask if they might want to join us in the future.

Unlike in real life, here peer exists either in active or passive set, but not in both at the same time - it's not a hard rule but it makes implementation easier. Thing is that, neither active nor passive view contain a complete list of all cluster members (hence name hybrid partial view). Both views have constrained upper bound capacity, which should be high enough to let all members to eventually build a single interconnected graph. Paper proposes to keep active view capacity as log(N) + C, where N is total expected cluster size and C is any constant eg. 1, while passive view should be K (eg. 6) times bigger than active size (we usually have more contacts in our phone than the number of people who we're actually able to talk with, right?).

Since both views are size-constrained, what happens when we want to add another peer to view, that's already full? Well, we can do that, but only if we remove another existing peer from the corresponding view first.

PS: Communication with member outside of our active view is resolved by propagating messages through chain of active connected peers. This topic however is covered by other protocols (like epidemic broadcast trees), which we won't touch in this blog post.

Implementation

With some intuition on the algorithm inner workings, let's try to write it down. You can use this snippet to get full grasp on the implementation used here, however we'll cover only key points. Our first component will be Reactor - it's an interface used by our membership actor to talk with outside environment. For purposes of this protocol, we need 3 methods:

  • Send which is responsible for message exchange between nodes. Here we also assume that if we'll try to send anything to an endpoint which we didn't connect before, we'll establish connection first before sending first message. The actor itself doesn't manage the connections to keep the implementation simple.
  • Disconnect which removes existing connection. It's necessary since - unlike many other cluster membership protocol - Hyparview assumes, that not all nodes will be connected to each other. For this reason we may need to explicitly say, that we need to close connection under certain conditions.
  • Notify which we'll use to send notifications about new established/disposed connections with other neighbor nodes.
type PeerEvent =
  | NeighborUp of Endpoint
  | NeighborDown of Endpoint

type Reactor =
  abstract Send: target:Endpoint * envelope:Envelope -> Async<unit>
  abstract Disconnect: Endpoint -> Async<unit>
  abstract Notify: PeerEvent -> unit
  
type PeerState =
  { ActiveView: Set<Endpoint>
    PassiveView: Set<Endpoint>
    Config: Config
    Random: Random
    Output: Reactor }

First operation, we need to cover is joining two nodes. HyParView allows remote nodes to deny incoming connections eg. when their own active view limit is reached. Originally this is designed to let the cluster stabilize and to avoid constant churn of opening and closing connections. However we also need an escape hatch here - if node could deny just any connection, we might have ended in situation, when new joining node that has no other contact points, would never be able to join the cluster.

For this reason we add a highPriority flag, that will force remote node to always accept incoming connection, even at the cost of having to close another peer from its active view.

let onJoin (state: PeerState) (peer: Endpoint) = async {
  // explicit join is treated with high priority
  let! state = addActive peer true state 
  let ttl = state.Config.ActiveRandomWalkLength
  let fwd = ForwardJoin(peer, ttl)
  for node in Set.remove peer state.ActiveView do
    do! state.Send(node, fwd)
  return state
}

To finalize join procedure, receiver is going to introduce new peer to its neighbors using ForwardJoin message. This message will be chained through to randomly picked active peers. In order to avoid messages flowing endlessly through the cluster, we reinforce it with time to live (TTL) counter, decremented on each step, which is popular and cheap way of solving this kind of problems.

Here are some tricky parts - we don't just propagate ForwardJoin but potentially also try to add it to passive view or even active view (if receiving node was the last one or it had no other active peers). This allows us to reinforce cluster connections and avoid building clusters with tightly interconnected groups with few weak links to other groups - think about people who never try to meet new people, they just stick around to friends they already know. Cluster like that could easily split into several independent ones if connecting links would be too few and too weak: like a party which dispersers because different groups have different ideas about what to do next and had no common friends in each others groups to keep them together.

let onForwardJoin (state: PeerState) peer sender ttl = async {
  if ttl = 0 || Set.isEmpty state.ActiveView 
  then return! addActive peer true state
  else
    let state = 
      if ttl = state.Config.PassiveRandomWalkLength 
      then addPassive peer state 
      else state
    match state.Random.Pick(Set.remove sender state.ActiveView) with
    | None -> return! addActive peer true state
    | Some next ->
      do! state.Send(next, ForwardJoin(peer, ttl-1))
      return state
}

Up until now we only talked about management of active and passive views in abstract terms. Time for more concrete approach. We use a simple principle - given preconfigured maximum allowed capacity of a given set of endpoints, if adding new endpoint would surpass allowed capacity, we'll drop one of the existing endpoints in that view at random.

Since passive view serves as subordinate of active one, we cannot add peer to passive set if it already exists in active one. Also alive (and only alive) peer removed from active view is automatically moved to passive one - in case we may need to contact it in the future.

let isPassiveViewFull (state: PeerState) =
  state.PassiveView.Count >= state.Config.PassiveViewCapacity
    
let addPassive (peer: Endpoint) (state: PeerState) =
  if Set.contains peer state.ActiveView || 
     Set.contains peer state.PassiveView || 
     peer = state.Self
  then state
  else
    let passive =
      if isPassiveViewFull state then
        state.Random.Pick state.PassiveView // None => view was empty
        |> Option.map (fun drop -> Set.remove drop state.PassiveView)
        |> Option.defaultValue state.PassiveView
      else state.PassiveView
    { state with PassiveView = Set.add peer passive }

Managing active view is slightly more complex:

  1. Disconnect message should be send prior removing active peer, so that we can inform if disconnecting node is alive (for purpose of future reconnects) or should be responded to (since sending Disconnect requires opening connection, we want to avoid endless cycle of open/close connections from nodes trying to gracefully disconnect from each other).
  2. Since our active view represents cluster membership state, any changes there should trigger notification channel about neighbor peers being promoted up or stepping down.
  3. Additionally, since endpoints in active view have corresponding system resources (open TCP connections), these must be eventually released as well.
let isActiveViewFull (state: PeerState) =
  state.ActiveView.Count >= state.Config.ActiveViewCapacity
    
let removeActive peer (state: PeerState) respond = async {
  let active = Set.remove peer state.ActiveView
  if obj.ReferenceEquals(active, state.ActiveView) then return state
  else
    if respond then 
      do! state.Send(peer, Disconnect(alive=true, respond=false))
    do! state.Output.Disconnect(peer)
    state.Output.Notify(NeighborDown peer)
    return addPassive peer { state with ActiveView = active }
}

let removeActiveIfFull state = async {
  if isActiveViewFull state then
    match state.Random.Pick state.ActiveView with
    | Some drop -> return! removeActive drop state true
    | None -> return state
  else return state
}

let addActive peer (highPriority: bool) (state: PeerState) = async {
  if Set.contains peer state.ActiveView || peer = state.Self 
  then return state
  else
    let! state = removeActiveIfFull state
    let passive = Set.remove peer state.PassiveView
    let active = Set.add peer state.ActiveView
    do! state.Send(peer, Neighbor(highPriority))
    state.Output.Notify(NeighborUp peer)
    return { state with ActiveView = active; PassiveView = passive }
}

As you may have noticed, as part of promoting peer to active view a Neighbor message is being sent. Neighbor is similar to Join with this difference, that it doesn't result in introduction (forwarding) of joining peer, making this operation cheaper:

let onNeighbor (state: PeerState) peer highPriority = async {
  if highPriority || not (isActiveViewFull state) then
    return! addActive peer highPriority state
  else
    // non-active peers are disconnected after handler is called 
    return state
}

Now, once we handled all connection-related messages, it's time to talk about Disconnect message. As we can assume, we should remove disconnecting peer from active view. If peer will remain active (it's not shutting down), we can add it to passive view (you may say good bye to a friend leaving the party, but you still want to keep them in your contacts).

let onDisconnect (old: PeerState) peer alive respond = async {
  let! state = removeActive peer old respond
  if not (obj.ReferenceEquals(old, state)) then
    let passive = Set.remove peer state.PassiveView
    if not (isActiveViewFull state) then
      match state.Random.Pick passive with
      | Some node ->
        let highPriority = Set.isEmpty state.ActiveView
        do! state.Send(node, Neighbor(highPriority))
        return if alive then addPassive peer state else state 
      | None -> return if alive then addPassive peer state else state
    else return if alive then addPassive peer state else state
  else return state
}

As you may notice in code above, during disconnect we'll also try to refill our active view by picking one random peer from passive view and introduce ourselves as its Neighbor. That's the reason we had passive view in the first place.

One last thing is a shuffle process - we do this to periodically exchange our contacts with other connected nodes. While keeping peers in passive view is cheap (there are no system resources allocated to them), the downside is that we never know if we can connect to them unless we'll try to promote them to active view. In result, it may turn out that at some point all of our peers we kept in our contact book so far but didn't called for a while, have changed their addresses or went offline, and that our passive view is outdated. After last active peer went away, we won't know if there's a cluster somewhere, because we won't have anyone else left to ask.

For this reason, from time to time we ask one of our neighbors to exchange contacts (giving them portion of our active and passive view). Since we don't want to build local, tightly interconnected groups within the cluster - for the reasons we covered above - we'll add a time to live counter and gossip this request just like ForwardJoin.

let doShuffle (state: PeerState) = async {
  match state.Random.Pick state.ActiveView with
  | None -> ()
  | Some node ->
    let active =
      state.Random.Shuffle (Set.remove node state.ActiveView)
      |> Seq.take state.Config.ShuffleActiveViewCount
      |> Set.ofSeq
    let passive =
      state.Random.Shuffle state.PassiveView
      |> Seq.take state.Config.ShufflePassiveViewCount
      |> Set.ofSeq
    let msg =
      { Origin = state.Self
        Nodes = active + passive
        Ttl = state.Config.ShuffleTTL }
    do! state.Output.Send(node, Shuffle msg)
}

This message is propagated through random active peers until the final peer is reached (TTL has expired). Then this peer will respond with equal portion of their contacts and give them back, while adding received peers into their own passive view.

let addAllPassive (state: PeerState) nodes =
  nodes |> Set.fold (fun acc node -> addPassive node acc) state

let onShuffle (state: PeerState) (shuffle: Shuffle) sender = async {
  if shuffle.Ttl = 0 then
    let nodes =
      state.Random.Shuffle(state.PassiveView)
      |> Seq.take shuffle.Nodes.Count
      |> Set.ofSeq
    do! state.Send(shuffle.Origin, ShuffleReply(nodes))
    return addAllPassive state shuffle.Nodes
  else 
    let peers = state.ActiveView - Set.ofList [shuffle.Origin; sender]
    match state.Random.Pick peers with
    | Some node ->
      do! state.Send(node, Shuffle { shuffle with Ttl = shuffle.Ttl - 1 })
      return state
    | None -> return state
}

let onShuffleReply (state: PeerState) nodes = async {
  return addAllPassive state nodes
}

Final notes

Unlike many membership protocols keeping cluster in fully aware, fully connected state, HyParView uses idea of partial cluster view to make it establish faster and scale better. How much better? The highest described scenario reached the size of 10,000 nodes, but probably could be higher. While claims of having X-thousands members with clusters using other protocols are not unheard off, they usually come with strings attached: people sometimes tend to stretch the definition by counting cluster of dozens coordinated discovery endpoints (actual cluster) together with thousands of client services connected to them. Here we talk about a cluster of homogeneous (equally capable to serve their role as members) services. IMO this gives a potential to work not only in huge machines inside of data centers, but also at the network edge - in machines as small as pocket devices.

HyParView prefers high availability and connectivity in exchange for a weak membership (just like SWIM), so at any point in time different nodes may have different, inconsistent view of who belongs to the cluster. It means that it's not really designed to work with systems that require strong membership properties, eg. any quorum based protocols like Raft or Paxos.

Last thing worth noticing is that HyParView doesn't prevent (or even detect) situation where cluster connection graph is split into several subclusters with no connections to each other. All guarantees are probabilistic - we can only reduce the chance of these situations happening by gossiping joining nodes via ForwardJoin and periodic shuffle/exchange of passive views.