DEV Community

Cover image for [OpenTelemetry] Observability of Async Processes with Custom Propagator
Y.Matsuda
Y.Matsuda

Posted on

[OpenTelemetry] Observability of Async Processes with Custom Propagator

It’s the 23rd day of Makuake Advent Calendar 2022 .

As we know OpenTelemetry is an observability framework to generate and export telemetry data. As more companies adopt microservices and SLI/SLOs, we need it to answer new, never-before-seen (and complex) questions.

In that context, observability of systems that communicate using asynchronous messaging is as important as systems communicates using synchronous messagin like HTTP or gRPC.

How about a notification system as an example:

  • How long does it take for the end-user to receive the notification after the notification event is fired?
  • What’s the error rate access the notification lifecycle?

To answer those questions, we may need expensive and unique structures.

Instead, in this article, I’ll demonstrate an easy way to do that with OpenTelemetry.

Example Case

All source code is here:

https://github.com/ymtdzzz/batch-tracing-sample

example case

Processing flow:

  • Enqueue the notification content as a message into a queue (Rabbit MQ) in batch
  • A worker (consumer) asynchronously receives the message and send a request to the notification server (/email or /push)
  • In the notification server, it responses 200 or 500

It’s assumed that the instrumentation of each component has been completed, and the HTTP communication has also been instrumented by net/http auto instrumentation library.

Problem

problem

In the current state, batch processing and subsequent processing (worker) cannot be traced.

trace a

trace b

Moreover, there doesn’t seem to be an instrumentation library for RabbitMQ in Golang.

https://opentelemetry.io/registry/?s=rabbitmq&component=&language=

What Should We Do?

To propagate context, we can use OpenTelemetry Propagator for both sync and async types of messaging!

Implement Custom Propagator for RabbitMQ

What’s Propagator?

https://opentelemetry.io/docs/reference/specification/context/api-propagators/

Propagator API is interface definitions for propagating contexts across process - how sender *Inject*s context into message and how receiver *Extract*s it from message. Propagator has Carrier which has a responsibility to actual injection and extraction from any type of messages.

Fortunately, RabbitMQ allows to put key-value format Headers in messages (docs), so we can use TextMapPropagator.

propagation flow

Propagator Implementation

Actually, since it is not Propagator but the Carrier that manipulates the TextMap, all we have to do is implementing the struct that satisfies the TextMapCarrier interface!

TextMapCarrier interface (doc):



type TextMapCarrier interface {

    // Get returns the value associated with the passed key.
    Get(key string) string

    // Set stores the key-value pair.
    Set(key string, value string)

    // Keys lists the keys stored in this carrier.
    Keys() []string
}


Enter fullscreen mode Exit fullscreen mode

Carrier implementation for this interface (source code):



type AMQPCarrier struct {
    headers amqp.Table
}

func (c *AMQPCarrier) Get(key string) string {
    return fmt.Sprintf("%s", c.headers[key])
}

func (c *AMQPCarrier) Set(key string, value string) {
    c.headers[key] = value
}

func (c *AMQPCarrier) Keys() []string {
    keys := make([]string, len(c.headers))
    for k := range c.headers {
        keys = append(keys, k)
    }
    return keys
}


Enter fullscreen mode Exit fullscreen mode

amqp.Table is just map[string]interface{} . Get() implementation is a little rough but it’s enough for example… ;)

Sender Side Implementation

At the sender side, we can inject context into header and just send the message (source code).


