DEV Community

Cover image for Spring Cloud Stream Kafka Streams Processor API
Roger Viñas Alcon
Roger Viñas Alcon

Posted on • Edited on

Spring Cloud Stream Kafka Streams Processor API

Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.

It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):

  • Apache Kafka
  • Rabbit MQ
  • Kafka Streams
  • Amazon Kinesis
  • ...

In a previous post Spring Cloud Stream Kafka Streams first steps I got working a simple example using the Kafka Streams binder.

In this one the goal is to use the Kafka Streams binder and the Kafka Streams Processor API to implement the following scenario:

Diagram

  1. We receive messages with key = userId and value = { userId: string, token: number } from topic pub.user.token

  2. For every userId which we receive token 1, 2, 3, 4 and 5 within under 1 minute, we send a completed event to topic pub.user.state

  3. For every userId which we receive at least one token but not the complete 1, 2, 3, 4 and 5 sequence within under 1 minute, we send an expired event to topic pub.user.state

Ready? Let's code! 🤓

GitHub logo rogervinas / spring-cloud-stream-kafka-streams-processor

🍀 Spring Cloud Stream & Kafka Streams Binder + Processor API

If you run this demo don't forget to read this information about caching in the state stores

Test-first using kafka-streams-test-utils

Once kafka-streams-test-utils is properly setup in our @BeforeEach we can implement this test:

data class UserTokenEvent(val userId: String, val token: Int)

enum class UserStateEventType { COMPLETED, EXPIRED }
data class UserStateEvent(val userId: String, val state: UserStateEventType)

@Test
fun `should publish completed event for one user`() {
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 3))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 4))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 5))

  topologyTestDriver.advanceWallClockTime(EXPIRATION.minusMillis(10))

  assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
    assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
    assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, COMPLETED))
  })
}

@Test
fun `should publish expired event for one user`() {
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))

  topologyTestDriver.advanceWallClockTime(EXPIRATION.plus(SCHEDULE).plus(SCHEDULE))

  assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
    assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
    assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, EXPIRED))
  })
}
Enter fullscreen mode Exit fullscreen mode

UserStateStream implementation

We start first with our UserStateStream implementation as a Function:

  • Which input is a KStream, as we want a String as the Kafka message's key and a UserTokenEvent as the Kafka message's value
  • Which output is a KStream, same here, String as the key and UserStateEvent as the value
class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {

  override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
    TODO()
  }
}
Enter fullscreen mode Exit fullscreen mode

Now step by step ...

1. Aggregation by userId

private const val USER_STATE_STORE = "user-state"

data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
  operator fun plus(event: UserTokenEvent) = UserState(event.userId, tokens + event.token)
}

class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
  override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
    return input
      .selectKey { _, event -> event.userId } // just in case but the key should be userId already
      .groupByKey()
      .aggregate(
        { UserState() },
        { userId, event, state ->
          logger.info("Aggregate $userId ${state.tokens} + ${event.token}")
          state + event // we use the UserState's plus operator
        },
        Materialized.`as`<String, UserState, KeyValueStore<Bytes, ByteArray>>(USER_STATE_STORE)
          .withKeySerde(Serdes.StringSerde())
          .withValueSerde(JsonSerde(UserState::class.java))
      )
      .toStream()
      // From here down it is just to avoid compilation errors
      .mapValues { userId, _ ->
        UserStateEvent(userId, COMPLETED) 
      }
  }
}
Enter fullscreen mode Exit fullscreen mode

2. Completed UserStateEvents

We can generate completed UserStateEvents straightaway once we receive the last UserTokenEvent:

data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
  // ...
  val completed = tokens.containsAll(listOf(1, 2, 3, 4, 5))
}

class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
  override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
    return input
      // ...
      .toStream()
      .mapValues { state ->
        logger.info("State $state")
        when {
          state.completed -> UserStateEvent(state.userId, COMPLETED)
          else -> null
        }
      }
      .filter { _, event -> event != null }
      .mapValues { event ->
        logger.info("Publish $event")
        event!!
      }
  }
}
Enter fullscreen mode Exit fullscreen mode

3. UserStateProcessor implementation

Our UserStateProcessor will scan periodically the "user-state" store and it will apply our expiration logic to every UserState:

class UserStateProcessor(
  private val schedule: Duration,
  private val expiration: Duration
) : Processor<String, UserState, Void, Void> {

  override fun init(context: ProcessorContext<Void, Void>) {
    context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
      val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
      stateStore.all().forEachRemaining { it : KeyValue<String, ValueAndTimestamp<UserState>> ->
        logger.info("Do something with $it!!") // TODO
      }
    }
  }

  override fun process(record: Record<String, UserState>?) {
    // we do not need to do anything here
  }
}
Enter fullscreen mode Exit fullscreen mode

Just apply the expiration logic this way:

data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), val expired: Boolean = false) {
  // ...
  fun expire() = UserState(userId, tokens, true)
}

