DEV Community

Abhishek Gupta
Abhishek Gupta

Posted on • Originally published at abhishek1987.Medium

Getting started with Kafka and Rust: Part 2

This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C library).

In this post we will cover the Kafka Consumer API.

Alt Text

Initial setup

Make sure you install a Kafka broker - a local setup should suffice. Of course you will need to have Rust installed as well - you will need version 1.45 or above

Before you begin, clone the GitHub repo:

git clone https://github.com/abhirockzz/rust-kafka-101
cd part2
Enter fullscreen mode Exit fullscreen mode

Simple Consumer

Creating a low-level consumer (BaseConsumer) is strikingly similar to how you'd create the its counterpart - BaseProducer. The only difference is that you will have to cast the output to the right type (which in this case is BaseConsumer)

    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("group.id", "my_consumer_group")
        .create()
        .expect("invalid consumer config");

    consumer
        .subscribe(&["rust"])
        .expect("topic subscribe failed");
Enter fullscreen mode Exit fullscreen mode

Notice that the group.id config has also been included .set("group.id", "my_consumer_group") - its mandatory.

Once a BaseConsumer is created, one can subscribe to one or more topics (in this case, its just one topic with the name rust).

To fetch messages from the topic, we start (spawn) a new thread:

    thread::spawn(move || loop {
        for msg_result in consumer.iter() {
            let msg = msg_result.unwrap();
            let key: &str = msg.key_view().unwrap().unwrap();
            let value = msg.payload().unwrap();
            let user: User = serde_json::from_slice(value).expect("failed to deser JSON to User");
            println!(
                "received key {} with value {:?} in offset {:?} from partition {}",
                key,
                user,
                msg.offset(),
                msg.partition()
            )
        }
    });
Enter fullscreen mode Exit fullscreen mode

It accepts a closure which in this case happens to be a infinite loop that:

  • Receives messages, and,
  • Prints out the key, value along with offset and partition info

Calling iter on the consumer is just a short-cut invoking poll without any timeout.

Other variations are also possible. You can use poll directly:

loop {
  let message = consumer.poll(Duration::from_secs(2));
    ...
}
Enter fullscreen mode Exit fullscreen mode

Or, use this format:

for message in &consumer {
...
}
Enter fullscreen mode Exit fullscreen mode

Run the program

  • Rename the file src/1_consumer_simple.rs to main.rs, and
  • execute cargo run

Output:

sending message
sending message
produced message with key user-1 in offset 25 of partition 2
produced message with key user-2 in offset 12 of partition 4
sending message
produced message with key user-3 in offset 20 of partition 0
received key user-3 with value User { id: 3, email: "user-3@foobar.com" } in offset 20 from partition 0
sending message
produced message with key user-4 in offset 24 of partition 3
received key user-4 with value User { id: 4, email: "user-4@foobar.com" } in offset 24 from partition 3
sending message
produced message with key user-5 in offset 25 of partition 3
received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 25 from partition 3
sending message
produced message with key user-6 in offset 26 of partition 3
received key user-6 with value User { id: 6, email: "user-6@foobar.com" } in offset 26 from partition 3
sending message
produced message with key user-7 in offset 27 of partition 3
received key user-7 with value User { id: 7, email: "user-7@foobar.com" } in offset 27 from partition 3
Enter fullscreen mode Exit fullscreen mode

As expected:

  • You see the producer callbacks - confirms that the message was sent to Kafka
  • Consumer received the message as well - as confirmed by the log

What about Consumer callbacks?

Yes, just like the producer, the consumer API also has callbacks for:

  • Re-balancing
  • Offset commit

To do this, we need to implement the ConsumerContext trait. We will:

struct ConsumerCallbackLogger;
impl ClientContext for ConsumerCallbackLogger {}
impl ConsumerContext for ConsumerCallbackLogger {
...
}
Enter fullscreen mode Exit fullscreen mode

We will skip the pre_rebalance method and focus on post_rebalance in this example:

    fn post_rebalance<'a>(&self, rebalance: &rdkafka::consumer::Rebalance<'a>) {
        println!("post_rebalance callback");

        match rebalance {
            Rebalance::Assign(tpl) => {
                for e in tpl.elements() {
                    println!("rebalanced partition {}", e.partition())
                }
            }
            Rebalance::Revoke => {
                println!("ALL partitions are REVOKED")
            }
            Rebalance::Error(err_info) => {
                println!("Post Rebalance error {}", err_info)
            }
        }
    }
