DEV Community

Goutham
Goutham

Posted on • Updated on

Hands-On Guide: Implementing Debezium for PostgreSQL to Kafka Integration

In Part 1, we explored the workings of Debezium and its integration process. This post will guide us through implementing the Debezium engine, enabling it to connect to a PostgreSQL database and export the required records to Kafka.

Let's solve below usecase

A small online bookstore wants to notify its customers in real-time when books they are interested in become available or when their stock levels are low, encouraging them to make a purchase decision.

usecase

For this experiment , lets implement a solution which can create a channel for us to notific user's in real time for any new books inserted to book table

We can implement this end to end solution in 3 simple steps

  1. Install Postgres and Kafka modules using Docker.
  2. Configuring necessary permission for Debezium User.
  3. Developing the Embedded Debezium engine (using spring boot).

Step 1

Below Docker compose file let us to spin the tech stack

  • Install Postgres ( our active Database )
  • Install Kafka , Zookeeper ( Debezium uses to export the DB records)
  • Kafkadrop ( to see the data in action )
version: '3.8'

services:
  # PostgreSQL service
  postgres:
    image: postgres:latest
    container_name: my-postgres-container
    environment:
      POSTGRES_DB: book_store
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
    ports:
      - "5432:5432"
    volumes:
      - ./postgres-data:/var/lib/postgresql/data
      - ./custom-postgresql.conf:/etc/postgresql/postgresql.conf
    command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]

  # Apache Kafka service using Confluent version
  kafka:
    image: confluentinc/cp-kafka:6.2.1
    container_name: my-kafka-container
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
    ports:
      - "9092:9092"
    volumes:
      - ./kafka-data:/var/lib/kafka/data
    depends_on:
      - zookeeper

  # Zookeeper service required for Kafka
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: my-zookeeper-container
    ports:
      - "2181:2181"

  # kafdrop service required for kafka ui
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: my-kafdrop-container
    environment:
      KAFKA_BROKERCONNECT: kafka:9093
      JVM_OPTS: "-Xms32M -Xmx64M"
    ports:
      - "9000:9000"
    depends_on:
      - kafka

Enter fullscreen mode Exit fullscreen mode

save the docker file and use docker compose to run

docker-compose -f debezium.yaml up
Enter fullscreen mode Exit fullscreen mode

Run docker

  • Ensure that PostgreSQL is configured with wal_level=logical to enable logical replication. This setting informs PostgreSQL that logical replication is needed, allowing Debezium to interpret the WAL (Write-Ahead Logging) log accurately and extract detailed information about the affected rows.

Wallevel

Step 2

Configuring postgres publication to be use by debezium

create publication bookstore_replication for table book_store.book_inventory ;
Enter fullscreen mode Exit fullscreen mode

Step 3

  1. In this step we create a simple Spring Boot application.
  2. Configure this app to subscribe to changes from our book_store inventory table.
  3. We will read all the DML operations from the DB and publish them as events to kafka.

Embedded Engine

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.3</version>
        <relativePath/> <!-- lookup parent from repository -->

    </parent>
    <groupId>com.tech.debezium</groupId>
    <artifactId>embeddedDebeziumEngine</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>embeddedDebeziumEngine</name>
    <description>EmbeddedDebeziumEngine</description>


    <properties>
        <version.debezium>2.5.2.Final</version.debezium>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>6.1.4</version>
        </dependency>
        <!-- For Maven -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-core</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-postgres</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version> <!-- Replace with the latest version -->
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-storage-core -->


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Enter fullscreen mode Exit fullscreen mode

Spring Boot Application Class

