DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 1: InsertField (timestamp)

You can use the InsertField Single Message Transform (SMT) to add the message timestamp into each message that Kafka Connect sends to a sink.

To use the Single Message Transform specify the name of the field (timestamp.field) that you want to add to hold the message timestamp:

"transforms" : "insertTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
Enter fullscreen mode Exit fullscreen mode

The message timestamp can be set by the producer API explicitly, or allowed to default to the setting on the broker (log.message.timestamp.type) or topic (message.timestamp.type) which by default is the time on the broker at which the message is created (CreateTime). Message timestamps were added in Apache Kafka 0.10 in KIP-32.

πŸ‘Ύ Demo code

Example - JDBC Sink connector

Given a JDBC sink connector that looks like this:

{
  "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url" : "jdbc:mysql://mysql:3306/demo",
  "connection.user" : "mysqluser",
  "connection.password" : "mysqlpw",
  "topics" : "transactions",
  "tasks.max" : "4",
  "auto.create" : "true"
}
Enter fullscreen mode Exit fullscreen mode

you can add the Single Message Transform as shown here:

{
  "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url" : "jdbc:mysql://mysql:3306/demo",
  "connection.user" : "mysqluser",
  "connection.password" : "mysqlpw",
  "topics" : "transactions",
  "tasks.max" : "4",
  "auto.create" : "true",
  "auto.evolve" : "true",
  "transforms" : "insertTS",
  "transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insertTS.timestamp.field": "messageTS"
}
Enter fullscreen mode Exit fullscreen mode

Note auto.evolve=true otherwise the target table won’t hold the new field unless it happens to exist already.

The JDBC connector correctly populates the new field as a timestamp type:

mysql> describe transactions;
+-------------+-------------+------+-----+---------+-------+
| Field       | Type        | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| customer_id | text        | YES  |     | NULL    |       |
| cost        | text        | YES  |     | NULL    |       |
| item        | text        | YES  |     | NULL    |       |
| card_type   | text        | YES  |     | NULL    |       |
| messageTS   | datetime(3) | YES  |     | NULL    |       |
+-------------+-------------+------+-----+---------+-------+
5 rows in set (0.01 sec)

mysql> select * from transactions limit 5;
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
| customer_id                          | cost  | item                  | card_type | messageTS               |
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
| b7187b3a-8ef4-469f-a99d-29dbc0cc3608 | 40.26 | Ten FIDY              | discover  | 2020-12-08 22:42:26.503 |
| c44a2596-ad4a-4c51-ba81-0a86cc48a2d3 | 96.60 | Trois Pistoles        | maestro   | 2020-12-08 22:42:26.842 |
| b7187b3a-8ef4-469f-a99d-29dbc0cc3608 | 53.44 | Chimay Grande Rserve | visa      | 2020-12-08 22:42:27.341 |
| c44a2596-ad4a-4c51-ba81-0a86cc48a2d3 | 25.33 | Chimay Grande Rserve | dankort   | 2020-12-08 22:42:27.841 |
| 573a732e-1d42-4749-a99d-a89391cd2858 | 43.95 | Arrogant Bastard Ale  | switch    | 2020-12-08 22:42:28.342 |
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)

Example - S3 Sink connector

InsertField writes the timestamp as a unix epoch value - if you’d rather it in a string then you can use an additional Single Message Transform, TimestampConverter as shown here in an example with the AWS S3 sink connector:

{
  "connector.class" : "io.confluent.connect.s3.S3SinkConnector",
  "storage.class" : "io.confluent.connect.s3.storage.S3Storage",
  "s3.region" : "us-west-2",
  "s3.bucket.name" : "rmoff-smt-demo-01",
  "topics" : "customers,transactions",
  "tasks.max" : "4",
  "flush.size" : "16",
  "format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
  "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "schema.compatibility" : "NONE",
  "partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
  "transforms" : "insertTS,formatTS",
  "transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insertTS.timestamp.field" : "messageTS",
  "transforms.formatTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.formatTS.format" : "yyyy-MM-dd HH:mm:ss:SSS",
  "transforms.formatTS.field" : "messageTS",
  "transforms.formatTS.target.type" : "string"
}
Enter fullscreen mode Exit fullscreen mode

This writes messages as JSON to S3 that look like this:

{
  "customer_id": "d0394941-2d2a-4d34-a374-23e5f5364ea9",
  "cost": "25.21",
  "item": "Founders Breakfast Stout",
  "card_type": "jcb",
  "messageTS": "2020-12-08 16:07:39:904"
}
Enter fullscreen mode Exit fullscreen mode

See also πŸŽ₯ Kafka Connect in Action : S3 Sink (πŸ‘Ύ kafka-to-s3 demo code)

Top comments (0)