Enter fullscreen mode Exit fullscreen mode

Rebalance is an enum. As a part of the implementation, we match it against all the possible options (partitions assigned, partitions revoked, rebalance error) and simply log it.

    fn commit_callback(
        &self,
        result: rdkafka::error::KafkaResult<()>,
        offsets: &rdkafka::TopicPartitionList,
    ) {
        match result {
            Ok(_) => {
                for e in offsets.elements() {
                    println!(
                        "committed offset {:?} in partition {}",
                        e.offset(),
                        e.partition()
                    )
                }
            }
            Err(err) => {
                println!("error committing offset - {}", err)
            }
        }
    }
Enter fullscreen mode Exit fullscreen mode

For commit callback events, we match on the KafkaResult (available in the commit_callback parameter) to check whether the commit was successful. If it was, we simply print out the committed offset in the partition or log the error that occurred during the commit process.

Once this is done, we simply need to plug-in our new implementation:

    let consumer: BaseConsumer<ConsumerCallbackLogger> = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092",)
        ....
        .create_with_context(ConsumerCallbackLogger {})
        .expect("invalid consumer config");
Enter fullscreen mode Exit fullscreen mode

To do this, we made a couple of changes:

  • Use create_with_context (instead of create)
  • use BaseConsumer<ConsumerCallbackLogger>

Run the program

  • Rename the file src/2_consumer_callback.rs to main.rs, and
  • execute cargo run
sending message
sending message
produced message with key user-1 in offset 0 of partition 2
post_rebalance callback
rebalanced partition 0
rebalanced partition 1
rebalanced partition 2
rebalanced partition 3
rebalanced partition 4
rebalanced partition 5
produced message with key user-2 in offset 0 of partition 4
sending message
produced message with key user-3 in offset 0 of partition 0
received key user-3 with value User { id: 3, email: "user-3@foobar.com" } in offset 0 from partition 0
sending message
committed offset Offset(1) in partition 0
committed offset Offset(1) in partition 4
produced message with key user-4 in offset 0 of partition 3
received key user-4 with value User { id: 4, email: "user-4@foobar.com" } in offset 0 from partition 3
Enter fullscreen mode Exit fullscreen mode

As expected, the re-balance events were logged along with the successful commits.

Trigger a Re-balance

Partition assignment happens the first time when you start the application and you're able to witness this, thanks to our ConsumerContext implementation. You can also trigger the rebalance again by starting the new instance of the application. Since there are two instances in the same consumer group, the topic partitions will be rebalanced. For e.g. if you had 6 partitions in the topic, they will be equally split up amongst these two instances.

You should see log messages similar to this:

....
# instance 1
post_rebalance callback
rebalanced partition 0
rebalanced partition 1
rebalanced partition 2
...

# instance 2
post_rebalance callback
rebalanced partition 3
rebalanced partition 4
rebalanced partition 5
Enter fullscreen mode Exit fullscreen mode

Switching to Manual commit

By default, the offset commit process is taken care of by the library itself. But we can exercise more control over it by switching to manual mode.

First thing would be do set enable.auto.commit to false - set("enable.auto.commit", "false");

At-least once delivery

To achieve this, we need to make sure we indeed process the message successfully before committing the offset. To simulate this, let's write a function (named process) that can fail randomly. We will then use this in our consumer loop and commit only when this functions returns successfully.

