The Kafka Connect JDBC Sink can be used to stream data from a Kafka topic to a database such as Oracle, Postgres, MySQL, DB2, etc.
It supports many permutations of configuration around how primary keys are handled. The documentation details these. This article aims to illustrate and expand on this.
❗ Want to cut to the chase? Check out the TL;DW (too long; didn't watch) summary
Background
What is the Kafka Connect JDBC Sink connector?
The JDBC connector is a plugin for Kafka Connect for streaming data both ways between a database and Apache Kafka.
Learn more about it in the documentation, this 🎥 video, and tutorial.
What is Kafka Connect?
👉 Learn more: From Zero to Hero with Kafka Connect
Why do I care about primary keys?
If you want to create the target table with the correct primary key column(s)
If you want to update existing records based on their key (i.e.
insert.mode
ofupsert
orupdate
)If you want to delete existing records based on their key (i.e.
delete.enabled
istrue
and you are sending tombstone records from your source topic)
Keys and Values in Kafka Messages
Messages in Apache Kafka are key/value pairs. The key and value may each hold a complex payload. Equally, they may also be null (although both being null wouldn't make so much sense).
The key in Apache Kafka messages is set by the producing application, whether that's using the Producer API directly, or a Kafka Connect source connector.
Serialization formats
Data in Apache Kafka messages is just bytes, so far as Apache Kafka is concerned. When working with that data we need to use SerDes to serialize and deserialize it - in Kafka Connect these are called Converters.
The Kafka Connect JDBC Sink requires that the value part of the message is serialized using a format that has an explicitly declared schema. This means it must be one of the following:
-
Written using one of the serializers provided by the Confluent Schema Registry (or accompanying converters in Kafka Connect)
- Avro
- Protobuf
- JSON Schema
Written from Kafka Connect using the JSON Converter with
schemas.enable
set totrue
You cannot use plain JSON, CSV, etc with the JDBC Sink connector. If you have this kind of data on your source topic you'll need to apply a schema to it first and write it to a new topic serialized appropriately, for example by applying a schema to JSON data with ksqlDB
The value and key part of your message can be serialized using different formats. Make sure you know how each is serialized as this can have a big impact particularly when it comes to handling keys.
There's also good documentation about serialization formats in ksqlDB. For general reference about the importance of schemas in your Kafka messages I would recommend:
An important note about the environment
You can find the Docker Compose to spin up the environment used in this blog on GitHub.
I'm using ksqlDB as my interface for populating topics and creating connectors. You can do both, either, or neither.
These are just Kafka topics, to which you can write with the Producer API if you'd like (using the appropriate serializer)
This is just Kafka Connect with a REST API that you can use directly if you'd rather
Let's get started!
No primary key handling at all
We'll start off with the most simple example, and build from there. To begin, we'll have no key at all:
CREATE STREAM FOO_01 (COL1 INT, COL2 INT)
WITH (KAFKA_TOPIC='FOO_01', VALUE_FORMAT='AVRO', PARTITIONS=1);
INSERT INTO FOO_01 (COL1, COL2) VALUES (0,0);
Throughout this I'll use PRINT
in ksqlDB to inspect the message structure (pay attention to the Key
and Value
):
ksql> PRINT FOO_01 FROM BEGINNING LIMIT 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/11 14:13:26.300 Z, key: <null>, value: {"COL1": 0, "COL2": 0}, partition: 0
You can also use kafkacat for this although it's a tad more fiddly than PRINT
alone:
$ docker exec -i kafkacat kafkacat \
-b broker:29092 -C -e -r http://schema-registry:8081 -s value=avro \
-t FOO_01 -f 'Topic+Partition+Offset: %t+%p+%o\tKey: %k\tValue: %s\n'
Topic+Partition+Offset: FOO_01+0+0 Key: Value: {"COL1": {"int": 0}, "COL2": {"int": 0}}
Let's push this topic to Postgres:
CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_01',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true'
);
Check the connector is working ✅
ksql> DESCRIBE CONNECTOR SINK_FOO_01_0;
Name : SINK_FOO_01_0
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : kafka-connect:8083
Task ID | State | Error Trace
---------------------------------
0 | RUNNING |
---------------------------------
ksql>
Check the data in Postgres ✅
docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
postgres=# SELECT * FROM "FOO_01" ;
COL1 | COL2
------+------
0 | 0
(1 row)
Note that in the above connector these (and other) configuration parameter assume their default values:
pk.fields = []
pk.mode = none
insert.mode = insert
Using a field in the value as the key
Let's imagine that of the two fields in the value of our message we want to set one of them as the primary key. We'll create a new version of this topic and add a couple more rows this time too
CREATE STREAM FOO_02 (COL1 INT, COL2 INT)
WITH (KAFKA_TOPIC='FOO_02', VALUE_FORMAT='AVRO', PARTITIONS=1);
INSERT INTO FOO_02 (COL1, COL2) VALUES (0,0);
INSERT INTO FOO_02 (COL1, COL2) VALUES (0,42);
INSERT INTO FOO_02 (COL1, COL2) VALUES (1,94);
Now our topic looks like this:
ksql> PRINT FOO_02 FROM BEGINNING LIMIT 3;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/03/11 14:44:39.016 Z, key: <null>, value: {"COL1": 0, "COL2": 0}, partition: 0
rowtime: 2021/03/11 14:44:39.067 Z, key: <null>, value: {"COL1": 0, "COL2": 42}, partition: 0
rowtime: 2021/03/11 14:44:39.117 Z, key: <null>, value: {"COL1": 1, "COL2": 94}, partition: 0
Topic printing ceased
ksql>
As always, pay attention to the key
vs value
part of the message. Here the key is still null.
Since it's a field in the value (we'll use COL1
) that we want to use as the primary key on the target database we use pk.mode=record_value
.
We're saying for the primary key of the target table, use a field(s) from the value of the record. We need to identify those fields using pk.fields
.
CREATE SINK CONNECTOR SINK_FOO_02_0 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_02',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_value',
'pk.fields' = 'COL1'
);
This seems to work if we check the status of it at first 🤔
ksql> DESCRIBE CONNECTOR SINK_FOO_02_0;
Name : SINK_FOO_02_0
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : kafka-connect:8083
Task ID | State | Error Trace
---------------------------------
0 | RUNNING |
---------------------------------
But after a while twiddling our thumbs and wondering why there's no data arriving in Postgres we check the connector again and see 😢
ksql> DESCRIBE CONNECTOR SINK_FOO_02_0;
Name : SINK_FOO_02_0
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : kafka-connect:8083
Task ID | State | Error Trace
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "FOO_02"("COL1","COL2") VALUES(0,42) was aborted: ERROR: duplicate key value violates unique constraint "FOO_02_pkey"
Detail: Key ("COL1")=(0) already exists. Call getNextException to see other errors in the batch.
…
As error messages go it's a pretty good one 👍
duplicate key value violates unique constraint "FOO_02_pkey"
Key ("COL1")=(0) already exists
Using an UPSERT
in the Kafka Connect JDBC Sink connector
The problem? We have three records on the source topic:
value: {"COL1": 0, "COL2": 0},
value: {"COL1": 0, "COL2": 42}
value: {"COL1": 1, "COL2": 94}
And the second record has the same value of COL1=0
) as the first, and thus the primary key we are defining would be violated. That's one of the purposes of a primary key!
Let's assume that we do want to ingest the data from this topic to Postgres, and in fact the two records for COL1=0
are not erroneous but are logically valid and one is intended to replace the other.
This calls for an UPSERT
! If a row for the primary key doesn't exist then INSERT
it, but if it does then UPDATE
it. We can tell the connector to do this with insert.mode=upsert
(the default is insert
).
CREATE SINK CONNECTOR SINK_FOO_02_1 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_02',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_value',
'pk.fields' = 'COL1',
'insert.mode' = 'upsert'
);
This time everything goes swimmingly and we get the two (three minus one which is an update) rows in Postgres:
postgres=# SELECT * FROM "FOO_02";
COL1 | COL2
------+------
0 | 42
1 | 94
(2 rows)
Let's prove that the upsert is working by inserting one new row in the Kafka topic (via ksqlDB):
INSERT INTO FOO_02 (COL1, COL2) VALUES (2,10);
In Postgres we see straight away :
postgres=# SELECT * FROM "FOO_02";
COL1 | COL2
------+------
0 | 42
1 | 94
2 | 10
(3 rows)
If we write a new value for the same logical key (COL1
) to the Kafka topic it gets pushed to Postgres and updates the row:
ksql> INSERT INTO FOO_02 (COL1, COL2) VALUES (2,20);
postgres=# SELECT * FROM "FOO_02";
COL1 | COL2
------+------
0 | 42
1 | 94
2 | 20
(3 rows)
Using multiple fields from the message value as the primary key
Above we saw how to take a single field from the value of the message and set it as the primary key for the target table. Now let's do it with multiple fields.
CREATE STREAM FOO_03 (COL1 INT, COL2 INT, COL3 VARCHAR, COL4 VARCHAR)
WITH (KAFKA_TOPIC='FOO_03', VALUE_FORMAT='AVRO', PARTITIONS=1);
INSERT INTO FOO_03 VALUES (1,2,'ABC','XYZ');
INSERT INTO FOO_03 VALUES (2,2,'xxx','qqq');
INSERT INTO FOO_03 VALUES (2,2,'xxx','III');
We'll use the fields COL1
, COL2
, and COL3
as a composite primary key. Here's the topic contents. As before, note the difference between the Kafka message key
and value
:
ksql> PRINT FOO_03 FROM BEGINNING LIMIT 3;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/11 16:37:01.955 Z, key: <null>, value: {"COL1": 1, "COL2": 2, "COL3": "ABC", "COL4": "XYZ"}, partition: 0
rowtime: 2021/03/11 16:37:44.009 Z, key: <null>, value: {"COL1": 2, "COL2": 2, "COL3": "xxx", "COL4": "qqq"}, partition: 0
rowtime: 2021/03/11 16:37:44.066 Z, key: <null>, value: {"COL1": 2, "COL2": 2, "COL3": "xxx", "COL4": "III"}, partition: 0
Topic printing ceased
The connector configuration is almost exactly the same as before, except we're specifying more than one field from the record value in pk.fields
:
CREATE SINK CONNECTOR SINK_FOO_03_0 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_03',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_value',
'pk.fields' = 'COL1,COL2,COL3',
'insert.mode' = 'upsert'
);
In Postgres:
postgres=# \d "FOO_03"
Table "public.FOO_03"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
COL1 | integer | | not null |
COL2 | integer | | not null |
COL3 | text | | not null |
COL4 | text | | |
Indexes:
"FOO_03_pkey" PRIMARY KEY, btree ("COL1", "COL2", "COL3")
postgres=# SELECT * FROM "FOO_03";
COL1 | COL2 | COL3 | COL4
------+------+------+------
1 | 2 | ABC | XYZ
2 | 2 | xxx | III
(2 rows)
There are two rows as expected (three source Kafka messages, two of which share the same composite key 2
/2
/xxx
)
Keys in Kafka Messages
🎥 A quick explainer about keys in Kafka messages
Using the key of the Kafka message as the primary key, option 1: primitive type (no struct)
When we say that the key of a Kafka message is a primitive type we mean that it is a string, or a type of number, and just a single field. So this is a primitive:
42
Whilst this isn't (unless you want the whole literal as the key value, which is unlikely)
{"id":42}
Let's populate a topic with some test data and see how this works:
CREATE STREAM FOO_04 (COL1 VARCHAR KEY, COL2 INT, COL3 VARCHAR)
WITH (KAFKA_TOPIC='FOO_04', VALUE_FORMAT='AVRO', KEY_FORMAT='KAFKA', PARTITIONS=1);
INSERT INTO FOO_04 VALUES ('mykey_val_A',2,'ABC');
INSERT INTO FOO_04 VALUES ('mykey_val_B',1,'XXX');
INSERT INTO FOO_04 VALUES ('mykey_val_A',5,'ZZZ');
Since we marked COL1
as KEY
its value is written to the key of the Kafka message. We can kind of see this with PRINT
(although it's not rendered as a string for these reasons):
ksql> PRINT 'FOO_04' FROM BEGINNING LIMIT 3;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/11 16:45:33.658 Z, key: [myk@7311980432057982785/-], value: {"COL2": 2, "COL3": "ABC"}, partition: 0
rowtime: 2021/03/11 16:45:33.706 Z, key: [myk@7311980432057982786/-], value: {"COL2": 1, "COL3": "XXX"}, partition: 0
rowtime: 2021/03/11 16:45:33.760 Z, key: [myk@7311980432057982785/-], value: {"COL2": 5, "COL3": "ZZZ"}, partition: 0
Topic printing ceased
It's much clearer (if a tad more complex to invoke) is using kafkacat:
$ docker exec -i kafkacat kafkacat \
-b broker:29092 -C -e -q \
-r http://schema-registry:8081 -s value=avro \
-t FOO_04 -f 'Offset: %o\tKey: %k\tValue: %s\n'
Offset: 0 Key: mykey_val_A Value: {"COL2": {"int": 2}, "COL3": {"string": "ABC"}}
Offset: 1 Key: mykey_val_B Value: {"COL2": {"int": 1}, "COL3": {"string": "XXX"}}
Offset: 2 Key: mykey_val_A Value: {"COL2": {"int": 5}, "COL3": {"string": "ZZZ"}}
So now let's use this and create a connector that uses the key of the Kafka message as the primary key for the target table. We do that by setting pk.mode=record_key
. Because the key is a primitive the pk.fields
value is simply the name of the column in the database to which we want to map the Kafka message key
CREATE SINK CONNECTOR SINK_FOO_04_0 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_04',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = 'FOOBAR',
'insert.mode' = 'upsert'
);
The result in Postgres:
postgres=# \d "FOO_04";
Table "public.FOO_04"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
COL2 | integer | | |
COL3 | text | | |
FOOBAR | text | | not null |
Indexes:
"FOO_04_pkey" PRIMARY KEY, btree ("FOOBAR")
postgres=# SELECT * FROM "FOO_04";
COL2 | COL3 | FOOBAR
------+------+-------------
1 | XXX | mykey_val_B
5 | ZZZ | mykey_val_A
(2 rows)
Deleting records in the target database with Kafka Connect JDBC Sink connector
So we've seen INSERT
and UPDATE
, but what about DELETE
?
A logical deletion in Kafka is represented by a tombstone message - a message with a key and a null
value. The Kafka Connect JDBC sink connector can be configured to delete the record in the target table which has a key matching that of the tombstone message by setting delete.enabled=true
. However, to do this, the key of the Kafka message must contain the primary key field(s).
We couldn't use the delete option in the examples above in which the primary key value was taken from field(s) in the value. Why not? Because, by definition, the value in a tombstone message is null. The two are mutually exclusive. You can have a value which includes fields to use for the primary key, or you can have a null. If it's null, it's not got a value. If it's got a value, it's not null.
This is why keys in Kafka messages make so much sense. Even if you can cram all your data into the value of the message, and you don't need partition locality for particular instances of an entity (such as all customers on a given partition, which would drive the need to use keys)---simply the fact that your data has a logical key means that using a the Kafka message key is a good idea. If you're using ksqlDB it added support for structured keys and supporting serialization formats in version 0.15 so there's no excuse not to use them :)
So, we now have the primary key in the key of the Kafka message, as we saw above. Let's add a tombstone message to our topic, here using the -Z
option of kafkacat. You can write NULLs using ksqlDB but this way is quicker for our purposes.
echo "mykey_val_A:" | docker exec -i kafkacat kafkacat -b broker:29092 -t FOO_04 -Z -K: -P
Check the data - observe the most recent message (offset 3) is a null value, denoted by the -1
length
docker exec -i kafkacat kafkacat \
-b broker:29092 -C -e -q \
-r http://schema-registry:8081 -s value=avro \
-t FOO_04 -f 'Offset: %o\tKey: %k\tValue: %s \t(length %S)\n'
Offset: 0 Key: mykey_val_A Value: {"COL2": {"int": 2}, "COL3": {"string": "ABC"}} (length 12)
Offset: 1 Key: mykey_val_B Value: {"COL2": {"int": 1}, "COL3": {"string": "XXX"}} (length 12)
Offset: 2 Key: mykey_val_A Value: {"COL2": {"int": 5}, "COL3": {"string": "ZZZ"}} (length 12)
Offset: 3 Key: mykey_val_A Value: (length -1)
Now we create a new connector, replacing the first one. Because it's got a new name it will read all of the messages from the topic again.
DROP CONNECTOR SINK_FOO_04_0;
CREATE SINK CONNECTOR SINK_FOO_04_1 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_04',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = 'FOOBAR',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
In the target table we see that the row for mykey_val_B
has been deleted:
postgres=# SELECT * FROM "FOO_04";
COL2 | COL3 | FOOBAR
------+------+-------------
1 | XXX | mykey_val_B
(1 row)
Using the key of the Kafka message as the primary key, option 2: structured keys
👉 Recommended reading if you're using ksqlDB: ✍️ Keys in ksqlDB, Unlocked
We saw above that if you want to use the key of the Kafka message as the primary key in the table you set pk.mode=record_key
and then in pk.fields
specify the name of the column in the database to store the value. But what about if you have a structured key? That is, one in which you've serialized it with a schema and have one (or more) fields that you want to use for the primary key?
Let's populate a new Kafka topic to illustrate this. There's an open issue in ksqlDB 0.15 which means that it can't write complex keys with the Schema Registry so for now I'll just use the kafka-avro-console-producer.
# Get a shell inside the Schema Registry container because
# the kafka-avro-console-producer script is available there
docker exec -it schema-registry bash
# Run this in the above shell, or elsewhere where the
# kafka-avro-console-producer script exists
kafka-avro-console-producer --topic FOO_06 --bootstrap-server broker:29092 \
--property key.schema='{"type":"record","name":"FOO05key","fields":[{"name":"K1","type":"string"},{"name":"K2","type":"int"}]}' \
--property value.schema='{"type":"record","name":"FOO05value","fields":[{"name":"COL3","type":"string"},{"name":"COL4","type":"string"}]}' \
--property parse.key=true \
--property key.separator="+" <<EOF
{"K1": "mykey_val_A", "K2": 1}+{"COL3": "NEVER", "COL4": "GONNA"}
{"K1": "mykey_val_A", "K2": 2}+{"COL3": "GIVE", "COL4": "YOU"}
{"K1": "mykey_val_A", "K2": 3}+{"COL3": "UP", "COL4": "🎙️"}
EOF
Head over to ksqlDB and check the data:
ksql> PRINT FOO_06 FROM BEGINNING LIMIT 3;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/12 09:59:55.337 Z, key: {"K1": "mykey_val_A", "K2": 1}, value: {"COL3": "NEVER", "COL4": "GONNA"}, partition: 0
rowtime: 2021/03/12 09:59:55.362 Z, key: {"K1": "mykey_val_A", "K2": 2}, value: {"COL3": "GIVE", "COL4": "YOU"}, partition: 0
rowtime: 2021/03/12 09:59:55.363 Z, key: {"K1": "mykey_val_A", "K2": 3}, value: {"COL3": "UP", "COL4": "🎙️"}, partition: 0
Topic printing ceased
Now when we push this topic to the database and want to use the key of the Kafka message as the primary key in the target table we have a decision to make - which column(s) of the key to use? This is where pk.fields
takes on a different meaning from above. When we were working with primitive keys pk.fields
was an arbitrary name of the column to write the key value to in the target table.
Now that we have a structured key with field names of its own pk.fields
can either be blank (use all the fields in the key, and create each as a column of the same name in the target database) or it can be a list of selected field(s) from the Kafka message key that we want to use as the primary key.
Here we use all the fields from the Kafka message key as the primary key in the target table:
CREATE SINK CONNECTOR SINK_FOO_06_0 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_06',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
The key is carried through to Postgres as expected:
postgres=# \d "FOO_06";
Table "public.FOO_06"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
COL3 | text | | not null |
COL4 | text | | not null |
K1 | text | | not null |
K2 | integer | | not null |
Indexes:
"FOO_06_pkey" PRIMARY KEY, btree ("K1", "K2")
postgres=# SELECT * FROM "FOO_06" ;
COL3 | COL4 | K1 | K2
-------+-------+-------------+----
NEVER | GONNA | mykey_val_A | 1
GIVE | YOU | mykey_val_A | 2
UP | 🎙️ | mykey_val_A | 3
(3 rows)
Let's try a variation on this and use just part of the key.
We'll drop the table and connector and then recreate them with new config:
postgres=# DROP TABLE "FOO_06" ;
DROP TABLE
postgres=#
ksql> DROP CONNECTOR SINK_FOO_06_0;
Message
-----------------------------------
Dropped connector "SINK_FOO_06_0"
-----------------------------------
ksql>
Now we use pk.fields
to identify one of the fields from the Kafka message key:
CREATE SINK CONNECTOR SINK_FOO_06_1 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_06',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = 'K2',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
This time K1
in the Kafka message key is ignored and just the specified field K2
is used as the primary key on the table:
postgres=# \d "FOO_06";
Table "public.FOO_06"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
COL3 | text | | not null |
COL4 | text | | not null |
K2 | integer | | not null |
Indexes:
"FOO_06_pkey" PRIMARY KEY, btree ("K2")
postgres=# SELECT * FROM "FOO_06" ;
COL3 | COL4 | K2
-------+-------+----
NEVER | GONNA | 1
GIVE | YOU | 2
UP | 🎙️ | 3
(3 rows)
What if you still want the data from K1
in the target table, but not as part of the primary key? For that you'd use either a custom Single Message Transform or some stream processing such as this:
-- Register the topic as a ksqlDB stream
CREATE STREAM FOO_06 WITH (KAFKA_TOPIC='FOO_06', FORMAT='AVRO');
-- Verify key/value schema
ksql> DESCRIBE FOO_06;
Name : FOO_06
Field | Type
-------------------------------------------------------
ROWKEY | STRUCT<K1 VARCHAR(STRING), K2 INTEGER> (key)
COL3 | VARCHAR(STRING)
COL4 | VARCHAR(STRING)
-------------------------------------------------------
-- When consuming from Kafka read all existing messages too
SET 'auto.offset.reset' = 'earliest';
-- Populate a new Kafka topic with altered key/value structure
CREATE STREAM FOO_06_RESTRUCTURE_01 AS
SELECT ROWKEY->K2,
AS_VALUE(ROWKEY->K1) AS K1,
COL3,
COL4
FROM FOO_06
PARTITION BY ROWKEY->K2;
-- Examine new key/value schema
ksql> DESCRIBE FOO_06_RESTRUCTURE_01;
Name : FOO_06_RESTRUCTURE_01
Field | Type
--------------------------------
K2 | INTEGER (key)
K1 | VARCHAR(STRING)
COL3 | VARCHAR(STRING)
COL4 | VARCHAR(STRING)
--------------------------------
-- Examine data
ksql> PRINT FOO_06_RESTRUCTURE_01 FROM BEGINNING LIMIT 3;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/12 10:26:05.004 Z, key: 1, value: {"K1": "mykey_val_A", "COL3": "NEVER", "COL4": "GONNA"}, partition: 0
rowtime: 2021/03/12 10:26:05.027 Z, key: 2, value: {"K1": "mykey_val_A", "COL3": "GIVE", "COL4": "YOU"}, partition: 0
rowtime: 2021/03/12 10:26:05.028 Z, key: 3, value: {"K1": "mykey_val_A", "COL3": "UP", "COL4": "🎙️"}, partition: 0
Topic printing ceased
Common errors
Trying to read data that has not been serialized with Schema Registry (e.g. Avro, Protobuf, JSON Schema)
As noted in the introduction, the Kafka Connect JDBC Sink connector requires that you use a serialization format that includes a schema. Let's see what happens if you don't, by creating a Kafka topic with data in plain JSON in both the key and value:
# Key/value are separated by the + character
docker exec -i kafkacat kafkacat -b broker:29092 -t FOO_08 -K+ -P <<EOF
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":1}+{"COL3":"FOO","COL4":"BAR"}
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":2}+{"COL3":"ZXC","COL4":"ASD"}
{"K1_GEO":"APAC","K2_BU":"FGH","K3_ID":9}+{"COL3":"QQQ","COL4":"WWW"}
EOF
If we consume the data with kafkacat we can see it is just straight JSON:
$ docker exec -i kafkacat kafkacat \
-b broker:29092 -C -e -q \
-t FOO_08 -f 'Offset: %o\tKey: %k\tValue: %s \t(length %S)\n'
Offset: 0 Key: {"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":1} Value: {"COL3":"FOO","COL4":"BAR"} (length 27)
Offset: 1 Key: {"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":2} Value: {"COL3":"ZXC","COL4":"ASD"} (length 27)
Offset: 2 Key: {"K1_GEO":"APAC","K2_BU":"FGH","K3_ID":9} Value: {"COL3":"QQQ","COL4":"WWW"} (length 27)
What we want to do is push this data to a database, and set the primary key on the target table as the three fields in the Kafka key.
Let's see what happens if we do this with the data as it stands.
CREATE SINK CONNECTOR SINK_FOO_08_0 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_08',
'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable' = 'false',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
We get the error Sink connector 'SINK_FOO_08_0' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='FOO_08',partition=0,offset=0,timestamp=1615547451030) with a HashMap key and null key schema.
:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'SINK_FOO_08_0' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='FOO_08',partition=0,offset=0,timestamp=1615547451030) with a HashMap key and null key schema.
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:113)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Let's try randomly jiggling things to see if they unbreak. Since the error mentions delete.enabled
let's try disabling it
CREATE SINK CONNECTOR SINK_FOO_08_1 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_08',
'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable' = 'false',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'false'
);
We just get variations on a theme: Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'SINK_FOO_08_1' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='FOO_08',partition=0,offset=0,timestamp=1615547451030) with a HashMap key and null key schema.
The nub of the issue is this: requires records with a non-null key and non-null Struct or primitive key schema
, and we're supplying a HashMap key and null key schema
.
Even if we ditch the idea of using the individual key fields and instead treat it as a primitive string (by using org.apache.kafka.connect.storage.StringConverter
instead of org.apache.kafka.connect.json.JsonConverter
), it doesn't get us much further:
CREATE SINK CONNECTOR SINK_FOO_08_2 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_08',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'false'
);
That throws org.apache.kafka.connect.errors.ConnectException: Need exactly one PK column defined since the key schema for records is a primitive type, defined columns are: []
which makes sense, so let's specify the name of the target column in the database into which the primitive value should be stored (using pk.fields
):
CREATE SINK CONNECTOR SINK_FOO_08_3 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_08',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = 'MY_KEY',
'insert.mode' = 'upsert',
'delete.enabled' = 'false'
);
That leads us off even further into the weeds with a new error that makes less sense:
ksql> DESCRIBE CONNECTOR SINK_FOO_08_3;
Name : SINK_FOO_08_3
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : kafka-connect:8083
Task ID | State | Error Trace
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap is in module java.base of loader 'bootstrap'; org.apache.kafka.connect.data.Struct is in unnamed module of loader 'app')
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:182)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
This somewhat cryptic error (class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap is in module java.base of loader 'bootstrap'; org.apache.kafka.connect.data.Struct is in unnamed module of loader 'app')
) is basically saying that whilst it's happy now with treating the key as a primitive to load into the column that we named, it is expecting a struct in the value part of the message, rather than the HashMap that it got from us using org.apache.kafka.connect.json.JsonConverter
. Since we don't have the schema itself embedded in the JSON message (so schemas.enable=false
) then we need to apply the schema some other way.
The best way to do this is to fix it at source: when the data is written to Kafka, make sure that it's written using a serializer that's going to store the schema and not throw it away. Good options are Avro, Protobuf, and JSON Schema.
That's not always possible though, and you're sometimes stuck with plain JSON data that you really want to load into a database. If that's the case you'll need to pre-process the topic using stream processing. Kafka Streams is one option, but ksqlDB is arguably easier and is what I'll show here (there's also a video tutorial).
To start with we create a new stream in ksqlDB and declare the schema of the JSON data in both the key and value:
-- Register the existing topic as a ksqlDB stream
-- and declare the full schema
ksql> CREATE STREAM FOO_08 (K1_GEO VARCHAR KEY,
K2_BU VARCHAR KEY,
K3_ID INT KEY,
COL3 VARCHAR,
COL4 VARCHAR)
WITH (KAFKA_TOPIC='FOO_08', FORMAT='JSON');
Message
----------------
Stream created
----------------
-- Verify the schema looks correct
ksql> DESCRIBE FOO_08;
Name : FOO_08
Field | Type
---------------------------------
K1_GEO | VARCHAR(STRING) (key)
K2_BU | VARCHAR(STRING) (key)
K3_ID | INTEGER (key)
COL3 | VARCHAR(STRING)
COL4 | VARCHAR(STRING)
---------------------------------
-- Verify the data is read correctly
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
ksql> SELECT * FROM FOO_08 EMIT CHANGES LIMIT 3;
+-------+------+------+-----+-----+
|K1_GEO |K2_BU |K3_ID |COL3 |COL4 |
+-------+------+------+-----+-----+
|EMEA |XYZ |1 |FOO |BAR |
|EMEA |XYZ |2 |ZXC |ASD |
|APAC |FGH |9 |QQQ |WWW |
Limit Reached
Query terminated
ksql>
Now we write the existing data, and all new messages that arrive, to a new topic and specify an appropriate serialization format. Avro, Protobuf, and JSON Schema are all good choices here.
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
ksql> CREATE STREAM FOO_08_CONVERTED
WITH (FORMAT='PROTOBUF') AS
SELECT * FROM FOO_08;
Message
------------------------------------------------
Created query with ID CSAS_FOO_08_CONVERTED_19
------------------------------------------------
ksql> DESCRIBE FOO_08_CONVERTED;
Name : FOO_08_CONVERTED
Field | Type
---------------------------------
K1_GEO | VARCHAR(STRING) (key)
K2_BU | VARCHAR(STRING) (key)
K3_ID | INTEGER (key)
COL3 | VARCHAR(STRING)
COL4 | VARCHAR(STRING)
---------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> PRINT FOO_08_CONVERTED FROM BEGINNING LIMIT 3;
Key format: PROTOBUF or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2021/03/12 11:10:51.030 Z, key: K1_GEO: "EMEA" K2_BU: "XYZ" K3_ID: 1, value: COL3: "FOO" COL4: "BAR", partition: 0
rowtime: 2021/03/12 11:10:51.071 Z, key: K1_GEO: "EMEA" K2_BU: "XYZ" K3_ID: 2, value: COL3: "ZXC" COL4: "ASD", partition: 0
rowtime: 2021/03/12 11:10:51.110 Z, key: K1_GEO: "APAC" K2_BU: "FGH" K3_ID: 9, value: COL3: "QQQ" COL4: "WWW", partition: 0
Topic printing ceased
Now we can push this data to the database. Note the value.converter
and key.converter
are not set for Protobuf:
CREATE SINK CONNECTOR SINK_FOO_08_4 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_08_CONVERTED',
'key.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
This work, and we have data in Postgres matching the schema and primary key as we wanted:
postgres=# \d "FOO_08_CONVERTED";
Table "public.FOO_08_CONVERTED"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
COL3 | text | | |
COL4 | text | | |
K1_GEO | text | | not null |
K2_BU | text | | not null |
K3_ID | integer | | not null |
Indexes:
"FOO_08_CONVERTED_pkey" PRIMARY KEY, btree ("K1_GEO", "K2_BU", "K3_ID")
postgres=# SELECT * FROM "FOO_08_CONVERTED" ;
COL3 | COL4 | K1_GEO | K2_BU | K3_ID
------+------+--------+-------+-------
FOO | BAR | EMEA | XYZ | 1
ZXC | ASD | EMEA | XYZ | 2
QQQ | WWW | APAC | FGH | 9
(3 rows)
If we insert new data and an update for an existing key into the original topic (JSON):
# Key/value are separated by the + character
docker exec -i kafkacat kafkacat -b broker:29092 -t FOO_08 -K+ -P <<EOF
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":10}+{"COL3":"FOO","COL4":"BAR"}
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":2}+{"COL3":"THIS","COL4":"CHANGED"}
EOF
this flows through automagically to the database:
postgres=# SELECT * FROM "FOO_08_CONVERTED" ;
COL3 | COL4 | K1_GEO | K2_BU | K3_ID
------+---------+--------+-------+-------
FOO | BAR | EMEA | XYZ | 1
QQQ | WWW | APAC | FGH | 9
FOO | BAR | EMEA | XYZ | 10
THIS | CHANGED | EMEA | XYZ | 2
(4 rows)
Footnote: changing the table name
You can use Single Message Transform to change the target object in the database to which the data is written. By default it takes the name of the source topic.
Using the RegExRouter we can change FOO_08_CONVERTED
to FOO_08
thus:
CREATE SINK CONNECTOR SINK_FOO_08_5 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'FOO_08_CONVERTED',
'key.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'true',
'transforms' = 'changeTopicName',
'transforms.changeTopicName.type' = 'org.apache.kafka.connect.transforms.RegexRouter',
'transforms.changeTopicName.regex' = '(.*)_CONVERTED$',
'transforms.changeTopicName.replacement' = '$1'
);
Now there are two tables in the target database - the original one, and the new one minus the _CONVERTED
suffix:
postgres=# \d
List of relations
Schema | Name | Type | Owner
--------+------------------+-------+----------
public | FOO_08 | table | postgres
public | FOO_08_CONVERTED | table | postgres
(3 rows)
postgres=# SELECT * FROM "FOO_08";
COL3 | COL4 | K1_GEO | K2_BU | K3_ID
------+---------+--------+-------+-------
FOO | BAR | EMEA | XYZ | 1
QQQ | WWW | APAC | FGH | 9
FOO | BAR | EMEA | XYZ | 10
THIS | CHANGED | EMEA | XYZ | 2
(4 rows)
Using a Kafka message key in which the schema key has default null values
This key schema causes problems because of "default": null
{
"type": "record",
"name": "FOO05key",
"fields": [
{
"name": "K1",
"type": "string",
"default": null
},
{
"name": "K2",
"type": "int",
"default": null
}
]
}
The error you'll get from the sink connector is this:
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1817)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1538)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1221)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:115)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:535)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: STRING
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 24 more
The fix is to remove the instances of "default": null
from the schema.
References
👾 Try it yourself (Docker Compose to spin up the environment used in this article)
Top comments (0)