DEV Community

Cover image for How to Process Events Exactly-Once with Kafka and DBOS
Peter Kraft for DBOS, Inc.

Posted on

How to Process Events Exactly-Once with Kafka and DBOS

Many applications built on event streaming systems like Apache Kafka need exactly-once semantics for data integrity. In other words, they need to guarantee that events are processed exactly one time and are never under- or over-counted. However, exactly-once semantics are hard to obtain in distributed systems, so many event streaming platforms default to weaker guarantees. For example, Kafka natively supports exactly-once semantics only for stream processing, not for general applications. As a result, developers who need exactly-once semantics must invent complicated architectures to obtain it.

In this post, we’ll show you how the open-source TypeScript framework DBOS Transact simplifies makes exactly-once event processing easy. We’ll demonstrate how to write, in <15 lines of code, a Kafka consumer that processes every message sent to a topic exactly one time, regardless of interruptions, crashes, or failures.

How exactly-once Kafka message processing works with DBOS Transact

Implementing exactly-once semantics in a distributed system is challenging because it’s theoretically impossible: the sender of a message cannot always verify whether an unresponsive receiver failed to process their message or is just being slow to acknowledge receipt. To get around this problem, a common strategy is to deliver a message multiple times, but design the receiver to process each event idempotently so that duplicate message deliveries have no effect. However, developing idempotent applications is also notoriously difficult!

DBOS Transact makes exactly-once event processing easy by making idempotency easy. You can process Kafka events exactly-once with two different types of operations:

When your consumer receives a message from Kafka, DBOS Transact constructs an idempotency key for that message from the message’s topic, partition, and offset–a combination that is guaranteed to be unique for a Kafka cluster. What happens next depends on whether you’re executing a transaction or a workflow.

Exactly-once transactions

If you’re processing the message with a single transaction, DBOS Transact synchronously executes the transaction, records its idempotency key as part of that transaction, then commits the message’s offset to Kafka. This guarantees exactly-once processing because:

  • If transaction execution fails, the offset isn’t committed, so Kafka retries the message later.
  • If a duplicate message arrives, DBOS Transact checks the idempotency key and doesn’t re-execute the transaction (the transaction logic also handles corner cases where the message arrives twice simultaneously–a subject for a future blog post).

Exactly-once workflows

If you’re processing the message with a workflow, DBOS Transact synchronously records the idempotency key, commits the message’s offset to Kafka, then asynchronously processes the workflow. This guarantees exactly-once processing because:

  • If recording the idempotency key fails, the offset isn’t committed, so Kafka will send the message again later.
  • If the workflow fails, it will always recover and resume from where it left off because DBOS workflows are reliable.
  • If a duplicate message arrives, DBOS Transact checks the idempotency key and doesn’t start a new workflow.

Getting Started with Kafka and DBOS

Now, let’s see what this looks like in code. The first step to exactly-once semantics is to write your event processing code in TypeScript using DBOS Transact. You can write a single database transaction or a workflow coordinating multiple operations. For now, let’s keep it simple:

import { Workflow, WorkflowContext } from '@dbos-inc/dbos-sdk';

class KafkaExample{
  @Workflow() 
  static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {      
    ctxt.logger.info(`Message received: ${message.value?.toString()}`) 
  }
}
Enter fullscreen mode Exit fullscreen mode

Next, annotate your method with a @KafkaConsume decorator specifying which topic to consume events from. Additionally, annotate your class with an @Kafka decorator defining which brokers to connect to. That’s all there is to it! When you start your application, DBOS Transact will invoke your method exactly-once for each message sent to the topic.

import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Workflow, WorkflowContext, Kafka, KafkaConsume } from '@dbos-inc/dbos-sdk';

@Kafka({ brokers: ['localhost:9092']})
class KafkaExample{
  @KafkaConsume("example-topic")
  @Workflow()
  static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
    ctxt.logger.info(`Message received: ${message.value?.toString()}`) 
  }
}
Enter fullscreen mode Exit fullscreen mode

If you need more control, you can pass KafkaJS configuration objects to both the class and method Kafka decorators. For example, you can specify a custom consumer group ID:

@KafkaConsume("example-topic", { groupId: "custom-group-id" })
@Workflow()
static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) { 
  ctxt.logger.info(`Message received: ${message.value?.toString()}`)
}
Enter fullscreen mode Exit fullscreen mode

To get started with DBOS Transact, check out the quickstart and docs. For more information on using Kafka with DBOS Transact, check out this docs page. To join our community:

Top comments (0)