En este artículo aprenderás a crear una aplicación con capacidades de Change Data Capture (CDC) utilizando Debezium, Apache Kafka y Apache NiFi. Este enfoque te permitirá capturar, procesar y transmitir cambios en tus datos en tiempo real, habilitando flujos de datos modernos y reactivos. Al final del artículo encontrarás un enlace a un repositorio de GitHub con código de ejemplo para que puedas replicar el entorno en tu máquina.
¿Qué es el Change Data Capture (CDC)?
El Change Data Capture (CDC) es un patrón que permite capturar las mutaciones de datos (inserciones, actualizaciones y eliminaciones) ocurridas en un sistema de origen. En lugar de depender de lecturas periódicas (batch) que pueden ser costosas e ineficientes, el CDC detecta y emite eventos tan pronto como se producen, lo que facilita la replicación, sincronización y análisis en tiempo real.
Algunas ventajas del CDC:
- Sincronización de datos en tiempo real: Mantener múltiples sistemas alineados sin importar la frecuencia de cambios.
- Análisis en vivo: Procesar datos en el momento exacto en que ocurren los eventos, ideal para dashboards, alertas o análisis inmediato.
- Menor latencia: Comparado con procesos ETL tradicionales, se evitan los “batch windows”.
- Escalabilidad y resiliencia: Integrado con sistemas de mensajería distribuida como Kafka, se logra un pipeline flexible y escalable.
Herramientas Clave en la Arquitectura
Debezium: Una plataforma de CDC que se integra con distintos motores de base de datos (MySQL, PostgreSQL, Oracle, MongoDB, etc.). Debezium captura los cambios a nivel de binlog (o equivalente) y los convierte en eventos, que luego publica en Kafka.
Apache Kafka: Un sistema de mensajería distribuido, escalable y altamente disponible. Sirve como capa intermedia para almacenar y transmitir los eventos generados por Debezium. Los consumidores pueden leer estos eventos en tiempo real.
Apache NiFi: Una herramienta de integración y automatización de flujos de datos basada en una interfaz gráfica (low-code), que facilita la transformación, enrutamiento y distribución de datos hacia sistemas destino (bases de datos analíticas, data lakes, almacenamiento en la nube, etc.).
Arquitectura General del Pipeline
La arquitectura típica consta de:
- Base de Datos Origen: Donde se ejecutan transacciones. Por ejemplo, una base MySQL.
- Debezium: Detecta cambios y produce eventos.
- Kafka: Recibe eventos desde Debezium y los pone a disposición de cualquier consumidor.
- NiFi u otras Aplicaciones: Consumen, transforman y reenvían estos datos a sistemas de destino (por ejemplo, ElasticSearch, HDFS, otra base de datos).
- Servicios Destino: Donde se utilizan los datos (reportes, dashboards, aplicaciones web, análisis en tiempo real).
Pasos para Implementar el Pipeline
1. Configurar la Base de Datos de Origen
Asume que usas MySQL. Asegúrate de habilitar el binlog y el formato de registro adecuado (ROW):
[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1
Crea un usuario con permisos para que Debezium pueda leer el binlog.
2. Levantar Kafka y Debezium con Docker
Crea un archivo docker-compose.yml similar a:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.0.1
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
debezium:
image: debezium/connect:1.9
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_config
OFFSET_STORAGE_TOPIC: debezium_offset
STATUS_STORAGE_TOPIC: debezium_status
depends_on:
- kafka
3. Configurar el Conector Debezium
Con Debezium en marcha, registra un conector que apunte a tu base de datos MySQL. Por ejemplo:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "tu_mysql_host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debeziumpwd",
"database.server.id": "184054",
"database.server.name": "fullfillment_app",
"database.include.list": "mydb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mydb"
}
}'
4. Consumir Eventos con Apache NiFi
En NiFi, utiliza un ConsumeKafkaRecord_2_x para leer eventos de un tópico específico. Luego, con procesadores como JoltTransformJSON puedes transformar el payload. Finalmente, envía los datos con PutDatabaseRecord a otra base de datos, o PutElasticsearchHttpRecord a un índice Elasticsearch.
El flujo podría ser:
[ConsumeKafkaRecord_2_x] -> [JoltTransformJSON] -> [PutDatabaseRecord]
Adapta los procesadores según tus necesidades y ajusta las propiedades (tópico Kafka, host de destino, etc.).
Top comments (0)