DEV Community

Abhishek Gupta for Microsoft Azure

Posted on

Redis Streams in Action - Part 3 (Java app to process tweets with Redis Streams)

Welcome to this series of blog posts which covers Redis Streams with the help of a practical example. We will use a sample application to make Twitter data available for search and query in real-time. RediSearch and Redis Streams serve as the backbone of this solution that consists of several co-operating components, each of which will we covered in a dedicated blog post.

The code is available in this GitHub repo - https://github.com/abhirockzz/redis-streams-in-action

This blog post will cover a Java based Tweets processor application whose role is to pick up tweets from Redis Streams and store them (as a HASH) so that they can be queried using RediSearch (the accurate term for this is "indexing documents" in RediSearch). You will deploy the application to Azure, validate it, run a few RediSearch queries to search tweets. Finally, there is a section where we will walk through the code to understand "how things work".

Pre-requisites

Please make sure that you read part 2 of this series and have the Tweets consumer application up and running. This application will read tweets from the Twitter Streaming API and push them to Redis Streams. Our tweets processor app (the one described in this blog) will then take over.

You will need an Azure account which you can get for free and the Azure CLI. Like the previous application, this one will also be deployed to Azure Container Instances using regular Docker CLI commands. This capability is enabled by integration between Docker and Azure. Just ensure that you have Docker Desktop version 2.3.0.5 or later, for Windows, macOS, or install the Docker ACI Integration CLI for Linux.

Deploy the app to Azure Container Instances

If you've been following along from the previous blog post, you should have setup the Enterprise tier of Azure Cache for Redis, using this quickstart. Once you finish this step, ensure that you save the following information: the Redis host name and Access key

The application is available as a Docker container - the easiest way is to simply re-use it. If you wish to build you own image, please use the Dockerfile available on the GitHub repo.

If you choose to build your own image, make sure to build the JAR file using Maven (mvn clean install) first

It's really convenient to deploy it to Azure Container Instances, that allows you to run Docker containers on-demand in a managed, serverless Azure environment.

Make sure you create an Azure context to associate Docker with an Azure subscription and resource group so you can create and manage container instances.

docker login azure
docker context create aci aci-context
docker context use aci-context
Enter fullscreen mode Exit fullscreen mode

Set the environment variables - make sure to update Redis host and credentials as per your account:

export STREAM_NAME=tweets_stream # don't change
export STREAM_CONSUMER_GROUP_NAME=redisearch_app_group # don't change

export REDIS_HOST=<redis host port e.g. my-redis-host>
export REDIS_PORT=<redis port>
export REDIS_PASSWORD=<redis access key (password)>
export SSL=true
Enter fullscreen mode Exit fullscreen mode

.. and then use docker run to deploy the container to Azure:

docker run -d --name redis-streams-consumer \
-e STREAM_NAME=$STREAM_NAME \
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PORT=$REDIS_PORT \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e SSL=$SSL \
abhirockzz/tweets-redis-streams-consumer-java
Enter fullscreen mode Exit fullscreen mode

As the container is being created, you should see an output similar to this:

[+] Running 2/2
 ⠿ Group redis-streams-consumer  Created                                                                             5.2s
 ⠿ redis-streams-consumer        Created                                                                            10.5s
Enter fullscreen mode Exit fullscreen mode

Validate this using the Azure portal:

To check the container logs, you can use the usual docker logs command:

docker logs redis-streams-consumer
Enter fullscreen mode Exit fullscreen mode

You should see an output similar to this:

Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089239324282880
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089243539517441
Reading from stream tweets_stream with XREADGROUP
not processed - tweet:1393089247721132033
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089256105693184
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089260304179200
....
Enter fullscreen mode Exit fullscreen mode

Notice the not processed logs? We will discuss them in the next section

Once the app is up and running, it will start consuming from tweets_stream Redis Stream and store each tweet info in a separate HASH, which in turn will be indexed by RediSearch. Before moving on, login to the Redis instance using redis-cli:

redis-cli -h <hostname> -p 10000 -a <password> --tls
Enter fullscreen mode Exit fullscreen mode

How are things looking?

If you see the logs carefully, you should be able to find the name of the HASH (which is based on the tweet ID) e.g. tweet:<tweet id>. Just inspect it's contents with HGETALL:

redis-cli> TYPE tweet:1393089163856056320
redis-cli> hash
redis-cli> HGETALL tweet:1393089163856056320
Enter fullscreen mode Exit fullscreen mode

