DEV Community

amaendeepm
amaendeepm

Posted on

Kafka with Rust using rdkafka

This Rust binding for the librdkafka C library allows you to harness Kafka's powerful messaging capabilities while benefiting from Rust's safety and performance.

First, you'll want to add the rdkafka dependency to your Cargo.toml file:

[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "naive-runtime", "tracing", "tokio","zstd"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
Enter fullscreen mode Exit fullscreen mode

You’ll also need to have librdkafka installed on your system. If you're on macOS, you can easily install it using Homebrew:

brew install librdkafka
Enter fullscreen mode Exit fullscreen mode

For Ubuntu or other Debian-based systems, use:

sudo apt-get install librdkafka-dev
Enter fullscreen mode Exit fullscreen mode

*Setting Up the Kafka Client
*

Here’s a sample implementation:

use rdkafka::{
    message::OwnedHeaders,
    producer::{BaseRecord, Producer, ProducerContext, ThreadedProducer},
    ClientConfig, ClientContext, Message,
};
use std::{env, time::Duration};

/// Kafka Client
pub struct Client {}

impl Client {
    /// Publish on the Kafka client
    pub async fn publish(
        headers: OwnedHeaders,
        message: &str,
        key: &str,
        topic: &str,
    ) -> anyhow::Result<()> {
        tracing::debug!(event="publish-start", key=?key, topic=?topic);

        let brokers = env::var("KAFKA_BROKERS").unwrap_or_else(|_| "localhost:9092".into());

        let producer: ThreadedProducer<ProducerCallback> = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("acks", "all")
            .set("enable.idempotence", "true")
            .set("compression.type", "zstd")  // Enable zstd compression
            .set("queue.buffering.max.ms", "0")
            .set("retries", "10")
            .set("request.timeout.ms", "10000")
            .create_with_context(ProducerCallback {})
            .expect("unable to create kafka producer");

        // Send the message
        let _ = producer.send(
            BaseRecord::to(topic)
                .payload(message)
                .key(key)
                .headers(headers),
        );

        // Flush the producer to ensure the message is sent
        let _ = producer.flush(Duration::from_secs(5));

        Ok(())
    }
}

Enter fullscreen mode Exit fullscreen mode

Implementing Retry Logic

Network issues or broker unavailability can cause message publishing to fail. To handle this, we can add a function for publishing messages with an exponential backoff strategy:

/// Publish on Kafka with an exponential backoff
pub(crate) async fn publish_with_retry(
    headers: OwnedHeaders,
    message: &str,
    key: &str,
    topic: &str,
    interval: u64,
) -> anyhow::Result<()> {
    const MAX_ATTEMPTS: u64 = 10;
    let mut publish_attempts = 1;

    while let Err(err) = Client::publish(headers.clone(), message, key, topic).await {
        tracing::error!(event="publish", msg="failed", err=?err);

        if publish_attempts > MAX_ATTEMPTS {
            tracing::error!(event="publish", msg="exceeded retries", err=?err);
            return Err(KafkaError::RetryExceeded.into());
        } else {
            publish_attempts += 1;
            tracing::error!(event="publish", msg="retrying", err=?err, retry_count=publish_attempts);

            // Note: exponential backoff happens here
            tokio::time::sleep(Duration::from_secs(interval + (2 * publish_attempts))).await;
        }
    }

    Ok(())
}

Enter fullscreen mode Exit fullscreen mode

This function retries sending the message, waiting longer between attempts, and gives up after a set number of retries.

Error Handling
For better error management during publishing, defining a custom error type can be useful:

/// Kafka Error
#[derive(Debug, thiserror::Error)]
pub enum KafkaError {
    #[error("Retry exceeded on trying to publish message")]
    RetryExceeded,
}

Enter fullscreen mode Exit fullscreen mode

Producer Callback for Delivery Reports
To handle delivery reports for messages sent, you can implement a ProducerCallback struct. This allows you to manage responses to successful deliveries and failures:

/// Producer Callback for Kafka
struct ProducerCallback {}

impl ClientContext for ProducerCallback {}

impl ProducerContext for ProducerCallback {
    type DeliveryOpaque = ();

    /// Handle delivery report from the producer
    fn delivery(
        &self,
        delivery_result: &rdkafka::producer::DeliveryResult<'_>,
        _delivery_opaque: Self::DeliveryOpaque,
    ) {
        let dr = delivery_result.as_ref();

        match dr {
            Ok(msg) => {
                let key: &str = msg.key_view().expect("unable to get key view").unwrap();

                tracing::debug!(
                    event = "publish",
                    key = key,
                    offset = msg.offset(),
                    partition = msg.partition()
                );
            }
            Err(err) => {
                tracing::warn!(event="publish", state="fail",
                    content=?err);
            }
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

This callback provides insights into the delivery status of your messages, allowing you to react appropriately to successes and failures.

With this setup, you can effectively publish messages to Kafka using Rust’s rdkafka library while utilizing zstd compression.

Top comments (0)