DEV Community

Robin Moffatt for Confluent

Posted on • Originally published at rmoff.net on

Kafka Connect JDBC Sink deep-dive: Working with Primary Keys

The Kafka Connect JDBC Sink can be used to stream data from a Kafka topic to a database such as Oracle, Postgres, MySQL, DB2, etc.

It supports many permutations of configuration around how primary keys are handled. The documentation details these. This article aims to illustrate and expand on this.

❗ Want to cut to the chase? Check out the TL;DW (too long; didn't watch) summary

Background

What is the Kafka Connect JDBC Sink connector?

The JDBC connector is a plugin for Kafka Connect for streaming data both ways between a database and Apache Kafka.

Learn more about it in the documentation, this 🎥 video, and tutorial.

What is Kafka Connect?

👉 Learn more: From Zero to Hero with Kafka Connect

Why do I care about primary keys?

🎥 Watch

  • If you want to create the target table with the correct primary key column(s)

  • If you want to update existing records based on their key (i.e. insert.mode of upsert or update)

  • If you want to delete existing records based on their key (i.e. delete.enabled is true and you are sending tombstone records from your source topic)

Keys and Values in Kafka Messages

Messages in Apache Kafka are key/value pairs. The key and value may each hold a complex payload. Equally, they may also be null (although both being null wouldn't make so much sense).

The key in Apache Kafka messages is set by the producing application, whether that's using the Producer API directly, or a Kafka Connect source connector.

Serialization formats

Data in Apache Kafka messages is just bytes, so far as Apache Kafka is concerned. When working with that data we need to use SerDes to serialize and deserialize it - in Kafka Connect these are called Converters.

The Kafka Connect JDBC Sink requires that the value part of the message is serialized using a format that has an explicitly declared schema. This means it must be one of the following:

You cannot use plain JSON, CSV, etc with the JDBC Sink connector. If you have this kind of data on your source topic you'll need to apply a schema to it first and write it to a new topic serialized appropriately, for example by applying a schema to JSON data with ksqlDB

The value and key part of your message can be serialized using different formats. Make sure you know how each is serialized as this can have a big impact particularly when it comes to handling keys.

There's also good documentation about serialization formats in ksqlDB. For general reference about the importance of schemas in your Kafka messages I would recommend:

An important note about the environment

You can find the Docker Compose to spin up the environment used in this blog on GitHub.

I'm using ksqlDB as my interface for populating topics and creating connectors. You can do both, either, or neither.

Let's get started!

No primary key handling at all

🎥 Watch

We'll start off with the most simple example, and build from there. To begin, we'll have no key at all:

CREATE STREAM FOO_01 (COL1 INT, COL2 INT)
  WITH (KAFKA_TOPIC='FOO_01', VALUE_FORMAT='AVRO', PARTITIONS=1);

INSERT INTO FOO_01 (COL1, COL2) VALUES (0,0);
Enter fullscreen mode Exit fullscreen mode

Throughout this I'll use PRINT in ksqlDB to inspect the message structure (pay attention to the Key and Value):

ksql> PRINT FOO_01 FROM BEGINNING LIMIT 1;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/11 14:13:26.300 Z, key: <null>, value: {"COL1": 0, "COL2": 0}, partition: 0
Enter fullscreen mode Exit fullscreen mode

You can also use kafkacat for this although it's a tad more fiddly than PRINT alone:

$ docker exec -i kafkacat kafkacat \
        -b broker:29092 -C -e -r http://schema-registry:8081 -s value=avro \
        -t FOO_01 -f 'Topic+Partition+Offset: %t+%p+%o\tKey: %k\tValue: %s\n'

Topic+Partition+Offset: FOO_01+0+0      Key:    Value: {"COL1": {"int": 0}, "COL2": {"int": 0}}
Enter fullscreen mode Exit fullscreen mode

Let's push this topic to Postgres:

CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (
    'connector.class'                         = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                          = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                         = 'postgres',
    'connection.password'                     = 'postgres',
    'topics'                                  = 'FOO_01',
    'key.converter'                           = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'       = 'http://schema-registry:8081',
    'value.converter'                         = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'     = 'http://schema-registry:8081',
    'auto.create'                             = 'true'
);
Enter fullscreen mode Exit fullscreen mode

Check the connector is working ✅

ksql> DESCRIBE CONNECTOR SINK_FOO_01_0;

Name                 : SINK_FOO_01_0
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State   | Error Trace
---------------------------------
 0       | RUNNING |
---------------------------------
ksql>
Enter fullscreen mode Exit fullscreen mode

Check the data in Postgres ✅

docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
Enter fullscreen mode Exit fullscreen mode
postgres=# SELECT * FROM "FOO_01" ;
 COL1 | COL2
------+------
    0 |    0
(1 row)
Enter fullscreen mode Exit fullscreen mode

Note that in the above connector these (and other) configuration parameter assume their default values:

pk.fields   = []
pk.mode     = none
insert.mode = insert
Enter fullscreen mode Exit fullscreen mode

Using a field in the value as the key

🎥 Watch

Let's imagine that of the two fields in the value of our message we want to set one of them as the primary key. We'll create a new version of this topic and add a couple more rows this time too

CREATE STREAM FOO_02 (COL1 INT, COL2 INT)
  WITH (KAFKA_TOPIC='FOO_02', VALUE_FORMAT='AVRO', PARTITIONS=1);

INSERT INTO FOO_02 (COL1, COL2) VALUES (0,0);
INSERT INTO FOO_02 (COL1, COL2) VALUES (0,42);
INSERT INTO FOO_02 (COL1, COL2) VALUES (1,94);
Enter fullscreen mode Exit fullscreen mode

Now our topic looks like this:

ksql> PRINT FOO_02 FROM BEGINNING LIMIT 3;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO
rowtime: 2021/03/11 14:44:39.016 Z, key: <null>, value: {"COL1": 0, "COL2": 0}, partition: 0
rowtime: 2021/03/11 14:44:39.067 Z, key: <null>, value: {"COL1": 0, "COL2": 42}, partition: 0
rowtime: 2021/03/11 14:44:39.117 Z, key: <null>, value: {"COL1": 1, "COL2": 94}, partition: 0
Topic printing ceased
ksql>
Enter fullscreen mode Exit fullscreen mode

As always, pay attention to the key vs value part of the message. Here the key is still null.

Since it's a field in the value (we'll use COL1) that we want to use as the primary key on the target database we use pk.mode=record_value.

We're saying for the primary key of the target table, use a field(s) from the value of the record. We need to identify those fields using pk.fields.

CREATE SINK CONNECTOR SINK_FOO_02_0 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_02',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_value',
    'pk.fields'                           = 'COL1'
);
Enter fullscreen mode Exit fullscreen mode

