DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 2: ValueToKey and ExtractField

Setting the key of a Kafka message is important as it ensures correct logical processing when consumed across multiple partitions, as well as being a requirement when joining to messages in other topics. When using Kafka Connect the connector may already set the key, which is great. If not, you can use these two Single Message Transforms (SMT) to set it as part of the pipeline based on a field in the value part of the message.

To use the ValueToKey Single Message Transform specify the name of the field (id) that you want to copy from the value to the key:

"transforms" : "copyIdToKey",
"transforms.copyIdToKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",
Enter fullscreen mode Exit fullscreen mode

This writes it as a Struct to the Key, so you will often want to combine it with the ExtractField Single Message Transform:

"transforms" : "copyIdToKey,extractKeyFromStruct",
"transforms.copyIdToKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",
"transforms.extractKeyFromStruct.type" :"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyFromStruct.field" :"id"
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Example - JDBC Source connector

Let’s start with a basic JDBC source connector:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:mysql://mysql:3306/demo",
  "connection.user": "mysqluser",
  "connection.password": "mysqlpw",
  "topic.prefix": "mysql-00-",
  "poll.interval.ms": 1000,
  "tasks.max":1,
  "table.whitelist" : "customers",
  "mode":"incrementing",
  "incrementing.column.name": "id",
  "validate.non.null": false
}
Enter fullscreen mode Exit fullscreen mode

An ingested Kafka message written by this connector looks like this - note the null key:

{
  "topic": "mysql-00-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512308962,
  "broker": 1,
  "key": null,
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Assuming you want to use the id field from the source as the message key you can add the Single Message Transforms as shown here:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:mysql://mysql:3306/demo",
  "connection.user": "mysqluser",
  "connection.password": "mysqlpw",
  "topic.prefix": "mysql-02-",
  "poll.interval.ms": 1000,
  "tasks.max":1,
  "table.whitelist" : "customers",
  "mode":"incrementing",
  "incrementing.column.name": "id",
  "validate.non.null": false,
  "transforms": "copyIdToKey,extractKeyFromStruct",
  "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.copyIdToKey.fields": "id",
  "transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKeyFromStruct.field":"id"
}
Enter fullscreen mode Exit fullscreen mode

The resulting Kafka message looks like this:

{
  "topic": "mysql-02-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512714619,
  "broker": 1,
  "key": "1",
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

See also Kafka Tutorials

Top comments (0)