DEV Community

Emre Odabas
Emre Odabas

Posted on

Kafka Exception C{r}onsumer ๐Ÿ”ฅ ๐Ÿš€

As a Trendyol Indexing Team, our architecture is heavily based on an event-based system that should be consistent, eventually. So even though this word seems to give relief, it is tough to ensure that every message in the system is successfully processed.

Our eventing system relies on Apache Kafka. It gives us powerful abilities when we need performance on event streaming. Our messages carry invalidations (daily 150M+) that should apply to Trendyol contents (300M+). This invalidation could be about a stock, promotion, or one of our 30 events.

In events worlds, we need to orchestrate our messages even in some inconsistent states. Retrying and delaying messages are our key strategies in this โ€œeventualโ€ life cycle of events. We have 10+ consumers that should apply this strategy. Therefore, we created an open-sourced library called Kafka Cronsumer for easy implementation.

Figure 1: Show me the code

How Kafka Cronsumer Works ๐Ÿ’ก

As the library name suggests, it consumes events within the exception topic based on the given cron expression.

For example, we could specify a cron expression as */20 * * * *, which means to run every 20th minutes. We could also set a duration 15 minutes (15m) represents our exception consumer actively consuming events within this fixed duration.

  • If the consumer gets an error when consuming a message, it increases the message retry count and produces the exception topic for handling the next iteration (next work time) again.
  • A message is discarded or moved to a dead letter if it exceeds maxRetry value.
  • Each message is processed only once at every iteration. So if the consumer encounters a message produced after our consumer start time, it pauses and waits for the next iteration.

As an overview, it works shown below.

Figure 2: Kafka Exception Cronsumer timeline overview

How to use it? ๐Ÿ‘ˆ

First of all, we need to set the required config values. We can also specify Kafka Consumer & Producer related configs, but it is not mandatory.

  groupId: "exception-consumer"
  topic: "exception"
  maxRetry: 3
  concurrency: 1
  cron: "*/20 * * * *"
  duration: 15m
Enter fullscreen mode Exit fullscreen mode

We define consumeFn function, which describes how the exception messages are consumed.

After that, we initialize Kafka cronsumer with these config values and consumeFn and run. It starts to work as shown above in the how to Kafka Cronsumer works section.

func main() {
        // ..
    var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
        fmt.Printf("consumer > Message received: %s\n", string(message.Value))
        return nil

    c := cronsumer.New(kafkaConfig, consumeFn)
Enter fullscreen mode Exit fullscreen mode

You can find a number of ready-to-run examples at this directory.

When to use it? ๐Ÿค”

  • Iteration-based back-off strategies are applicable
  • Messages could be processed in an eventually consistent state
  • Max retry exceeded messages could be ignored
  • To increase consumer resiliency -To increase consumer performance with concurrency

When to avoid?โ—๏ธ

  • Messages should be processed in order
  • Messages should be certainly processed (we discard messages if max retry is exceeded)
  • Messages should be committed (we use auto-commit interval for increasing performance)
  • Messages with TTL (Time to Live)

Thanks for reading this far. All feedbacks are welcome. We love to share our knowledge. If you like to โ€œshareโ€ with us, you could apply for our open positions.

Co-Authored by:

Top comments (0)