This seems to work if we check the status of it at first 🤔

ksql> DESCRIBE CONNECTOR SINK_FOO_02_0;

Name                 : SINK_FOO_02_0
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State   | Error Trace
---------------------------------
 0       | RUNNING |
---------------------------------
Enter fullscreen mode Exit fullscreen mode

But after a while twiddling our thumbs and wondering why there's no data arriving in Postgres we check the connector again and see 😢

ksql> DESCRIBE CONNECTOR SINK_FOO_02_0;

Name                 : SINK_FOO_02_0
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "FOO_02"("COL1","COL2") VALUES(0,42) was aborted: ERROR: duplicate key value violates unique constraint "FOO_02_pkey"
  Detail: Key ("COL1")=(0) already exists.  Call getNextException to see other errors in the batch.
  
Enter fullscreen mode Exit fullscreen mode

As error messages go it's a pretty good one 👍

duplicate key value violates unique constraint "FOO_02_pkey"
Key ("COL1")=(0) already exists
Enter fullscreen mode Exit fullscreen mode

Using an UPSERT in the Kafka Connect JDBC Sink connector

🎥 Watch

The problem? We have three records on the source topic:

value: {"COL1": 0, "COL2": 0},
value: {"COL1": 0, "COL2": 42}
value: {"COL1": 1, "COL2": 94}
Enter fullscreen mode Exit fullscreen mode

And the second record has the same value of COL1=0) as the first, and thus the primary key we are defining would be violated. That's one of the purposes of a primary key!

Let's assume that we do want to ingest the data from this topic to Postgres, and in fact the two records for COL1=0 are not erroneous but are logically valid and one is intended to replace the other.

This calls for an UPSERT! If a row for the primary key doesn't exist then INSERT it, but if it does then UPDATE it. We can tell the connector to do this with insert.mode=upsert (the default is insert).

CREATE SINK CONNECTOR SINK_FOO_02_1 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_02',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_value',
    'pk.fields'                           = 'COL1',
    'insert.mode'                         = 'upsert'
);
Enter fullscreen mode Exit fullscreen mode

This time everything goes swimmingly and we get the two (three minus one which is an update) rows in Postgres:

postgres=# SELECT * FROM "FOO_02";
 COL1 | COL2
------+------
    0 |   42
    1 |   94
(2 rows)
Enter fullscreen mode Exit fullscreen mode

Let's prove that the upsert is working by inserting one new row in the Kafka topic (via ksqlDB):

INSERT INTO FOO_02 (COL1, COL2) VALUES (2,10);
Enter fullscreen mode Exit fullscreen mode

In Postgres we see straight away :

postgres=# SELECT * FROM "FOO_02";
 COL1 | COL2
------+------
    0 |   42
    1 |   94
    2 |   10
(3 rows)
Enter fullscreen mode Exit fullscreen mode

If we write a new value for the same logical key (COL1) to the Kafka topic it gets pushed to Postgres and updates the row:

ksql> INSERT INTO FOO_02 (COL1, COL2) VALUES (2,20);
Enter fullscreen mode Exit fullscreen mode
postgres=# SELECT * FROM "FOO_02";
 COL1 | COL2
------+------
    0 |   42
    1 |   94
    2 |   20
(3 rows)
Enter fullscreen mode Exit fullscreen mode

Using multiple fields from the message value as the primary key

🎥 Watch

Above we saw how to take a single field from the value of the message and set it as the primary key for the target table. Now let's do it with multiple fields.

CREATE STREAM FOO_03 (COL1 INT, COL2 INT, COL3 VARCHAR, COL4 VARCHAR)
  WITH (KAFKA_TOPIC='FOO_03', VALUE_FORMAT='AVRO', PARTITIONS=1);

INSERT INTO FOO_03 VALUES (1,2,'ABC','XYZ');
INSERT INTO FOO_03 VALUES (2,2,'xxx','qqq');
INSERT INTO FOO_03 VALUES (2,2,'xxx','III');
Enter fullscreen mode Exit fullscreen mode

