DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 8: TimestampConverter

The TimestampConverter Single Message Transform lets you work with timestamp fields in Kafka messages. You can convert a string into a native Timestamp type (or Date or Time), as well as Unix epoch - and the same in reverse too.

This is really useful to make sure that data ingested into Kafka is correctly stored as a Timestamp (if it is one), and also enables you to write a Timestamp out to a sink connector in a string format that you choose.

The TimestampConverter takes three arguments; the name of the field holding the timestamp, the data type to which you want to convert it, and the format of the timestamp to parse (if reading a string and target.type is unix/timestamp/date/time) or write (if you’re writing a string and target.type is string). The timestamp format is based on SimpleDateFormat.

"transforms" : "convertTS",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp"
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Change string-based timestamp into timestamp

In the payload the txn_date field looks like a timestamp:

$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day8-transactions -C -c5 -o-5 -u -q -J | \
  jq '.payload.Gen0.txn_date.string'

"Thu Dec 10 17:06:59 GMT 2020"
"Sat Dec 05 16:39:40 GMT 2020"
"Sat Dec 05 21:43:46 GMT 2020"
"Sun Dec 13 20:30:21 GMT 2020"
"Wed Dec 09 06:18:31 GMT 2020"
Enter fullscreen mode Exit fullscreen mode

But if we have a look at the actual schema for the value (since it’s serialised as Avro; the same would apply for Protobuf or JSON Schema) we can see that it might look like a timestamp, quack like a timestamp, but is in fact not a timestamp per se, but a string:

$ curl -s "http://localhost:8081/subjects/day8-transactions-value/versions/latest" | jq '.schema|fromjson[]'

"null"
{
  "type": "record",
  "name": "Gen0",
  "namespace": "io.mdrogalis",
  "fields": [
[…]
    {
      "name": "txn_date",
      "type": [
        "null",
        "string"
      ]
[…]
Enter fullscreen mode Exit fullscreen mode

This means that when the data is used by a consumer, such as a Kafka Connect sink, it’s still handled as a string with the result that the target object type will usually inherit the same. Here’s an example of the JDBC sink connector (See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)):

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password": "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true"
        }'
Enter fullscreen mode Exit fullscreen mode
mysql> describe `day8-transactions`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| cost             | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)

Enter fullscreen mode Exit fullscreen mode

Note that txn_date is a text field, which is no use for anyone who wants to use it as the timestamp that it is.

This is where the TimestampConverter comes in. In our example it’s going to cast the string as it passes through Kafka Connect with the supplied date and time format string to a timestamp. You can also use it to convert between epoch timestamp value, and also to target a string, epoch, date, or time (as well as actual timestamp).

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-01/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Timestamp",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withTS"
        }'
Enter fullscreen mode Exit fullscreen mode

Here’s the resulting table in MySQL:

mysql> describe `day8-transactions_withTS`;
+------------------+-------------+------+-----+---------+-------+
| Field            | Type        | Null | Key | Default | Extra |
+------------------+-------------+------+-----+---------+-------+
| customer_remarks | text        | YES  |     | NULL    |       |
| item             | text        | YES  |     | NULL    |       |
| cost             | text        | YES  |     | NULL    |       |
| card_type        | text        | YES  |     | NULL    |       |
| txn_date         | datetime(3) | YES  |     | NULL    |       |
+------------------+-------------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

As mentioned above, you can also extract just the date or time components of the timestamp by changing the target.type.

Date only

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-02/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Date",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withDate"
        }'
Enter fullscreen mode Exit fullscreen mode

Resulting table in MySQL:

mysql> describe `day8-transactions_withDate`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| cost             | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| txn_date         | date | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)
Enter fullscreen mode Exit fullscreen mode
mysql> select txn_date from `day8-transactions_withDate` LIMIT 5;
+------------+
| txn_date   |
+------------+
| 2020-01-04 |
| 2020-01-04 |
| 2019-12-29 |
| 2020-01-01 |
| 2019-12-29 |
+------------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Time only

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-03/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Time",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withTime"
        }'
Enter fullscreen mode Exit fullscreen mode

Resulting table in MySQL:

mysql> describe `day8-transactions_withTime`;
+------------------+---------+------+-----+---------+-------+
| Field            | Type    | Null | Key | Default | Extra |
+------------------+---------+------+-----+---------+-------+
| customer_remarks | text    | YES  |     | NULL    |       |
| item             | text    | YES  |     | NULL    |       |
| cost             | text    | YES  |     | NULL    |       |
| card_type        | text    | YES  |     | NULL    |       |
| txn_date         | time(3) | YES  |     | NULL    |       |
+------------------+---------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode
mysql> select txn_date from `day8-transactions_withTime` LIMIT 5;
+--------------+
| txn_date     |
+--------------+
| 14:05:19.000 |
| 14:09:11.000 |
| 19:18:25.000 |
| 03:22:06.000 |
| 09:57:44.000 |
+--------------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Unix Epoch

You can also write an unix epoch:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-04/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "unix",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withUnixEpoch"
        }'
Enter fullscreen mode Exit fullscreen mode

Resulting table in MySQL:

mysql> describe `day8-transactions_withUnixEpoch`;
+------------------+--------+------+-----+---------+-------+
| Field            | Type   | Null | Key | Default | Extra |
+------------------+--------+------+-----+---------+-------+
| customer_remarks | text   | YES  |     | NULL    |       |
| item             | text   | YES  |     | NULL    |       |
| cost             | text   | YES  |     | NULL    |       |
| card_type        | text   | YES  |     | NULL    |       |
| txn_date         | bigint | YES  |     | NULL    |       |
+------------------+--------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode
mysql> select txn_date from `day8-transactions_withUnixEpoch` LIMIT 5;
+---------------+
| txn_date      |
+---------------+
| 1577973919000 |
| 1577714951000 |
| 1577819905000 |
| 1577762526000 |
| 1577786264000 |
+---------------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

If you have timestamp in unix epoch (bigint) as the source, you can use TimestampConverter to write it as a timestamp/date/time, and also as a string - if you do the latter then the format configuration applies to the format in which the string will be written.

Accessing timestamps in nested fields

Unfortunately the TimestampConverter only works on root-level elements; it can’t be used on timestamp fields that are nested in other fields. You’d need to either use Flatten first, or write your own transformation.

Try it out!

You can find the full code for trying this outβ€”including a Docker Compose so you can spin it up on your local machineβ€” πŸ‘Ύ here

Top comments (0)