This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C
library).
In this post we will cover the Kafka Producer API.
Initial setup
Make sure you install a Kafka broker - a local setup should suffice. Of course you will need to have Rust installed as well - you will need version 1.45 or above
Before you begin, clone the GitHub repo:
git clone https://github.com/abhirockzz/rust-kafka-101
cd part1
Check the Cargo.toml
file:
...
[dependencies]
rdkafka = { version = "0.25", features = ["cmake-build","ssl"] }
...
Note on the cmake-build
feature
rust-rdkafka
provides a couple of ways to resolve the librdkafka
dependency. I chose static
linking, wherein librdkafka
was compiled. You could opt for dynamic
linking to refer to a locally installed version though.
For more, please refer to this link
Ok, let's start off with the basics.
Simple producer
Here is a simple producer based on BaseProducer:
let producer: BaseProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanisms", "PLAIN")
.set("sasl.username", "<update>")
.set("sasl.password", "<update>")
.create()
.expect("invalid producer config");
The send
method to start producing messages - it's done in tight loop
with a thread::sleep
in between (not something you would do in production) to make it easier to track/follow the results. The key, value (payload
) and the destination Kafka topic is represented in the form of a BaseRecord
for i in 1..100 {
println!("sending message");
producer
.send(
BaseRecord::to("rust")
.key(&format!("key-{}", i))
.payload(&format!("value-{}", i)),
)
.expect("failed to send message");
thread::sleep(Duration::from_secs(3));
}
You can check the entire code in the file
src/1_producer_simple.rs
To test if the producer is working ...
Run the program:
- simply rename the file
src/1_producer_simple.rs
tomain.rs
- execute
cargo run
You should see this output:
sending message
sending message
sending message
...
What's going on? To figure it out - connect to your Kafka topic (I have used rust
as the name of the Kafka topic in the above example) using the Kafka CLI consumer (or any other consumer client e.g. kafkacat
). You should see the messages flowing in.
For example:
&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Producer callback
We are flying blind right now! Unless we explicitly create a consumer to look at our messages, we have no clue whether they are being sent to Kafka. Let's fix that by implementing a ProducerContext (trait) to hook into the produce event - it's like a callback.
Start by creating a struct
and an empty implementation for the ClientContext trait (this is mandatory).
struct ProducerCallbackLogger;
impl ClientContext for ProducerCallbackLogger {}
Now comes the main part where we implement the delivery
function in the ProducerContext
trait.
impl ProducerContext for ProduceCallbackLogger {
type DeliveryOpaque = ();
fn delivery(
&self,
delivery_result: &rdkafka::producer::DeliveryResult<'_>,
_delivery_opaque: Self::DeliveryOpaque,
) {
let dr = delivery_result.as_ref();
match dr {
Ok(msg) => {
let key: &str = msg.key_view().unwrap().unwrap();
println!(
"produced message with key {} in offset {} of partition {}",
key,
msg.offset(),
msg.partition()
)
}
Err(producer_err) => {
let key: &str = producer_err.1.key_view().unwrap().unwrap();
println!(
"failed to produce message with key {} - {}",
key, producer_err.0,
)
}
}
}
}
We match against the DeliveryResult (which is a Result
after all) to account for success (Ok
) and failure (Err
) scenarios. All we do is simply log the message in both cases, since this is just an example. You could do pretty much anything you wanted to here (don't go crazy though!)
We've ignored DeliveryOpaque which is an associated type of the
ProducerContext
trait
We need to make sure that we plug in our ProducerContext
implementation. We do this by using the create_with_context method (instead of create) and make sure by providing the correct type for BaseProducer
as well.
let producer: BaseProducer<ProduceCallbackLogger> = ClientConfig::new().set(....)
...
.create_with_context(ProduceCallbackLogger {})
...
How does the "callback get called"?
Ok, we have the implementation, but we need a way to trigger it! One of the ways is to call flush on the producer. So, we could write our producer as such:
- add
producer.flush(Duration::from_secs(3));
, and - comment the
sleep
(just for now)
producer
.send(
BaseRecord::to("rust")
.key(&format!("key-{}", i))
.payload(&format!("value-{}", i)),
)
.expect("failed to send message");
producer.flush(Duration::from_secs(3));
println!("flushed message");
//thread::sleep(Duration::from_secs(3));
Hold on, we can do better!
The send
method is non-blocking (be default) but by calling flush
after each send
, we have now converted this into a synchronous invocation - not recommended from a performance perspective.
We can improve the situation by using a ThreadedProducer. It takes care of invoking the poll method in a background thread to ensure that the delivery callback notifications are delivered. Doing this is very simple - just change the type from BaseProducer
to ThreadedProducer
!
# before: BaseProducer<ProduceCallbackLogger>
# after: ThreadedProducer<ProduceCallbackLogger>
Also, we don't need the call to flush
anymore.
...
//producer.flush(Duration::from_secs(3));
//println!("flushed message");
thread::sleep(Duration::from_secs(3));
...
The code is available in
src/2_threaded_producer.rs
Run the program again
- Rename the file
src/2_threaded_producer.rs
tomain.rs
and - execute
cargo run
Output:
sending message
sending message
produced message with key key-1 in offset 6 of partition 2
produced message with key key-2 in offset 3 of partition 0
sending message
produced message with key key-3 in offset 7 of partition 2
As expected, you should be able to see the producer event callback, denoting that the messages were indeed sent to the Kafka topic. Of course, you can connect to the topic directly and double-check, just like before:
&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
To try a failure scenario, try using an incorrect topic name and notice how the
Err
variant of thedelivery
implementation gets invoked.
Sending JSON messages
So far, we were just sending String
s as key and values. JSON is a commonly used message format, let's see how to use that.
Assume we want to send User
info which will be represented using this struct
:
struct User {
id: i32,
email: String,
}
We can then use serde_json library to serialize this as JSON. All we need is to use the custom derives in serde - Deserialize
and Serialize
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct User {
id: i32,
email: String,
}
Change the producer loop:
- Create a
User
instance - Serialize it to a JSON string using to_string_pretty
- Include that in the payload
...
let user_json = serde_json::to_string_pretty(&user).expect("json serialization failed");
producer
.send(
BaseRecord::to("rust")
.key(&format!("user-{}", i))
.payload(&user_json),
)
.expect("failed to send message");
...
you can also use to_vec (instead of
to_string()
) to convert it into aVec
of bytes (Vec<u8>
)
To run the program...
- Rename the file
src/3_JSON_payload.rs
tomain.rs
, and - execute
cargo run
Consume from the topic:
&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
You should see messages with a String
key (e.g. user-34
) and JSON value:
{
"id": 34,
"email": "user-34@foobar.com"
}
Is there a better way?
Yes! If you are used to the declarative serialization/de-serialization approach in the Kafka Java client (and probably others as well), you may not like this "explicit" approach. Just to put things in perspective, this is how you'd do it in Java:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
....
ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, key, user);
producer.send(record);
Notice that you simply configure the
Producer
to useKafkaJsonSchemaSerializer
and theUser
class is serialized to JSON
rust-rdkafka
provides something similar with the ToBytes trait. Here is what it looks like:
pub trait ToBytes {
/// Converts the provided data to bytes.
fn to_bytes(&self) -> &[u8];
}
Self-explanatory, right? There are existing implementations for String
, Vec<u8>
etc. So you can use these types as key or value without any additional work - this is exactly what we just did. But the problem is the way we did it was "explicit" i.e. we converted the User
struct into a JSON String and passed it on.
What if we could implement ToBytes
for User
?
impl ToBytes for User {
fn to_bytes(&self) -> &[u8] {
let b = serde_json::to_vec_pretty(&self).expect("json serialization failed");
b.as_slice()
}
}
You will see a compiler error:
cannot return value referencing local variable `b`
returns a value referencing data owned by the current function
For additional background, please refer to this GitHub issue. I would happy to see an example other which can work with
ToBytes
- please drop in a note if you've inputs on this!
TL;DR is that it's best to stick to the "explicit" way of doing things unless you have a ToBytes
implementation that "does not involve an allocation and cannot fail".
Wrap up
That's it for the first part. Part 2 will cover topics around Kafka consumer.
Top comments (1)
An idea about the
impl ToBytes for User
problem:The issue is that in
to_bytes
we allocate aVec<u8>
on the heap and then want to return a pointer to the underlying byte-array. But theVec
goes out of scope whento_bytes
ends. This cleans up theVec
and the pointer would be invalid if would be allowed to use it after the method returns.Save but sad.
One option would be to build a wrapper around the
User
that contains aninner: User
and an extra fieldbytes
, that contains aVec
of u8.Or, which is what I will show here, we could store the
Vec
directly on the User. While the field is excluded from serialization. Like so:Then there can be an extra method that initializes the bytes:
Which means, that as long as we initialize the bytes field before we call
to_bytes
the compiler allows us to return a pointer to the underlying bytes-array. Because the compiler knows that this pointer is valid as long as theVec
is valid, which in turn will not be dropped until theUser
instance goes our of scope.The
impl
ofToBytes
could look like this:The full example can be seen at this playground-link:
play.rust-lang.org/?version=stable...