DEV Community

Cover image for How to Create a Job Queue using Bull and Redis in NodeJS
Francisco Mendes
Francisco Mendes

Posted on

How to Create a Job Queue using Bull and Redis in NodeJS

Overview

Today I decided to create an article about one of my favorite libraries, in this case I won't talk about each of the library's specifics but I will give a small overview of what it is for and some examples that I hope will help you understand where you can implement a queue in your projects.

Unfortunately this was not one of the libraries that clicked instantly in my head and I don't understand why, this is because the library is stupidly intuitive and has a set of methods that we can use.

The first thing I recommend to everyone is to take a look at the website, despite not being very complete at least you get the idea of the lifecycle of the queue system and some fundamental knowledge of the library.

Then I recommend going to the API Reference, because this is where you will find each of the methods that can be used as well as information about them, such as what they do and the parameters they need.

After visiting the website and reading it from cover to cover and having had a look at the API Reference, in my opinion you are ready to start playing with the library.

Installation

In this part it is up to you which package manager to use, for now I leave here some commands:

# NPM
npm init -y
npm install bull

# YARN
yarn init -y
yarn add bull

# PNPM
pnpm init -y
pnpm add bull
Enter fullscreen mode Exit fullscreen mode

Now you can import the bull in your project and you can create your queue, when a new one is instantiated what is fundamental is to define the name, however you can pass some settings you want:

import Queue from "bull";

// If you have the default Redis credentials
// (username, password, host, port)
const myFirstQueue = new Queue('my-first-queue');

// If you want to pass some "custom" Redis config
const myFirstQueue = new Queue('my-first-queue', {
  redis: { host: "...", port: 7474 }
});
Enter fullscreen mode Exit fullscreen mode

But anyway let's start moving on to the examples, the first two methods you'll need to learn are the following:

  • .add() - This method is responsible for creating a new job and adding it to the queue;
  • .process() - It is the function responsible for processing the jobs we have in the queue

A basic example would be the following:

import Queue from "bull";

const queue = new Queue("myQueue");

const main = async () => {
  await queue.add({ name: "John", age: 30 });
};

queue.process((job, done) => {
  console.log(job.data);
  done();
});

main().catch(console.error);
Enter fullscreen mode Exit fullscreen mode

As you may have noticed in the example above, in the main() function a new job is inserted in the queue with the payload of { name: "John", age: 30 }. In turn, in the processor we will receive this same job and we will log it.

But there are not only jobs that are immediately inserted into the queue, we have many others and perhaps the second most popular are repeatable jobs. Which would be the following:

import Queue from "bull";
import milliseconds from "milliseconds";

const scheduler = new Queue("schedulerQueue", {
  defaultJobOptions: { repeat: { every: milliseconds.minutes(5) } },
});

const main = async () => {
  await scheduler.add({});
};

scheduler.process((_, done) => {
  console.log("Scheduled job");
  done();
});

main().catch(console.error);
Enter fullscreen mode Exit fullscreen mode

In the example above we created a queue called scheduler to which we passed some settings, which we stipulated that the scheduler will run every 5 minutes. Then you notice that in our main() function we pass an empty object to the queue, because in this case I didn't want to add something to the queue, what I want is for the processor to be executed to apply my logic, which in this case is just a log.

Another amazing thing is that if you are a TypeScript programmer, you can infer the data types very easily in this library, like this:

import Queue from "bull";

interface IJobData {
  name: string;
  age: number;
}

const queue = new Queue<IJobData>("myQueue");

const main = async () => {
  await queue.add({ name: "John", age: 30 });
};

queue.process((job, done) => {
  console.log(job.data.name);
  done();
});

void main();
Enter fullscreen mode Exit fullscreen mode

By inferring the data types from the properties of the IJobData interface, we will have a much more accurate intellisense, in all the library's methods.

Another super interesting point is the ease with which we can fetch the jobs that are in the queue, but first I recommend that you fully understand the library's lifecyle. The library has several methods and several ways to perform operations such as removing a job from the queue.

But in this article I will show my way, let's take into account this example:

import Queue from "bull";

interface IJobData {
  name: string;
  age: number;
}

const queue = new Queue<IJobData>("myQueue");

const controller = async () => {
  const queuedJobs = await queue.getJobs(["waiting", "delayed"]);

  const jobsToRemove = queuedJobs.filter(
    (queuedJob) => queuedJob.data.age >= 31
  );

  await Promise.all(jobsToRemove.map((job) => job.remove()));
};

void controller();
Enter fullscreen mode Exit fullscreen mode

