Welcome! In this unit, we will explore how to use Redis streams for event logging. This is an important part of our Redis-based backend system project. By the end of this lesson, you will know how to log events and retrieve them using Redis streams. Remember, you've already learned how to manage user data and handle transactions. Now, we're adding another layer to our system by using streams.
In this unit, we will focus on the following tasks:
- Adding entries to a stream: We will log user activities in a Redis stream.
- Reading entries from a stream: You will see how to read the logged events from the stream.
To log user activities, we use the xadd
command in Redis. In our Java implementation, this is handled by the addToStream
method in the Main
class. Here's how it works:
Java1// Function to add entry to stream with pipeline 2public static void addToStream(Pipeline pipeline, String streamName, User user) { 3 pipeline.xadd( 4 streamName, 5 XAddParams.xAddParams(), // Auto-generate an ID 6 Map.of( 7 "event", "user_added", 8 "username", user.getUsername(), 9 "data", gson.toJson(user.getData()), 10 "score", String.valueOf(user.getScore()) 11 ) 12 ); 13}
This method logs an event to a Redis stream with fields such as event
, username
, data
, and score
. Each event is auto-assigned a unique ID by Redis.
In the Main
class, this method is called for multiple users within a pipeline:
Java1try (Pipeline pipeline = jedis.pipelined()) { 2 for (User user : new User[]{user1, user2, user3}) { 3 addToStream(pipeline, streamName, user); 4 } 5 pipeline.sync(); 6}
This ensures that all events are logged efficiently in one batch operation.
To retrieve logged events, we use the xread
command in Redis. The readFromStream
method in the Main
class demonstrates this:
Java1// Function to read from stream 2public static List<Entry<String, List<StreamEntry>>> readFromStream(Jedis jedis, String streamName) { 3 // We want to read from the beginning (0-0) 4 Map<String, StreamEntryID> streams = Map.of(streamName, new StreamEntryID("0-0")); 5 // Provide XReadParams (e.g., count, block) if needed. Here, we use default. 6 return jedis.xread(XReadParams.xReadParams(), streams); 7}
This method reads entries from the specified stream starting from the beginning (0-0
). The XReadParams
object allows for customization, such as setting a maximum count of entries to read.
In the Main
class, the retrieved entries are printed as follows:
Java1List<Entry<String, List<StreamEntry>>> streamEntries = readFromStream(jedis, streamName); 2System.out.println("Stream entries:"); 3for (Entry<String, List<StreamEntry>> entry : streamEntries) { 4 for (StreamEntry se : entry.getValue()) { 5 System.out.println("Stream: " + entry.getKey() 6 + ", ID: " + se.getID() 7 + ", Fields: " + se.getFields()); 8 } 9}
This code iterates through the retrieved entries and prints each event, including its stream, ID, and fields.
In this code, we read the entries from the user_activity_stream
and print each one. The streamName
and XReadParams
parameters help control the stream reading operation.
Feel ready to give it a try? Let's jump into the practice section and start working on logging events using Redis streams. Happy coding!