In this blog post we'll discuss challenges and solutions for concurrent reading from massive amount of Redis streams without running into limitations of the Redis protocol or our machine resources.

First let's describe our problem. We could phrase it as follows: we have a number of entities in our system serving their changes in realtime to the subscribed users. These clients may be connected to different physical server instances, yet still expect to listent to the same logical entities, including changes happening while they were offline. We can have tens of thousands concurrent users listening to tens of thousands of entities. Additionally some other business processes may also be interested in reading their state changes at a different (non-realtime) pace.

For the use case above we need to be able to somehow exchange the updates through different server instances, and here let's assume that for simplicity we picked Redis streams for message exchange. The memory overhead of an individual stream is just few hundred bytes, so we don't have issues with scalling the stream count on the Redis side. What about the servers passing these messages to our subscribers?

Realities of Redis Serialization Protocol

What makes listening to thousands Redis streams so hard? To answer that let's look on how does Redis network libraries usually work.

Most of them are simply creating a one (or small fixed number of) TCP connections submitting RESP (REdis Serialization Protocol) commands and waiting for the response. In order to mulitplex multiple commands, they are usually pipelined at the client level, which so far is enough for the purposes of individual GET, SET etc. requests.

However, live streaming requires continuous listening on incoming messages in real time using XREAD command. XREAD itself accepts a BLOCK option that additionally will block the connection until specified timeout or until a new message from a stream(s) we XREAD from arrives. We should put double emphasis on "block the connection" part here. What we observed is that the connection will no longer respond to any other commands until XREAD completes!

Technically we could overcome this issue by either:

  1. Creating a dedicated Redis connection (and listener thread) for every stream to serve its XREAD operation. However connection+thread approach may not scale well once we want to run into tens of thousands of concurrent streams.
  2. Not using the BLOCK option, which would return immediatelly if there are no pending messages waiting to be read. This however would quickly turn our listener into something akin to an active empty loop, saturating all available machine resources even if there are no messages in our to Redis streams.

Fortunatelly, there's also one XREAD characteristic that we're going to make advantage of: XREAD can be passed with multiple streams at the same time, returning whenever any number of them has a fitting message in it. This property can allow us to batch multiple stream reads into a single Redis request.

Building a recurring I/O thread pool

With the properties mentioned above we can quickly scheme a following design: we'll create a fixed number of N Redis connections - each with it's own thread running an event loop. We'll put all incoming subscribers on a shared pending task queue one by one. Then we'll pull them from the queue in batches up to M streams at the time. These batches will be send to a first free worker thread, wait for response and then route reply messages to their corresponding subscribers. Finally the batch we just processed will be put back onto pending queue again.

That design resembles thread pool similar to the ones used internally by many async runtimes, yet simpler. The one we'll use however is strictly dedicated for serving Redis streams data:

  • Every thread is bound 1-1 to its Redis connection that blocks when waiting for incoming XREAD to complete.
  • We don't need to bound worker thread affinity to CPU cores: most of the time these I/O threads will be suspended, just waiting for XREAD BLOCK timeout or to return the reply. We can easily have hundreds of them.
  • In the example code that will follow we're just going to use a single multi-producer/multi-consumer task queue. No need for per-thread queues or work stealing - our workload is bound to network latency (milliseconds) and XREAD BLOCK timeout (possibly hundreds of milliseconds up to seconds), so there's no need to make things more complicated than necessary.

For the purposes of this excercise we'll also assuming that each Redis stream has at most one active subscriber in the same process.

Give me the code

For our example we're going to use Rust code, with redis driver and loole crate for multi-producer/multi-consumer queue implementation. Here, we're going to name our IO thread pool as StreamRouter since it's dedicated to one thing - listening to live stream messages comming from Redis and routing them to their subscribers.

pub type StreamKey = Arc<str>;
pub type MessageId = String;
pub type StreamReader = tokio::sync::mpsc::UnboundedReceiver<(MessageId, RedisMap)>;
type RedisMap = HashMap<String, Value>;
type StreamSender = tokio::sync::mpsc::UnboundedSender<(MessageId, RedisMap)>;

pub struct StreamRouter {
  /// Pending task queue, where we can subscribe to.
  buf: loole::Sender<StreamReadTask>,
  /// Flag used to stop worker event loops when this object is dropped.
  alive: Arc<AtomicBool>,
  /// I/O worker threads.
  #[allow(dead_code)]
  workers: Vec<JoinHandle<()>>,
}

impl Drop for StreamRouter {
  fn drop(&mut self) {
    self.alive.store(false, SeqCst);
  }
}

