DEV Community

Yoav Danieli for Aspecto

Posted on • Edited on • Originally published at aspecto.io

Distributed Tracing for RabbitMQ with OpenTelemetry

Jazz Jackrabbit OpenTelemetry RabbitMQ

In this guide, you will learn how to use OpenTelemetry to instrument RabbiMQ to create spans for different operations. (e.g., consume and produce). We will then see how to visualize your traces in Jaeger and Aspecto. I will use Node.js for all code examples.

Feel free to skip to the practical section of this guide if you are already familiar with RabbitMq and OpenTelemetry.

What to Expect

What is OpenTelemetry

OpenTelemetry is a CNCF (cloud-native compute foundation) open source project that allows us to collect, export and generate telemetry data – logs metrics, and traces (which together make up the three pillars of observability).

OpenTelemetry provides each programming language with a single API and SDK with which you can instrument your application to generate telemetry. The OpenTelemetry Specification defines the cross-language requirements for the APIs and SDKs.

*But what does it have to do with RabbitMQ?
*

Well, there is a great deal of complexity in modern software applications. Their architecture is made up of many microservices that are independent of one another. For communication and information transfer, these microservices use messaging systems (like RabbitMq).

Distributed architectures are complex, which is why users need an easy way to visualize and troubleshoot them.

We use OpenTelemetry to collect data from different transactions within our services and components. Third-party tools, such as messaging systems like RabbitMq, are also included in those components.

Using the collected telemetry, we can gain a better understanding of how our software performs and behaves.

Check out this short guide for a deeper dive into OpenTelemetry.

As far as this OpenTelemetry js guide is concerned, these terms should be familiar to you:

  • ** Span:** Spans represent actions/operations that have occurred in our system. An HTTP request or a database operation that spans over time (starts at X and has a duration of Y milliseconds). A span would usually be the parent and/or the child of another span.
  • Trace: Traces represent a tree of spans connected in a child/parent relationship. Traces specify the progression of requests across different services and components in our app (DB, data sources, queues, etc.). For example, sending an API call to user-service resulted in a DB query to users-db.
  • Exporter: Once we create a span, we need to send it to a dedicated location (e.g., a collector). This is the component that sends telemetry data to that destination.
  • Instrumentation: The instrumentation libraries enable us to gather data and generate spans based on different libraries used in our applications, such as RabbitMQ, Mongo, Express, etc. Our app can be instrumented manually or automatically.
    • Auto Instrumentation: Automatically create spans from the application libraries we use with ready-to-use OpenTelemetry libraries.
    • Manual instrumentation: Writing specific code manually to define where each span begins and ends.

See the official documentation for more information on OpenTelemetry jargon.

Using RabbitMq in this OpenTelemetry Node guide

RabbitMq is a messaging broker that supports many messaging patterns. One of the most famous messaging patterns is the publisher/subscriber pattern. In this pattern, one service produces and sends the message (publisher), while other services consume the message (subscribers). It is possible for any service to subscribe to receive messages from the publisher.

In our guide, we will use RabbitMq’s publisher/subscriber pattern. For more information on RabbitMQ visit the official site.

The Practical: OpenTelemetry and RabbitMQ

  • Create an application
  • Add RabbitMq messaging code
  • Instrument with OpenTelemetry

Step 1 – Create a Node.js application

First, let’s create a basic node application. It will consist of two services. a publisher and subscribers. The publisher will listen to requests from the user. After receiving such a request it will publish a message to a topic (called exchange in RabbitMq terms). The services that will subscribe to this exchange will receive and print the message.

The publisher code:
/* publisher.js */
const express = require('express')
const app = express()
const port = 3000
app.get('/', (req, res) => {
  // TODO: Implement publish message
  res.send('Hello World!')
})
app.listen(port, () => {
  console.log(`Publisher app listening on port ${port}`)
})
/* consumer.js */
// TODO: implement consume messages
Enter fullscreen mode Exit fullscreen mode

Step 2 – Add RabbitMq messaging code

Now let’s add the code we need to publish and consume messages on RabbitMq. for that we will need to install the amqplib library and run a local instance of RabbitMq

npm i amqplib
docker run -d --name rabbit rabbitmq:3-management
Enter fullscreen mode Exit fullscreen mode

Now let’s add the code to our publisher:

