DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 11: Predicate and Filter

Apache Kafka 2.6 included KIP-585 which adds support for defining predicates against which transforms are conditionally executed, as well as a Filter Single Message Transform to drop messages - which in combination means that you can conditionally drop messages.

As part of Apache Kafka, Kafka Connect ships with pre-built Single Message Transforms and Predicates, but you can also write you own. The API for each is documented: Transformation / Predicate. The predicates that ship with Apache Kafka are:

  • RecordIsTombstone - The value part of the message is null (denoting a tombstone message)

  • HasHeaderKey- Matches if a header exists with the name given

  • TopicNameMatches - Matches based on topic

It can be used in both a source and sink connector depending on requirements.

Conditionally renaming fields based on the topic name

Consider two topics holding logically identical entities, but with varying field names. Awful, isn’t it 😱

  • Topic day11-sys01

    docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \
      -t day11-sys01  | \
      jq  '.payload.Gen0'
    
    {
        "txn_date": { "string": "Sun Dec 13 02:18:44 GMT 2020" },
        "amount"  : { "string": "68.32" },
        "product" : { "string": "Yeti Imperial Stout" },
        "units"   : { "string": "56" },
        "source"  : { "string": "SYS01" }
    }
    
  • Topic day11-systemB

    docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \
      -t day11-systemB  | \
      jq  '.payload.Gen0'
    
    {
      "txn_date": { "string": "Mon Dec 07 16:22:03 GMT 2020" },
      "cost"    : { "string": "63.39" },
      "units"   : { "string": "27" },
      "item"    : { "string": "St. Bernardus Abt 12" },
      "source"  : { "string": "SYSTEM_B" }
    }
    

Our esteemed data engineers have determined that in these two systems cost is the same as amount and item the same as product.

With Kafka Connect and Single Message Transform we can stream this data to a single harmonious home downstream from one connector. This is enabled through two Single Message Transforms.

  • Conditionally modifying the field names

    • The transform includes the predicate property ("transforms.renameSystemBFields.predicate": "isSystemBTopic") which resolves to the predicates.isSystemBTopic configuration.
    • Predicate configuration follows a similar pattern to transforms; predicate prefix, followed by a label (isSystemBTopic), a type (org.apache.kafka.connect.transforms.predicates.TopicNameMatches) and then additional configuration properties as required by the particular predicate being used.
  • Renaming the topic (and thus target table name) for all messages to transactions.

    • Note that this happens in the transforms list after the field renaming, otherwise the predicate will not match the topic for .*-systemB since the topic will already be called transactions

The sink connector here picks up both topics ("topics.regex" : "day11-.*") but only applies the ReplaceField renames operation to messages from the day11-systemB topic.

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-00/config \
  -d '{
      "connector.class"                         : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                          : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                         : "mysqluser",
      "connection.password"                     : "mysqlpw",
      "topics.regex"                            : "day11-.*",
      "tasks.max"                               : "4",
      "auto.create"                             : "true",
      "auto.evolve"                             : "true",

      "transforms"                              : "renameSystemBFields,renameTargetTopic",
      "transforms.renameSystemBFields.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameSystemBFields.renames"  : "item:product,cost:amount",
      "transforms.renameSystemBFields.predicate": "isSystemBTopic",

      "transforms.renameTargetTopic.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.renameTargetTopic.regex"      : "day11-.*",
      "transforms.renameTargetTopic.replacement": "transactions",

      "predicates"                              : "isSystemBTopic",
      "predicates.isSystemBTopic.type"          : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
      "predicates.isSystemBTopic.pattern"       : ".*-systemB"
      }'
Enter fullscreen mode Exit fullscreen mode

The resulting table is in the target database looks like this:

mysql> describe transactions;
+----------+------+------+-----+---------+-------+
| Field    | Type | Null | Key | Default | Extra |
+----------+------+------+-----+---------+-------+
| txn_date | text | YES  |     | NULL    |       |
| amount   | text | YES  |     | NULL    |       |
| units    | text | YES  |     | NULL    |       |
| product  | text | YES  |     | NULL    |       |
| source   | text | YES  |     | NULL    |       |
+----------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)
Enter fullscreen mode Exit fullscreen mode

with data from both topics present (identifiable by the different source values):

