Welcome! In this lesson, we will dive into Redis Streams — a powerful feature used for processing streams of data. This lesson will guide you through the basics and show you how Redis Streams can be essential for high-performance applications.
In this lesson, we'll learn about streams in Redis and how they can be used to handle continuous data flows. We'll learn how to create streams, add events to them, and read events from them using the Boost.Redis library.
Streams are a powerful data structure that allows you to process real-time data efficiently. Here are a few real-world scenarios where Redis Streams can be useful:
- Chat Applications: Redis Streams can be used to handle messages in real time.
- Monitoring Systems: Streams can be used to process logs and events.
- User Activity Tracking: Streams can be used to track user actions in real time.
Here's a quick preview:
- To add an event to a stream, use the Redis command
XADD. - To read events from a stream, use the Redis command
XREAD.
First, let's set up a connection to Redis using Boost.Redis. We'll connect to localhost:6379:
Here we create a Boost.Asio io_context and a Redis connection. The async_run method establishes and maintains the connection to Redis asynchronously.
Now let's add three events to a stream called mystream. Each event contains field-value pairs that describe the event:
The request object allows us to pipeline multiple Redis commands. Each XADD command uses "*" as the entry ID, which tells Redis to auto-generate a unique ID based on the current timestamp.
The response type boost::redis::response<std::string, std::string, std::string> tells Boost.Redis that we expect three string responses (one for each XADD command). Each response will contain the auto-generated entry ID.
After adding events, we can read them back using XREAD. We'll read the first 2 messages from the stream:
Here we use boost::redis::generic_response because XREAD returns a complex nested structure that's easier to handle dynamically. The "0-0" ID means we want to read from the beginning of the stream. COUNT 2 limits the response to 2 messages.
Before we parse the response, we need a quick mental model of how generic_response represents nested Redis replies.
generic_response gives you the reply as a flat vector of nodes:
You can think of the original RESP3 reply as a tree (arrays containing arrays, strings, etc.). Boost.Redis turns that tree into a single linear vector, one node per element.
For this lesson, we only need three fields from each node:
-
node.value- The text of this element: stream name, message ID, field name, field value, etc.
- For “array header” nodes (like “this is the messages array”), this is usually empty.
-
node.aggregate_size- For arrays: how many elements the array has (how many children follow it in the flat vector).
- We use this mainly for:
- the “messages array” (how many messages), and
- the “fields array” (how many
field, valueelements).
- For simple values (like
"mystream"or"login"), you can ignore this.
-
node.depth- How deep we are in the original tree:
0= root1= children of root
- How deep we are in the original tree:
Imagine Redis returns this RESP3 structure:
- Root array (1 element)
- Array “messages” (2 elements)
- Bulk string
"msg1" - Bulk string
"msg2"
- Bulk string
- Array “messages” (2 elements)
Tree view:
Flattened as std::vector<node>:
Our XREAD reply is the same idea, just a bigger tree: one root array, then the stream name and an array of messages, then arrays for each message, and finally arrays for each message’s fields.
Keep that picture in mind; now we’ll look at the actual XREAD shape.
The XREAD command returns a nested array structure. With generic_response, Boost.Redis flattens that into our nodes vector, using the rules from the primer.
For XREAD COUNT 2 STREAMS mystream 0-0, the tree shape is roughly:
Flattened into nodes, that becomes (conceptually):
Now we’re ready to walk this vector and pull out the stream name, message IDs, and field-value pairs.
We start by getting the vector of nodes, checking its size, and extracting the stream name:
Step-by-step:
nodes[0]is the root array header. We skip it.nodes[1]is the stream name ("mystream"). We read and print it.nodes[2]is the “messages array” header. We skip over it with++ito land at the first message.
From here, i points to the first message array.
Next, we iterate through each message in the stream:
For each iteration:
++iskips the message array header ([3],[10], …).nodes[i++]is the message ID (e.g."1234567890123-0").nodes[i++]is the fields array header.- Its
aggregate_sizetells us how many field and value nodes follow. - Each field-value pair uses 2 nodes (
field,value), so
num_pairs = aggregate_size / 2.
- Its
At this point we know the message ID and how many key-value pairs it has.
Because children always follow their parent, we can now read exactly num_pairs field-value pairs in a simple loop.
Finally, we extract and print all field-value pairs for each message:
Inside the loop:
field_nodeis the field name (e.g."event").value_nodeis the field value (e.g."login").- We print them as
field=value, adding commas between pairs.
After all pairs for a message are printed, we close the brace and move on to the next message (if any).
Example output:
Understanding Redis Streams is crucial for applications that need to process a large volume of real-time data efficiently. Whether you are building a chat application, a monitoring system, or handling user activities and logs, Redis Streams can handle it all.
Redis Streams are designed to offer reliable data processing with minimal latency. By mastering them, you can build robust systems capable of processing vast amounts of data in real time.
Ready to see how Redis Streams can elevate your application? Move on to the practice section to get some hands-on experience!
