The Cast
Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
Cast
takes one argument listing the field(s) that you would like to transform and the target data type. Multiple fields can be specified by separating each specification with a comma.
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
Changing field data types as they pass through Kafka Connect
Because weβre developers who appreciate the finer things in life, including the importance of schemas, weβre using a suitable serialisation method for our data - Avro, Protobuf, or JSON Schema. The beauty of this is that the full schema is persisted at ingest, and available for any consumer. It also means that we want to make sure that the schema is correct for the data.
Letβs use the REST API of the Schema Registry to take a look at the schema of the data that our source connector is streaming into Kafka:
$ curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'
"fields": [
{
"name": "cost",
"type": ["null", "string"],
},
{
"name": "customer_remarks",
"type": ["null", "string"],
},
{
"name": "units",
"type": ["null", "string"],
},
{
"name": "card_type",
"type": ["null", "string"],
},
{
"name": "txn_date",
"type": ["null", "string"],
},
{
"name": "item",
"type": ["null", "string"],
}
]
(Iβve cut bits of the schema shown above for the sake of space)
This data streamed as-is to a target system willβunless the source system has something like Elasticsearchβs dynamic field mappingβremain in its string
/text
types:
mysql> describe `day9-transactions`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost | text | YES | | NULL | |
| customer_remarks | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | text | YES | | NULL | |
| item | text | YES | | NULL | |
| units | text | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
6 rows in set (0.01 sec)
We saw previously how we handled the txn_date
being a string when it should be a timestamp. Now we will use the Cast
Single Message Transform to cast the two fields currently held as string to their correct numeric types.
We can do this at ingest or egress; if the conversion is to rectify an incorrect type then logically this should be done at ingest in the source connector. If itβs to fix a specific requirement in a technology being written to in a sink connector then it should be done there instead.
Here weβll apply the fix to the source connector. Note that weβre using a new target topic (day9-01
) because otherwise youβll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409
).
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day9-01-transactions.with" : "#{Internet.uuid}",
"genv.day9-01-transactions.cost.with" : "#{Commerce.price}",
"genv.day9-01-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day9-01-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day9-01-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day9-01-transactions.item.with" : "#{Beer.name}",
"topic.day9-01-transactions.throttle.ms" : 1000,
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
}'
Now the schema of the data in Kafka is correct for the data:
"fields": [
{
"name": "txn_date",
"type": ["null", "string"],
},
{
"name": "units",
"type": [
"null", { "type": "int", "connect.type": "int16" } ],
},
{
"name": "customer_remarks",
"type": ["null", "string"],
},
{
"name": "cost",
"type": ["null", "float"],
},
{
"name": "item",
"type": [ "null", "string"
],
},
{
"name": "card_type",
"type": ["null", "string"],
}
]
and when itβs used in a sink connector the data is correctly stored in the target system:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day9-01-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"}'
mysql> describe `day9-01-transactions`;
+------------------+----------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+----------+------+-----+---------+-------+
| txn_date | text | YES | | NULL | |
| units | smallint | YES | | NULL | |
| customer_remarks | text | YES | | NULL | |
| cost | float | YES | | NULL | |
| item | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
+------------------+----------+------+-----+---------+-------+
6 rows in set (0.00 sec)
mysql> select item, units, cost from `day9-01-transactions` LIMIT 5;
+-----------------------+-------+-------+
| item | units | cost |
+-----------------------+-------+-------+
| Alpha King Pale Ale | 29 | 17.49 |
| Brooklyn Black | 36 | 92.88 |
| St. Bernardus Abt 12 | 17 | 94.04 |
| Celebrator Doppelbock | 63 | 58.64 |
| Ten FIDY | 85 | 60.53 |
+-----------------------+-------+-------+
5 rows in set (0.00 sec)
Try it out!
You can find the full code for trying this outβincluding a Docker Compose so you can spin it up on your local machineβ πΎ here
Top comments (0)