mysql> SELECT * FROM transactions LIMIT 5;
+------------------------------+----------+--------+-------------------------------+-------+
| txn_date                     | source   | amount | product                       | units |
+------------------------------+----------+--------+-------------------------------+-------+
| Tue Dec 08 14:27:13 GMT 2020 | SYS01    | 10.03  | Stone IPA                     | 39    |
| Tue Dec 15 23:09:20 GMT 2020 | SYSTEM_B | 7.24   | Ruination IPA                 | 72    |
| Wed Dec 09 06:26:34 GMT 2020 | SYS01    | 92.66  | Bells Expedition              | 55    |
| Thu Dec 10 19:38:26 GMT 2020 | SYSTEM_B | 65.11  | Sierra Nevada Celebration Ale | 5     |
| Fri Dec 11 01:38:48 GMT 2020 | SYS01    | 55.52  | Sierra Nevada Celebration Ale | 31    |
+------------------------------+----------+--------+-------------------------------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Filtering out null records

Consider a source topic in which there are tombstone (null) records being produced. These may be by design, or by error - but either way, we want to exclude them from the sink connector pipeline.

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C  -o-10 -u -q -J \
  -t sys02  | \
  jq -c '[.offset,.key,.payload]'

[88,"0d011ee6-4424-4cb6-8665-61b46918b3d9",null]
[89,"b859f443-e92e-4599-a426-91c4bc6b1d28",null]
[90,"5633d30f-5b08-4a94-8690-b576e3e3d978",null]
[91,"aa0efeae-9dac-43a9-854b-1da3b589dee7",{"Gen0":{"amount":{"string":"73.66"},"txn_date":{"string":"Sun Dec 13 01:21:10 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Kirin Inchiban"},"units":{"string":"67"}}}]
[92,"4de86341-8165-42ca-bbea-276875cc9585",{"Gen0":{"amount":{"string":"6.86"},"txn_date":{"string":"Tue Dec 08 16:42:27 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Trappistes Rochefort 8"},"units":{"string":"61"}}}]
[93,"478dd272-a0cb-4f36-9dcb-73dd5bba245a",{"Gen0":{"amount":{"string":"30.50"},"txn_date":{"string":"Sun Dec 13 11:03:59 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Edmund Fitzgerald Porter"},"units":{"string":"11"}}}]
[94,"50a2e247-1a2b-4321-bc3e-a3980df83c23",{"Gen0":{"amount":{"string":"19.18"},"txn_date":{"string":"Fri Dec 11 03:48:47 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Samuel Smith’s Imperial IPA"},"units":{"string":"4"}}}]
[95,"6f2172b7-d3b2-4890-a295-82a889e9a5b7",null]
[96,"fdfc9d85-fe02-4846-86a7-e31d1acdf26c",{"Gen0":{"amount":{"string":"7.27"},"txn_date":{"string":"Thu Dec 10 09:53:55 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Stone IPA"},"units":{"string":"87"}}}]
[97,"2b307e28-ff01-4f01-9a7e-529c60afb8ce",{"Gen0":{"amount":{"string":"53.49"},"txn_date":{"string":"Wed Dec 16 15:05:38 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Samuel Smith’s Imperial IPA"},"units":{"string":"3"}}}]
Enter fullscreen mode Exit fullscreen mode

Here’s a sink connector similar to above, again using predicate to apply a transform selectively. In this instance it’s the Filter transform (which drops a record), applied only when isNullRecord predicate is true.

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-01/config \
  -d '{
      "connector.class"                     : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                      : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                     : "mysqluser",
      "connection.password"                 : "mysqlpw",
      "topics"                              : "sys02",
      "tasks.max"                           : "4",
      "auto.create"                         : "true",
      "auto.evolve"                         : "true",

      "transforms"                          : "dropNullRecords",
      "transforms.dropNullRecords.type"     : "org.apache.kafka.connect.transforms.Filter",
      "transforms.dropNullRecords.predicate": "isNullRecord",

      "predicates"                          : "isNullRecord",
      "predicates.isNullRecord.type"        : "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
      }'
Enter fullscreen mode Exit fullscreen mode

Filtering based on the contents of a message

Confluent Platform includes its own Filter Single Message Transform. Instead of being intended for use in combination with a predicate (as the org.apache.kafka.connect.transforms.Filter transform is), the one in Confluent Platform uses JSON path to specify a predicate based on the data in the message itself.

Here’s an example that filters out all messages except those that include Stout in the product field:

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
  -d '{
      "connector.class"                        : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                         : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                        : "mysqluser",
      "connection.password"                    : "mysqlpw",
      "topics"                                 : "day11-sys01",
      "tasks.max"                              : "4",
      "auto.create"                            : "true",
      "auto.evolve"                            : "true",

      "transforms"                             : "filterStout",
      "transforms.filterStout.type"            : "io.confluent.connect.transforms.Filter$Value",
      "transforms.filterStout.filter.condition": "$[?(@.product =~ /.*Stout/)]",
      "transforms.filterStout.filter.type"     : "include"
      }'
