Welcome back! In this lesson, we will dive into Redis Streams — a powerful feature used for processing streams of data. This lesson will guide you through the basics and show you how Redis Streams
can be essential for high-performance applications using C++.
In this lesson, we'll learn about streams in Redis
and how they can be used to handle continuous data flows. We'll learn how to create streams, add events to them, and read events from them.
Streams are a powerful data structure that allows you to process real-time data efficiently. Here are a few real-world scenarios where Redis Streams
can be useful:
- Chat Applications: Streams can be used to handle messages in real-time.
- Monitoring Systems: Streams can be used to process logs and events.
- User Activity Tracking: Streams can be used to track user actions in real-time.
Let's dive into the details!
Let's see how to use the hiredis
C++ library to interact with Redis Streams
.
C++1#include <iostream> 2#include <hiredis/hiredis.h> 3 4int main() { 5 // Connect to the Redis server 6 redisContext* context = redisConnect("127.0.0.1", 6379); 7 if (context == nullptr || context->err) { 8 if (context) { 9 std::cerr << "Connection error: " << context->errstr << std::endl; 10 } else { 11 std::cerr << "Connection error: can't allocate Redis context" << std::endl; 12 } 13 return 1; 14 } 15 16 // Add events to a Redis stream 17 redisReply* reply = (redisReply*)redisCommand(context, "XADD mystream * event login user Alice"); 18 freeReplyObject(reply); 19 reply = (redisReply*)redisCommand(context, "XADD mystream * event purchase user Bob amount 100"); 20 freeReplyObject(reply); 21 reply = (redisReply*)redisCommand(context, "XADD mystream * event add_to_cart user Alice product laptop"); 22 freeReplyObject(reply); 23 24 // Read the first two events from the stream 25 reply = (redisReply*)redisCommand(context, "XREAD COUNT 2 STREAMS mystream 0"); 26 if (reply->type == REDIS_REPLY_ARRAY) { 27 std::cout << "Stream messages:" << std::endl; 28 for (size_t i = 0; i < reply->elements; ++i) { 29 redisReply* stream = reply->element[i]; 30 if (stream->type == REDIS_REPLY_ARRAY && stream->elements == 2) { 31 std::cout << "Stream: " << stream->element[0]->str << std::endl; // Stream name 32 33 redisReply* messages = stream->element[1]; 34 for (size_t j = 0; j < messages->elements; ++j) { 35 redisReply* message = messages->element[j]; 36 if (message->type == REDIS_REPLY_ARRAY && message->elements == 2) { 37 std::cout << " ID: " << message->element[0]->str << std::endl; 38 39 redisReply* fields = message->element[1]; 40 for (size_t k = 0; k < fields->elements; k += 2) { 41 std::cout << " " << fields->element[k]->str << ": " 42 << fields->element[k + 1]->str << std::endl; 43 } 44 } 45 } 46 } 47 } 48 } else { 49 std::cerr << "Failed to read stream." << std::endl; 50 } 51 freeReplyObject(reply); 52 53 // Free the context 54 redisFree(context); 55 56 return 0; 57} 58/* 59Output: 60 Stream messages: 61 Stream: mystream 62 ID: 1737154114178-0 63 event: login 64 user: Alice 65 ID: 1737154114178-1 66 event: purchase 67 user: Bob 68 amount: 100 69*/ 70
The above code snippet demonstrates how to add events to a Redis stream called mystream
using the XADD
command and then read those events using the XREAD
command. Each event consists of key-value pairs representing various actions performed by users.
For adding events, the XADD
command is used. For example, the command XADD mystream * event login user Alice
is broken down as follows:
XADD
is the command to add a new entry to a stream.mystream
is the name of the stream where the entry will be added.*
indicates that Redis should automatically assign an ID to the entry.- The subsequent key-value pairs (
event login user Alice
) represent the data for that entry, detailing an event type (e.g.,login
) and associated user (e.g.,Alice
).
After adding entries, the code proceeds to read the first two messages from the stream using the XREAD
command. XREAD COUNT 2 STREAMS mystream 0
works as follows:
XREAD
is the command to read entries from a stream.COUNT 2
specifies that up to two entries should be returned.STREAMS
indicates that the name of the stream follows.mystream
is the stream's name from which entries are being read.0
signifies starting the read from the beginning of the stream.
The reading process involves:
-
Check Reply Type: The response from the command is checked to ensure it's of type
REDIS_REPLY_ARRAY
, indicating a list of streams and their respective messages. -
Iterate Over Streams: The outer loop iterates over each stream in the response. Each stream consists of:
- The name of the stream (
stream->element[0]
). - The associated messages (
stream->element[1]
).
- The name of the stream (
-
Process Each Message: For each message within a stream:
- Check if it is a valid array with two elements, including the message ID and a set of key-value pairs.
- Print the message ID (
message->element[0]->str
).
-
Iterate Over Fields: For the key-value pairs in each message:
- Iterate using a loop over each field's elements, printing each key-value pair.
This structured iteration allows you to read and print the first two messages within a specified Redis stream.
Understanding Redis Streams
is crucial for applications that need to process a large volume of real-time data efficiently. Whether you are building a chat application, monitoring system, or handling user activities and logs, Redis Streams
can handle it all.
Redis Streams are designed to offer reliable data processing with minimal latency. By mastering them, you can build robust systems capable of processing vast amounts of data in real-time.
Are you excited to see how Redis Streams
can elevate your application? Let's move on to the practice section to get some hands-on experience!