Spring Cloud Sleuth is the solution for distributed tracing provided by Spring and comes with a bunch of useful integrations out of the box
rogervinas / spring-cloud-sleuth-in-action
🍀 Spring Cloud Sleuth in Action
This sample ☝️ uses some of these integrations executing the following flow:
To keep it simple everything will be executed within the same Spring Boot Application but at the end it is the same as if it was splitted between different services
Demo time!
Let's follow these steps to execute the demo:
- Run docker-compose:
docker-compose up -d
- Start the Spring Boot Application:
./gradlew bootRun
- Consume from the Kafka topic
my.topic
with kcat:
kcat -b localhost:9094 -C -t my.topic -f '%h %s\n'
- Execute a request to the first endpoint with curl or any other tool you like:
curl http://localhost:8080/request1?payload=hello \
-H 'X-B3-TraceId: aaaaaa1234567890' \
-H 'X-B3-SpanId: bbbbbb1234567890'
Note: the default format for context propagation is B3, so we use headers X-B3-TraceId
and X-B3-SpanId
- Check application output, all lines should share the same
traceId
Started MyApplicationKt in 44.739 seconds (JVM running for 49.324) - traceId ? spanId ? - main
>>> RestRequest1 hello - traceId aaaaaa1234567890 spanId cf596e6281432fb9 - http-nio-8080-exec-7
>>> KafkaProducer hello - traceId aaaaaa1234567890 spanId cf596e6281432fb9 - http-nio-8080-exec-7
>>> KafkaConsumer hello - traceId aaaaaa1234567890 spanId 91e1b6b37334620c - KafkaConsumerDestination...
>>> RestRequest2 hello - traceId aaaaaa1234567890 spanId a1ac0233664f5249 - http-nio-8080-exec-8
>>> RestRequest3 hello - traceId aaaaaa1234567890 spanId bf384c3b4d97efe9 - http-nio-8080-exec-9
>>> RestRequest4 hello - traceId aaaaaa1234567890 spanId c84470ce03e993f1 - http-nio-8080-exec-1
>>> AsyncService hello - traceId aaaaaa1234567890 spanId acccead477b4e1c8 - task-3
- Check kcat output:
b3=aaaaaa1234567890-331986280d41ccdc-1,
nativeHeaders={"b3":["aaaaaa1234567890-331986280d41ccdc-1"]},
contentType=application/json,
spring_json_header_types={
"b3":"java.lang.String",
"nativeHeaders":"org.springframework.util.LinkedMultiValueMap",
"contentType":"java.lang.String"
}
hello
- Check zipkin at http://localhost:9411/zipkin/
Stop the Spring Boot Application just with CTRL-C
Stop docker-compose:
docker-compose down
Show me the code!
This demo was created using this spring initializr configuration
Just adding the sleuth dependency will enable tracing by default in any of the supported integrations, so as you will see no extra coding is needed (maybe only a few exceptions)
Logging
We need to add traceId and spanId values to the application log. In production we would use the logstash-logback-encoder to generate logs in JSON format and send them to an ELK but for the demo we use this plain text logback layout:
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%msg - traceId %X{traceId:-?} spanId %X{spanId:-?} - %thread%n</pattern>
</encoder>
</appender>
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
</configuration>
RestController
Create your @RestController as usual
@RestController
class MyRestController {
companion object {
private val LOGGER = LoggerFactory.getLogger(MyRestController::class.java)
}
@GetMapping("/request1")
fun request1(@RequestParam("payload") payload: String): String {
LOGGER.info(">>> RestRequest1 $payload")
// do more stuff
return "ok"
}
}
Kafka Producer & Consumer
We have a few alternatives to propagate tracing information when publishing to kafka
For example, we can use Spring for Apache Kafka and create a KafkaProducer or KafkaConsumer using the autoconfigured KafkaProducerFactory or KafkaConsumerFactory. We can use the autoconfigured KafkaTemplate too
In this demo we use Spring Cloud Stream and Reactive Functions Support
-
Configure binding and function definitions:
spring: cloud: stream: kafka: binder: brokers: "localhost:9094" bindings: consumer-in-0: group: ${spring.application.name} destination: "my.topic" producer-out-0: destination: "my.topic" function: definition: consumer;producer
-
The consumer is just a @bean implementing a lambda consuming a
<Message<PAYLOAD>>
:
@Component("consumer") class MyKafkaConsumer: (Message<String>) -> Unit { companion object { private val LOGGER = LoggerFactory.getLogger(MyKafkaConsumer::class.java) } override fun invoke(message: Message<String>) { LOGGER.info(">>> KafkaConsumer ${message.payload}") // do more stuff } }
-
The producer is just a @bean implementing a lambda producing a
Flux<Message<PAYLOAD>>
:In this case we have to use MessagingSleuthOperators helper methods in order to preserve the tracing context when using reactive stream functions
@Component("producer") class MyKafkaProducer(private val beanFactory: BeanFactory) : () -> Flux<Message<String>> { companion object { private val LOGGER = LoggerFactory.getLogger(MyKafkaProducer::class.java) } private val sink = Sinks.many().unicast().onBackpressureBuffer<Message<String>>() fun produce(payload: String) { LOGGER.info(">>> KafkaProducer $payload") sink.emitNext(createMessageWithTracing(payload), FAIL_FAST) } private fun createMessageWithTracing(payload: String): Message<String> { return MessagingSleuthOperators.handleOutputMessage( beanFactory, MessagingSleuthOperators.forInputMessage(beanFactory, GenericMessage(payload)) ) } override fun invoke() = sink.asFlux() }
RestTemplate
Just create a RestTemplate @bean and inject it wherever is needed
@Configuration
class MyConfiguration {
@Bean
fun restTemplate() = RestTemplate()
}
FeignClient
Just declare the @FeignClient as usual
@SpringBootApplication
@EnableFeignClients
class MyApplication
@FeignClient(name = "request3", url = "http://localhost:\${server.port}")
interface MyFeignClient {
@RequestMapping(method = [RequestMethod.GET], path = ["/request3"])
fun request3(@RequestParam("payload") payload: String) : String
}
WebClient
Just create a WebClient @bean and inject it wherever is needed
@Configuration
class MyConfiguration {
@Bean
fun webClient() = WebClient.create()
}
Async
Just annotate the method with @Async as usual. Tracing context will be preserved between threads
@SpringBootApplication
@EnableAsync
class MyApplication
@Service
class MyAsyncService {
companion object {
private val LOGGER = LoggerFactory.getLogger(MyAsyncService::class.java)
}
@Async
fun execute(payload: String): CompletableFuture<String> {
LOGGER.info(">>> AsyncService $payload")
return CompletableFuture.completedFuture("ok")
}
}
Zipkin
In production we would send to zipkin a small percentage of all the traces (sampling) but for the demo we will send all of them:
spring:
sleuth:
sampler:
probability: 1.0
zipkin:
base-url: "http://localhost:9411"
Test
One easy way to test the demo is running a SpringBootTest with an OutputCaptureExtension and verify that all logs contain the expected traceId and spanId values:
@SpringBootTest(webEnvironment = DEFINED_PORT)
@Testcontainers
@ExtendWith(OutputCaptureExtension::class)
class MyApplicationIntegrationTest {
@Test
fun `should propagate tracing`(log: CapturedOutput) {
val traceId = "edb77ece416b3196"
val spanId = "c58ac2aa66d238b9"
val response = request1(traceId, spanId)
assertThat(response.statusCode).isEqualTo(OK)
assertThat(response.body).isEqualTo("ok")
val logLines = await()
.atMost(TEN_SECONDS)
.pollDelay(ONE_SECOND)
.until({ parseLogLines(log) }, { it.size >= 7 })
assertThatLogLineContainsMessageAndTraceId(logLines[0], "RestRequest1 hello", traceId)
assertThatLogLineContainsMessageAndTraceId(logLines[1], "KafkaProducer hello", traceId)
assertThatLogLineContainsMessageAndTraceId(logLines[2], "KafkaConsumer hello", traceId)
assertThatLogLineContainsMessageAndTraceId(logLines[3], "RestRequest2 hello", traceId)
assertThatLogLineContainsMessageAndTraceId(logLines[4], "RestRequest3 hello", traceId)
assertThatLogLineContainsMessageAndTraceId(logLines[5], "RestRequest4 hello", traceId)
assertThatLogLineContainsMessageAndTraceId(logLines[6], "AsyncService hello", traceId)
}
}
Run test with ./gradlew test
That's it! Happy coding! 💙
Top comments (0)