DEV Community

Cover image for Spring Cloud Stream Kafka & Confluent Avro Schema Registry
Roger Viñas Alcon
Roger Viñas Alcon

Posted on • Updated on

Spring Cloud Stream Kafka & Confluent Avro Schema Registry

Spring Cloud Stream uses Json serializers by default, but maybe you want to consider using Avro serializers:

  • It is faster to serialize/deserialize.
  • It uses a more compact binary form.
  • You can define a schema and compatibility rules between schema versions.

In this demo, based on the spring-cloud-stream-schema-registry-integration sample, we will create three Spring Cloud Stream applications, one consumer and two producers, all of them using the Confluent Schema Registry Server and the Confluent Avro Serializers.

Diagram

As you can see in the diagram, the consumer should be able to consume both Sensor v1 and v2 messages.

Ready? Let's do it! 🚀

GitHub logo rogervinas / spring-cloud-stream-kafka-confluent-avro-schema-registry

🍀 Spring Cloud Stream Kafka & Confluent Avro Schema Registry

Creating the project

Just to keep it simple we will put the consumer and the two producers as modules of a gradle multi-module project, with a little help of spring initializr.

As we do not use maven like the spring-cloud-stream-schema-registry-integration sample, we cannot use the official avro-maven-plugin. We will use davidmc24/gradle-avro-plugin instead.

We will use a docker-compose.yml based on the one from confluent/cp-all-in-one both to run it locally and to execute the integration tests. From that configuration we will keep only the containers: zookeeper, broker, schema-registry and control-center.

Confluent control-center is not really needed, but it may be interesting to take a look at its admin console at http://localhost:9021 when running the demo locally.

We use SCHEMA_COMPATIBILITY_LEVEL = none but you can play with the other ones as documented in Confluent Schema Evolution and Compatibility.

Implementing the Producer v1

This producer will send Sensor messages in v1 format as specified in producer1/avro/sensor.avsc:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"temperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0}
  ]
}
Enter fullscreen mode Exit fullscreen mode

The davidmc24/gradle-avro-plugin will generate java code similar to:

public class Sensor {
    public Sensor(String id, Float temperature, Float acceleration, Float velocity) {
        this.id = id;
        this.temperature = temperature;
        this.acceleration = acceleration;
        this.velocity = velocity;
    }

    // Getters and Setters
}
Enter fullscreen mode Exit fullscreen mode

Using this configuration:

spring:
  cloud:
    function:
      definition: "myProducer"
    stream:
      bindings:
        myProducer-out-0:
          destination: sensor-topic
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          myProducer-out-0:
            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
Enter fullscreen mode Exit fullscreen mode

Then Spring Cloud Stream:

  • Will expect us to implement a @bean named myProducer returning a value or a Flux of values:
    • In Kotlin we can use a lambda () -> Value or () -> Flux<Value>.
    • In Java we can use a Supplier<Value> or Supplier<Flux<Value>>.
  • Will call this @bean one or many times to retrieve the values to be published.
  • Will connect to a kafka broker on localhost:9092.
  • Will use the confluent KafkaAvroSerializer and connect to the schema registry server on localhost:8081.

For simplicity, we put everything in the same Application class, including the RestController:

@SpringBootApplication
@RestController
class Application {

  private val version = "v1"
  private val unbounded: BlockingQueue<Sensor> = LinkedBlockingQueue()
  @Autowired private lateinit var random: Random

  @Bean
  fun myProducer(): () -> Sensor? = { unbounded.poll() }

  @RequestMapping(value = ["/messages"], method = [RequestMethod.POST])
  fun sendMessage(): String {
    unbounded.offer(randomSensor())
    return "ok, have fun with $version payload!"
  }

  private fun randomSensor() = Sensor().apply {
    this.id = random.nextInt(1000, 9999).toString() + "-$version"
    this.acceleration = random.nextFloat() * 10
    this.velocity = random.nextFloat() * 100
    this.temperature = random.nextFloat() * 50
  }
}

fun main(args: Array<String>) {
  runApplication<Application>(*args)
}
Enter fullscreen mode Exit fullscreen mode

To generate a random Sensor message we will use this Random @bean, that we could mock for the integration test:

@Configuration
class RandomConfiguration { 
  @Bean
  fun random() = Random(System.nanoTime())
}
Enter fullscreen mode Exit fullscreen mode

Testing the Producer v1

To test the producer we will use Testcontainers to start docker compose. For simplicity, we will use fixed ports but take into account that it may be better to use random ports and set system properties to override the configuration values for the kafka broker url and the schema registry url.

To verify that the producer is indeed producing messages we will consume them using a simple KafkaConsumer using the Avro Deserializer:

@Test
fun `should produce sensor v1 message`() {
  KafkaConsumer<String, GenericRecord>(consumerProperties()).use { consumer ->
    // Subscribe to topic
    consumer.subscribe(listOf(SENSOR_TOPIC))

    // Consume previous messages (just in case)
    consumer.poll(TIMEOUT)

    // Produce one message
    WebTestClient
      .bindToServer()
      .baseUrl("http://localhost:$serverPort")
      .build()
      .post().uri("/messages").exchange()
      .expectStatus().isOk
      .expectBody(String::class.java).isEqualTo("ok, have fun with v1 payload!")

    // Consume message
    assertThat(consumer.poll(TIMEOUT)).singleElement().satisfies(Consumer { record ->
      val value = record.value()
      assertThat(value["id"]).isEqualTo("2376-v1")
      assertThat(value["temperature"]).isEqualTo(33.067642f)
      assertThat(value["acceleration"]).isEqualTo(3.2810485f)
      assertThat(value["velocity"]).isEqualTo(84.885544f)
    })
  }
}
Enter fullscreen mode Exit fullscreen mode

