DEV Community

Cover image for Kafka Tracing with Spring Boot and OpenTelemetry : A Detailed Guide with Examples
JavaFullStackDev.in
JavaFullStackDev.in

Posted on

Kafka Tracing with Spring Boot and OpenTelemetry : A Detailed Guide with Examples

Tracing is crucial for understanding the flow of requests in distributed systems. Kafka, as a distributed messaging system, plays a pivotal role in many architectures. Integrating Kafka with tracing tools like OpenTelemetry helps you monitor and debug your systems effectively. This guide will walk you through setting up Kafka tracing with Spring Boot and OpenTelemetry, providing detailed examples.

Table of Contents

  1. Introduction
  2. Prerequisites
  3. Setting Up a Spring Boot Application
  4. Integrating Kafka with Spring Boot
  5. Setting Up OpenTelemetry
  6. Configuring Kafka Tracing
  7. Deploying and Testing
  8. Visualizing Traces
  9. Conclusion

Introduction

Kafka is widely used for building real-time data pipelines and streaming applications. However, tracing messages through Kafka can be challenging due to its asynchronous nature. OpenTelemetry, an open-source observability framework, provides robust support for distributed tracing. By integrating OpenTelemetry with Spring Boot and Kafka, you can gain insights into your application's behavior and diagnose issues more effectively.

Prerequisites

Before you start, ensure you have the following:

  • Java 11 or later installed
  • Apache Kafka installed and running
  • Docker installed and configured (for running OpenTelemetry Collector and Jaeger)
  • Basic understanding of Spring Boot and Kafka
  • An IDE like IntelliJ IDEA or Eclipse

Setting Up a Spring Boot Application

First, create a simple Spring Boot application. If you already have one, you can skip this section.

1. Initialize a Spring Boot Project

You can use Spring Initializr to generate a new project or set it up manually. For simplicity, we'll use Spring Initializr.

curl https://start.spring.io/starter.zip \
  -d dependencies=web,kafka,actuator \
  -d name=kafka-tracing-demo \
  -d artifactId=kafka-tracing-demo \
  -d packageName=com.example.kafkatracingdemo \
  -d javaVersion=11 \
  -o kafka-tracing-demo.zip

unzip kafka-tracing-demo.zip
cd kafka-tracing-demo
Enter fullscreen mode Exit fullscreen mode

2. Create a Simple Kafka Producer and Consumer

Create a simple Kafka producer and consumer to test tracing.

Producer

Create KafkaProducerConfig.java:

package com.example.kafkatracingdemo;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Enter fullscreen mode Exit fullscreen mode

Create MessageProducer.java:

package com.example.kafkatracingdemo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String TOPIC = "test_topic";

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumer

Create KafkaConsumerConfig.java:

package com.example.kafkatracingdemo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
Enter fullscreen mode Exit fullscreen mode

Create MessageListener.java:

package com.example.kafkatracingdemo;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class MessageListener {

    @KafkaListener(topics = "test_topic", groupId = "group_id")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Setting Up OpenTelemetry

OpenTelemetry provides a set of APIs, libraries, agents, and instrumentation to capture distributed traces and metrics. We'll set up OpenTelemetry to capture and export traces.

1. Add Dependencies

Add the following dependencies in pom.xml:

<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-api</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-sdk</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry.instrumentation</groupId>
    <artifactId>opentelemetry-spring-boot-starter</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry.instrumentation</groupId>
    <artifactId>opentelemetry-spring-kafka-3.0</artifactId>
    <version>1.16.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

2. Configure OpenTelemetry

Create OpenTelemetryConfig.java:

package com.example.kafkatracingdemo;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OpenTelemetryConfig {

    @Bean
    public Tracer tracer() {
        SpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
                .setEndpoint("http://localhost:4317")
                .build();

        SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
                .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
                .setResource(Resource.getDefault())
                .build();

        OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
                .setTracerProvider(sdkTracerProvider)
                .build();

        GlobalOpenTelemetry.set(openTelemetrySdk);

        return openTelemetrySdk.getTracer("kafka-tracing-demo");
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuring Kafka Tracing

Integrate OpenTelemetry with Kafka producer and consumer to trace message flows.

1. Update Kafka Producer

Modify MessageProducer.java:


java
package com.example.kafkatracingdemo;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
andyif profile image
andyif

Looks like this article not finished