fn process(u: User) -> Result<(), ()> {
    let mut rnd = rand::thread_rng();
    let ok = rnd.gen_bool(1.0 / 2.0); //50% probability of returning true
    match ok {
        true => {
            println!("SUCCESSFULLY processed User info {:?}", u);
            Ok(())
        }
        false => {
            println!("FAILED to process User info {:?}", u);
            Err(())
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We will need to modify our consumer loop"

  • Add manual offset commit based on response from the process function
  • Add a label ('consumer_thread) to our thread loop
    thread::spawn(move || 'consumer_thread: loop {
        for msg_result in consumer.iter() {
            //..... omitted

            println!(
                "received key {} with value {:?} in offset {:?} from partition {}",
                key,
                user,
                msg.offset(),
                msg.partition()
            );

            let processed = process(user);
            match processed {
                Ok(_) => {
                    consumer.commit_message(&msg, CommitMode::Sync);
                }
                Err(_) => {
                    println!("loop encountered processing error");
                    break 'consumer_thread;
                }
            }
        }
    });
Enter fullscreen mode Exit fullscreen mode

We call process - this is to simulate processing of each record received by the consumer. In case the processing succeeds (returns Ok), we commit the record using commit_message.

Note that the commit itself may fail. This should ideally be handled in the commit_callback implementation of ConsumerContext

Run the program

  • Rename the file src/3_manual_commit.rs to main.rs, and
  • Execute cargo run

The program output is lengthy, but bear with me.

Output:

produced message with key user-1 in offset 22 of partition 2
produced message with key user-2 in offset 28 of partition 4
post_rebalance callback
rebalanced partition 0
rebalanced partition 1
rebalanced partition 2
rebalanced partition 3
rebalanced partition 4
rebalanced partition 5
received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 52 from partition 3
SUCCESSFULLY processed User info User { id: 5, email: "user-5@foobar.com" }
committed offset Offset(53) in partition 3
received key user-2 with value User { id: 2, email: "user-2@foobar.com" } in offset 28 from partition 4
SUCCESSFULLY processed User info User { id: 2, email: "user-2@foobar.com" }
produced message with key user-3 in offset 35 of partition 0
committed offset Offset(29) in partition 4
received key user-1 with value User { id: 1, email: "user-1@foobar.com" } in offset 22 from partition 2
FAILED to process User info User { id: 1, email: "user-1@foobar.com" }
loop encountered processing error. closing consumer...
post_rebalance callback
ALL partitions have been REVOKED
Enter fullscreen mode Exit fullscreen mode

Notice these logs messages when process returns successfully:

  1. received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 52 from partition 3
  2. SUCCESSFULLY processed User info User { id: 5, email: "user-5@foobar.com" }
  3. committed offset Offset(52) in partition 3

For a failure scenario:

  1. received key user-1 with value User { id: 1, email: "user-1@foobar.com" } in offset 22 from partition 2
  2. FAILED to process User info User { id: 1, email: "user-1@foobar.com" }
  3. loop encountered processing error. closing consumer...

We ended up stopping the consumer when processing failed? The question here is:

How to handle messages that did not get processed?

Note that failure could happen due to many reasons. A couple of them are:

  • Processing failed (this is what we simulated in this example), or,
  • Processing was successful, but the commit failed

If we continue with our consumer loop after a failed message, we could end up losing messages (data loss). Why? It's because the commit_message method also marks smaller offsets (less that the one being handled) as committed. For e.g. if you had a scenario where offset 20 from partition 5 failed to get processed (and committed), you continue processing and offset 21 from partition 5 was processed and committed successfully, you will end up missing data from offset 20 - this is because committing offset 21 will also commit offsets 20 and below. Even after you re-start the application, this will not be detected.

To prevent this...

You can either:

  • Halt the consumer process after detecting the first failure. In this example, we do this by exiting our consumer thread itself (this is not acceptable for real-world applications though). When you restart the application, the processing will begin from the last committed offset and the failed message will be picked up and re-processed.
  • Even better - You can handle this in commit_callback by sending this data to another Kafka topic (also know as a "dead letter topic") which can be processed separately.

Other considerations

This is by no means an exhaustive list or coverage of all the delivery semantics:

  • We did not cover at-most once and exactly once.
  • You may want to choose the use Async commit mode - this has it's own set of caveats.
  • Committing each and every message (even asynchronously) carries overhead. You may want to commit messages/offsets in batches. As always, you need to take care of a lot of corner cases here as well.

That's all for this two part series on getting started with Rust and Kafka using the rust-rdkafka library. We covered:

  • A simple producer
  • Producer with delivery callback
  • How to send JSON payloads
  • A basic consumer
  • Handle re-balance and offset commit callbacks
  • Explore manual commit and at-least once delivery semantics

Discussion (0)