We'll use the fields COL1, COL2, and COL3 as a composite primary key. Here's the topic contents. As before, note the difference between the Kafka message key and value:

ksql> PRINT FOO_03 FROM BEGINNING LIMIT 3;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/11 16:37:01.955 Z, key: <null>, value: {"COL1": 1, "COL2": 2, "COL3": "ABC", "COL4": "XYZ"}, partition: 0
rowtime: 2021/03/11 16:37:44.009 Z, key: <null>, value: {"COL1": 2, "COL2": 2, "COL3": "xxx", "COL4": "qqq"}, partition: 0
rowtime: 2021/03/11 16:37:44.066 Z, key: <null>, value: {"COL1": 2, "COL2": 2, "COL3": "xxx", "COL4": "III"}, partition: 0
Topic printing ceased
Enter fullscreen mode Exit fullscreen mode

The connector configuration is almost exactly the same as before, except we're specifying more than one field from the record value in pk.fields:

CREATE SINK CONNECTOR SINK_FOO_03_0 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_03',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_value',
    'pk.fields'                           = 'COL1,COL2,COL3',
    'insert.mode'                         = 'upsert'
);
Enter fullscreen mode Exit fullscreen mode

In Postgres:

postgres=# \d "FOO_03"
               Table "public.FOO_03"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 COL1   | integer |           | not null |
 COL2   | integer |           | not null |
 COL3   | text    |           | not null |
 COL4   | text    |           |          |
Indexes:
    "FOO_03_pkey" PRIMARY KEY, btree ("COL1", "COL2", "COL3")

postgres=# SELECT * FROM "FOO_03";
 COL1 | COL2 | COL3 | COL4
------+------+------+------
    1 |    2 | ABC  | XYZ
    2 |    2 | xxx  | III
(2 rows)
Enter fullscreen mode Exit fullscreen mode

There are two rows as expected (three source Kafka messages, two of which share the same composite key 2/2/xxx)

Keys in Kafka Messages

🎥 A quick explainer about keys in Kafka messages

Using the key of the Kafka message as the primary key, option 1: primitive type (no struct)

🎥 Watch

When we say that the key of a Kafka message is a primitive type we mean that it is a string, or a type of number, and just a single field. So this is a primitive:

42
Enter fullscreen mode Exit fullscreen mode

Whilst this isn't (unless you want the whole literal as the key value, which is unlikely)

{"id":42}
Enter fullscreen mode Exit fullscreen mode

Let's populate a topic with some test data and see how this works:

CREATE STREAM FOO_04 (COL1 VARCHAR KEY, COL2 INT, COL3 VARCHAR)
  WITH (KAFKA_TOPIC='FOO_04', VALUE_FORMAT='AVRO', KEY_FORMAT='KAFKA', PARTITIONS=1);

INSERT INTO FOO_04 VALUES ('mykey_val_A',2,'ABC');
INSERT INTO FOO_04 VALUES ('mykey_val_B',1,'XXX');
INSERT INTO FOO_04 VALUES ('mykey_val_A',5,'ZZZ');
Enter fullscreen mode Exit fullscreen mode

Since we marked COL1 as KEY its value is written to the key of the Kafka message. We can kind of see this with PRINT (although it's not rendered as a string for these reasons):

ksql> PRINT 'FOO_04' FROM BEGINNING LIMIT 3;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/11 16:45:33.658 Z, key: [myk@7311980432057982785/-], value: {"COL2": 2, "COL3": "ABC"}, partition: 0
rowtime: 2021/03/11 16:45:33.706 Z, key: [myk@7311980432057982786/-], value: {"COL2": 1, "COL3": "XXX"}, partition: 0
rowtime: 2021/03/11 16:45:33.760 Z, key: [myk@7311980432057982785/-], value: {"COL2": 5, "COL3": "ZZZ"}, partition: 0
Topic printing ceased
Enter fullscreen mode Exit fullscreen mode

It's much clearer (if a tad more complex to invoke) is using kafkacat:

$ docker exec -i kafkacat kafkacat \
        -b broker:29092 -C -e -q \
        -r http://schema-registry:8081 -s value=avro \
        -t FOO_04 -f 'Offset: %o\tKey: %k\tValue: %s\n'
Offset: 0       Key: mykey_val_A        Value: {"COL2": {"int": 2}, "COL3": {"string": "ABC"}}
Offset: 1       Key: mykey_val_B        Value: {"COL2": {"int": 1}, "COL3": {"string": "XXX"}}
Offset: 2       Key: mykey_val_A        Value: {"COL2": {"int": 5}, "COL3": {"string": "ZZZ"}}
Enter fullscreen mode Exit fullscreen mode

So now let's use this and create a connector that uses the key of the Kafka message as the primary key for the target table. We do that by setting pk.mode=record_key. Because the key is a primitive the pk.fields value is simply the name of the column in the database to which we want to map the Kafka message key

CREATE SINK CONNECTOR SINK_FOO_04_0 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_04',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_key',
    'pk.fields'                           = 'FOOBAR',
    'insert.mode'                         = 'upsert'
);
Enter fullscreen mode Exit fullscreen mode

The result in Postgres:

