DEV Community

loading...
Cover image for Introduction to RabbitMQ for Nodejs Developers

Introduction to RabbitMQ for Nodejs Developers

Farzad Aziminia
Software Architect and animal lover. Technology enthusiast. Full-stack and stack agnostic
Updated on ・5 min read

Prerequisites

For this tutorial you need some background of Node.js and have docker installed on your machine

Queue

In computer science there’s the concept of queues. Queue is the set of messages that are meant to be delivered from one sender to one or more receivers. The messages can be delivered in order or out of order by design. The computer program that handles these transactions is called message broker. RabbitMQ is one the most popular message brokers that runs on top of Advanced Message Queuing Protocol (AMQP). There are four main components forming AMQP protocol: Publisher, Exchange, Queue, Consumer.

Publisher

Messages are Published to an exchange by a publisher, Publisher also is responsible for setting the attributes of the message which we will cover later.

Exchanges

Exchanges are responsible for routing the messages to one or more Queues, we will cover Queues Later. There are 4 different types of exchanges in rabbitmq.

1.Direct
2.Fanout
3.Topic
4.Header

For this tutorial we are going to cover only two: Direct, I’m gonna do another tutorial on the Fanout exchange later.

Direct exchanges are responsible for routing messages to a queue based on the Routing key. When you declare a queue you can “Bind” it to an exchange using a Routing key, we will cover this topic later. Direct queues are suitable for distributing tasks among Workers.
RabbitMq Model
A Fanout exchange sends a message to all of the queues that are bound to the exchange by a routing key. When a message comes in, the exchange will send a copy of that message to all Queues. Fanout exchanges are useful to broadcast a message to multiple nodes in a distributed system.

Queues

Queues are responsible for storing the messages and delivering them to consumers. Queues need to get declared before you can start using them. A Queue needs to bind to an exchange so it can start receiving messages. Binding is a set of rules that exchanges use to route messages to one or more queues.

Consumers

Consumers are the last piece of the puzzle, they need to Subscribe to a queue so they can start receiving messages, when a consumer receives and processes a message, it needs to “Acknowledge” the message in order to get another one.

Installing rabbitMQ

We will use docker to install rabbitmq and it’s management UI.

docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Enter fullscreen mode Exit fullscreen mode

The above command will install rabbitmq and bind two ports to your local port: 5672 and 15672.
You can use 15672 to get into rabbitMQ management portal: http://localhost:15672 the default username password is guest/guest.
We need to use the amqplib library on port 5672 to communicate with the rabbitMQ server. Now lets create a direct exchange and a queue

Rabbitmq management Ui

const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
const EXCHANGE_TYPE = 'direct';
const EXCHANGE_NAME = 'main';
const KEY = 'myKey';
const number = '5'
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
   const channel = await conn.createChannel();
   await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE);
   await channel.assertQueue(QUEUE_NAME);
   channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, KEY);
})
Enter fullscreen mode Exit fullscreen mode

There are a lot happening, let’s break it down

At line 1 I imported amqplib library then at line 7 I created a connection to rabbitmq
Line 9 I created a channel inside the connection, you need to create a channel before you can start interacting with rabbitmq. At line 10 I used assertExchage method to create an exchange. This method takes two arguments: name of the exchange and type of the exchange
At line 11 I used the assertQueue method to create a Queue called square. And at line 12 I used bindQueue method to bind main to square with the routing key myKey

After you run the above code you can navigate to the management interface and click on queues tab and you will see square under the list of queues

Let's write an application to calculate the square of a number

const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
const EXCHANGE_TYPE = 'direct';
const EXCHANGE_NAME = 'main';
const KEY = 'myKey';
const number = '5'
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
   const channel = await conn.createChannel();
   await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE);
   await channel.assertQueue(QUEUE_NAME);
   channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, KEY);
   channel.sendToQueue(QUEUE_NAME, Buffer.from(number))
})
Enter fullscreen mode Exit fullscreen mode

We can use the sendToQueue method to send messages to the square queue. sendToQueue method takes two arguments, name of the queue and the content which you want to send. The only caveat is the content should be in the buffer format.

Now after you run the above code, you can navigate to your queue from the management ui and you’ll see that you have a message inside square queue

Let's consume that message and find the square

rabbitmq Queues

const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
   const channel = await conn.createChannel();
   channel.consume(QUEUE_NAME, (m)=>{
       const number = parseInt(m.content.toString())
       const square = number * number
       console.log(square)
       channel.ack(m)
   })
})
Enter fullscreen mode Exit fullscreen mode

It’s pretty much similar to our publisher in many ways, we need to open a connection and create a channel then we can use the consume method that is exposed from our channel object. The consume method accepts two arguments, the queue name and a callback. Whenever a message gets published to the square queue, this callback function will be invoked. The callback function accepts an argument which is the message object. We can find our number under the content property. If you remember, when we published our message, we had to convert our number to a buffer, so when we consume the content, we have to convert it back to the number. First we need to convert the number from buffer to string, then from string to number. When we have our number, we can find the square and console log it to the screen and finally we can use ack method to acknowledge the message and tell rabbitmq to remove that message from the queue and send the next one if any. So first run your publisher and then consumer to see the effect. You can also have the management console open and watch the activity on your queue.

Next lets run two consumer and one modify our publisher a bit

const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
const EXCHANGE_TYPE = 'direct';
const EXCHANGE_NAME = 'main';
const KEY = 'myKey';
const numbers = ['1', '2', '3', '4', '5']
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
   const channel = await conn.createChannel();
   await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE);
   await channel.assertQueue(QUEUE_NAME);
   channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, KEY);
   numbers.forEach((number)=>{
       channel.sendToQueue(QUEUE_NAME, Buffer.from(number))
   })
})
Enter fullscreen mode Exit fullscreen mode

Now we have an array of numbers instead of one, and we are using the forEach method to iterate through the array and send the numbers to the queue one by one. Now run your consumer and you will see the consumer automatically consume all the messages and show the square of the numbers.

Conclusion

RabbitMQ is a popular message broker that runs on top of AMPQ protocol. AMPQ protocol is consist of 4 components: 1-Publisher, 2-Exchange, 3-Queue, 4-Consumer.

In order to comunicate to rabbitmq we need to open a connection and inside the connection create a channel.Then we can both publish a message to a queue or consume messages form a queue or queues.

Discussion (0)