<span class="c">// Create an empty amqp.Tables</span>
<span class="n">headers</span> <span class="o">:=</span> <span class="n">amqp</span><span class="o">.</span><span class="n">NewConnectionProperties</span><span class="p">()</span>   
<span class="c">// Assign it to custom Carrier</span>
<span class="n">carrier</span> <span class="o">:=</span> <span class="n">internal</span><span class="o">.</span><span class="n">NewAMQPCarrier</span><span class="p">(</span><span class="n">headers</span><span class="p">)</span>
<span class="c">// Inject the context</span>
<span class="n">otel</span><span class="o">.</span><span class="n">GetTextMapPropagator</span><span class="p">()</span><span class="o">.</span><span class="n">Inject</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span> <span class="n">carrier</span><span class="p">)</span>
<span class="n">err</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="n">PublishWithContext</span><span class="p">(</span>
    <span class="n">ctx</span><span class="p">,</span>
    <span class="s">""</span><span class="p">,</span>
    <span class="n">q</span><span class="o">.</span><span class="n">Name</span><span class="p">,</span>
    <span class="no">false</span><span class="p">,</span>
    <span class="no">false</span><span class="p">,</span>
    <span class="n">amqp</span><span class="o">.</span><span class="n">Publishing</span><span class="p">{</span>
        <span class="n">ContentType</span><span class="o">:</span> <span class="s">"application/octet-stream"</span><span class="p">,</span>
        <span class="n">Body</span><span class="o">:</span>        <span class="n">msg</span><span class="p">,</span>
        <span class="n">Headers</span><span class="o">:</span>     <span class="n">headers</span><span class="p">,</span> <span class="c">// Assign the context injected headers</span>
    <span class="p">},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
    <span class="nb">panic</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
<span class="p">}</span>
<span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="s">"Message has been sent"</span><span class="p">)</span>
Enter fullscreen mode Exit fullscreen mode
Enter fullscreen mode Exit fullscreen mode




Receiver Side Implementation

Receiver side is the same.


    <span class="c">// Assign the received headers to custom Carrier</span>
    <span class="n">carrier</span> <span class="o">:=</span> <span class="n">internal</span><span class="o">.</span><span class="n">NewAMQPCarrier</span><span class="p">(</span><span class="n">d</span><span class="o">.</span><span class="n">Headers</span><span class="p">)</span>
    <span class="c">// Extract the context</span>
    <span class="n">ctx</span> <span class="o">:=</span> <span class="n">otel</span><span class="o">.</span><span class="n">GetTextMapPropagator</span><span class="p">()</span><span class="o">.</span><span class="n">Extract</span><span class="p">(</span><span class="n">context</span><span class="o">.</span><span class="n">Background</span><span class="p">(),</span> <span class="n">carrier</span><span class="p">)</span>
    <span class="c">// Generate child Span with received context as parent Span</span>
    <span class="n">ctx</span><span class="p">,</span> <span class="n">span</span> <span class="o">:=</span> <span class="n">otel</span><span class="o">.</span><span class="n">Tracer</span><span class="p">(</span><span class="s">"notification"</span><span class="p">)</span><span class="o">.</span><span class="n">Start</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span> <span class="s">"consume"</span><span class="p">)</span>

    <span class="n">msg</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">internal</span><span class="o">.</span><span class="n">DecodeNotificationMessage</span><span class="p">(</span><span class="n">d</span><span class="o">.</span><span class="n">Body</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="nb">panic</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
    <span class="p">}</span>
    <span class="n">log</span><span class="o">.</span><span class="n">Printf</span><span class="p">(</span><span class="s">"received msg: %v</span><span class="se">\n</span><span class="s">"</span><span class="p">,</span> <span class="n">msg</span><span class="p">)</span>

    <span class="n">internal</span><span class="o">.</span><span class="n">CallServer</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span> <span class="o">&amp;</span><span class="n">client</span><span class="p">,</span> <span class="n">msg</span><span class="p">)</span>

    <span class="n">span</span><span class="o">.</span><span class="n">End</span><span class="p">()</span>
Enter fullscreen mode Exit fullscreen mode
Enter fullscreen mode Exit fullscreen mode




We’re All Set 🎉

Now, let’s start the apps and check the Jaeger UI endpoint (http://localhost:16686/).

final result a

By connecting the traces, we can now investigate any errors throughout the notification lifecycle easily.

final result b

Moreover, since the duration of the entire Trace is able to be measured, we can analyze bottlenecks of performance decreasing and notice slow notifications based on user experience.

I hope you learned something new from this post, please let me know if you have any feedback in the comments or Twitter!

Top comments (0)