postgres=# \d "FOO_04";
               Table "public.FOO_04"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 COL2   | integer |           |          |
 COL3   | text    |           |          |
 FOOBAR | text    |           | not null |
Indexes:
    "FOO_04_pkey" PRIMARY KEY, btree ("FOOBAR")

postgres=# SELECT * FROM "FOO_04";
 COL2 | COL3 |   FOOBAR
------+------+-------------
    1 | XXX  | mykey_val_B
    5 | ZZZ  | mykey_val_A
(2 rows)
Enter fullscreen mode Exit fullscreen mode

Deleting records in the target database with Kafka Connect JDBC Sink connector

🎥 Watch

So we've seen INSERT and UPDATE, but what about DELETE?

A logical deletion in Kafka is represented by a tombstone message - a message with a key and a null value. The Kafka Connect JDBC sink connector can be configured to delete the record in the target table which has a key matching that of the tombstone message by setting delete.enabled=true. However, to do this, the key of the Kafka message must contain the primary key field(s).

We couldn't use the delete option in the examples above in which the primary key value was taken from field(s) in the value. Why not? Because, by definition, the value in a tombstone message is null. The two are mutually exclusive. You can have a value which includes fields to use for the primary key, or you can have a null. If it's null, it's not got a value. If it's got a value, it's not null.

This is why keys in Kafka messages make so much sense. Even if you can cram all your data into the value of the message, and you don't need partition locality for particular instances of an entity (such as all customers on a given partition, which would drive the need to use keys)---simply the fact that your data has a logical key means that using a the Kafka message key is a good idea. If you're using ksqlDB it added support for structured keys and supporting serialization formats in version 0.15 so there's no excuse not to use them :)

So, we now have the primary key in the key of the Kafka message, as we saw above. Let's add a tombstone message to our topic, here using the -Z option of kafkacat. You can write NULLs using ksqlDB but this way is quicker for our purposes.

echo "mykey_val_A:" | docker exec -i kafkacat kafkacat -b broker:29092 -t FOO_04 -Z -K: -P
Enter fullscreen mode Exit fullscreen mode

Check the data - observe the most recent message (offset 3) is a null value, denoted by the -1 length

docker exec -i kafkacat kafkacat \
        -b broker:29092 -C -e -q \
        -r http://schema-registry:8081 -s value=avro \
        -t FOO_04 -f 'Offset: %o\tKey: %k\tValue: %s \t(length %S)\n'
Offset: 0       Key: mykey_val_A        Value: {"COL2": {"int": 2}, "COL3": {"string": "ABC"}}  (length 12)
Offset: 1       Key: mykey_val_B        Value: {"COL2": {"int": 1}, "COL3": {"string": "XXX"}}  (length 12)
Offset: 2       Key: mykey_val_A        Value: {"COL2": {"int": 5}, "COL3": {"string": "ZZZ"}}  (length 12)
Offset: 3       Key: mykey_val_A        Value:          (length -1)
Enter fullscreen mode Exit fullscreen mode

Now we create a new connector, replacing the first one. Because it's got a new name it will read all of the messages from the topic again.

DROP CONNECTOR SINK_FOO_04_0;
CREATE SINK CONNECTOR SINK_FOO_04_1 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_04',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_key',
    'pk.fields'                           = 'FOOBAR',
    'insert.mode'                         = 'upsert',
    'delete.enabled'                      = 'true'
);
Enter fullscreen mode Exit fullscreen mode

In the target table we see that the row for mykey_val_B has been deleted:

postgres=# SELECT * FROM "FOO_04";
 COL2 | COL3 |   FOOBAR
------+------+-------------
    1 | XXX  | mykey_val_B
(1 row)
Enter fullscreen mode Exit fullscreen mode

Using the key of the Kafka message as the primary key, option 2: structured keys

👉 Recommended reading if you're using ksqlDB: ✍️ Keys in ksqlDB, Unlocked

🎥 Watch

We saw above that if you want to use the key of the Kafka message as the primary key in the table you set pk.mode=record_key and then in pk.fields specify the name of the column in the database to store the value. But what about if you have a structured key? That is, one in which you've serialized it with a schema and have one (or more) fields that you want to use for the primary key?

Let's populate a new Kafka topic to illustrate this. There's an open issue in ksqlDB 0.15 which means that it can't write complex keys with the Schema Registry so for now I'll just use the kafka-avro-console-producer.

# Get a shell inside the Schema Registry container because
# the kafka-avro-console-producer script is available there
docker exec -it schema-registry bash
Enter fullscreen mode Exit fullscreen mode
# Run this in the above shell, or elsewhere where the
# kafka-avro-console-producer script exists
kafka-avro-console-producer --topic FOO_06 --bootstrap-server broker:29092 \
 --property key.schema='{"type":"record","name":"FOO05key","fields":[{"name":"K1","type":"string"},{"name":"K2","type":"int"}]}' \
 --property value.schema='{"type":"record","name":"FOO05value","fields":[{"name":"COL3","type":"string"},{"name":"COL4","type":"string"}]}' \
 --property parse.key=true \
 --property key.separator="+" <<EOF
{"K1": "mykey_val_A", "K2": 1}+{"COL3": "NEVER", "COL4": "GONNA"}
{"K1": "mykey_val_A", "K2": 2}+{"COL3": "GIVE", "COL4": "YOU"}
{"K1": "mykey_val_A", "K2": 3}+{"COL3": "UP", "COL4": "🎙️"}
EOF
Enter fullscreen mode Exit fullscreen mode