Enter fullscreen mode Exit fullscreen mode

The resulting data in MySQL has just the expected messages in:

mysql> select * from `day11-sys01`;
+------------------------------+--------+--------+------------------------------+-------+
| txn_date                     | source | amount | product                      | units |
+------------------------------+--------+--------+------------------------------+-------+
| Fri Dec 11 07:27:51 GMT 2020 | SYS01  | 58.75  | Stone Imperial Russian Stout | 67    |
| Sat Dec 12 05:15:18 GMT 2020 | SYS01  | 28.66  | Oak Aged Yeti Imperial Stout | 43    |
| Tue Dec 15 10:56:00 GMT 2020 | SYS01  | 73.17  | Storm King Stout             | 28    |
| Tue Dec 15 12:46:52 GMT 2020 | SYS01  | 55.06  | Stone Imperial Russian Stout | 68    |
| Tue Dec 15 09:04:27 GMT 2020 | SYS01  | 0.34   | Bourbon County Stout         | 33    |
| Wed Dec 09 02:12:24 GMT 2020 | SYS01  | 88.97  | Bourbon County Stout         | 28    |
| Sun Dec 13 04:18:51 GMT 2020 | SYS01  | 6.29   | Samuel Smiths Oatmeal Stout  | 7     |
| Thu Dec 10 10:51:51 GMT 2020 | SYS01  | 6.95   | Samuel Smiths Oatmeal Stout  | 1     |
+------------------------------+--------+--------+------------------------------+-------+
8 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

If you want to filter on numerics then make sure the data type is correct; use Cast if necessary, as shown here. In this case, the order of the "transforms" is important:

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
  -d '{
      "connector.class"                        : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"                         : "jdbc:mysql://mysql:3306/demo",
      "connection.user"                        : "mysqluser",
      "connection.password"                    : "mysqlpw",
      "topics"                                 : "day11-sys01",
      "tasks.max"                              : "4",
      "auto.create"                            : "true",
      "auto.evolve"                            : "true",

      "transforms"                                  : "castTypes,filterSmallOrder",
      "transforms.filterSmallOrder.type"            : "io.confluent.connect.transforms.Filter$Value",
      "transforms.filterSmallOrder.filter.condition": "$[?(@.amount < 42)]",
      "transforms.filterSmallOrder.filter.type"     : "include",
      "transforms.castTypes.type"                   : "org.apache.kafka.connect.transforms.Cast$Value",
      "transforms.castTypes.spec"                   : "amount:float32"
      }'
Enter fullscreen mode Exit fullscreen mode

In the resulting data you can see that all the values in amount are less than 42, per the specified filter

mysql> select * from `day11-sys01` LIMIT 10;
+------------------------------+--------+--------+-------------------------------------------+-------+
| txn_date                     | source | amount | product                                   | units |
+------------------------------+--------+--------+-------------------------------------------+-------+
| Thu Dec 10 00:57:55 GMT 2020 | SYS01  |   3.53 | Sierra Nevada Celebration Ale             | 26    |
| Mon Dec 14 01:01:00 GMT 2020 | SYS01  |  10.19 | Racer 5 India Pale Ale, Bear Republic Bre | 26    |
| Wed Dec 09 13:57:03 GMT 2020 | SYS01  |  20.29 | Hennepin                                  | 32    |
| Wed Dec 09 19:58:35 GMT 2020 | SYS01  |  33.27 | 90 Minute IPA                             | 44    |
| Fri Dec 11 14:21:57 GMT 2020 | SYS01  |  14.87 | Yeti Imperial Stout                       | 52    |
| Wed Dec 09 17:19:18 GMT 2020 | SYS01  |  28.58 | Yeti Imperial Stout                       | 60    |
| Wed Dec 09 18:59:01 GMT 2020 | SYS01  |  34.28 | Two Hearted Ale                           | 67    |
| Mon Dec 07 18:47:19 GMT 2020 | SYS01  |  14.62 | Shakespeare Oatmeal                       | 47    |
| Sat Dec 12 23:07:38 GMT 2020 | SYS01  |  35.98 | Samuel Smiths Oatmeal Stout               | 31    |
| Fri Dec 11 19:14:25 GMT 2020 | SYS01  |  32.12 | Founders Breakfast Stout                  | 73    |
+------------------------------+--------+--------+-------------------------------------------+-------+
10 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Try it out!

You can find the full code for trying this outβ€”including a Docker Compose so you can spin it up on your local machineβ€” πŸ‘Ύ here

Top comments (0)