DEV Community

Munawwar
Munawwar

Posted on • Updated on

Queues with Redis without duplicate items

I work on an ecommerce store (Shopify), that has got webhooks* to auto-fix product issues like wrong product details, auto-fix assignment to the wrong inventory location etc. The way the system worked previously was that any update to a product immediately does all the checks and computations to try and auto-fix that product's details.

(* webhook is basically a trigger.. a notification. That is, I can set Shopify to notify me about all "product updates" or "order creation" etc to a URL on my server. The body of the http call is a JSON that I can process)

The Problem

The way merchandisers update products are occasionally through bulk actions. That is, for example first they change multiple products' pricing and then the same products' inventory etc. And they do this in a space of couple of minutes. And this causes lots of hits to the same webhook. Note that many of the updates are to the same products, so they are "duplicates".

Furthermore the webhook internally uses shopify APIs that are rate limited to around 2 requests per second. My webhooks were hitting this limit a lot especially when bulk actions are done. Which then requires retries, exponential backoff etc.

Overall this means my system was doing a lot of duplicated work and was hitting some hard limits.

Solution

The solution I imagined is that the webhook adds these product ids to a queue and every 30 minutes or so, take the ids from the queue, find unique ids and process them in one go.

Implementation 1 - RabbitMQ

One of the often used queuing system is RabbitMQ. However RabbitMQ cannot de-duplicate the messages of the queue "nicely".

What does "nicely" mean? Well.. there is a way, which involves reading all the messages in the queue, and then finding unique ones through code. That is, deduplication done by the consumer. I would rather not have duplicates in the queue in the first place, if possible.

Besides I already have redis running. If redis can be used, I could avoid adding yet another system for me to maintain.

Implementation 2 - Redis

Redis has several data structures in its feature list. It has regular queues. But that is just like RabbitMQ in that messages will have to deduplicated by my code. Redis has got another interesting data structure called "sorted set".

A sorted set is a data structure where each item in the set also has a numeric score, so that items can be fetched in sorted order of the scores.

This data structure may not look very intuitive at first glance. A queue is a just a list.. pretty easy to understand. You can push and pop items to and from it. But how to use a sorted set as a queue?

To build the intuition, note that even with queues each item in a queue have got a numeric association. If you are told that "you are the 10th person in the queue" means there are 9 people ahead of you. And the next person joining the queue will be the 11th person. If you think of position as a "score", then a queue's scoring logic for a new item is always the number of items previously in the queue + 1.

But in a sorted set, the score for a new item doesn't have to be strictly the position of the item. It can be any increasing number.. like time. Time is easy to access as well. Though time can have duplicates if two items are added in the same millisecond. However in our case that is not a problem as sorted sets allows duplicate scores.. and few duplicate scores are ok, as long as they do not majorly alter the overall order of insertions. And we are more concerned about keys being unique.

With that said, I am going to write down the redis command that will add a key to the sorted set if and only if it doesn't exist already in the set.

ZADD product-updates NX <currentTimeInMilliseconds> product-id-1
Enter fullscreen mode Exit fullscreen mode

The NX in the command prevents duplicate product ids from getting into the "queue". Now from the consumer side, I can fetch all the oldest product ids that were added into the set and limit the result to 100. Note that I am not removing the items from the set.. just reading them.

ZRANGEBYSCORE product-updates 0 <currentTimeInMilliseconds> LIMIT 0 100
Enter fullscreen mode Exit fullscreen mode

I can process each item (product id) and then remove the items either one by one as I process them, or remove all the 100 items in single command.

ZREM product-updates product-id-1 product-id-2 ...
Enter fullscreen mode Exit fullscreen mode

Workers and load-balancing

For me a single process/thread is sufficient to do the job as I would hit shopify's rate limits if I try to parallelize the work. However, maybe for another use case that doesn't have such limits, you would probably like to have multiple workers doing the work. How would we do this?

My solution would be with the following assumptions:

  1. we need to know how many workers are going to run upfront
  2. each worker will have a queue dedicated for itself

The main problem here is ensuring de-duplication works now. How can we de-duplicate when there are multiple queues?

The trick I am going to pull is by using a hash function.

hash("product-id") % numberOfQueues
Enter fullscreen mode Exit fullscreen mode

A hash function takes a string and returns a number. That's how the remainder operator % can work. It will always return the same number for the same string. The hash function output must be pretty random for this to work reliably.

There are several hash functions out there. One of my favorite is fnv1a, since it is tiny in implementation (literally 10 lines of code).

So let's take an example. Let's say we have 4 workers/queues.

fnv1a("product-id-1") % 4 // = 0 always, if one uses https://www.npmjs.com/package/fnv1a
Enter fullscreen mode Exit fullscreen mode

You will always get the same queue number if you pass the the same product id. So producers don't need to handle duplicates, rather they try to consistently add a given product id to the same queue number as long as the number of queues is constant. And our ZADD NX command will handle duplicates within a queue.

(By the way, this approach is also known as "sticky session load balancing")

That's all for this post. The post was mostly about how to implement a queue with Redis without containing any duplicates.

Top comments (0)