Head over to ksqlDB and check the data:

ksql> PRINT FOO_06 FROM BEGINNING LIMIT 3;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/12 09:59:55.337 Z, key: {"K1": "mykey_val_A", "K2": 1}, value: {"COL3": "NEVER", "COL4": "GONNA"}, partition: 0
rowtime: 2021/03/12 09:59:55.362 Z, key: {"K1": "mykey_val_A", "K2": 2}, value: {"COL3": "GIVE", "COL4": "YOU"}, partition: 0
rowtime: 2021/03/12 09:59:55.363 Z, key: {"K1": "mykey_val_A", "K2": 3}, value: {"COL3": "UP", "COL4": "🎙️"}, partition: 0
Topic printing ceased
Enter fullscreen mode Exit fullscreen mode

Now when we push this topic to the database and want to use the key of the Kafka message as the primary key in the target table we have a decision to make - which column(s) of the key to use? This is where pk.fields takes on a different meaning from above. When we were working with primitive keys pk.fields was an arbitrary name of the column to write the key value to in the target table.

Now that we have a structured key with field names of its own pk.fields can either be blank (use all the fields in the key, and create each as a column of the same name in the target database) or it can be a list of selected field(s) from the Kafka message key that we want to use as the primary key.

Here we use all the fields from the Kafka message key as the primary key in the target table:

CREATE SINK CONNECTOR SINK_FOO_06_0 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_06',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_key',
    'pk.fields'                           = '',
    'insert.mode'                         = 'upsert',
    'delete.enabled'                      = 'true'
);
Enter fullscreen mode Exit fullscreen mode

The key is carried through to Postgres as expected:

postgres=# \d "FOO_06";
               Table "public.FOO_06"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 COL3   | text    |           | not null |
 COL4   | text    |           | not null |
 K1     | text    |           | not null |
 K2     | integer |           | not null |
Indexes:
    "FOO_06_pkey" PRIMARY KEY, btree ("K1", "K2")

postgres=# SELECT * FROM "FOO_06" ;
 COL3  | COL4  |     K1      | K2
-------+-------+-------------+----
 NEVER | GONNA | mykey_val_A |  1
 GIVE  | YOU   | mykey_val_A |  2
 UP    | 🎙️     | mykey_val_A |  3
(3 rows)
Enter fullscreen mode Exit fullscreen mode

Let's try a variation on this and use just part of the key.

🎥 Watch

We'll drop the table and connector and then recreate them with new config:

postgres=# DROP TABLE "FOO_06" ;
DROP TABLE
postgres=#
Enter fullscreen mode Exit fullscreen mode
ksql> DROP CONNECTOR SINK_FOO_06_0;

 Message
-----------------------------------
 Dropped connector "SINK_FOO_06_0"
-----------------------------------
ksql>
Enter fullscreen mode Exit fullscreen mode

Now we use pk.fields to identify one of the fields from the Kafka message key:

CREATE SINK CONNECTOR SINK_FOO_06_1 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_06',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_key',
    'pk.fields'                           = 'K2',
    'insert.mode'                         = 'upsert',
    'delete.enabled'                      = 'true'
);
Enter fullscreen mode Exit fullscreen mode

This time K1 in the Kafka message key is ignored and just the specified field K2 is used as the primary key on the table:

postgres=# \d "FOO_06";
               Table "public.FOO_06"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 COL3   | text    |           | not null |
 COL4   | text    |           | not null |
 K2     | integer |           | not null |
Indexes:
    "FOO_06_pkey" PRIMARY KEY, btree ("K2")

postgres=# SELECT * FROM "FOO_06" ;
 COL3  | COL4  | K2
-------+-------+----
 NEVER | GONNA |  1
 GIVE  | YOU   |  2
 UP    | 🎙️     |  3
(3 rows)
Enter fullscreen mode Exit fullscreen mode

What if you still want the data from K1 in the target table, but not as part of the primary key? For that you'd use either a custom Single Message Transform or some stream processing such as this:

🎥 Watch

-- Register the topic as a ksqlDB stream
CREATE STREAM FOO_06 WITH (KAFKA_TOPIC='FOO_06', FORMAT='AVRO');

-- Verify key/value schema
ksql> DESCRIBE FOO_06;

Name                 : FOO_06
 Field  | Type
-------------------------------------------------------
 ROWKEY | STRUCT<K1 VARCHAR(STRING), K2 INTEGER> (key)
 COL3   | VARCHAR(STRING)
 COL4   | VARCHAR(STRING)
-------------------------------------------------------

-- When consuming from Kafka read all existing messages too
SET 'auto.offset.reset' = 'earliest';

-- Populate a new Kafka topic with altered key/value structure
CREATE STREAM FOO_06_RESTRUCTURE_01 AS
  SELECT ROWKEY->K2,
         AS_VALUE(ROWKEY->K1) AS K1,
         COL3,
         COL4
    FROM FOO_06
    PARTITION BY ROWKEY->K2;

-- Examine new key/value schema
ksql> DESCRIBE FOO_06_RESTRUCTURE_01;

Name                 : FOO_06_RESTRUCTURE_01
 Field | Type
--------------------------------
 K2    | INTEGER          (key)
 K1    | VARCHAR(STRING)
 COL3  | VARCHAR(STRING)
 COL4  | VARCHAR(STRING)