To generate always the same result, we will override the Random @bean with one with a fixed seed:

@Configuration
class RandomTestConfiguration {
  @Bean @Primary
  fun randomTest() = Random(0)
}
Enter fullscreen mode Exit fullscreen mode

Implementing the Producer v2

This producer will send Sensor messages in v2 format as specified in producer2/avro/sensor.avsc:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"internalTemperature", "type":"float", "default":0.0},
    {"name":"externalTemperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0},
    {"name":"accelerometer","type":[
      "null",{
        "type":"array",
        "items":"float"
      }
    ]},
    {"name":"magneticField","type":[
      "null",{
        "type":"array",
        "items":"float"
      }
    ]}
  ]
}
Enter fullscreen mode Exit fullscreen mode

Note that this new version:

  • Splits the temperature field in two: internalTemperature and externalTemperature.
  • Adds two new optional fields: accelerometer and magneticField.

Implementing and testing this producer is done the same as the v1 one.

Implementing the Consumer

For the consumer to be able to consume Sensor v1 and v2 messages we define the schema in consumer/avro/sensor.avsc as:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]},
    {"name":"externalTemperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0}
  ]
}
Enter fullscreen mode Exit fullscreen mode

So if it receives a v1 message:

  • temperature field would be mapped to internalTemperature.
  • externalTemperature field would be 0.

Then, using this configuration:

spring:
  cloud:
    function:
      definition: "myConsumer"
    stream:
      bindings:
        myConsumer-in-0:
          destination: sensor-topic
          consumer:
            useNativeDecoding: true
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          myConsumer-in-0:
            consumer:
              configuration:
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                schema.registry.url: http://localhost:8081
                specific.avro.reader: true
Enter fullscreen mode Exit fullscreen mode

Spring Cloud Stream:

  • Will expect us to implement a @bean named myConsumer accepting a value:
    • In Kotlin we can use a lambda (Value) -> Unit.
    • In Java we can use a Consumer<Value>.
  • Will call this @bean every time a value is consumed.
  • Will connect to a kafka broker on localhost:9092.
  • Will use the confluent KafkaAvroDeserializer and connect to the schema registry server on localhost:8081.

So for the sake of a demo the implementation can be as simple as:

@SpringBootApplication
class Application {
  @Bean
  fun myConsumer(): (Sensor) -> Unit = { println("Consumed $it") }
}

fun main(args: Array<String>) {
  runApplication<Application>(*args)
}
Enter fullscreen mode Exit fullscreen mode

Testing the Consumer

Same as for the producers, to test the consumer we will use Testcontainers to start docker compose.

To produce test messages we will use a simple KafkaProducer using the Avro Serializer:

First of all we will mock the process @bean so we can verify it has been called:

@MockBean(name = "myConsumer")
private lateinit var myConsumer: (Sensor) -> Unit
Enter fullscreen mode Exit fullscreen mode

Then we test that we can consume Sensor v1 messages:

@Test
fun `should consume sensor v1 message`() {
  val id = UUID.randomUUID().toString()
  val temperature = 34.98f
  val acceleration = 9.81f
  val velocity = 15.73f

  val recordV1 = createRecord(
    schema = """
    {
      "namespace" : "com.example",
      "type" : "record",
      "name" : "Sensor",
      "fields" : [
        {"name":"id","type":"string"},
        {"name":"temperature", "type":"float", "default":0.0},
        {"name":"acceleration", "type":"float","default":0.0},
        {"name":"velocity","type":"float","default":0.0}
      ]
    }
  """.trimIndent()
  ).apply {
    put("id", id)
    put("temperature", temperature)
    put("acceleration", acceleration)
    put("velocity", velocity)
  }

  produceRecord(id, recordV1)

  verify(myConsumer, timeout(TIMEOUT.toMillis()))
    .invoke(Sensor(id, temperature, 0f, acceleration, velocity))
}
Enter fullscreen mode Exit fullscreen mode

And we test that we can consume Sensor v2 messages:

@Test
fun `should consume sensor v2 message`() {
 val id = UUID.randomUUID().toString()
 val internalTemperature = 34.98f
 val externalTemperature = 54.16f
 val acceleration = 9.81f
 val velocity = 15.73f

 val recordV2 = createRecord(
   schema = """
   {
     "namespace" : "com.example",
     "type" : "record",
     "name" : "Sensor",
     "fields" : [
       {"name":"id","type":"string"},
       {"name":"internalTemperature", "type":"float", "default":0.0},
       {"name":"externalTemperature", "type":"float", "default":0.0},
       {"name":"acceleration", "type":"float","default":0.0},
       {"name":"velocity","type":"float","default":0.0},
       {"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},
       {"name":"magneticField","type":["null",{"type":"array","items":"float"}]}
     ]
   }
 """.trimIndent()
 ).apply {
   put("id", id)
   put("internalTemperature", internalTemperature)
   put("externalTemperature", externalTemperature)
   put("acceleration", acceleration)
   put("velocity", velocity)
   put("accelerometer", listOf(1.1f, 2.2f, 3.3f))
   put("magneticField", listOf(4.4f, 5.5f, 6.6f))
 }

 produceRecord(id, recordV2)

 verify(myConsumer, timeout(TIMEOUT.toMillis()))
   .invoke(Sensor(id, internalTemperature, externalTemperature, acceleration, velocity))
}
Enter fullscreen mode Exit fullscreen mode

That's it! Happy coding! 💙

Top comments (0)