DEV Community

Cover image for Spring Cloud Stream Multibinder
Roger Viñas Alcon
Roger Viñas Alcon

Posted on • Updated on

Spring Cloud Stream Multibinder

Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.

It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):

  • Apache Kafka
  • Rabbit MQ
  • Kafka Streams
  • Amazon Kinesis
  • ...

But what if we need more than one binder in the same application? 🤔

Not a problem! You can specify multiple binder configurations as documented in Connecting to Multiple Systems

Let's put the theory into practice 🛠️ ...

GitHub logo rogervinas / spring-cloud-stream-multibinder

🍀 Spring Cloud Stream Multibinder - Kafka & Kafka Streams

Goal

We want to implement this flow:

Diagram1

  • User will POST string payloads to /text endpoint
  • A KafkaProducer will send these payloads to topic pub.texts as { "text" : string }
  • A KafkaStreams transformation will consume from topic pub.texts and produce events to topic pub.lengths as { "length" : number }
  • A KafkaConsumer will consume events from topic pub.lengths and log them to the console

So we will use two Spring Cloud Stream binders:

  • Kafka
  • Kafka Streams

Create the project

We use this spring initializr configuration and we add:

  • Kafka binder lib spring-cloud-stream-binder-kafka
  • Kafka Streams binder lib spring-cloud-stream-binder-kafka-streams

Integration Test

We start writing the following integration test, using:

@Test
fun `should process text lengths`(capturedOutput: CapturedOutput) {
  postText("Do")
  postText("Or do not")
  postText("There is no try")

  await().atMost(ONE_MINUTE).untilAsserted {
    assertThat(capturedOutput.out).contains("Consumed length [2]")
    assertThat(capturedOutput.out).contains("Consumed length [9]")
    assertThat(capturedOutput.out).contains("Consumed length [15]")
  }
}
Enter fullscreen mode Exit fullscreen mode

This test will obviously fail, but it should work once we have finished our implementation.

Spring Cloud Stream binders configuration

Next we configure the two binders:

spring:
  application:
    name: "spring-cloud-stream-multibinder"
  cloud:
    function:
      definition: textProducer;textLengthProcessor;lengthConsumer
    stream:
      bindings:
        textProducer-out-0:
          destination: "${kafka.topic.texts}"
          binder: kafka1
        textLengthProcessor-in-0:
          destination: "${kafka.topic.texts}"
          binder: kstream1
        textLengthProcessor-out-0:
          destination: "${kafka.topic.lengths}"
          binder: kstream1
        lengthConsumer-in-0:
          destination: "${kafka.topic.lengths}"
          group: "${spring.application.name}"
          binder: kafka1
      binders:
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: "${kafka.brokers}"
        kstream1:
          type: kstream
          environment:
            spring.cloud.stream.kafka.streams.binder:
              applicationId: "${spring.application.name}-KApp"
              brokers: "${kafka.brokers}"

kafka:
  topic:
    texts: "pub.texts"
    lengths: "pub.lengths"
  brokers: "localhost:9094"
Enter fullscreen mode Exit fullscreen mode
  • Spring Cloud Stream will create:
    • A Kafka Streams binder connected to localhost:9094
    • A Kafka binder connected to localhost:9094
  • Following the Spring Cloud Stream functional programming model conventions we should create:
    • A bean named textProducer that should implement:
    • In Java: Supplier<Flux<TextEvent>> interface
    • In Kotlin: () -> Flux<TextEvent> lambda
    • A bean named textLengthProcessor that should implement:
    • In Java: Function<KStream<String, TextEvent>, KStream<String, LengthEvent>> interface
    • In Kotlin: the same, there is no support for lambdas yet 😅
    • A bean named lengthConsumer that should implement:
    • In Java: Consumer<LengthEvent> interface
    • In Kotlin: (LengthEvent) -> Unit lambda

💡 We use different values for the Kafka Streams applicationId and the Kafka Consumers group to avoid undesired behaviors.

💡 We are using Spring Cloud Stream's default serialization/deserialization of Kotlin data classes to Json. In order for this to work we need to add com.fasterxml.jackson.module:jackson-module-kotlin dependency.

💡 You can find all the available configuration properties documented in:

TextProducer

We create TextProducer interface to be implemented later:

data class TextEvent(val text: String)

interface TextProducer {
  fun produce(event: TextEvent)
}
Enter fullscreen mode Exit fullscreen mode

TextController

Once we have the test ...

@WebFluxTest(controllers = [TextController::class])
class TextControllerTest {

  @Autowired
  lateinit var webClient: WebTestClient

  @MockBean
  lateinit var textProducer: TextProducer

  @Test
  fun `should produce text events`() {
    val text = "Some awesome text"
    webClient.post()
      .uri("/text")
      .bodyValue(text)
      .exchange()
      .expectStatus().isOk

    verify(textProducer).produce(TextEvent(text))
  }
}
Enter fullscreen mode Exit fullscreen mode

... the implementation is easy:

@RestController
class TextController(private val textProducer: TextProducer) {

  @PostMapping("/text", consumes = [TEXT_PLAIN_VALUE])
  fun text(@RequestBody text: String) {
    textProducer.produce(TextEvent(text))
  }
}
Enter fullscreen mode Exit fullscreen mode

TextProducer implementation

We implement TextProducer as expected by Spring Cloud Stream conventions like this:

class TextFluxProducer : () -> Flux<TextEvent>, TextProducer {

  private val sink = Sinks.many().unicast().onBackpressureBuffer<TextEvent>()

  override fun produce(event: TextEvent) {
    sink.emitNext(event, FAIL_FAST)
  }

  override fun invoke() = sink.asFlux()
}
Enter fullscreen mode Exit fullscreen mode

And we can easily test the implementation as follows:

@Test
fun `should produce text events`() {
  val producer = TextFluxProducer()

  val events = mutableListOf<TextEvent>()
  producer().subscribe(events::add)

  producer.produce(TextEvent("Well"))
  producer.produce(TextEvent("nobody is"))
  producer.produce(TextEvent("perfect!"))

  assertThat(events).containsExactly(
    TextEvent("Well"),
    TextEvent("nobody is"),
    TextEvent("perfect!")
  )
}
Enter fullscreen mode Exit fullscreen mode

TextLengthProcessor

We implement the transformation using Kafka Stream's mapValues method:

class TextLengthProcessor : Function<KStream<String, TextEvent>, KStream<String, LengthEvent>> {

  override fun apply(input: KStream<String, TextEvent>): KStream<String, LengthEvent> {
    return input
      .mapValues { event -> LengthEvent(event.text.length) }
  }
}
Enter fullscreen mode Exit fullscreen mode

And we can test it using kafka-streams-test-utils:

private const val TOPIC_IN = "topic.in"
private const val TOPIC_OUT = "topic.out"

private const val KEY1 = "key1"
private const val KEY2 = "key2"
private const val KEY3 = "key3"

internal class TextLengthProcessorTest {

  private lateinit var topologyTestDriver: TopologyTestDriver
  private lateinit var topicIn: TestInputTopic<String, TextEvent>
  private lateinit var topicOut: TestOutputTopic<String, LengthEvent>

  @BeforeEach
  fun beforeEach() {
    val stringSerde = Serdes.StringSerde()
    val streamsBuilder = StreamsBuilder()

    TextLengthProcessor()
      .apply(streamsBuilder.stream(TOPIC_IN))
      .to(TOPIC_OUT)

    val config = Properties().apply {
      setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.javaClass.name)
      setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde::class.java.name)
      setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test")
      setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server")
      setProperty(JsonDeserializer.TRUSTED_PACKAGES, "*")
    }
    val topology = streamsBuilder.build()
    topologyTestDriver = TopologyTestDriver(topology, config)
    topicIn = topologyTestDriver.createInputTopic(TOPIC_IN, stringSerde.serializer(), JsonSerde(TextEvent::class.java).serializer())
    topicOut = topologyTestDriver.createOutputTopic(TOPIC_OUT, stringSerde.deserializer(), JsonSerde(LengthEvent::class.java).deserializer())
  }

  @AfterEach
  fun afterEach() {
    topologyTestDriver.close()
  }

  @Test
  fun `should produce length events from text events`() {
    topicIn.pipeInput(KEY1, TextEvent("Hello!"))
    topicIn.pipeInput(KEY2, TextEvent("How are you?"))
    topicIn.pipeInput(KEY3, TextEvent("Bye!"))

    assertThat(topicOut.readKeyValuesToList()).containsExactly(
      KeyValue(KEY1, LengthEvent(6)),
      KeyValue(KEY2, LengthEvent(12)),
      KeyValue(KEY3, LengthEvent(4))
    )
  }
}
Enter fullscreen mode Exit fullscreen mode