--------------------------------

-- Examine data
ksql> PRINT FOO_06_RESTRUCTURE_01 FROM BEGINNING LIMIT 3;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/03/12 10:26:05.004 Z, key: 1, value: {"K1": "mykey_val_A", "COL3": "NEVER", "COL4": "GONNA"}, partition: 0
rowtime: 2021/03/12 10:26:05.027 Z, key: 2, value: {"K1": "mykey_val_A", "COL3": "GIVE", "COL4": "YOU"}, partition: 0
rowtime: 2021/03/12 10:26:05.028 Z, key: 3, value: {"K1": "mykey_val_A", "COL3": "UP", "COL4": "🎙️"}, partition: 0
Topic printing ceased
Enter fullscreen mode Exit fullscreen mode

Common errors

Trying to read data that has not been serialized with Schema Registry (e.g. Avro, Protobuf, JSON Schema)

As noted in the introduction, the Kafka Connect JDBC Sink connector requires that you use a serialization format that includes a schema. Let's see what happens if you don't, by creating a Kafka topic with data in plain JSON in both the key and value:

# Key/value are separated by the + character
docker exec -i kafkacat kafkacat -b broker:29092 -t FOO_08 -K+ -P <<EOF
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":1}+{"COL3":"FOO","COL4":"BAR"}
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":2}+{"COL3":"ZXC","COL4":"ASD"}
{"K1_GEO":"APAC","K2_BU":"FGH","K3_ID":9}+{"COL3":"QQQ","COL4":"WWW"}
EOF
Enter fullscreen mode Exit fullscreen mode

If we consume the data with kafkacat we can see it is just straight JSON:

$ docker exec -i kafkacat kafkacat \
        -b broker:29092 -C -e -q \
        -t FOO_08 -f 'Offset: %o\tKey: %k\tValue: %s \t(length %S)\n'
Offset: 0       Key: {"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":1}  Value: {"COL3":"FOO","COL4":"BAR"}      (length 27)
Offset: 1       Key: {"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":2}  Value: {"COL3":"ZXC","COL4":"ASD"}      (length 27)
Offset: 2       Key: {"K1_GEO":"APAC","K2_BU":"FGH","K3_ID":9}  Value: {"COL3":"QQQ","COL4":"WWW"}      (length 27)
Enter fullscreen mode Exit fullscreen mode

What we want to do is push this data to a database, and set the primary key on the target table as the three fields in the Kafka key.

Let's see what happens if we do this with the data as it stands.

CREATE SINK CONNECTOR SINK_FOO_08_0 WITH (
    'connector.class'                = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                 = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                = 'postgres',
    'connection.password'            = 'postgres',
    'topics'                         = 'FOO_08',
    'key.converter'                  = 'org.apache.kafka.connect.json.JsonConverter',
    'key.converter.schemas.enable'   = 'false',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'auto.create'                    = 'true',
    'pk.mode'                        = 'record_key',
    'pk.fields'                      = '',
    'insert.mode'                    = 'upsert',
    'delete.enabled'                 = 'true'
);
Enter fullscreen mode Exit fullscreen mode

We get the error Sink connector 'SINK_FOO_08_0' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='FOO_08',partition=0,offset=0,timestamp=1615547451030) with a HashMap key and null key schema.:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'SINK_FOO_08_0' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='FOO_08',partition=0,offset=0,timestamp=1615547451030) with a HashMap key and null key schema.
        at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:113)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more
Enter fullscreen mode Exit fullscreen mode

Let's try randomly jiggling things to see if they unbreak. Since the error mentions delete.enabled let's try disabling it

CREATE SINK CONNECTOR SINK_FOO_08_1 WITH (
    'connector.class'                = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                 = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                = 'postgres',
    'connection.password'            = 'postgres',
    'topics'                         = 'FOO_08',
    'key.converter'                  = 'org.apache.kafka.connect.json.JsonConverter',
    'key.converter.schemas.enable'   = 'false',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'auto.create'                    = 'true',
    'pk.mode'                        = 'record_key',
    'pk.fields'                      = '',
    'insert.mode'                    = 'upsert',
    'delete.enabled'                 = 'false'
);
Enter fullscreen mode Exit fullscreen mode

We just get variations on a theme: Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'SINK_FOO_08_1' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='FOO_08',partition=0,offset=0,timestamp=1615547451030) with a HashMap key and null key schema.

The nub of the issue is this: requires records with a non-null key and non-null Struct or primitive key schema, and we're supplying a HashMap key and null key schema.

Even if we ditch the idea of using the individual key fields and instead treat it as a primitive string (by using org.apache.kafka.connect.storage.StringConverter instead of org.apache.kafka.connect.json.JsonConverter), it doesn't get us much further:

CREATE SINK CONNECTOR SINK_FOO_08_2 WITH (
    'connector.class'                = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                 = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                = 'postgres',
    'connection.password'            = 'postgres',
    'topics'                         = 'FOO_08',
    'key.converter'                  = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'auto.create'                    = 'true',
    'pk.mode'                        = 'record_key',
    'pk.fields'                      = '',
    'insert.mode'                    = 'upsert',
    'delete.enabled'                 = 'false'
);
Enter fullscreen mode Exit fullscreen mode