/* publisher.js */
const amqplib = require('amqplib');
const express = require('express')
const app = express()
const port = process.env.PORT || 3000
let rabbitConnection;
/* An exchange is where the rabbitMq computation takes place. 
According to the messaging strategy defined by the exchange type, messages are sent to an exchange that distributes them to consumers. */
const exchange = 'logs'
const sendRabbitMqMessage = async (message) => {
  if (!rabbitConnection) {
    rabbitConnection = await amqplib.connect('amqp://localhost');
  }

  const channel = await rabbitConnection.createChannel();
/* Type "fanout" means sending the message to all consumers that subscribed to that exchange. */
  await channel.assertExchange(exchange , 'fanout')
/* Notice that we pass an empty string as the queue name. This means the queue will be defined per consumer. */
  await channel.publish(exchange, '', Buffer.from(message))
}
app.get('/', async (req, res) => {
  const message = 'Hello World!'
  console.log(`Send message: '${message}'`);
  await sendRabbitMqMessage(message);
  res.send(message)
})
app.listen(port, () => {
console.log(`${process.env.SERVICE} Running`)
})
Enter fullscreen mode Exit fullscreen mode

Now let’s subscribe to messages by adding this code to the consumer:

/* consumer.js */
const amqplib = require('amqplib');
let rabbitConnection;
let exchange = 'logs'
const rabbitMqListenToMessages = async (callback) => {
  if (!rabbitConnection) {
      rabbitConnection = await amqplib.connect('amqp://localhost');
  }
  const channel = await rabbitConnection.createChannel();
  await channel.assertExchange(exchange, 'fanout')
  const q = await channel.assertQueue('');
  await channel.bindQueue(q.queue, exchange, '');
  await channel.consume(q.queue, (message) => callback(message.content.toString()), { noAck: true })
}
rabbitMqListenToMessages((message) => console.log(`Consumer received message: ${message}`))
console.log(`${process.env.SERVICE} Running`)
Enter fullscreen mode Exit fullscreen mode

Let’s check if our messaging works properly. Run the application:

# terminal 1
SERVICE=publisher node ./publisher.js
> Publisher Running
# terminal 2
SERVICE=consumer-1 node ./consumer.js
> consumer-1 running
# We can also add another consumer just for fun
# terminal 2.1
SERVICE=consumer-2 node ./consumer.js
> consumer-2 running
Enter fullscreen mode Exit fullscreen mode

After running the services we can open RabbitMQ management UI at http://localhost:15672/#/exchanges (username and password are both ‘guest’).

There we can see the exchange named ‘logs’ of the type fanout we created:

RabbitMQ setup. Exchanges tab.

By navigating to the queues tab we can see the two queues that were created for the consumer services:

RabbitMQ setup. queues tab.

Now let’s call the get endpoint and trigger the flow of events:

# terminal 3
curl http://localhost:3000
Enter fullscreen mode Exit fullscreen mode

Now we should see the following printed:

# on terminal 1
> Send message: 'Hello World!'
# on terminal 2 and 2.1
> Consumer received message: Hello World!
Enter fullscreen mode Exit fullscreen mode

Congrats! We just wrote a system with services that can communicate with each other. The next step is to understand the workflow of the system by adding OpenTelemetry.

Step 3 – Instrument with Opentelemetry Node

So far, so good. We can now start examining our application behavior. For that, we will generate spans using amqplib instrumentation. Then view them in the console. Because we also use http to call on the publisher endpoint that is implemented using Express.js, Let’s add auto-instrumentations for this span as-well

Install the following packages:

npm install @opentelemetry/sdk-node @opentelemetry/instrumentation-amqplib @opentelemetry/instrumentation-http opentelemetry-instrumentation-express
Enter fullscreen mode Exit fullscreen mode

Create a tracing.js file:

/* tracing.js */
// Require dependencies
const opentelemetry = require("@opentelemetry/sdk-node");
const { AmqplibInstrumentation } = require('@opentelemetry/instrumentation-amqplib');
const { HttpInstrumentation } = require("@opentelemetry/instrumentation-http");
const { ExpressInstrumentation } = require("opentelemetry-instrumentation-express");

const sdk = new opentelemetry.NodeSDK({
  traceExporter: new opentelemetry.tracing.ConsoleSpanExporter(),
instrumentations: [
      new AmqplibInstrumentation(),
      new HttpInstrumentation(),
      new ExpressInstrumentation()
  ],
serviceName: process.env.SERVICE
});
sdk.start()
Enter fullscreen mode Exit fullscreen mode

By running the application again and invoking the endpoint we can see the spans printed in the console:

