Welcome back! In this lesson, we will explore another powerful feature of Redis: Publish/Subscribe (Pub/Sub) messaging. This lesson builds on our understanding of Redis and introduces a dynamic way to enable real-time communication within your applications using C++ and the hiredis
library.
In this lesson, you will learn how to set up and use Redis Pub/Sub messaging in C++ to send and receive messages between different parts of your application. This is useful for creating real-time features like notifications, chat systems, or live updates.
Here's how you can set up a simple Pub/Sub system in Redis using C++:
C++1#include <iostream> 2#include <hiredis/hiredis.h> 3#include <thread> 4#include <atomic> 5 6// Global flag to stop the listener thread 7std::atomic<bool> keepRunning(true); 8 9// Message handler function 10void messageHandler(const redisReply* reply) { 11 if (reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 3) { 12 std::cout << "Received message: " << reply->element[2]->str << std::endl; 13 } else { 14 std::cerr << "Unexpected message format or error in reply." << std::endl; 15 } 16} 17 18// Pub/Sub listener function 19void runPubSub(redisContext* context) { 20 while (keepRunning) { 21 redisReply* reply = nullptr; 22 if (redisGetReply(context, (void**)&reply) == REDIS_OK) { 23 if (reply) { 24 messageHandler(reply); 25 freeReplyObject(reply); 26 } 27 } else { 28 std::cerr << "Error receiving message: " << context->errstr << std::endl; 29 break; 30 } 31 } 32} 33 34int main() { 35 // Connect to the Redis server for subscribing 36 redisContext* subContext = redisConnect("127.0.0.1", 6379); 37 if (subContext == nullptr || subContext->err) { 38 if (subContext) { 39 std::cerr << "Connection error (sub): " << subContext->errstr << std::endl; 40 } else { 41 std::cerr << "Connection error: can't allocate Redis context (sub)" << std::endl; 42 } 43 return 1; 44 } 45 46 // Connect to the Redis server for publishing 47 redisContext* pubContext = redisConnect("127.0.0.1", 6379); 48 if (pubContext == nullptr || pubContext->err) { 49 if (pubContext) { 50 std::cerr << "Connection error (pub): " << pubContext->errstr << std::endl; 51 } else { 52 std::cerr << "Connection error: can't allocate Redis context (pub)" << std::endl; 53 } 54 redisFree(subContext); 55 return 1; 56 } 57 58 // Subscribe to the "notifications" channel 59 std::cout << "Subscribing to channel 'notifications'..." << std::endl; 60 redisReply* reply = (redisReply*)redisCommand(subContext, "SUBSCRIBE notifications"); 61 if (!reply || reply->type != REDIS_REPLY_ARRAY) { 62 std::cerr << "Failed to subscribe to channel or unexpected reply type." << std::endl; 63 if (reply) freeReplyObject(reply); 64 redisFree(subContext); 65 redisFree(pubContext); 66 return 1; 67 } 68 freeReplyObject(reply); 69 70 // Start the Pub/Sub listener thread 71 std::thread listenerThread(runPubSub, subContext); 72 73 // Sleep to allow listener to set up 74 std::this_thread::sleep_for(std::chrono::seconds(1)); 75 76 // Publish a message to the "notifications" channel 77 std::cout << "Publishing a test message..." << std::endl; 78 redisReply* publishReply = (redisReply*)redisCommand(pubContext, "PUBLISH notifications %s", "Hello, Redis!"); 79 if (publishReply && publishReply->type == REDIS_REPLY_INTEGER) { 80 std::cout << "Message published, number of subscribers that received the message: " << publishReply->integer << std::endl; 81 } else { 82 std::cerr << "Failed to publish message or unexpected reply type." << std::endl; 83 } 84 if (publishReply) freeReplyObject(publishReply); 85 86 // Unsubscribe and stop the listener 87 std::cout << "Unsubscribing and stopping listener..." << std::endl; 88 keepRunning = false; 89 redisCommand(subContext, "UNSUBSCRIBE notifications"); 90 listenerThread.join(); 91 92 // Free the Redis contexts 93 redisFree(subContext); 94 redisFree(pubContext); 95 std::cout << "Program finished." << std::endl; 96 97 return 0; 98} 99
-
We define a
messageHandler
function that processes messages received from the Redis server. This function checks the type and structure of the message, then prints the received message to the standard output. -
In the
runPubSub
function, we handle the subscription to thenotifications
channel. This function loops continuously, usingredisGetReply
to listen for incoming messages, which are then processed by themessageHandler
. This loop continues running until thekeepRunning
flag is set tofalse
. -
We start the Pub/Sub listener in a separate thread using
std::thread
, executing therunPubSub
function. This allows the main function to proceed without waiting for the incoming messages, thus implementing non-blocking behavior. -
After allowing some time for the listener to initialize using
std::this_thread::sleep_for
, we publish a test message to thenotifications
channel with thePUBLISH
command. The command returns the number of subscribers that received the message, which is then displayed. -
Resources are properly managed by explicitly unsubscribing from the
notifications
channel and setting thekeepRunning
flag tofalse
, effectively stopping the listener thread. We ensure that thelistenerThread
is joined, meaning it finishes executing before the program ends. Lastly, we free the Redis contexts usingredisFree
to prevent any memory leaks.
The Pub/Sub messaging model is crucial for enabling real-time communication in modern applications. Whether it's sending notifications to users, making chat applications, or updating dashboards in real-time, Pub/Sub can help you achieve these goals efficiently in C++.
The benefits of mastering Pub/Sub messaging using Redis include:
- Real-Time Communication: Instantly update parts of your application as events occur, providing a seamless user experience.
- Decoupled Architecture: Senders and receivers are independent, promoting modularity and easier maintenance of your application.
- Scalability: Scale your application by adding more subscribers or publishers without altering the core logic.
Learning how to leverage Pub/Sub messaging with Redis using C++ will enable you to build responsive, scalable, and maintainable applications. Ready to get hands-on with the example? Let’s move forward and start implementing!