That throws org.apache.kafka.connect.errors.ConnectException: Need exactly one PK column defined since the key schema for records is a primitive type, defined columns are: [] which makes sense, so let's specify the name of the target column in the database into which the primitive value should be stored (using pk.fields):

CREATE SINK CONNECTOR SINK_FOO_08_3 WITH (
    'connector.class'                = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                 = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                = 'postgres',
    'connection.password'            = 'postgres',
    'topics'                         = 'FOO_08',
    'key.converter'                  = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'auto.create'                    = 'true',
    'pk.mode'                        = 'record_key',
    'pk.fields'                      = 'MY_KEY',
    'insert.mode'                    = 'upsert',
    'delete.enabled'                 = 'false'
);
Enter fullscreen mode Exit fullscreen mode

That leads us off even further into the weeds with a new error that makes less sense:

ksql> DESCRIBE CONNECTOR SINK_FOO_08_3;

Name                 : SINK_FOO_08_3
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap is in module java.base of loader 'bootstrap'; org.apache.kafka.connect.data.Struct is in unnamed module of loader 'app')
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:182)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more
Enter fullscreen mode Exit fullscreen mode

This somewhat cryptic error (class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap is in module java.base of loader 'bootstrap'; org.apache.kafka.connect.data.Struct is in unnamed module of loader 'app')) is basically saying that whilst it's happy now with treating the key as a primitive to load into the column that we named, it is expecting a struct in the value part of the message, rather than the HashMap that it got from us using org.apache.kafka.connect.json.JsonConverter. Since we don't have the schema itself embedded in the JSON message (so schemas.enable=false) then we need to apply the schema some other way.

The best way to do this is to fix it at source: when the data is written to Kafka, make sure that it's written using a serializer that's going to store the schema and not throw it away. Good options are Avro, Protobuf, and JSON Schema.

That's not always possible though, and you're sometimes stuck with plain JSON data that you really want to load into a database. If that's the case you'll need to pre-process the topic using stream processing. Kafka Streams is one option, but ksqlDB is arguably easier and is what I'll show here (there's also a video tutorial).

To start with we create a new stream in ksqlDB and declare the schema of the JSON data in both the key and value:

-- Register the existing topic as a ksqlDB stream
-- and declare the full schema
ksql> CREATE STREAM FOO_08 (K1_GEO VARCHAR KEY,
                      K2_BU  VARCHAR KEY,
                      K3_ID  INT     KEY,
                      COL3   VARCHAR,
                      COL4   VARCHAR)
  WITH (KAFKA_TOPIC='FOO_08', FORMAT='JSON');

 Message
----------------
 Stream created
----------------

-- Verify the schema looks correct
ksql> DESCRIBE FOO_08;

Name                 : FOO_08
 Field  | Type
---------------------------------
 K1_GEO | VARCHAR(STRING)  (key)
 K2_BU  | VARCHAR(STRING)  (key)
 K3_ID  | INTEGER          (key)
 COL3   | VARCHAR(STRING)
 COL4   | VARCHAR(STRING)
---------------------------------

-- Verify the data is read correctly
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.

ksql> SELECT * FROM FOO_08 EMIT CHANGES LIMIT 3;
+-------+------+------+-----+-----+
|K1_GEO |K2_BU |K3_ID |COL3 |COL4 |
+-------+------+------+-----+-----+
|EMEA   |XYZ   |1     |FOO  |BAR  |
|EMEA   |XYZ   |2     |ZXC  |ASD  |
|APAC   |FGH   |9     |QQQ  |WWW  |
Limit Reached
Query terminated
ksql>
Enter fullscreen mode Exit fullscreen mode

Now we write the existing data, and all new messages that arrive, to a new topic and specify an appropriate serialization format. Avro, Protobuf, and JSON Schema are all good choices here.

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.

ksql> CREATE STREAM FOO_08_CONVERTED
        WITH (FORMAT='PROTOBUF') AS
        SELECT * FROM FOO_08;

 Message
------------------------------------------------
 Created query with ID CSAS_FOO_08_CONVERTED_19
------------------------------------------------

ksql> DESCRIBE FOO_08_CONVERTED;

Name                 : FOO_08_CONVERTED
 Field  | Type
---------------------------------
 K1_GEO | VARCHAR(STRING)  (key)
 K2_BU  | VARCHAR(STRING)  (key)
 K3_ID  | INTEGER          (key)
 COL3   | VARCHAR(STRING)
 COL4   | VARCHAR(STRING)
---------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

ksql> PRINT FOO_08_CONVERTED FROM BEGINNING LIMIT 3;
Key format: PROTOBUF or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2021/03/12 11:10:51.030 Z, key: K1_GEO: "EMEA" K2_BU: "XYZ" K3_ID: 1, value: COL3: "FOO" COL4: "BAR", partition: 0
rowtime: 2021/03/12 11:10:51.071 Z, key: K1_GEO: "EMEA" K2_BU: "XYZ" K3_ID: 2, value: COL3: "ZXC" COL4: "ASD", partition: 0
rowtime: 2021/03/12 11:10:51.110 Z, key: K1_GEO: "APAC" K2_BU: "FGH" K3_ID: 9, value: COL3: "QQQ" COL4: "WWW", partition: 0
Topic printing ceased
Enter fullscreen mode Exit fullscreen mode

Now we can push this data to the database. Note the value.converter and key.converter are not set for Protobuf:

