In this guide, you will learn how to use OpenTelemetry to instrument RabbiMQ to create spans for different operations. (e.g., consume and produce). We will then see how to visualize your traces in Jaeger and Aspecto. I will use Node.js for all code examples.
Feel free to skip to the practical section of this guide if you are already familiar with RabbitMq and OpenTelemetry.
What to Expect
- What is OpenTelemetry
- The Practical: OpenTelemetry Node and RabbitMQ
- Visualization with OpenTelemety
- Final note
What is OpenTelemetry
OpenTelemetry is a CNCF (cloud-native compute foundation) open source project that allows us to collect, export and generate telemetry data – logs metrics, and traces (which together make up the three pillars of observability).
OpenTelemetry provides each programming language with a single API and SDK with which you can instrument your application to generate telemetry. The OpenTelemetry Specification defines the cross-language requirements for the APIs and SDKs.
*But what does it have to do with RabbitMQ?
*
Well, there is a great deal of complexity in modern software applications. Their architecture is made up of many microservices that are independent of one another. For communication and information transfer, these microservices use messaging systems (like RabbitMq).
Distributed architectures are complex, which is why users need an easy way to visualize and troubleshoot them.
We use OpenTelemetry to collect data from different transactions within our services and components. Third-party tools, such as messaging systems like RabbitMq, are also included in those components.
Using the collected telemetry, we can gain a better understanding of how our software performs and behaves.
Check out this short guide for a deeper dive into OpenTelemetry.
As far as this OpenTelemetry js guide is concerned, these terms should be familiar to you:
- ** Span:** Spans represent actions/operations that have occurred in our system. An HTTP request or a database operation that spans over time (starts at X and has a duration of Y milliseconds). A span would usually be the parent and/or the child of another span.
- Trace: Traces represent a tree of spans connected in a child/parent relationship. Traces specify the progression of requests across different services and components in our app (DB, data sources, queues, etc.). For example, sending an API call to user-service resulted in a DB query to users-db.
- Exporter: Once we create a span, we need to send it to a dedicated location (e.g., a collector). This is the component that sends telemetry data to that destination.
-
Instrumentation: The instrumentation libraries enable us to gather data and generate spans based on different libraries used in our applications, such as RabbitMQ, Mongo, Express, etc. Our app can be instrumented manually or automatically.
- Auto Instrumentation: Automatically create spans from the application libraries we use with ready-to-use OpenTelemetry libraries.
- Manual instrumentation: Writing specific code manually to define where each span begins and ends.
See the official documentation for more information on OpenTelemetry jargon.
Using RabbitMq in this OpenTelemetry Node guide
RabbitMq is a messaging broker that supports many messaging patterns. One of the most famous messaging patterns is the publisher/subscriber pattern. In this pattern, one service produces and sends the message (publisher), while other services consume the message (subscribers). It is possible for any service to subscribe to receive messages from the publisher.
In our guide, we will use RabbitMq’s publisher/subscriber pattern. For more information on RabbitMQ visit the official site.
The Practical: OpenTelemetry and RabbitMQ
- Create an application
- Add RabbitMq messaging code
- Instrument with OpenTelemetry
Step 1 – Create a Node.js application
First, let’s create a basic node application. It will consist of two services. a publisher and subscribers. The publisher will listen to requests from the user. After receiving such a request it will publish a message to a topic (called exchange in RabbitMq terms). The services that will subscribe to this exchange will receive and print the message.
The publisher code:
/* publisher.js */
const express = require('express')
const app = express()
const port = 3000
app.get('/', (req, res) => {
// TODO: Implement publish message
res.send('Hello World!')
})
app.listen(port, () => {
console.log(`Publisher app listening on port ${port}`)
})
/* consumer.js */
// TODO: implement consume messages
Step 2 – Add RabbitMq messaging code
Now let’s add the code we need to publish and consume messages on RabbitMq. for that we will need to install the amqplib library and run a local instance of RabbitMq
npm i amqplib
docker run -d --name rabbit rabbitmq:3-management
Now let’s add the code to our publisher:
/* publisher.js */
const amqplib = require('amqplib');
const express = require('express')
const app = express()
const port = process.env.PORT || 3000
let rabbitConnection;
/* An exchange is where the rabbitMq computation takes place.
According to the messaging strategy defined by the exchange type, messages are sent to an exchange that distributes them to consumers. */
const exchange = 'logs'
const sendRabbitMqMessage = async (message) => {
if (!rabbitConnection) {
rabbitConnection = await amqplib.connect('amqp://localhost');
}
const channel = await rabbitConnection.createChannel();
/* Type "fanout" means sending the message to all consumers that subscribed to that exchange. */
await channel.assertExchange(exchange , 'fanout')
/* Notice that we pass an empty string as the queue name. This means the queue will be defined per consumer. */
await channel.publish(exchange, '', Buffer.from(message))
}
app.get('/', async (req, res) => {
const message = 'Hello World!'
console.log(`Send message: '${message}'`);
await sendRabbitMqMessage(message);
res.send(message)
})
app.listen(port, () => {
console.log(`${process.env.SERVICE} Running`)
})
Now let’s subscribe to messages by adding this code to the consumer:
/* consumer.js */
const amqplib = require('amqplib');
let rabbitConnection;
let exchange = 'logs'
const rabbitMqListenToMessages = async (callback) => {
if (!rabbitConnection) {
rabbitConnection = await amqplib.connect('amqp://localhost');
}
const channel = await rabbitConnection.createChannel();
await channel.assertExchange(exchange, 'fanout')
const q = await channel.assertQueue('');
await channel.bindQueue(q.queue, exchange, '');
await channel.consume(q.queue, (message) => callback(message.content.toString()), { noAck: true })
}
rabbitMqListenToMessages((message) => console.log(`Consumer received message: ${message}`))
console.log(`${process.env.SERVICE} Running`)
Let’s check if our messaging works properly. Run the application:
# terminal 1
SERVICE=publisher node ./publisher.js
> Publisher Running
# terminal 2
SERVICE=consumer-1 node ./consumer.js
> consumer-1 running
# We can also add another consumer just for fun
# terminal 2.1
SERVICE=consumer-2 node ./consumer.js
> consumer-2 running
After running the services we can open RabbitMQ management UI at http://localhost:15672/#/exchanges (username and password are both ‘guest’).
There we can see the exchange named ‘logs’ of the type fanout we created:
By navigating to the queues tab we can see the two queues that were created for the consumer services:
Now let’s call the get endpoint and trigger the flow of events:
# terminal 3
curl http://localhost:3000
Now we should see the following printed:
# on terminal 1
> Send message: 'Hello World!'
# on terminal 2 and 2.1
> Consumer received message: Hello World!
Congrats! We just wrote a system with services that can communicate with each other. The next step is to understand the workflow of the system by adding OpenTelemetry.
Step 3 – Instrument with Opentelemetry Node
So far, so good. We can now start examining our application behavior. For that, we will generate spans using amqplib instrumentation. Then view them in the console. Because we also use http to call on the publisher endpoint that is implemented using Express.js, Let’s add auto-instrumentations for this span as-well
Install the following packages:
npm install @opentelemetry/sdk-node @opentelemetry/instrumentation-amqplib @opentelemetry/instrumentation-http opentelemetry-instrumentation-express
Create a tracing.js file:
/* tracing.js */
// Require dependencies
const opentelemetry = require("@opentelemetry/sdk-node");
const { AmqplibInstrumentation } = require('@opentelemetry/instrumentation-amqplib');
const { HttpInstrumentation } = require("@opentelemetry/instrumentation-http");
const { ExpressInstrumentation } = require("opentelemetry-instrumentation-express");
const sdk = new opentelemetry.NodeSDK({
traceExporter: new opentelemetry.tracing.ConsoleSpanExporter(),
instrumentations: [
new AmqplibInstrumentation(),
new HttpInstrumentation(),
new ExpressInstrumentation()
],
serviceName: process.env.SERVICE
});
sdk.start()
By running the application again and invoking the endpoint we can see the spans printed in the console:
# terminal 1
SERVICE=publisher node --require './tracing.js' ./publisher.js
# terminal 2
SERVICE=consumer node --require './tracing.js' ./consumer.js
# terminal 3
curl http://localhost:3000
# terminal 1 prints
> {
traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
parentId: undefined,
name: 'HTTP GET',
id: '72ed82cb81b3ee6d',
kind: 1,
timestamp: 1659068638640026,
duration: 6384,
attributes: {
'http.url': 'http://localhost:3000/',
'http.host': 'localhost:3000',
'net.host.name': 'localhost',
'http.method': 'GET',
'http.target': '/',
'http.user_agent': 'curl/7.79.1',
'http.flavor': '1.1',
'net.transport': 'ip_tcp',
'net.host.ip': '::ffff:127.0.0.1',
'net.host.port': 3000,
'net.peer.ip': '::ffff:127.0.0.1',
'net.peer.port': 54034,
'http.status_code': 200,
'http.status_text': 'OK'
},
status: { code: 0 },
events: [],
links: []
}
{
traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
parentId: '72ed82cb81b3ee6d',
name: 'GET /',
id: '802f16d65cc7f7a1',
kind: 0,
timestamp: 1659068638642200,
duration: 4973,
attributes: {
'http.route': '',
'express.route.configured': '',
'express.route.params': '{}'
},
status: { code: 1 },
events: [],
links: []
}
{
traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
parentId: '72ed82cb81b3ee6d',
name: 'logs -> send',
id: 'c78dc63639307344',
kind: 3,
timestamp: 1659068638659382,
duration: 1086,
attributes: {
'messaging.protocol_version': '0.9.1',
'messaging.url': 'amqp://localhost',
'messaging.protocol': 'AMQP',
'net.peer.name': 'localhost',
'net.peer.port': 5672,
'messaging.system': 'rabbitmq',
'messaging.destination': 'logs',
'messaging.destination_kind': 'topic',
'messaging.rabbitmq.routing_key': ''
},
status: { code: 0 },
events: [],
links: []
}
# terminal 2 prints
> {
traceId: 'fbbf2a11623d4230e712f6fd0c1d5912',
parentId: 'c78dc63639307344',
name: ' process',
id: 'd69903f27d4c48b8',
kind: 4,
timestamp: 1659068638665048,
duration: 729,
attributes: {
'messaging.protocol_version': '0.9.1',
'messaging.url': 'amqp://localhost',
'messaging.protocol': 'AMQP',
'net.peer.name': 'localhost',
'net.peer.port': 5672,
'messaging.system': 'rabbitmq',
'messaging.destination': 'logs',
'messaging.destination_kind': 'topic',
'messaging.rabbitmq.routing_key': '',
'messaging.operation': 'process'
},
status: { code: 0 },
events: [],
links: []
}
At this point, we created spans and logged them to our console.
Visualization with OpenTelemety
Even though our spans look stunning in the console, this is not just about logging them but visualizing them. Our ability to visualize traces is where the true troubleshooting power of this technology comes into play.
For visualization, we’ll be using:
- The open-source Jaeger Tracing
- Aspecto
OpenTelemetry, RabbitMQ, and Jaeger Tracing
Jaeger Tracing is a suite of open source projects managing the entire distributed tracing “stack”: client, collector, and UI. Jaeger UI is the most commonly used open-source to visualize traces.
This is how it’s done:
Export to Jaeger
- Run Jaeger locally with the following docker command
docker run -d --name jaeger \
-e COLLECTOR_OTLP_ENABLED=true \
-p 16686:16686 \
jaegertracing/all-in-one:1.30
We can view Jaeger by visiting http://localhost:16686.
As we can see, there are no traces to view yet. To see traces we need to add an Exporter to export our traces to Jaeger.
- Install the following packages:
npm install @opentelemetry/exporter-jaeger @opentelemetry/sdk-trace-base
Edit our tracing.js file and add Jaeger exporter:
/* tracing.js */
const opentelemetry = require("@opentelemetry/sdk-node");
const { AmqplibInstrumentation } = require('@opentelemetry/instrumentation-amqplib');
const { SimpleSpanProcessor } = require("@opentelemetry/sdk-trace-base");
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');
const { HttpInstrumentation } = require("@opentelemetry/instrumentation-http");
const { ExpressInstrumentation } = require("opentelemetry-instrumentation-express");
const exporter = new JaegerExporter();
const sdk = new opentelemetry.NodeSDK({
spanProcessor: new SimpleSpanProcessor(exporter),
instrumentations: [
new AmqplibInstrumentation(),
new HttpInstrumentation(),
new ExpressInstrumentation()
],
serviceName: process.env.SERVICE
});
sdk.start()
Now, let’s run the publisher and consumers services:
# terminal 1 - publisher
SERVICE=publisher node -r './tracing.js' publisher.js
> Publisher Running
# terminal 2 - consumer 1
SERVICE=consumer-1 node -r './tracing.js' ./consumer.js
> Consumer-1 Running
# terminal 3 - consumer 2
SERVICE=consumer-2 node -r './tracing.js' consumer.js
> Consumer-2 Running
Invoke the publisher’s endpoint:
# terminal 4
curl http://localhost:3000
Let’s see our traces in Jaeger UI. As you can see we now have 3 more services listed in the search input:
By selecting the publisher service and clicking ‘Find Traces’ we can see our trace with 3 spans created from the three services:
Clicking on the trace once more will show us the details of each span:
Advanced Visualization for OpenTelemetry Traces and RabbitMQ with Aspecto
Jaeger offers impressive visualization capabilities, so feel free to stop here if you’re satisfied.
However, you can take your tracing visualization to the next level with Aspecto. Try it yourself with the free-forever plan that has no limited features.
Sending traces to Aspecto takes a few minor modifications to the existing code. Give this Live Playground a try to get a better idea of what to expect.
Export to Aspecto
Here’s how it’s done:
Create a free account at www.aspecto.io or log in to your existing account
Install the following packages:
npm install @opentelemetry/sdk-trace-base @opentelemetry/exporter-trace-otlp-proto
- Modify the tracing.js file
Make sure to replace the {ASPECTO_AUTH} with your unique Aspecto token ID – https://app.aspecto.io/app/integration/token (Settings > Integrations > Tokens)
/* tracing.js */
const opentelemetry = require("@opentelemetry/sdk-node");
const { AmqplibInstrumentation } = require('@opentelemetry/instrumentation-amqplib');
const { SimpleSpanProcessor } = require("@opentelemetry/sdk-trace-base");
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-proto');
const { HttpInstrumentation } = require("@opentelemetry/instrumentation-http");
const { ExpressInstrumentation } = require("opentelemetry-instrumentation-express");
const exporter = new OTLPTraceExporter({
url: 'https://otelcol.aspecto.io/v1/traces',
headers: {
// Aspecto API-Key is required
Authorization: process.env.ASPECTO_API_KEY
}
})
const sdk = new opentelemetry.NodeSDK({
spanProcessor: new SimpleSpanProcessor(exporter),
instrumentations: [
new AmqplibInstrumentation(),
new HttpInstrumentation(),
new ExpressInstrumentation()
],
serviceName: process.env.SERVICE
});
sdk.start()
That’s it! Now run the application and invoke the endpoint once again:
# terminal 1 - publisher
SERVICE=publisher node -r './tracing.js' publisher.js
> Publisher Running
# terminal 2 - consumer 1
SERVICE=consumer-1 node -r './tracing.js' ./consumer.js
> Consumer-1 Running
# terminal 3 - consumer 2
SERVICE=consumer-2 node -r './tracing.js' consumer.js
> Consumer-2 Running
Invoke the publisher’s endpoint:
# terminal 4
curl http://localhost:3000
We can view our tracing on the Aspecto platform.
Log in to your account and view the recent traces.
Drilling down to a specific trace, we can see a graph of the trace’s flow and a timeline, which makes it super convenient to understand the application’s workflow.
Quick note: the “gaps” you see in the timeline are where RabbitMQ was processing the message.
That’s about it for this OpenTelemetry Node with RabbitMQ guide, folks. If you have any questions or issues with any of these steps, feel free to reach out to us via chat or join our OpenTelemetry Slack channel (part of the CNCF Slack).
Final notes
If you’re interested, we also provide a simple way for wrapping all the instrumentations your node application needs with the Aspecto SDK. Simply import and invoke the following package at the beginning of your code (before all other imports).
require('@aspecto/opentelemetry')({
aspectoAuth: process.env.ASPECTO_API_KEY
});
Top comments (0)