impl StreamRouter {
  pub fn new(client: &Client, options: StreamRouterOptions) -> Result<Self, RedisError> {
    let alive = Arc::new(AtomicBool::new(true));
    // create a multi-producer/multi-consumer channel
    let (pending_task_sender, pending_task_receiver) = loole::unbounded();
    let mut workers = Vec::with_capacity(options.worker_count);
    for worker_id in 0..options.worker_count {
      // create a worker with a dedicated connection
      let conn = client.get_connection()?;
      let worker = Self::new_worker(
        worker_id,
        conn,
        pending_task_sender.clone(),
        pending_task_receiver.clone(),
        alive.clone(),
        &options,
      );
      workers.push(worker);
    }
    Ok(Self {
      buf: tx,
      workers,
      alive,
    })
  }
  
  /// Subscribe to Redis stream, starting listening since last message id.
  pub fn subscribe(&self, stream_key: StreamKey, last_id: Option<String>) -> StreamReader {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    // by default `XREAD {stream-key} "0-0"` means reading 
    // from beggining of the stream.
    let last_id = last_id.unwrap_or_else(|| "0-0".to_string());
    let h = StreamReadTask::new(stream_key, last_id, tx);
    self.buf.send(h).unwrap();
    rx
  }
}

pub struct StreamRouterOptions {
  /// Number of worker threads, each having its own Redis connection.
  pub worker_count: usize,
  /// How many Redis streams we pass to a single XREAD command.
  pub xread_streams: usize,
  /// How long poll worker will be blocked while waiting for 
  /// Redis `XREAD` request to respond. This blocks a worker thread
  /// and doesn't affect other threads.
  ///
  /// If set to `None` it won't block and will return immediately, 
  /// which gives a biggest responsiveness but can lead to unnecessary
  /// active loops causing CPU spikes even when idle.
  pub xread_block_millis: Option<usize>,
  /// How many messages a single `XREAD` request is allowed to return.
  pub xread_count: Option<usize>,
}

struct StreamReadTask {
  /// Redis stream key
  key: StreamKey,
  /// Last message id read from given Redis stream (default: "0-0")
  last_id: String,
  /// Channel where messages should be forwarded.
  sender: StreamSender,
}

It's a pretty standard boilerplate: we're creating a fixed number of threads, each having their own dedicated connection, and one shared pending task queue. We need both channel sender/receiver pair because we want to reschedule our tasks back once they're processed.

impl StreamRouter {
  fn new_worker(
    worker_id: usize,
    conn: Connection,
    tx: loole::Sender<StreamReadTask>,
    rx: loole::Receiver<StreamReadTask>,
    alive: Arc<AtomicBool>,
    options: &StreamRouterOptions
  ) -> JoinHandle<()> {
    // convert our options object to redis client XREAD options
    let mut xread_options = StreamReadOptions::default();
    if let Some(block_millis) = options.xread_block_millis {
      xread_options = xread_options.block(block_millis);
    }
    if let Some(count) = options.xread_count {
      xread_options = xread_options.count(count);
    }
    let count = options.xread_streams;
    // spawn worker thread
    std::thread::spawn(move || {
      if let Err(err) = Self::event_loop(conn, tx, rx, alive, xread_options, count) {
        tracing::error!("worker {} failed: {}", worker_id, err);
      }
    })
  }
}

The event loop itself is pretty simple:

  1. We have two fixed same-sized buffers: one for stream_keys and one for their corresponding start message_ids. This is because of XREAD characterstics, where stream and its last message ID are split into two list.
  2. Call XREAD.
  3. When the response arrives, route the streams to their task response channels.
  4. Prune tasks for channels that have been already closed.
  5. Reschedule remaining active tasks back onto the pending task queue.