CREATE SINK CONNECTOR SINK_FOO_08_4 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'FOO_08_CONVERTED',
    'key.converter'                       = 'io.confluent.connect.protobuf.ProtobufConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.protobuf.ProtobufConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'pk.mode'                             = 'record_key',
    'pk.fields'                           = '',
    'insert.mode'                         = 'upsert',
    'delete.enabled'                      = 'true'
);
Enter fullscreen mode Exit fullscreen mode

This work, and we have data in Postgres matching the schema and primary key as we wanted:

postgres=# \d "FOO_08_CONVERTED";
          Table "public.FOO_08_CONVERTED"
 Column |  Type   | Collation | Nullable | Default
--------+---------+-----------+----------+---------
 COL3   | text    |           |          |
 COL4   | text    |           |          |
 K1_GEO | text    |           | not null |
 K2_BU  | text    |           | not null |
 K3_ID  | integer |           | not null |
Indexes:
    "FOO_08_CONVERTED_pkey" PRIMARY KEY, btree ("K1_GEO", "K2_BU", "K3_ID")

postgres=# SELECT * FROM "FOO_08_CONVERTED" ;
 COL3 | COL4 | K1_GEO | K2_BU | K3_ID
------+------+--------+-------+-------
 FOO  | BAR  | EMEA   | XYZ   |     1
 ZXC  | ASD  | EMEA   | XYZ   |     2
 QQQ  | WWW  | APAC   | FGH   |     9
(3 rows)
Enter fullscreen mode Exit fullscreen mode

If we insert new data and an update for an existing key into the original topic (JSON):

# Key/value are separated by the + character
docker exec -i kafkacat kafkacat -b broker:29092 -t FOO_08 -K+ -P <<EOF
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":10}+{"COL3":"FOO","COL4":"BAR"}
{"K1_GEO":"EMEA","K2_BU":"XYZ","K3_ID":2}+{"COL3":"THIS","COL4":"CHANGED"}
EOF
Enter fullscreen mode Exit fullscreen mode

this flows through automagically to the database:

postgres=# SELECT * FROM "FOO_08_CONVERTED" ;
 COL3 |  COL4   | K1_GEO | K2_BU | K3_ID
------+---------+--------+-------+-------
 FOO  | BAR     | EMEA   | XYZ   |     1
 QQQ  | WWW     | APAC   | FGH   |     9
 FOO  | BAR     | EMEA   | XYZ   |    10
 THIS | CHANGED | EMEA   | XYZ   |     2
(4 rows)
Enter fullscreen mode Exit fullscreen mode

Footnote: changing the table name

You can use Single Message Transform to change the target object in the database to which the data is written. By default it takes the name of the source topic.

Using the RegExRouter we can change FOO_08_CONVERTED to FOO_08 thus:

CREATE SINK CONNECTOR SINK_FOO_08_5 WITH (
    'connector.class'                        = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                         = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                        = 'postgres',
    'connection.password'                    = 'postgres',
    'topics'                                 = 'FOO_08_CONVERTED',
    'key.converter'                          = 'io.confluent.connect.protobuf.ProtobufConverter',
    'key.converter.schema.registry.url'      = 'http://schema-registry:8081',
    'value.converter'                        = 'io.confluent.connect.protobuf.ProtobufConverter',
    'value.converter.schema.registry.url'    = 'http://schema-registry:8081',
    'auto.create'                            = 'true',
    'pk.mode'                                = 'record_key',
    'pk.fields'                              = '',
    'insert.mode'                            = 'upsert',
    'delete.enabled'                         = 'true',
    'transforms'                             = 'changeTopicName',
    'transforms.changeTopicName.type'        = 'org.apache.kafka.connect.transforms.RegexRouter',
    'transforms.changeTopicName.regex'       = '(.*)_CONVERTED$',
    'transforms.changeTopicName.replacement' = '$1'
);
Enter fullscreen mode Exit fullscreen mode

Now there are two tables in the target database - the original one, and the new one minus the _CONVERTED suffix:

postgres=# \d
              List of relations
 Schema |       Name       | Type  |  Owner
--------+------------------+-------+----------
 public | FOO_08           | table | postgres
 public | FOO_08_CONVERTED | table | postgres
(3 rows)

postgres=# SELECT * FROM "FOO_08";
 COL3 |  COL4   | K1_GEO | K2_BU | K3_ID
------+---------+--------+-------+-------
 FOO  | BAR     | EMEA   | XYZ   |     1
 QQQ  | WWW     | APAC   | FGH   |     9
 FOO  | BAR     | EMEA   | XYZ   |    10
 THIS | CHANGED | EMEA   | XYZ   |     2
(4 rows)
Enter fullscreen mode Exit fullscreen mode

Using a Kafka message key in which the schema key has default null values

This key schema causes problems because of "default": null

{
  "type": "record",
  "name": "FOO05key",
  "fields": [
    {
      "name": "K1",
      "type": "string",
      "default": null
    },
    {
      "name": "K2",
      "type": "int",
      "default": null
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

The error you'll get from the sink connector is this:

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
        at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1817)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1538)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1221)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:115)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:535)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:498)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 13 more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: STRING
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
        at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
        ... 24 more
Enter fullscreen mode Exit fullscreen mode

The fix is to remove the instances of "default": null from the schema.

References

Top comments (0)