The result will look like any other HASH. For e.g.

 1) "location"
 2) "Nairobi, Kenya"
 3) "text"
 4) "RT @WanjaNjubi: #EidMubarak \xf0\x9f\x99\x8f\nMay peace be upon you now and always.\n#EidUlFitr https://t.co/MlL0DbM2aS"
 5) "id"
 6) "1393089163856056320"
 7) "user"
 8) "Hot_96Kenya"
 9) "hashtags"
10) "EidMubarak,EidUlFitr"
Enter fullscreen mode Exit fullscreen mode

Alright, its time to query tweets with RediSearch! Let's use a few commands to search the tweets-index index:

  • FT.SEARCH tweets-index hello - will return tweets which
  • FT.SEARCH tweets-index hello|world - its the same as above, just that it's applicable for "hello" OR "world"
  • Use FT.SEARCH tweets-index "@location:India" if you're interested in tweets from a specific location
  • FT.SEARCH tweets-index "@user:jo* @location:India" - this combines location along with a criteria that the username should start with jo
  • FT.SEARCH tweets-index "@user:jo* | @location:India" - this is subtle variant of the above. | signifies an OR criteria
  • You can search using hash tags as well - FT.SEARCH tweets-index "@hashtags:{cov*}
  • Include multiple hash tags as such - FT.SEARCH tweets-index "@hashtags:{cov*|Med*}"

These are just a few examples. I would highly recommend you to refer to the RediSearch documentation and try other other queries as well.

Let's scale out

One of the key benefits of using Redis Streams is to leverage its Consumer Groups feature. This means that you can simply add more instances to the application (horizontal scale out) in order to improve the processing - the more number of instances, the faster the tweets gets processed. Each application will consume from a different part of the same Redis Stream (tweets_stream), thus the workload is distributed (almost) evenly amongst all the instances - this gives you the ability to scale linearly.

Let's try this out. To start another instance, use docker run - make sure to use a different name:

docker run -d --name redis-streams-consumer_2 \
-e STREAM_NAME=$STREAM_NAME \
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PORT=$REDIS_PORT \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e SSL=$SSL \
abhirockzz/tweets-redis-streams-consumer-java
Enter fullscreen mode Exit fullscreen mode

Notice that I used a different name --name redis-streams-consumer_2

Things will continue like before - just a little faster since we have another helping hand. You can check the logs the new instance as well - docker logs redis-streams-consumer_2.

You can continue to experiment further and try scaling out to more instances.

Let's dig a little deeper

We can introspect Redis Streams using the XPENDING command:

XPENDING tweets_stream redisearch_app_group
Enter fullscreen mode Exit fullscreen mode

You will an output similar to this:

1) (integer) 25
2) "1618572598902-0"
3) "1618573768902-0"
4) 1) 1) "consumer-b6410cf9-8244-41ba-b0a5-d79b66d33d65"
      2) "20"
   2) 1) "consumer-e5a872d4-b488-416e-92ee-55d2902b338f"
      2) "5"
Enter fullscreen mode Exit fullscreen mode

If you're new to Redis Streams, this output might not make a lot of sense. The call to XPENDING returns the no. of messages that were received by our processing application, but have not been processed (and acknowledged) yet. In this case, we have two application instances (they randomly generate UUIDs) and have 20 and 5 unprocessed messages respectively (of course, the numbers will differ in your case).

In production scenario, application failures could happen due to multiple reasons. However, in our sample app, the below code snippet was used to simulate this situation - it randomly chooses (about 20% probability) to not process the tweet received from Redis Streams:

if (!(random.nextInt(5) == 0)) {
    conn.hset(hashName, entry.getFields());
    conn.xack(streamName, consumerGroupName, entry.getID());
}
Enter fullscreen mode Exit fullscreen mode

That's the reason you will see XPENDING count increasing slowly but surely. In production, if one (or more) instances crash, the XPENDING count for those instance(s) will stop increasing but remain constant. It implies that, these messages are now left unprocessed - in this specific example, it means that the tweet information will not be available in RediSearch for you to query.

Redis Streams to the rescue

Redis Streams provides reliable messaging. It stores the state for each consumer - that's exactly what you see with XPENDING! If you start another consumer instance with the same group and consumer name, you will be able to replay the same messages and re-process them to ensure that tweets are stored in Redis. This does not involve doing anything different/additional on your part.

Another option is to have a dedicated application that can periodically check the consumer group state (XPENDING), claim messages that have been left abandoned, re-process and (most importantly) acknowledge (XACK) them. In the next (final) part of this series, we will explore how you can build an application to do exactly this!

