Welcome back! In this lesson we will dive into Redis Streams — a powerful feature used for processing streams of data. This lesson will guide you through the basics and show you how Redis Streams can be essential for high-performance applications.
In this lesson, we'll learn about streams in Redis and how they can be used to handle continuous data flows. We'll learn how to create streams, add events to them, and read events from them.
Streams are a powerful data structure that allows you to process real-time data efficiently. Redis Streams differ from Lists as they are designed for time-ordered event storage and retrieval, allowing consumers to read and process messages at their own pace. Unlike Pub/Sub, Redis Streams retain messages for replay and provide more control over message acknowledgment and processing.
Here are a few real-world scenarios where Redis Streams can be useful:
- Chat Applications: Redis Streams can be used to handle messages in real-time.
- Monitoring Systems: Streams can be used to process logs and events.
- User Activity Tracking: Streams can be used to track user actions in real-time.
Let's dive into the details!
Here’s a quick preview:
- To add an event to a stream, you can use the
xadd
command. - To read events from a stream, you can use the
xread
command. Use thexread
command when you need to continuously read new entries from a Redis stream in order to handle real-time data as it arrives.
Let's see how these commands work in practice.
Java1import io.lettuce.core.RedisClient; 2import io.lettuce.core.StreamMessage; 3import io.lettuce.core.XReadArgs.StreamOffset; 4import io.lettuce.core.api.StatefulRedisConnection; 5import io.lettuce.core.api.sync.RedisCommands; 6 7public class Main { 8 9 public static void main(String[] args) { 10 // Connect to Redis server 11 RedisClient redisClient = RedisClient.create("redis://localhost:6379"); 12 StatefulRedisConnection<String, String> connection = redisClient.connect(); 13 RedisCommands<String, String> syncCommands = connection.sync(); 14 15 // Add events to the stream 16 syncCommands.xadd("mystream", "event", "login", "user", "Alice"); 17 syncCommands.xadd("mystream", "event", "purchase", "user", "Bob", "amount", "100"); 18 syncCommands.xadd("mystream", "event", "add_to_cart", "user", "Alice", "product", "laptop"); 19 20 // Read messages from the stream 21 var messages = syncCommands.xread( 22 StreamOffset.from("mystream", "0") // Start from the beginning 23 ); 24 25 // Print the retrieved messages 26 for (StreamMessage<String, String> message : messages) { 27 System.out.println("ID: " + message.getId()); 28 message.getBody().forEach((key, value) -> 29 System.out.println(" " + key + ": " + value)); 30 } 31 32 // Close connection 33 connection.close(); 34 redisClient.shutdown(); 35 } 36}
The above code snippet demonstrates how to add events to a Redis stream called mystream
. Each event contains key-value pairs representing different actions by users.
The code subsequently reads messages from mystream
starting from the beginning, using the xread
command. It then iterates over the retrieved messages, printing each message's ID and the key-value pairs contained in the message body.
If you'd rather work with historic data, xrange
is a good fit. The xrange
command allows you to read messages from a Redis Stream within a specified range. This is particularly useful when you need to analyze past events or replay specific segments of data in the stream. By utilizing the Range
and Limit
parameters, you can fine-tune the set of messages retrieved, providing flexibility in handling historical data.
Java1import io.lettuce.core.RedisClient; 2import io.lettuce.core.StreamMessage; 3import io.lettuce.core.api.StatefulRedisConnection; 4import io.lettuce.core.api.sync.RedisCommands; 5import io.lettuce.core.Range; 6import io.lettuce.core.Limit; 7 8public class Main { 9 10 public static void main(String[] args) { 11 // Connect to Redis server 12 RedisClient redisClient = RedisClient.create("redis://localhost:6379"); 13 StatefulRedisConnection<String, String> connection = redisClient.connect(); 14 RedisCommands<String, String> syncCommands = connection.sync(); 15 16 // Add events to the stream 17 syncCommands.xadd("mystream", "event", "login", "user", "Alice"); 18 syncCommands.xadd("mystream", "event", "purchase", "user", "Bob", "amount", "100"); 19 syncCommands.xadd("mystream", "event", "add_to_cart", "user", "Alice", "product", "laptop"); 20 21 // Read messages from the stream 22 var messages = syncCommands.xrange( 23 "mystream", 24 Range.create("-", "+"), 25 Limit.from(2) 26 ); 27 28 // Print the retrieved messages 29 for (StreamMessage<String, String> message : messages) { 30 System.out.println("ID: " + message.getId()); 31 message.getBody().forEach((key, value) -> 32 System.out.println(" " + key + ": " + value)); 33 } 34 35 // Close connection 36 connection.close(); 37 redisClient.shutdown(); 38 } 39}
The above code snippet once again demonstrates how to add events to a Redis stream called mystream
.
The code subsequently reads messages from the Redis stream mystream
by employing the xrange
command, specifying a range from the beginning to the end of the stream indicated by Range.create("-", "+")
and limiting the returned results to two messages through Limit.from(2)
. It then iterates through the retrieved messages, extracting and printing the unique ID of each message.
Understanding Redis Streams is crucial for applications that need to process a large volume of real-time data efficiently. Whether you are building a chat application, monitoring system or handling user activities and logs, Redis Streams can handle it all.
Redis Streams are designed to offer reliable data processing with minimal latency. By mastering them, you can build robust systems capable of processing vast amounts of data in real-time.
Are you excited to see how Redis Streams can elevate your application? Let's move on to the practice section to get some hands-on experience!