Welcome! In this unit, we will delve into implementing Pub/Sub for notifications within our Redis-based backend system project using C++. You've already learned how to manage user data, handle transactions, and use streams for event logging. Now, we'll add another powerful feature to our project: real-time notifications using Redis Pub/Sub (publish/subscribe). This will enable our system to send and receive messages instantaneously.
In this unit, we'll focus on creating a simple real-time notification system using Redis Pub/Sub in C++. Specifically, we'll cover:
- Publishing Messages: How to send notifications.
- Subscribing to Channels: How to receive and handle notifications.
Here is a quick refresh of how Pub/Sub works in Redis using C++ and hiredis:
C++1#include <iostream> 2#include <hiredis/hiredis.h> 3#include <thread> 4#include <atomic> 5 6// Global flag to stop the listener thread 7std::atomic<bool> keepRunning(true); 8 9// Message handler function 10void messageHandler(const redisReply* reply) { 11 if (reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 3) { 12 std::cout << "Received message: " << reply->element[2]->str << std::endl; 13 } else { 14 std::cerr << "Unexpected message format or error in reply." << std::endl; 15 } 16} 17 18// Pub/Sub listener function 19void runPubSub(redisContext* context) { 20 while (keepRunning) { 21 redisReply* reply = nullptr; 22 if (redisGetReply(context, (void**)&reply) == REDIS_OK) { 23 if (reply) { 24 messageHandler(reply); 25 freeReplyObject(reply); 26 } 27 } else { 28 std::cerr << "Error receiving message: " << context->errstr << std::endl; 29 break; 30 } 31 } 32} 33 34int main() { 35 // Connect to the Redis server for subscribing 36 redisContext* subContext = redisConnect("127.0.0.1", 6379); 37 if (subContext == nullptr || subContext->err) { 38 if (subContext) { 39 std::cerr << "Connection error (sub): " << subContext->errstr << std::endl; 40 } else { 41 std::cerr << "Connection error: can't allocate Redis context (sub)" << std::endl; 42 } 43 return 1; 44 } 45 46 // Connect to the Redis server for publishing 47 redisContext* pubContext = redisConnect("127.0.0.1", 6379); 48 if (pubContext == nullptr || pubContext->err) { 49 if (pubContext) { 50 std::cerr << "Connection error (pub): " << pubContext->errstr << std::endl; 51 } else { 52 std::cerr << "Connection error: can't allocate Redis context (pub)" << std::endl; 53 } 54 redisFree(subContext); 55 return 1; 56 } 57 58 // Subscribe to the "notifications" channel 59 std::cout << "Subscribing to channel 'notifications'..." << std::endl; 60 redisReply* reply = (redisReply*)redisCommand(subContext, "SUBSCRIBE notifications"); 61 if (!reply || reply->type != REDIS_REPLY_ARRAY) { 62 std::cerr << "Failed to subscribe to channel or unexpected reply type." << std::endl; 63 if (reply) freeReplyObject(reply); 64 redisFree(subContext); 65 redisFree(pubContext); 66 return 1; 67 } 68 freeReplyObject(reply); 69 70 // Start the Pub/Sub listener thread 71 std::thread listenerThread(runPubSub, subContext); 72 73 // Sleep to allow listener to set up 74 std::this_thread::sleep_for(std::chrono::seconds(1)); 75 76 // Publish a message to the "notifications" channel 77 std::cout << "Publishing a test message..." << std::endl; 78 redisReply* publishReply = (redisReply*)redisCommand(pubContext, "PUBLISH notifications %s", "Hello, Redis!"); 79 if (publishReply && publishReply->type == REDIS_REPLY_INTEGER) { 80 std::cout << "Message published, number of subscribers that received the message: " << publishReply->integer << std::endl; 81 } else { 82 std::cerr << "Failed to publish message or unexpected reply type." << std::endl; 83 } 84 if (publishReply) freeReplyObject(publishReply); 85 86 // Unsubscribe and stop the listener 87 std::cout << "Unsubscribing and stopping listener..." << std::endl; 88 keepRunning = false; 89 redisCommand(subContext, "UNSUBSCRIBE notifications"); 90 listenerThread.join(); 91 92 // Free the Redis contexts 93 redisFree(subContext); 94 redisFree(pubContext); 95 std::cout << "Program finished." << std::endl; 96 97 return 0; 98} 99//Note: The exact order of the output messages may still vary due to the multi-threaded nature of the program. 100 101/* Output: 102 Subscribing to channel 'notifications'... 103 Publishing a test message... 104 Message published, number of subscribers that received the message: 1 105 Unsubscribing and stopping listener... 106 Received message: Hello, Redis! 107 Program finished. 108*/ 109
In this C++ snippet, the runPubSub
function continuously listens for messages on the "notifications" channel and uses messageHandler
to process and print incoming messages. The main
function establishes two connections to the Redis server—one for subscribing and one for publishing messages—and utilizes threads to handle these tasks concurrently, ensuring real-time message delivery between publisher and subscriber.
Exciting, isn’t it? Now it's time to put this into practice. Let's implement the complete code to build our real-time notification system.
Happy coding!