Spring Cloud Stream uses Json serializers by default, but maybe you want to consider using Avro serializers:
- It is faster to serialize/deserialize.
- It uses a more compact binary form.
- You can define a schema and compatibility rules between schema versions.
In this demo, based on the spring-cloud-stream-schema-registry-integration sample, we will create three Spring Cloud Stream applications, one consumer and two producers, all of them using the Confluent Schema Registry Server and the Confluent Avro Serializers.
As you can see in the diagram, the consumer should be able to consume both Sensor v1 and v2 messages.
Ready? Let's do it! 🚀
rogervinas / spring-cloud-stream-kafka-confluent-avro-schema-registry
🍀 Spring Cloud Stream Kafka & Confluent Avro Schema Registry
- Creating the project
- Implementing the Producer v1
- Implementing the Producer v2
- Implementing the Consumer
Creating the project
Just to keep it simple we will put the consumer and the two producers as modules of a gradle multi-module project, with a little help of spring initializr.
As we do not use maven like the spring-cloud-stream-schema-registry-integration sample, we cannot use the official avro-maven-plugin. We will use davidmc24/gradle-avro-plugin instead.
We will use a docker-compose.yml based on the one from confluent/cp-all-in-one both to run it locally and to execute the integration tests. From that configuration we will keep only the containers: zookeeper, broker, schema-registry and control-center.
Confluent control-center is not really needed, but it may be interesting to take a look at its admin console at http://localhost:9021 when running the demo locally.
We use SCHEMA_COMPATIBILITY_LEVEL = none
but you can play with the other ones as documented in Confluent Schema Evolution and Compatibility.
Implementing the Producer v1
This producer will send Sensor messages in v1 format as specified in producer1/avro/sensor.avsc:
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"temperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0}
]
}
The davidmc24/gradle-avro-plugin will generate java code similar to:
public class Sensor {
public Sensor(String id, Float temperature, Float acceleration, Float velocity) {
this.id = id;
this.temperature = temperature;
this.acceleration = acceleration;
this.velocity = velocity;
}
// Getters and Setters
}
Using this configuration:
spring:
cloud:
function:
definition: "myProducer"
stream:
bindings:
myProducer-out-0:
destination: sensor-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
myProducer-out-0:
producer:
configuration:
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
Then Spring Cloud Stream:
- Will expect us to implement a @bean named
myProducer
returning a value or aFlux
of values:- In Kotlin we can use a lambda
() -> Value
or() -> Flux<Value>
. - In Java we can use a
Supplier<Value>
orSupplier<Flux<Value>>
.
- In Kotlin we can use a lambda
- Will call this @bean one or many times to retrieve the values to be published.
- Will connect to a kafka broker on
localhost:9092
. - Will use the confluent
KafkaAvroSerializer
and connect to the schema registry server onlocalhost:8081
.
For simplicity, we put everything in the same Application class, including the RestController:
@SpringBootApplication
@RestController
class Application {
private val version = "v1"
private val unbounded: BlockingQueue<Sensor> = LinkedBlockingQueue()
@Autowired private lateinit var random: Random
@Bean
fun myProducer(): () -> Sensor? = { unbounded.poll() }
@RequestMapping(value = ["/messages"], method = [RequestMethod.POST])
fun sendMessage(): String {
unbounded.offer(randomSensor())
return "ok, have fun with $version payload!"
}
private fun randomSensor() = Sensor().apply {
this.id = random.nextInt(1000, 9999).toString() + "-$version"
this.acceleration = random.nextFloat() * 10
this.velocity = random.nextFloat() * 100
this.temperature = random.nextFloat() * 50
}
}
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
To generate a random Sensor message we will use this Random @bean, that we could mock for the integration test:
@Configuration
class RandomConfiguration {
@Bean
fun random() = Random(System.nanoTime())
}
Testing the Producer v1
To test the producer we will use Testcontainers to start docker compose. For simplicity, we will use fixed ports but take into account that it may be better to use random ports and set system properties to override the configuration values for the kafka broker url and the schema registry url.
To verify that the producer is indeed producing messages we will consume them using a simple KafkaConsumer using the Avro Deserializer:
@Test
fun `should produce sensor v1 message`() {
KafkaConsumer<String, GenericRecord>(consumerProperties()).use { consumer ->
// Subscribe to topic
consumer.subscribe(listOf(SENSOR_TOPIC))
// Consume previous messages (just in case)
consumer.poll(TIMEOUT)
// Produce one message
WebTestClient
.bindToServer()
.baseUrl("http://localhost:$serverPort")
.build()
.post().uri("/messages").exchange()
.expectStatus().isOk
.expectBody(String::class.java).isEqualTo("ok, have fun with v1 payload!")
// Consume message
assertThat(consumer.poll(TIMEOUT)).singleElement().satisfies(Consumer { record ->
val value = record.value()
assertThat(value["id"]).isEqualTo("2376-v1")
assertThat(value["temperature"]).isEqualTo(33.067642f)
assertThat(value["acceleration"]).isEqualTo(3.2810485f)
assertThat(value["velocity"]).isEqualTo(84.885544f)
})
}
}
To generate always the same result, we will override the Random @bean with one with a fixed seed:
@Configuration
class RandomTestConfiguration {
@Bean @Primary
fun randomTest() = Random(0)
}
Implementing the Producer v2
This producer will send Sensor messages in v2 format as specified in producer2/avro/sensor.avsc:
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"internalTemperature", "type":"float", "default":0.0},
{"name":"externalTemperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0},
{"name":"accelerometer","type":[
"null",{
"type":"array",
"items":"float"
}
]},
{"name":"magneticField","type":[
"null",{
"type":"array",
"items":"float"
}
]}
]
}
Note that this new version:
- Splits the
temperature
field in two:internalTemperature
andexternalTemperature
. - Adds two new optional fields:
accelerometer
andmagneticField
.
Implementing and testing this producer is done the same as the v1 one.
Implementing the Consumer
For the consumer to be able to consume Sensor v1 and v2 messages we define the schema in consumer/avro/sensor.avsc as:
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]},
{"name":"externalTemperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0}
]
}
So if it receives a v1 message:
-
temperature
field would be mapped tointernalTemperature
. -
externalTemperature
field would be 0.
Then, using this configuration:
spring:
cloud:
function:
definition: "myConsumer"
stream:
bindings:
myConsumer-in-0:
destination: sensor-topic
consumer:
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
myConsumer-in-0:
consumer:
configuration:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
Spring Cloud Stream:
- Will expect us to implement a @bean named
myConsumer
accepting a value:- In Kotlin we can use a lambda
(Value) -> Unit
. - In Java we can use a
Consumer<Value>
.
- In Kotlin we can use a lambda
- Will call this @bean every time a value is consumed.
- Will connect to a kafka broker on
localhost:9092
. - Will use the confluent
KafkaAvroDeserializer
and connect to the schema registry server onlocalhost:8081
.
So for the sake of a demo the implementation can be as simple as:
@SpringBootApplication
class Application {
@Bean
fun myConsumer(): (Sensor) -> Unit = { println("Consumed $it") }
}
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
Testing the Consumer
Same as for the producers, to test the consumer we will use Testcontainers to start docker compose.
To produce test messages we will use a simple KafkaProducer using the Avro Serializer:
First of all we will mock the process
@bean so we can verify it has been called:
@MockBean(name = "myConsumer")
private lateinit var myConsumer: (Sensor) -> Unit
Then we test that we can consume Sensor v1 messages:
@Test
fun `should consume sensor v1 message`() {
val id = UUID.randomUUID().toString()
val temperature = 34.98f
val acceleration = 9.81f
val velocity = 15.73f
val recordV1 = createRecord(
schema = """
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"temperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0}
]
}
""".trimIndent()
).apply {
put("id", id)
put("temperature", temperature)
put("acceleration", acceleration)
put("velocity", velocity)
}
produceRecord(id, recordV1)
verify(myConsumer, timeout(TIMEOUT.toMillis()))
.invoke(Sensor(id, temperature, 0f, acceleration, velocity))
}
And we test that we can consume Sensor v2 messages:
@Test
fun `should consume sensor v2 message`() {
val id = UUID.randomUUID().toString()
val internalTemperature = 34.98f
val externalTemperature = 54.16f
val acceleration = 9.81f
val velocity = 15.73f
val recordV2 = createRecord(
schema = """
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"internalTemperature", "type":"float", "default":0.0},
{"name":"externalTemperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0},
{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},
{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}
]
}
""".trimIndent()
).apply {
put("id", id)
put("internalTemperature", internalTemperature)
put("externalTemperature", externalTemperature)
put("acceleration", acceleration)
put("velocity", velocity)
put("accelerometer", listOf(1.1f, 2.2f, 3.3f))
put("magneticField", listOf(4.4f, 5.5f, 6.6f))
}
produceRecord(id, recordV2)
verify(myConsumer, timeout(TIMEOUT.toMillis()))
.invoke(Sensor(id, internalTemperature, externalTemperature, acceleration, velocity))
}
That's it! Happy coding! 💙
Top comments (0)