So, how does it all work?

It's a good time to walk through the code real quick.

You can refer to the code in the GitHub repo

The app uses JRediSearch which abstracts the API of the RediSearch module. The first thing we do is establish a connection to Redis:

GenericObjectPoolConfig<Jedis> jedisPoolConfig = new GenericObjectPoolConfig<>();
JedisPool pool = new JedisPool(jedisPoolConfig, redisHost, Integer.valueOf(redisPort), timeout, redisPassword, isSSL);
Client redisearch = new Client(INDEX_NAME, pool);
Enter fullscreen mode Exit fullscreen mode

Then we create a Schema and the Index definition.

        Schema sc = new Schema().addTextField(SCHEMA_FIELD_ID, 1.0).addTextField(SCHEMA_FIELD_USER, 1.0)
                .addTextField(SCHEMA_FIELD_TWEET, 1.0).addTextField(SCHEMA_FIELD_LOCATION, 1.0)
                .addTagField(SCHEMA_FIELD_HASHTAGS);

        IndexDefinition def = new IndexDefinition().setPrefixes(new String[] { INDEX_PREFIX });

        try {
            boolean indexCreated = redisearch.createIndex(sc, Client.IndexOptions.defaultOptions().setDefinition(def));

            if (indexCreated) {
                System.out.println("Created RediSearch index ");
            }
        } catch (Exception e) {
            System.out.println("Did not create RediSearch index - " + e.getMessage());
        }
Enter fullscreen mode Exit fullscreen mode

To explore the Redis Streams APIs (xgroupCreate, xreadGroup etc.) exposed by the Jedis library, take a look at it's javadocs

Before moving on, we create a Redis Streams Consumer group (using xgroupCreate) - this is mandatory. A consumer group represents a set of applications that work "together" and co-operate with each other to share the processing load:

try {
    conn = pool.getResource();
    String res = conn.xgroupCreate(streamName, consumerGroupName, StreamEntryID.LAST_ENTRY, true);
}
Enter fullscreen mode Exit fullscreen mode

Each app in the consumer group needs to be uniquely identified. While it is possible to assign a name manually, we generate a random consumer name.

String consumerName = "consumer-" + UUID.randomUUID().toString();
Enter fullscreen mode Exit fullscreen mode

The main part of the consumer app is loop that uses xreadGroup to read from the Redis Stream. Notice the StreamEntryID.UNRECEIVED_ENTRY - this means that we will are asking Redis to return stream entries which has not been received by any other consumer in the group. Also, our invocation blocks for 15 seconds and we opt to get a maximum of 50 messages per call to XREADGROUP (of course, you can change this as per requirements).

while (true) {

    List<Entry<String, List<StreamEntry>>> results = conn.xreadGroup(consumerGroupName, consumerName, 50,
                        15000, false, Map.entry(streamName, StreamEntryID.UNRECEIVED_ENTRY));

    if (results == null) {
        continue;
    }
    ....
}
Enter fullscreen mode Exit fullscreen mode

Each stream entry needs to be saved to a Redis HASH (using hset). The good thing is that reading a stream entry returns a HashMap and this is exactly what HSET API expects as well. So we are able to re-use the HashMap!

That's not all though, notice the xack method - this is way to call XACK and communicate that we have indeed processed the message successfully:

                for (Entry<String, List<StreamEntry>> result : results) {
                    List<StreamEntry> entries = result.getValue();
                    for (StreamEntry entry : entries) {
                        String tweetid = entry.getFields().get("id");
                        String hashName = INDEX_PREFIX + tweetid;

                        try {
                            // simulate random failure/anomaly. ~ 20% will NOT be ACKed
                            if (!(random.nextInt(5) == 0)) {
                                conn.hset(hashName, entry.getFields());
                                conn.xack(streamName, consumerGroupName, entry.getID());
                            }
                        } catch (Exception e) {
                            continue;
                        }
                    }
                }
Enter fullscreen mode Exit fullscreen mode

There is a lot of scope for optimization here. For e.g. you can make this process multi-threaded by spawning a thread for each batch (say 50 messages)

That's all for this blog!

Interested in the final part?

So far, we covered high level overview in part 1, the tweets consumer Rust app in part 2 and a Java app to process those tweets from Redis Streams. As promised, the final part of the series will cover an app to monitor the process and re-process abandoned messages in order to keep our overall system robust - this will a Serverless Go application deployed to Azure Functions. Stay tuned!

Discussion (0)