DEV Community

Cover image for Simple Event-Driven App using Amazon MQ (RabbitMQ)
Bervianto Leo Pratama for AWS Community Builders

Posted on • Updated on

Simple Event-Driven App using Amazon MQ (RabbitMQ)

Hello everyone! I'm back after busy work and want to introduce an event-driven app. I've explored and learned microservices and want to start from event-driven architecture. If you want to know more about Event-Driven Architecture, please visit this page.

Preparation

I'm using Node.js for the sample app so you will need some tools like Node.js and Yarn. I'll give you another sample with different programming languages such as C# and Go in the next post. You may check this Github Repository to look around.

RabbitMQ Demo

Simple Publish/Subscribe App.

Tools

Preparation

  • (For Linux/MacOS user) Copy .env.sh.example to .env.sh.
  • (For Windows user - CMD) Copy ,env.cmd.example to .env.cmd.
  • (For Powershell) Copy .env.ps1.example to .env.ps1.
  1. Update the .env file to point your RabbitMQ host.
  2. Run yarn --frozen-lockfile to download the dependencies.

Consumer

  • Run node consumer.js.

Publisher/Sender

  • Run node sender.js.

LICENSE

MIT




Prepare the RabbitMQ in Amazon MQ

We will provision the RabbitMQ manually through the AWS Console.

  1. Search Amazon MQ in the search box and click the Amazon MQ.

    Search Amazon MQ

  2. Click Get started button.

    Get started

  3. Select RabbitMQ and click Next.

    Select RabbitMQ

  4. We will use Single-instance broker just for testing, you may use Cluster deployment for production use. And, click Next.

    Select Single-instance broker

  5. Give the broker name and select mq.t3.micro instance.

    Give name

  6. Setup the username and password for the RabbitMQ host.

    Setup username and password

  7. Open the Additional setting. Make sure you select the Public access and Use the default VPC and subnet(s). Please avoid Public access for production use. You may need to use Private access and setup the VPC yourself for production use.

    Additional settings

  8. Finally, click Next. Review your configuration, and click Create broker.

    Create Broker Review

Time to Code!

  • Init your project. Use yarn init. Input the details of your project.

    Init

  • Install some dependencies.

    • yarn add amqplib - for send/consume message from/to RabbitMQ.
    • yarn add uuid - to generate message id and correlation id.

Sender Code

const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');

// setup queue name
const queueName = 'test-queue';

/**
 * Send message
 */
async function send() {
  // connect to RabbitMQ
  const connection = await amqp.connect(process.env.RABBITMQ_HOST || 'amqp://localhost');
  // create a channel
  const channel = await connection.createChannel();
  // create/update a queue to make sure the queue is exist
  await channel.assertQueue(queueName, {
    durable: true,
  });
  // generate correlation id, basically correlation id used to know if the message is still related with another message
  const correlationId = uuidv4();
  // send 10 messages and generate message id for each messages
  for (let i = 1; i <= 10; i++) {
    const buff = Buffer.from(JSON.stringify({
      test: `Hello World ${i}!!`
    }), 'utf-8');
    const result = channel.sendToQueue(queueName, buff, {
      persistent: true,
      messageId: uuidv4(),
      correlationId: correlationId,
    });
    console.log(result);
  }
  // close the channel
  await channel.close();
  // close the connection
  await connection.close();
}

send();
Enter fullscreen mode Exit fullscreen mode

Consumer Code

const amqp = require('amqplib');

// setup queue name
const queueName = 'test-queue';

/**
 * consume the message
 */
async function consume() {
  // setup connection to RabbitMQ
  const connection = await amqp.connect(process.env.RABBITMQ_HOST || 'amqp://localhost');
  // setup channel
  const channel = await connection.createChannel();
  // make sure the queue created
  await channel.assertQueue(queueName, {
    durable: true,
  });
  console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queueName);
  // setup consume
  channel.consume(queueName, function (message) {
    // just print the message in the console
    console.log("[%s] Received with id (%s) message: %s", message.properties.correlationId, message.properties.messageId, message.content.toString());
    // ack manually
    channel.ack(message);
  }, {
    // we use ack manually
    noAck: false,
  });
}

consume();
Enter fullscreen mode Exit fullscreen mode

Test Your Code

  • Setup your environment variable, you may use the example scripts to setup the environment variable. As example:

    • For powershell
     $env:RABBITMQ_HOST = 'amqps://<username>:<password>@<rabbitmq-endpoint>:<rabbitmqport>'
    
    • For Linux/MacOS
     export RABBITMQ_HOST=amqps://<username>:<password>@<rabbitmq-endpoint>:<rabbitmqport>
    
  • Run the sender. Use node sender.js. You will get the console like this.

    Sender

  • Run the consumer. Use node consumer.js. You will get the console like this.

    Consume

Congrats

Congrats! You've made a simple app to send and receive messages. Don't forget to clean up the resources if you do not use them again.

GIF Congrats

Top comments (3)

Collapse
 
kosm profile image
Kos-M

cool post . Can you explain why you have selected to ack manually in consumer function ?

Collapse
 
berviantoleo profile image
Bervianto Leo Pratama • Edited

Because I want to delete or ack the message only when the message is finished to be processed. I also can nack when there is an error. You may check this article to know which method you need.

Collapse
 
kosm profile image
Kos-M

Very usefull , thank you !