public static void main(String[] args) {
        SpringApplication.run(EmbeddedDebeziumEngineApplication.class, args);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Configuration postgresDebeziumConfig = io.debezium.config.Configuration.create()
                .with("name", "postgres-inventory-connector")
                .with("bootstrap.servers","localhost:9092")
                .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
                .with("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore")
                .with("offset.storage.topic", "debezium_bookstore_lsn")
                .with("offset.storage.partitions", "1")
                .with("offset.storage.replication.factor", "1")
                .with("offset.flush.interval.ms","6000")
                .with("database.hostname", "localhost")
                .with("database.port", "5432")
                .with("database.user", "myuser")
                .with("database.password", "mypassword")
                .with("database.dbname", "book_store")
                .with("topic.prefix", "book_store")
                .with("table.include.list", "book_store.book_inventory")
                .with("slot.name","bookstore_replication")
                .with("plugin.name","pgoutput")
                .with("snapshot.mode","initial")
                .build();

        PostgresEventHandler changeEventProcessor = new PostgresEventHandler(properties);
        debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(postgresDebeziumConfig.asProperties())
                .notifying(changeEventProcessor::handleChangeEvent)
                .build();

        executorService = Executors.newSingleThreadExecutor();
        executorService.execute(debeziumEngine);
        // Start the Debezium engine
        debeziumEngine.run();
    }
Enter fullscreen mode Exit fullscreen mode

Here is my handler class

package com.tech.debezium.embeddedDebeziumEngine.handler;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.tech.debezium.embeddedDebeziumEngine.model.InventoryEvent;
import io.debezium.data.Envelope;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.Properties;

import static io.debezium.data.Envelope.FieldName.OPERATION;


public class PostgresEventHandler {

    private final KafkaProducer<String, String> kafkaProducer;
    private static final String TOPIC_NAME = "bookstore_inventory_stream";
    private static final String ERROR_MESSAGE = "Exception occurred during event handling";
    private static final Logger logger = LoggerFactory.getLogger(PostgresEventHandler.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    public PostgresEventHandler(Properties kafkaProperties) {
        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
    }


    public void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
        Struct sourceRecordChangeValue = (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            try {
                Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
                Optional<InventoryEvent> event = getProductEvent(sourceRecord, operation);
                if (event.isPresent()) {
                    String jsonEvent = objectMapper.writeValueAsString(event.get());
                    kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, jsonEvent));
                }
            } catch (Exception e) {
                logger.error(ERROR_MESSAGE, e);
            }
        }
    }

    private Optional<InventoryEvent> getProductEvent(SourceRecord event, Envelope.Operation op) {
        final Struct value = (Struct) event.value();
        Struct values = null;

        // Since the operations for CREATE and READ are identical in handling,
        // they are combined into a single case.
        switch (op) {
            case CREATE:
            case READ:
            case UPDATE: // Handle UPDATE similarly to CREATE and READ, but you're now aware it's an update.
                values = value.getStruct("after");
                break;
            case DELETE:
                values = value.getStruct("before");
                if (values != null) {
                    Integer id = values.getInt32("id");
                    return Optional.of(new InventoryEvent(op.toString(), id, null, null));
                } else {
                    return Optional.empty();
                }

            default:
                // Consider whether you need a default case to handle unexpected operations
                return Optional.empty();
        }

        if (values != null) {
            String name = values.getString("name");
            Integer id = values.getInt32("id");
            Double price = (Double) values.get("price");
            return Optional.of(new InventoryEvent(op.toString(), id, name, price));
        } else {
            return Optional.empty();
        }

    }
}
Enter fullscreen mode Exit fullscreen mode

Once everything is set up, let's see Debezium in action

  1. Start our Debezium server , this should subscribe to our database and push the recieved messages to kafka
    SpringBoot app

  2. Navigate to Postgres and create/delete/update records.
    Postgres Operations

  3. Ensure that all events are captured in Kafka using kafkdrop , and you should see 2 topics

    • bookstore_inventory_stream ( This topic contains the actual events corresponding to changes in our inventory table)
    • debezium_bookstore_lsn ( This topic is utilized by Debezium to store the log sequence number up to which the engine has read. This ensures that in the event of restarts, Debezium can resume streaming from the precise position where it left off.)

Kafdrop

Once the data is available in kafka , we can create multiple consumers based on our need to notify users, will cover it in separate post. Here is overall setup and how it looks

End to End

The complete source code is available on repository

Top comments (0)