# terminal 1
SERVICE=publisher node --require './tracing.js' ./publisher.js
# terminal 2
SERVICE=consumer node --require './tracing.js' ./consumer.js
# terminal 3
curl http://localhost:3000
# terminal 1 prints
> {
  traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
  parentId: undefined,
  name: 'HTTP GET',
  id: '72ed82cb81b3ee6d',
  kind: 1,
  timestamp: 1659068638640026,
  duration: 6384,
  attributes: {
    'http.url': 'http://localhost:3000/',
    'http.host': 'localhost:3000',
    'net.host.name': 'localhost',
    'http.method': 'GET',
    'http.target': '/',
    'http.user_agent': 'curl/7.79.1',
    'http.flavor': '1.1',
    'net.transport': 'ip_tcp',
    'net.host.ip': '::ffff:127.0.0.1',
    'net.host.port': 3000,
    'net.peer.ip': '::ffff:127.0.0.1',
    'net.peer.port': 54034,
    'http.status_code': 200,
    'http.status_text': 'OK'
  },
  status: { code: 0 },
  events: [],
  links: []
}
{
  traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
  parentId: '72ed82cb81b3ee6d',
  name: 'GET /',
  id: '802f16d65cc7f7a1',
  kind: 0,
  timestamp: 1659068638642200,
  duration: 4973,
  attributes: {
    'http.route': '',
    'express.route.configured': '',
    'express.route.params': '{}'
  },
  status: { code: 1 },
  events: [],
  links: []
}
{
  traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
  parentId: '72ed82cb81b3ee6d',
  name: 'logs ->  send',
  id: 'c78dc63639307344',
  kind: 3,
  timestamp: 1659068638659382,
  duration: 1086,
  attributes: {
    'messaging.protocol_version': '0.9.1',
    'messaging.url': 'amqp://localhost',
    'messaging.protocol': 'AMQP',
    'net.peer.name': 'localhost',
    'net.peer.port': 5672,
    'messaging.system': 'rabbitmq',
    'messaging.destination': 'logs',
    'messaging.destination_kind': 'topic',
    'messaging.rabbitmq.routing_key': ''
  },
  status: { code: 0 },
  events: [],
  links: []
}
# terminal 2 prints
> {
  traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
  parentId: 'c78dc63639307344',
  name: ' process',
  id: 'd69903f27d4c48b8',
  kind: 4,
  timestamp: 1659068638665048,
  duration: 729,
  attributes: {
    'messaging.protocol_version': '0.9.1',
    'messaging.url': 'amqp://localhost',
    'messaging.protocol': 'AMQP',
    'net.peer.name': 'localhost',
    'net.peer.port': 5672,
    'messaging.system': 'rabbitmq',
    'messaging.destination': 'logs',
    'messaging.destination_kind': 'topic',
    'messaging.rabbitmq.routing_key': '',
    'messaging.operation': 'process'
  },
  status: { code: 0 },
  events: [],
  links: []
}
Enter fullscreen mode Exit fullscreen mode

At this point, we created spans and logged them to our console.

Visualization with OpenTelemety

Even though our spans look stunning in the console, this is not just about logging them but visualizing them. Our ability to visualize traces is where the true troubleshooting power of this technology comes into play.

For visualization, we’ll be using:

  • The open-source Jaeger Tracing
  • Aspecto

OpenTelemetry, RabbitMQ, and Jaeger Tracing

Jaeger Tracing is a suite of open source projects managing the entire distributed tracing “stack”: client, collector, and UI. Jaeger UI is the most commonly used open-source to visualize traces.

This is how it’s done:

Export to Jaeger

  1. Run Jaeger locally with the following docker command
docker run -d --name jaeger \
  -e COLLECTOR_OTLP_ENABLED=true \
  -p 16686:16686 \
  jaegertracing/all-in-one:1.30
Enter fullscreen mode Exit fullscreen mode

We can view Jaeger by visiting http://localhost:16686.

Jaeger tracing platform empty search.

As we can see, there are no traces to view yet. To see traces we need to add an Exporter to export our traces to Jaeger.

  1. Install the following packages:
npm install @opentelemetry/exporter-jaeger @opentelemetry/sdk-trace-base
Enter fullscreen mode Exit fullscreen mode

Edit our tracing.js file and add Jaeger exporter:

/* tracing.js */
const opentelemetry = require("@opentelemetry/sdk-node");
const { AmqplibInstrumentation } = require('@opentelemetry/instrumentation-amqplib');
const { SimpleSpanProcessor } = require("@opentelemetry/sdk-trace-base");
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');
const { HttpInstrumentation } = require("@opentelemetry/instrumentation-http");
const { ExpressInstrumentation } = require("opentelemetry-instrumentation-express");
const exporter = new JaegerExporter();
const sdk = new opentelemetry.NodeSDK({
  spanProcessor: new SimpleSpanProcessor(exporter),
  instrumentations: [
      new AmqplibInstrumentation(),
      new HttpInstrumentation(),
      new ExpressInstrumentation()
  ],
  serviceName: process.env.SERVICE
});
sdk.start()
Enter fullscreen mode Exit fullscreen mode

Now, let’s run the publisher and consumers services:

# terminal 1 - publisher
SERVICE=publisher node -r './tracing.js' publisher.js
> Publisher Running
# terminal 2 - consumer 1
SERVICE=consumer-1 node -r './tracing.js' ./consumer.js
> Consumer-1 Running
# terminal 3 - consumer 2
SERVICE=consumer-2 node -r './tracing.js' consumer.js
> Consumer-2 Running
Enter fullscreen mode Exit fullscreen mode

Invoke the publisher’s endpoint:

# terminal 4
curl http://localhost:3000
Enter fullscreen mode Exit fullscreen mode

Let’s see our traces in Jaeger UI. As you can see we now have 3 more services listed in the search input:

Jaeger Tracing search pane. Showing two consumers and one publisher of RabbitMQ

By selecting the publisher service and clicking ‘Find Traces’ we can see our trace with 3 spans created from the three services:

Jaeger tracing trace visualization for RabbitMQ publisher and two consumers. Showing five spans.

Clicking on the trace once more will show us the details of each span:

Jaeger tracing trace visualization for RabbitMQ publisher and two consumers.

Advanced Visualization for OpenTelemetry Traces and RabbitMQ with Aspecto

Jaeger offers impressive visualization capabilities, so feel free to stop here if you’re satisfied.

However, you can take your tracing visualization to the next level with Aspecto. Try it yourself with the free-forever plan that has no limited features.

Sending traces to Aspecto takes a few minor modifications to the existing code. Give this Live Playground a try to get a better idea of what to expect.

Export to Aspecto

Here’s how it’s done:

  1. Create a free account at www.aspecto.io or log in to your existing account

  2. Install the following packages:

npm install @opentelemetry/sdk-trace-base @opentelemetry/exporter-trace-otlp-proto
Enter fullscreen mode Exit fullscreen mode
  1. Modify the tracing.js file

Make sure to replace the {ASPECTO_AUTH} with your unique Aspecto token ID – https://app.aspecto.io/app/integration/token (Settings > Integrations > Tokens)

/* tracing.js */
const opentelemetry = require("@opentelemetry/sdk-node");
const { AmqplibInstrumentation } = require('@opentelemetry/instrumentation-amqplib');
const { SimpleSpanProcessor } = require("@opentelemetry/sdk-trace-base");
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-proto');
const { HttpInstrumentation } = require("@opentelemetry/instrumentation-http");
const { ExpressInstrumentation } = require("opentelemetry-instrumentation-express");
const exporter = new OTLPTraceExporter({
  url: 'https://otelcol.aspecto.io/v1/traces',
  headers: {
      // Aspecto API-Key is required
      Authorization: process.env.ASPECTO_API_KEY
  }
})
const sdk = new opentelemetry.NodeSDK({
  spanProcessor: new SimpleSpanProcessor(exporter),
  instrumentations: [
      new AmqplibInstrumentation(),
      new HttpInstrumentation(),
      new ExpressInstrumentation()
  ],
  serviceName: process.env.SERVICE
});
sdk.start()
Enter fullscreen mode Exit fullscreen mode

That’s it! Now run the application and invoke the endpoint once again:

# terminal 1 - publisher
SERVICE=publisher node -r './tracing.js' publisher.js
> Publisher Running
# terminal 2 - consumer 1
SERVICE=consumer-1 node -r './tracing.js' ./consumer.js
> Consumer-1 Running
# terminal 3 - consumer 2
SERVICE=consumer-2 node -r './tracing.js' consumer.js
> Consumer-2 Running
Enter fullscreen mode Exit fullscreen mode

Invoke the publisher’s endpoint:

# terminal 4
curl http://localhost:3000
Enter fullscreen mode Exit fullscreen mode

We can view our tracing on the Aspecto platform.

Log in to your account and view the recent traces.

Aspecto OpenTelemetry traces overview for RabbitMQ publisher and two consumers.

Drilling down to a specific trace, we can see a graph of the trace’s flow and a timeline, which makes it super convenient to understand the application’s workflow.

Aspecto OpenTelemetry trace visualization for RabbitMQ publisher and two consumers. Including HTTP client.

Quick note: the “gaps” you see in the timeline are where RabbitMQ was processing the message.

That’s about it for this OpenTelemetry Node with RabbitMQ guide, folks. If you have any questions or issues with any of these steps, feel free to reach out to us via chat or join our OpenTelemetry Slack channel (part of the CNCF Slack).

Final notes

If you’re interested, we also provide a simple way for wrapping all the instrumentations your node application needs with the Aspecto SDK. Simply import and invoke the following package at the beginning of your code (before all other imports).

require('@aspecto/opentelemetry')({
  aspectoAuth: process.env.ASPECTO_API_KEY
});
Enter fullscreen mode Exit fullscreen mode

Top comments (0)