Debezium Custom Converters
Creating custom converters using Debezium's new SPI to override value conversions
Introduction
Background about the TimestampConverter
Hey, my name is Oryan Moshe, and I started my own community for senior developers in Israel named in.dev.
I'm also a Lead Architect at Rivery, We are a fully managed data integration platform, and part of my job here is developing the ability to stream changes straight from the clients’ databases, to our platform, using change data capture (CDC).
The last time I coded in Java was 7 years ago, so if you have any suggestions to improve the code shown here please feel free to comment below!
You can find the converter right here:
https://github.com/oryanmoshe/debezium-timestamp-converter/
CDC — Change Data Capture
Before we talk about Debezium, we have to talk about CDC.
CDC is a way for us to get the changes happening in the database (as opposed to the actual data)
This means we can actually get every state that every record has been through in the database.
CDC is useful for a number of cases:
- Compiling a log of record changes
- Undoing (or reverting) a change
- Tracking record deletion (which is not simply a matter of using
SELECT
)
What is Debezium Anyway?
Debezium is an open source platform, maintained by Red Hat, that allows developers to implement CDC into a Kafka infrastructure.
Debezium actuates CDC by configuring connections using the provided Kafka Connect data connectors. Currently there's support for MySQL, PostgreSQL, Microsoft SQL Server, MongoDB, and even some limited support for Oracle.
What are converters, and why would we need a custom one?
All messages produced by Debezium are processed before entering the designated topic.
This ensures that all fields of a given type (defined by the schema) behave the same.
In other words, all DATE
fields on all of the databases will be transformed into the same format. This is, by default "Days since epoch".
But this behavior isn't always wanted, especially in this temporal example.
For our particular use case we need all temporal fields to be in the same format, whether the type is DATE
, DATETIME
, DATETIME2
, TIME
or TIMESTAMP
.
The format we chose was YYYY-MM-dd'T'HH:mm:ss.SSS'Z'
.
Creating a custom converter
Here's an explanation for each step needed to create our TimestampConverter
.
The basics of custom converters
To allow such behavior, the Debezium SPI (Service Provider Interface) was added to Debezium Version 1.1.
This allows developers to make their own converters with Java, by creating a class that implements the io.debezium.spi.converter.CustomConverter
interface.
The First Gotcha
What we didn't know when we started developing this converter, is that once we registered a custom converter to a temporal column, Debezium's behavior became sporadic. Sometimes it'll pass a DATE
column as "Days since epoch", as expected, but sometimes it'll pass it as a string, matching the date format of the database it came from.
This meant we had to have all of our bases covered, both for numeric values (let's say, "Days since epoch") and for all date format databases can produce (YYYY-MM-dd
, dd/MM/YYYY
, YYYY-MMM-dd
, etc.)
Things got a bit complicated on the logic front, but let's not get into this right now.
What's needed for our custom converter to work
Each converter has to implement at least two methods to be harnessed by Debezium:
configure
This method runs when the connector is initialised. It accepts one argument:
props
An object of type java.util.Properties
, containing all of the properties we passed to our converter instance.
converterFor
This method runs once for each column defined in our schema, and its job is to define (a.k.a "register") the converter for each. It accepts two arguments:
column
An object of type io.debezium.spi.converter.RelationalColumn
, containing the definition of the column we're currently handling, including its name, type, size, table, etc.
registration
An object of type io.debezium.spi.converter.CustomConverter.ConverterRegistration
, an internal definition, that has one method: register
.
Using the configure
method
As stated above, we use the configure
method to pass properties into our converter. This is important because we can use the same converter for multiple connectors, and change its behavior according to these properties.
For our TimestampConverter
we wanted to pass four properties:
-
debug
– Indicates whether to print debug messages. Defaults tofalse
. -
format.date
– The format to convert all columns of typeDATE
. Defaults toYYYY-MM-dd
. -
format.time
– The format to convert all columns of typeTIME
. Defaults toHH:mm:ss
. -
format.datetime
– The format to convert all other temporal columns. Defaults toYYYY-MM-dd'T'HH:mm:ss.SSS'Z'
.
All of these properties are optional and have default values associated with them.
To support them we defined each of them as a class property with the default value. Inside the configure
method we assigned them with the passed value:
public class TimestampConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final String DEFAULT_DATE_FORMAT = "YYYY-MM-dd";
public static final String DEFAULT_TIME_FORMAT = "HH:mm:ss.SSS";
public String strDatetimeFormat, strDateFormat, strTimeFormat;
public Boolean debug;
private SimpleDateFormat simpleDatetimeFormatter, simpleDateFormatter, simpleTimeFormatter;
@Override
public void configure(Properties props) {
this.strDatetimeFormat = props.getProperty("format.datetime", DEFAULT_DATETIME_FORMAT);
this.simpleDatetimeFormatter = new SimpleDateFormat(this.strDatetimeFormat);
this.strDateFormat = props.getProperty("format.date", DEFAULT_DATE_FORMAT);
this.simpleDateFormatter = new SimpleDateFormat(this.strDateFormat);
this.strTimeFormat = props.getProperty("format.time", DEFAULT_TIME_FORMAT);
this.simpleTimeFormatter = new SimpleDateFormat(this.strTimeFormat);
this.debug = props.getProperty("debug", "false").equals("true");
this.simpleDatetimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
this.simpleTimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
}
}
Using the converterFor
method
Now it's time for the big moment. Each column must be converted to its respective format.
First of all, we have to understand the type of the column we're currently handling. This is determined using column.typeName
.
If the type is any of the temporal types (defined as a class constant) we handle it accordingly. If it's not, we do nothing, and Debezium will take control.
To tell Debezium to convert a specific column to something else, we need to use the registration
passed to us. Then register
it, providing a schema
(create one of type string
and make it optional
) and a converter.
The converter is just a function, or in our case a lambda, that receives an Object
. This is the source value, and returns a value matching the schema we provided. In our case, we needed to return a String
(or null
, because we made it optional
).
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
if (SUPPORTED_DATA_TYPES.stream().anyMatch(s -> s.toLowerCase().equals(column.typeName().toLowerCase()))) {
boolean isTime = "time".equals(column.typeName().toLowerCase());
registration.register(datetimeSchema, rawValue -> {
if (rawValue == null)
return rawValue;
Long millis = getMillis(rawValue.toString(), isTime);
Instant instant = Instant.ofEpochMilli(millis);
Date dateObject = Date.from(instant);
switch (column.typeName().toLowerCase()) {
case "time":
return simpleTimeFormatter.format(dateObject);
case "date":
return simpleDateFormatter.format(dateObject);
default:
return simpleDatetimeFormatter.format(dateObject);
}
});
}
}
In this code snippet look at the two crucial parts we have mentioned before. These are the call to registration.register
, and the return
statements.
Using a Custom Converter with Debezium
Installation
Installation in our Debezium cluster is straight-forward. We just need to add the .jar
file of the converter to the connector we want to use it in.
The Second Gotcha
Notice I said " ... to the connecter we want ... ", this is a thing Debezium didn't make clear in the documentation. We need to add this converter to every connector if we want to use it in.
Let's say the base folder for connectors is /kafka/connect
. Then inside we'll find folders like debezium-connector-mysql
, or debezium-connector-postgres
.
We need to add the converter .jar
file to each of those folders if we intend to use it.
Configuration
After adding the .jar
file to our connector, we can configure our connectors to use it!
To do so all we need to do is add the following keys to our existing configuration:
"converters": "timestampConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter"
If we need to customize the formats of specific data types, we can use these additional configuration keys:
"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"
Conclusions
The addition of an SPI to Debezium brought a lot to the table in term of custom converters.
This allows us to get a tailored fit CDC connector, with the data streaming into our Kafka cluster exactly in the format we want.
I didn't include the actual logic, converting the values from their raw format into the epoch time (this part is contained in the getMillis
method)
But I have published the TimestampConverter
as open source, so anyone can read the code there, use the converter in an application (be it as a .jar
file found in the releases section, or as a dependency found in the packages section), or contribute to its development!
Feel free to suggest contributions to this converter, and share with me what kind of converters you created using the new Debezium SPI, and which ones you wish were made!
Links
To read more about Debezium Custom Converter visit their official documentation:
https://debezium.io/documentation/reference/1.1/development/converters.html
Link to the repository of my TimestampConverter
:
https://github.com/oryanmoshe/debezium-timestamp-converter
Top comments (1)
Hi I have added he JAR file to the plugin.path(base folder for connectors) folder where other debezium JAR files are also copied previously.
Starting the connect distributed process results in a error when i include the timestamp converter JAR file.
github.com/oryanmoshe/debezium-tim...