impl StreamRouter {
  fn event_loop(
    mut conn: redis::connection::Connection,
    tx: loole::Sender<StreamReadTask>,
    rx: loole::Receiver<StreamReadTask>,
    alive: Arc<AtomicBool>,
    options: StreamReadOptions,
    count: usize,
  ) -> RedisResult<()> {
    let mut stream_keys = Vec::with_capacity(count);
    let mut message_ids = Vec::with_capacity(count);
    let mut senders = HashMap::with_capacity(count);
    while alive.load(SeqCst) {
      // read as many tasks as possible without blocking more than once
      if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
        break; // pending task queue has been closed
      }
      // read_buf should always return non-empty buffer
      assert!(!stream_keys.is_empty()); 

      // perform Redis XREAD for streams given their last message ids
      let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;

      let mut msgs = 0;
      for stream in result.keys {
        let mut remove_sender = false;
        // match received Redis messages with their subscriber's channel
        if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
          for stream_id in stream.ids {
            let message_id = stream_id.id;
            let value = stream_id.map;
            // update latest message id for a given stream
            message_ids[*idx].clone_from(&message_id);
            msgs += 1;
            // forward message to subscriber
            if let Err(err) = sender.send((message_id, value)) {
              remove_sender = true; // sender is already closed, remove it
            }
          }
        }
        if remove_sender {
          senders.remove(stream.key.as_str());
        }
      }
      
      Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
    }
    Ok(())
  }

  /// Schedule streams with their handles for continuous event loop
  /// processing, skipping the ones that lost their subscribers.
  fn schedule_back(
    tx: &Sender<StreamReadTask>,
    keys: &mut Vec<StreamKey>,
    ids: &mut Vec<String>,
    senders: &mut HashMap<StreamKey, (StreamSender, usize)>,
  ) {
    let keys = keys.drain(..);
    let mut ids = ids.drain(..);
    for key in keys {
      if let Some(last_id) = ids.next() {
        if let Some((sender, _)) = senders.remove(key.as_str()) {
          if sender.is_closed() {
            continue; // skip rescheduling of closed senders
          }
          let h = StreamReadTask::new(key, last_id, sender);
          if let Err(err) = tx.send(h) {
            tracing::warn!("failed to reschedule: {}", err);
            break;
          }
        }
      }
    }
    senders.clear();
  }
}

Reading pending tasks queue isn't tricky either. We wait for the first task, possibly blocking current execution thread until first pending task arrives. Once we received first one, we additionally try to pull for additional tasks - this time in non-blocking fashion via Receiver::try_recv instead of Receiver::recv - so that we can batch them together within single XREAD request and handle incoming surge of tasks that could happen ie. after system restart.

impl StreamRouter {
  /// Read as much values from the `rx` receiver to fill up the buffer,
  /// only blocking once. Once this method returns, the provided buffer
  /// might not be fully filled, but it will never be empty.
  fn read_buf(
    rx: &Receiver<StreamReadTask>,
    stream_keys: &mut Vec<StreamKey>,
    message_ids: &mut Vec<String>,
    senders: &mut HashMap<StreamKey, (StreamSender, usize)>,
  ) -> bool {
    // try to receive first element - block the thread if there's none
    let mut count = stream_keys.capacity();
    if let Ok(h) = rx.recv() {
      senders.insert(h.key.clone(), (h.sender, stream_keys.len()));
      stream_keys.push(h.key);
      message_ids.push(h.last_id.to_string());

      count -= 1;
      if count == 0 {
        // we reached the configured capacity of a single batch
        return true;
      }

      // eagerly try to poll more stream keys without blocking to fill
      // the buffer
      while let Ok(h) = rx.try_recv() {
        senders.insert(h.key.clone(), (h.sender, stream_keys.len()));
        stream_keys.push(h.key);
        message_ids.push(h.last_id.to_string());

        count -= 1;
        if count == 0 {
          // we reached the configured capacity of a single batch
          return true;
        }
      }
      true // no more pending tasks
    } else {
      false // router's pending task queue has been closed
    }
  }
}

With this we can now listen to potentially unbounded number of Redis streams using a fixed number of resources, which is arguably more scalable approach that straight forward reads. However there's one more catch that we're going to discuss now.

Tunning latency for upper percentiles

While this approach should be pretty fast when it comes to response times (assuming sufficient machine resources), there's a built-in issue that may cause spikes in top percentile latencies. These might cause hiccups is system responsiveness every now and then.

As you may have noticed, a tradeoff of having a fixed number of listener threads is that we're not activelly listening (with XREADs) on ALL channels that we subscribed to. For example: with worker_count = 80, xread_streams = 100 and xread_block_millis = 5000 we're reading 80 * 100 = 8,000 streams at the time, waiting up to 5s (or until XREAD returns data), before moving to the next batch from the pending queue. This means that if we have more than 40,000 listeners, we might occassionally wait even up to over (40,000 / 8,000) * 5s = 25s before rolling through all of the (possibly idle) streams until we reach the one that has pending messages on it.

This can be configured by either reducing xread_block_millis which will potentially reduce latency when the number of stream subscribers pass over the number activelly listened streams - at the cost of higher network traffic - or increasing worker_count or xread_streams which will increase the number of streams we're activelly listenting to.

Another possible solution would be to extend our snippet with a dynamic number of reader threads (with some upper cap), where with the changing number of subscribers we spawn or teardown additional threads to keep up with the expected active number of listened streams.