Introduction
Change Data Capture (CDC) has become essential for modern data architectures, enabling real-time data synchronization and event-driven systems. In this guide, we'll walk through building a practical CDC solution using Debezium, focusing on capturing changes from a PostgreSQL database and processing them in real-time.
Project Overview
We'll build a system that:
Captures changes from a PostgreSQL database using Debezium
Streams changes through Apache Kafka
Processes events in a Spring Boot application
Provides real-time updates to a MongoDB database
Architecture
Our solution uses the following components:
- PostgreSQL as the source database
- Debezium connector for CDC
- Apache Kafka for event streaming
- Spring Boot application for processing
- MongoDB as the target database
Implementation Guide
1.Setting Up the Development Environment
First, create a docker-compose.yml file to set up our infrastructure:
version: '3.8'
services:
postgres:
image: debezium/postgres:13
ports:
- "5432:5432"
environment:
POSTGRES_DB: inventory
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- postgres-data:/var/lib/postgresql/data
kafka:
image: confluentinc/cp-kafka:7.0.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
connect:
image: debezium/connect:1.9
ports:
- "8083:8083"
environment:
GROUP_ID: 1
BOOTSTRAP_SERVERS: kafka:29092
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
depends_on:
- kafka
- postgres
mongodb:
image: mongo:5.0
ports:
- "27017:27017"
volumes:
postgres-data:
2. Creating the Source Database Schema
Create the following schema in PostgreSQL:
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_products_updated_at
BEFORE UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION update_updated_at();
3. Configuring Debezium
Register the Debezium PostgreSQL connector:
@PostConstruct
void configureConnector() {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8083/connectors"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString("""
{
"name": "products-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "dbserver1",
"table.include.list": "public.products",
"plugin.name": "pgoutput"
}
}
"""))
.build();
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenAccept(System.out::println)
.join();
}
4. Spring Boot Application
Create a Spring Boot application to process the CDC events:
@SpringBootApplication
public class CdcApplication {
public static void main(String[] args) {
SpringApplication.run(CdcApplication.class, args);
}
}
@Configuration
@EnableKafka
class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cdc-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
class ProductEventProcessor {
private final MongoTemplate mongoTemplate;
public ProductEventProcessor(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@KafkaListener(topics = "dbserver1.public.products")
public void processProductEvent(String changeEvent) {
JsonNode eventNode = new ObjectMapper().readTree(changeEvent);
JsonNode payload = eventNode.get("payload");
String operation = payload.get("op").asText();
JsonNode after = payload.get("after");
switch (operation) {
case "c": // Create
case "u": // Update
Product product = new ObjectMapper().convertValue(after, Product.class);
mongoTemplate.save(product);
break;
case "d": // Delete
JsonNode before = payload.get("before");
mongoTemplate.remove(
Query.query(Criteria.where("id").is(before.get("id").asText())),
Product.class
);
break;
}
}
}
5. Data Model
Create the Product model class:
@Document(collection = "products")
public class Product {
@Id
private String id;
private String name;
private String description;
private BigDecimal price;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Getters and setters
}
Testing the Solution#
1. Start the infrastructure:
docker-compose up -d
2.Insert test data:
INSERT INTO products (name, description, price)
VALUES ('Test Product', 'Description', 99.99);
3.Verify the data is synchronized to MongoDB:
mongo
> use inventory
> db.products.find()
Conclusion#
This implementation demonstrates how to build a robust CDC pipeline using Debezium. The solution provides real-time data synchronization between PostgreSQL and MongoDB, which can be extended to support other databases and use cases.
Key benefits of this approach include:
Minimal impact on source database performance
Real-time data propagation
Reliable event ordering
Scalable architecture
For production deployments, consider adding:
Error handling and retry mechanisms
Monitoring and alerting
Schema evolution support
Data validation and transformation
Top comments (0)