Apache Kafka solves many problems within your software engineering organization. Once adoption increases, it also introduces new challenges, such as making sure all Kafka producers and consumers implement correct data formats.
In this blog post, we'll explore how to guarantee that all clients work with agreed-upon data formats and stay compliant.
The Problem
Kafka producers and consumers don't know anything about each other, as they do not interact directly. The only interaction point both application types have is the messages they exchange via Kafka topics.
If we only have a handful of producers and consumers managed by a single team, it is straightforward to negotiate message formats and enforce them. As Kafka gets adopted by more teams across the organization, staying compliant becomes much more challenging. We need a mechanism that supports us in writing producer/consumer code that works reliably and does not fail because of malformed message data.
Using Schemas
To solve this problem, we establish schemas for all types of exchanged data. In other scenarios, you might have used XSD (for XML documents) or JSON schema (for JSON APIs). In the context of Kafka, we will implement Apache Avro. Avro helps us to describe the data we want to exchange while also providing mechanisms to serialize it. We can also generate code from schema files (useful for statically typed programming languages).
What is the Schema Registry?
With the data format set, we need one additional building block to make schemata available to producers and consumers. This building block is the Schema Registry. It's an additional service that runs alongside Kafka, managing different schema definitions for data types producers and consumers use.
Kafka itself does not interact with the registry. It does not know what kind of data producers add to a topic. From Kafka's point of view, everything consists of bytes. It has no concept of what these bytes represent. Producers and consumers, on the other hand, need to understand what those bytes mean. Therefore, producers and consumers will work with the schema registry to ensure all payloads conform to a specific schema.
How does Kafka Schema Registry work?
The schema registry contains several schema definitions for different types of data. These schema definitions get registered via HTTP (example below). When the producer constructs a new message, it queries the registry for a specific schema. The producer uses the returned schema to serialize the data into a byte array, which then gets sent to Kafka. Once the consumer receives a new message, instead of deserializing it into a string right away, it uses the schema and schema registry to turn bytes into a data structure.
How does the consumer know what schema to use?
Each message produced on a topic includes a reference to the related schema. We use a unique ID to identify a specific schema and schema version within the registry. The consumer can use this ID to retrieve the correct schema.
How to define a Schema?
A schema gets defined in JSON format. Here's an example of a chat message:
{"namespace": "com.cisco.eti.chatmessage",
"type": "record",
"name": "ChatMessage",
"doc" : "Represents a chat message sent by a user",
"fields": [
{"name": "email", "type": "string", "doc": "The person who sent the message"},
{"name": "message", "type": "string", "doc": "The message"}
]
}
We describe all fields and data types that a record of this type has. Each field has a name, type, and documentation. We can also define default values.
Once defined, we can register it with the registry like this:
$ jq '. | {schema: tojson}' schema.avsc | curl -XPOST http://localhost:8085/subjects/chat_message/versions -H 'Content-Type: application/json' -d @- |
A schema is saved under a so-called subject name
. This subject name is used later to retrieve the correct schema by the producer.
If we change this schema and execute the same curl request, we create a new revision with a unique ID.
Using Avro Schema in Rust
Let's see how we can use an Avro schema in a Rust application. For demonstration purposes, let's say we're building a chat application. A producer sends mock chat messages to a dedicated Kafka topic, which then gets consumed by a reader, and printing out all received messages.
Both applications use Avro Schemas to validate events. The producer code is pretty straightforward:
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let sr_settings = SrSettings::new(format!("http://{}", schema_registry_address())); //(2)
let encoder = AvroEncoder::new(sr_settings); //(1)
let existing_schema_strategy = SubjectNameStrategy::RecordNameStrategy(String::from("chat_message")); //(3)
let bytes = encoder.encode_struct(&payload, &existing_schema_strategy).await?;
Instead of just serializing a payload to a byte array, we use a special encoder, AvroEncoder
(1). AvroEncoder
knows where it finds the registry (2) and will fetch the appropriate schema for the payload, based on the existing_schema_strategy
(3).
On the consumer side, we have some more work to do:
consumer
.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
let sr_settings = SrSettings::new(format!("http://{}", "localhost:8085"));
let decoder = AvroDecoder::new(sr_settings);
loop {
match consumer.recv().await {
Err(e) => eprintln!("Kafka error: {}", e),
Ok(m) => {
match m.key_view::<str>() {
Some(Ok(key)) => match decoder.decode_with_schema(m.payload()).await { //(1)
Ok(tuple) => match get_avro_data(tuple) { //(2)
Some(v) => {
println!("RECEIVED V: {:?}", v);
},
None => eprintln!("Could not get avro data"),
}
Err(e) => eprintln!("Error decoding value of record with error {:?}", e),
},
Some(Err(_)) => eprintln!("Message payload is not a string"),
None => eprintln!("No key")
}
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
};
}
We're using AvroDecoder
to fetch the appropriate schema and deserialize incoming data. Once the decoder has done its work (1), we can convert the received data into a data structure that's easy to work with (2). While this example code performs the translation between received data and a data structure manually, it provides us with the safety of the schema to ensure we receive valid data.
Summary
Once Kafka adoption within the organization increases, enforcing data formats for all producers and consumers becomes crucial. Kafka developers leverage the schema registry to manage schemata to reduce efforts and increase compliance. Producers and consumers interface with the schema registry to fetch schemata and validate payloads against it.
Next Steps
Are you thinking about deploying your Kafka application to production, but you need help with how to deploy Kafka? A managed hoster is too expensive, and you feel you lack the expertise to run Kafka in your Kubernetes cluster?
Check out Calisti. Calisti gives you the best of both worlds. You can run Kafka on your premises without the hassle of doing everything yourself.
Calisti supports you during the Kafka installation, providing you with sensible defaults. It also comes with the option to deploy your own Schema Registry and scaling mechanisms when needed. Calisti offers a generous free tier - no credit card required.
Sign up for a free account today.
Top comments (2)
Creator/maintainer of the Rust Crate for Schema Registry here. Thanks a lot for the example. Another great advantage of schema's, is being able to work with different languages. You can have a producer in Rust, and a consumer in Java. The payloads will also be smaller, which also means better performance.
Great addition!