class UserStateProcessor(
  private val schedule: Duration,
  private val expiration: Duration
) : Processor<String, UserState, Void, Void> {

  override fun init(context: ProcessorContext<Void, Void>) {
    context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
      val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
      stateStore.all().forEachRemaining {
        val age = Duration.ofMillis(time - it.value.timestamp())
        if (age > expiration) {
          if (it.value.value().expired) {
            // if it is already expired from a previous execution, we delete it
            logger.info("Delete ${it.key}")
            stateStore.delete(it.key)
          } else {
            // if it has expired right now, we mark it as expired and we update it
            logger.info("Expire ${it.key}")
            stateStore.put(it.key, ValueAndTimestamp.make(it.value.value().expire(), it.value.timestamp()))
          }
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

4. UserStateStream and UserStateProcessor integration

class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
  override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
    return input
      // ...
      .toStream()
      // we add the UserStateProcessor
      .apply { process(ProcessorSupplier { UserStateProcessor(schedule, expiration) }, USER_STATE_STORE) }
      // downstream we will both receive upstream realtime values as the ones "generated" by the UserStateProcessor
      .mapValues { state ->
        logger.info("State $state")
        when {
          // null states are sent downstream by UserStateProcessor when deleting entries from the store
          state == null -> null // "null" value generated by UserStateProcessor deleting values from the store
          // completed states are sent downstream from upstream
          state.completed -> UserStateEvent(state.userId, COMPLETED)
          // expired states are sent downstream by UserStateProcessor when updating entries from the store
          state.expired -> UserStateEvent(state.userId, EXPIRED)
          else -> null
        }
      }
      .filter { _, event -> event != null }
      .mapValues { event ->
        logger.info("Publish $event")
        event!!
      }
  }
}
Enter fullscreen mode Exit fullscreen mode

And just at this point our UserStreamTest should pass 🟩 👌

Kafka Streams binder configuration

Easy!

spring:
  application:
    name: "spring-cloud-stream-kafka-streams-processor"
  cloud:
    stream:
      function:
        definition: userStateStream
        bindings:
          userStateStream-in-0: "pub.user.token"
          userStateStream-out-0: "pub.user.state"
      kafka:
        streams:
          binder:
            applicationId: "${spring.application.name}"
            brokers: "localhost:9094"
            configuration:
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Enter fullscreen mode Exit fullscreen mode

With this configuration:

  • Spring Cloud Stream will create a Kafka Streams binder connected to localhost:9094
  • We need to create a @Bean named userStateStream that should implement Function interface
    • This @Bean will connect a KStream subscribed to pub.user.token topic to another KStream publishing to pub.user.state topic

You can find all the available configuration properties documented in Kafka Streams Properties.

UserStateStream bean

As required by our configuration we need to create a @Bean named userStateStream:

@Configuration
class ApplicationConfiguration {

  @Bean
  fun userStateStream(
    @Value("\${user.schedule}") schedule: Duration,
    @Value("\${user.expiration}") expiration: Duration
  ): Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> = UserStateStream(schedule, expiration)
}
Enter fullscreen mode Exit fullscreen mode

Integration Test

We already "unit test" our UserStateStream with kafka-streams-test-utils but we need also an integration test using a Kafka container ... Testcontainers to the rescue!

1. Kafka helpers

First we need utility classes to produce to Kafka and consume from Kafka using kafka-clients library:


class KafkaConsumerHelper(bootstrapServers: String, topic: String) {

  fun consumeAll(): List<ConsumerRecord<String, String>> {
    // ...
  }

  fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
    // ...
  }
}

class KafkaProducerHelper(bootstrapServers: String) {

  fun send(topic: String?, key: String, body: String) {
    // ...
  }
}
Enter fullscreen mode Exit fullscreen mode

2. DockerCompose Testcontainer

As described in Testcontainers + Junit5 we can use @Testcontainers annotation:

@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
class ApplicationIntegrationTest {

  companion object {

    @Container
    val container = DockerComposeContainerHelper().createContainer()
  }

  // ...
}
Enter fullscreen mode Exit fullscreen mode

3. Tests

And finally the tests, using Awaitility as we are testing asynchronous stuff:

class ApplicationIntegrationTest {

  // ...

  @Test
  fun `should publish completed event`() {
    val username = UUID.randomUUID().toString()

    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 5}""")

    await().atMost(ONE_MINUTE).untilAsserted {
      val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
      assertThat(record).singleElement().satisfies(Consumer {
        assertThat(it.key()).isEqualTo(username)
        JSONAssert.assertEquals("""{"userId": "$username", "state": "COMPLETED"}""", it.value(), true)
      })
    }
  }

  @Test
  fun `should publish expired event`() {
    val username = UUID.randomUUID().toString()

    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
    kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")

    await().atMost(ONE_MINUTE).untilAsserted {
      val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
      assertThat(record).singleElement().satisfies(Consumer {
        assertThat(it.key()).isEqualTo(username)
        JSONAssert.assertEquals("""{"userId": "$username", "state": "EXPIRED"}""", it.value(), true)
      })
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

And just at this point all our tests should pass 🟩 👏

That's it, happy coding! 💙

Top comments (0)