Let's suppose that from a controller/service/handler that you have in your application, you want to remove a job regardless of the reason. As you may have noticed, we first went to the queue to find all the jobs that have the status of waiting and delayed, then we filter the jobs by age (in this case I wanted all jobs whose age property value was greater than or equal to 32). Finally, we map some promises and then invoke them.

The same concept can be applied when inserting jobs into the queue, if you have a list of data that need to be inserted into the queue, you can do it like this:

import Queue from "bull";

interface IJobData {
  name: string;
  age: number;
}

const users = [
  { name: "John", age: 31 },
  { name: "Jane", age: 25 },
  { name: "Jim", age: 19 },
  { name: "Jill", age: 17 },
  { name: "Jack", age: 32 },
];

const queue = new Queue<IJobData>("myQueue");

const controller = async () => {
  const promises = users.map((user) => queue.add(user));

  await Promise.all(promises);
};

void controller();
Enter fullscreen mode Exit fullscreen mode

In the example above, we have an array called users that we are going to use to map the promises that correspond to the addition of each of the jobs in the queue, finally we invoke each of the promises to insert them into the queue.

Last but not least I'll talk about delays and give some examples of when these can be implemented.

Imagine that a user has just registered in your application and you would like to send them an email asking how their experience has been so far. The implementation could look like the following:

import Queue from "bull";
import milliseconds from "milliseconds";

interface IJobData {
  email: string;
  subject: string;
  body: string;
}

const queue = new Queue<IJobData>("myQueue");

const controller = async () => {
  // 7 days delay
  await queue.add(
    {
      email: "7days@dev.to",
      subject: "What's your feedback so far?",
      body: "I hope that your experience with our service has been great.",
    },
    { delay: milliseconds.days(7) }
  );
};

void controller();
Enter fullscreen mode Exit fullscreen mode

Another reason why you can choose to use a delayed job is if you want to add a delay according to the timestamp. Something like this:

import Queue from "bull";
import milliseconds from "milliseconds";

interface IJobData {
  email: string;
  subject: string;
  body: string;
}

const queue = new Queue<IJobData>("myQueue");

const controller = async () => {
  // Process At: 2021-01-22T10:04:00.000Z
  const currentTime = new Date().getTime();
  const processAt = new Date("2021-01-22T10:04:00.000Z").getTime();
  const delay = processAt - currentTime;
  await queue.add(
    {
      email: "processAt@dev.to",
      subject: "Event Reminder",
      body: "You have an event coming up!",
    },
    { delay }
  );
};

void controller();
Enter fullscreen mode Exit fullscreen mode

Still taking into account the previous example, you can play with it even more. Let's say you want to schedule an email at the time of the timestamp, but you also want to send another email as a reminder. Could implement something like this:

import Queue from "bull";
import milliseconds from "milliseconds";

interface IJobData {
  email: string;
  subject: string;
  body: string;
}

const queue = new Queue<IJobData>("myQueue");

const controller = async () => {
  // Process 30 minutes after timestamp
  const timestamp = new Date("2021-01-22T10:04:00.000Z").getTime();
  const currentTimeMs = new Date().getTime();
  const thirtyMinutesDelay = timestamp - currentTimeMs + milliseconds.minutes(30);
  await queue.add(
    {
      email: "process30minLater@dev.to",
      subject: "Event Reminder",
      body: "The event has started!",
    },
    { delay: thirtyMinutesDelay }
  );
};

void controller();
Enter fullscreen mode Exit fullscreen mode

Conclusion

As always, I hope you found it interesting. If you noticed any errors in this article, please mention them in the comments. ๐Ÿง‘๐Ÿปโ€๐Ÿ’ป

Hope you have a great day! ๐Ÿ‘Š

Top comments (3)

Collapse
 
redbeardjunior profile image
RedbeardJunior

Thank you for the good article ! I'm also trying to build somthing like this ! but im getting a constantly error.

Error: No .lua files found!
at loadScripts (webpack-internal:///(rsc)/./node_modules/bull/lib/commands/index.js:29:15)
at async eval (webpack-internal:///(rsc)/./node_modules/bull/lib/commands/index.js:18:19)

trying to do a simpel connection.

Collapse
 
dbjdbj profile image
DBJDBJ

front puts to the input queue, worker thread takes from the input queue, finished and puts the rezult to the output queue.

there are sepearate front methods to subimt and to get the rezult.

Any ideas in the context of bull?

Collapse
 
lakincoder profile image
Lakin Mohapatra

if redis is restarted , data will be lost ?