DEV Community

Suraj
Suraj

Posted on • Edited on

Moving from Hibernate Envers to Debezium

This article is about logging data change events asynchronously.

What is Hibernate Envers ?

Envers provide an easy auditing/versioning solution for entity classes.

Why to move from envers?

Envers is a synchronous audit logging mechanism. So, there is an additional commit transaction required for the log table created by envers. This takes an additional millisecond for the write operation.

And, if there is an error while writing into the log table, then whole data is rolled back (even the actual changes).

What is Debezium ?

Debezium is an open source distributed platform for change data capture.

It lets your apps react every time your data changes. It continuously monitors your databases and streams every row-level change in the same order they were committed in the database to any streaming platform i.e Kafka

It uses Kafka-Connect to stream captured events to Kafka and then from Kafka to other systems.

How debezium uses Kafka connect to stream data change events.

Why Debezium ?

  • Data change is captured asynchronously
  • Reads from the WAL. All changes, even the ones made from SQL console will be captured.
  • Has a range of connectors to capture from (MySql, Postgres) and sink data to (MySql, Postgres, Elastic, etc)

Let's start on a POC, where we will capture data change events from MySql DB and then store the emitted events in a different MySql DB.

We need to first install/download

Steps

  • Extract the downloaded Debezium Connector to get the jars.
  • Move the extracted jars to any classpath as specified in plugin.path
    variable in /kafka/bin/config/connect-distributed.poperties file.
    Or, move the jars to Kafka libs directory

    mv debezium-connector-mysql/* kafka/libs/

  • Extract the Kafka Jbdc Sink Connector jars
    and place them at the same location as we did for Debezium Connector jars.


  • Start zookeeper. Zookeeper is shipped with Kafka, so we don't need to download separately.

/bin/zookeeper-server-start.sh /config/zookeeper.properties

  • Start Kafka

/bin/kafka-server-start.sh /config/server.properties

/bin/connected-distributed.sh /config/connected-distributed.properties

Now we have all the tools needed installed and running.

Let's create connectors

Let us assume the default initial data in the employee table be as shown in the above fig.

Give following access to debezium user, so that it can scan and create a snapshot of the table and database at the time of creating a connector.

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%'

We will first create a source connector that will capture the data change events of the employee table.

curl -X POST \
http://localhost:8083/connectors/ \
-H 'Content-Type: application/json' \
-H 'Postman-Token: f4367baf-f2b3-4763-84d4-21ae2f68faeb' \
-H 'cache-control: no-cache' \
-d '{
"name": "producer-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "12345",
"database.server.name": "localhost",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory",
"include.schema.changes": "true",
"table.whitelist": "employee",
"transforms": "Reroute", "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.Reroute.topic.replacement": "$3"
}
}'

Little about not so obvious input data

i) Data change for each table is published in Kafka topic in the format
server_name.database_name.table_name.
ii) So, here we will transform the topic before they are created using transform. The new topics that will be created will be of form table_name

We now will create the sink connector which will consume events from the employee topic and persist in the sink DB

curl -X POST \
http://localhost6:8083/connectors/ \
-H 'Content-Type: application/json' \
-H 'Postman-Token: 453bb3ff-ac42-4934-a45f-4afb51052609' \
-H 'cache-control: no-cache' \
-d '{
"name": "sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/sink_db?user=sink&password=snk",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "insert",
"topics.regex": "employee"
}
}'

Now, let's make a few updates to the employee table

update employee set salary = 10000 where emp_id = 1;
update employee set emp_name = 'CLAIRE' where emp_id = 3;

You can see in below image that the data changes are captured for the above two update queries

Logged data change events

Top comments (0)