LengthConsumer

We implement LengthStreamConsumer as expected by Spring Cloud Stream conventions like this:

data class LengthEvent(val length: Int)

class LengthStreamConsumer(private val processor: LengthProcessor) : (LengthEvent) -> Unit {

  override fun invoke(event: LengthEvent) {
    processor.process(event)
  }
}
Enter fullscreen mode Exit fullscreen mode

We decouple the final implementation using the interface LengthProcessor:

interface LengthProcessor {
  fun process(event: LengthEvent)
}
Enter fullscreen mode Exit fullscreen mode

And we can test everything with this code:

@Test
fun `should consume length events`() {
  val lengthProcessor = mock(LengthProcessor::class.java)
  val lengthStreamConsumer = LengthStreamConsumer(lengthProcessor)

  lengthStreamConsumer(LengthEvent(10))
  lengthStreamConsumer(LengthEvent(20))
  lengthStreamConsumer(LengthEvent(30))

  verify(lengthProcessor).process(LengthEvent(10))
  verify(lengthProcessor).process(LengthEvent(20))
  verify(lengthProcessor).process(LengthEvent(30))
}
Enter fullscreen mode Exit fullscreen mode

LengthConsumer implementation

For this demo the implementation just logs the event:

class LengthConsoleProcessor : LengthProcessor {

  private val logger = LoggerFactory.getLogger(LengthConsoleProcessor::class.java)

  override fun process(event: LengthEvent) {
    logger.info("Consumed length [${event.length}]")
  }
}
Enter fullscreen mode Exit fullscreen mode

And we can also test it using JUnit Jupiter OutputCaptureExtension:

@ExtendWith(OutputCaptureExtension::class)
internal class LengthConsoleProcessorTest {

  @Test
  fun `should log consumed length event to console`(capturedOutput: CapturedOutput) {
    val lengthConsoleProcessor = LengthConsoleProcessor()

    lengthConsoleProcessor.process(LengthEvent(53))

    assertThat(capturedOutput.out).contains("Consumed length [53]")
  }
}
Enter fullscreen mode Exit fullscreen mode

Wiring it all together

We only need to create all the required instances naming them accordingly to the binder configuration:

@Configuration
class MyApplicationConfiguration {

  @Bean
  fun textFluxProducer() = TextFluxProducer()

  @Bean
  fun textProducer(textProducer: TextFluxProducer): () -> Flux<TextEvent> = textProducer

  @Bean
  fun textLengthProcessor(): Function<KStream<String, TextEvent>, KStream<String, LengthEvent>> =
    TextLengthProcessor()

  @Bean
  fun lengthConsumer(lengthProcessor: LengthProcessor): (LengthEvent) -> Unit =
    LengthStreamConsumer(lengthProcessor)

  @Bean
  fun lengthConsoleProcessor() = LengthConsoleProcessor()
}
Enter fullscreen mode Exit fullscreen mode

Please note that:

  • The three Spring Cloud functions defined in application.yml will be bound by name to the beans textProducer, textLengthProcessor and lengthConsumer.
    • For the Kafka binder ones, textProducer and lengthConsumer, we have to define them explicitly as Kotlin lambdas (required by KotlinLambdaToFunctionAutoConfiguration).
    • If we were using Java we should use java.util.function types: Supplier and Consumer.
    • For the Kafka Stream binder one, textLengthProcessor, we have to define it explicitly as a java.util.function.Function, there is no support for Kotlin lambdas yet (check KafkaStreamsFunctionBeanPostProcessor).
  • Beans textFluxProducer and textProducer return the same instance ...
    • We need textFluxProducer to inject it whenever a TextProducer interface is needed (the TextController for example).
    • We need textProducer to bind it to the textProducer Spring Cloud function required by the Kafka binder.

And that is it, now MyApplicationIntegrationTest should work! 🤞

Happy coding! 💙

Latest comments (0)