DEV Community

Building Real-Time Data Pipelines with Debezium: A Practical Guide

Introduction

Change Data Capture (CDC) has become essential for modern data architectures, enabling real-time data synchronization and event-driven systems. In this guide, we'll walk through building a practical CDC solution using Debezium, focusing on capturing changes from a PostgreSQL database and processing them in real-time.

Project Overview
We'll build a system that:

  • Captures changes from a PostgreSQL database using Debezium

  • Streams changes through Apache Kafka

  • Processes events in a Spring Boot application

  • Provides real-time updates to a MongoDB database

Architecture
Our solution uses the following components:

  1. PostgreSQL as the source database
  2. Debezium connector for CDC
  3. Apache Kafka for event streaming
  4. Spring Boot application for processing
  5. MongoDB as the target database

Implementation Guide

1.Setting Up the Development Environment
First, create a docker-compose.yml file to set up our infrastructure:

version: '3.8'
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: inventory
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - postgres-data:/var/lib/postgresql/data

  kafka:
    image: confluentinc/cp-kafka:7.0.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  connect:
    image: debezium/connect:1.9
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      BOOTSTRAP_SERVERS: kafka:29092
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status
    depends_on:
      - kafka
      - postgres

  mongodb:
    image: mongo:5.0
    ports:
      - "27017:27017"

volumes:
  postgres-data:
Enter fullscreen mode Exit fullscreen mode

2. Creating the Source Database Schema
Create the following schema in PostgreSQL:

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price DECIMAL(10,2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_products_updated_at
    BEFORE UPDATE ON products
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at();
Enter fullscreen mode Exit fullscreen mode

3. Configuring Debezium
Register the Debezium PostgreSQL connector:

@PostConstruct
void configureConnector() {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create("http://localhost:8083/connectors"))
        .header("Content-Type", "application/json")
        .POST(HttpRequest.BodyPublishers.ofString("""
            {
                "name": "products-connector",
                "config": {
                    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                    "database.hostname": "postgres",
                    "database.port": "5432",
                    "database.user": "postgres",
                    "database.password": "postgres",
                    "database.dbname": "inventory",
                    "database.server.name": "dbserver1",
                    "table.include.list": "public.products",
                    "plugin.name": "pgoutput"
                }
            }
            """))
        .build();

    client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::body)
        .thenAccept(System.out::println)
        .join();
}
Enter fullscreen mode Exit fullscreen mode

4. Spring Boot Application
Create a Spring Boot application to process the CDC events:

@SpringBootApplication
public class CdcApplication {
    public static void main(String[] args) {
        SpringApplication.run(CdcApplication.class, args);
    }
}

@Configuration
@EnableKafka
class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cdc-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

@Service
class ProductEventProcessor {
    private final MongoTemplate mongoTemplate;

    public ProductEventProcessor(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @KafkaListener(topics = "dbserver1.public.products")
    public void processProductEvent(String changeEvent) {
        JsonNode eventNode = new ObjectMapper().readTree(changeEvent);
        JsonNode payload = eventNode.get("payload");

        String operation = payload.get("op").asText();
        JsonNode after = payload.get("after");

        switch (operation) {
            case "c": // Create
            case "u": // Update
                Product product = new ObjectMapper().convertValue(after, Product.class);
                mongoTemplate.save(product);
                break;
            case "d": // Delete
                JsonNode before = payload.get("before");
                mongoTemplate.remove(
                    Query.query(Criteria.where("id").is(before.get("id").asText())), 
                    Product.class
                );
                break;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

5. Data Model
Create the Product model class:

@Document(collection = "products")
public class Product {
    @Id
    private String id;
    private String name;
    private String description;
    private BigDecimal price;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;

    // Getters and setters
}
Enter fullscreen mode Exit fullscreen mode

Testing the Solution#

1. Start the infrastructure:

docker-compose up -d

2.Insert test data:

INSERT INTO products (name, description, price) 
VALUES ('Test Product', 'Description', 99.99);
Enter fullscreen mode Exit fullscreen mode

3.Verify the data is synchronized to MongoDB:

mongo
> use inventory
> db.products.find()
Enter fullscreen mode Exit fullscreen mode

Conclusion#

This implementation demonstrates how to build a robust CDC pipeline using Debezium. The solution provides real-time data synchronization between PostgreSQL and MongoDB, which can be extended to support other databases and use cases.

Key benefits of this approach include:

  • Minimal impact on source database performance

  • Real-time data propagation

  • Reliable event ordering

  • Scalable architecture

For production deployments, consider adding:

  • Error handling and retry mechanisms

  • Monitoring and alerting

  • Schema evolution support

  • Data validation and transformation

Top comments (0)