DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 5: MaskField

If you want to mask fields of data as you ingest from a source into Kafka, or write to a sink from Kafka with Kafka Connect, the MaskField Single Message Transform is perfect for you. It retains the fields whilst replacing its value.

To use the Single Message Transform you specify the field to mask, and its replacement value. To mask the contents of a field called cc_num you would use:

"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num",
"transforms.maskCC.replacement" : " ****-**** - ****-****"
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Source data

Here’s the data being written by our source connector:

$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-00-person -C -c1 -o beginning -u -q -J | jq '.'

{
  "topic": "day5-00-person",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607595830038,
  "broker": 1,
  "key": "648e4c63-ba83-4f3b-befe-505c07d9694f",
  "payload": {
    "Gen0": {
      "phone_num": {
        "string": "702.792.9316 x5223"
      },
      "lastName": {
        "string": "Reinger"
      },
      "cc_exp": {
        "string": "2015-11-11"
      },
      "firstName": {
        "string": "Wayne"
      },
      "cc_num": {
        "string": "1234-2121-1221-1211"
      },
      "fullAddress": {
        "string": "2338 Andres Ville, North Tonymouth, OH 18398-3319"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Check out all that sensitive information! Plenty of times we don’t want to have that either stored in the Kafka topic in the first place, or we OK with that but we don’t want to stream it verbatim to a target sink.

Masking data at ingest with Kafka Connect source connector

We can mask the data from an source connector in Kafka Connect with the MaskField Single Message Transform. Whilst the above Kafka Connect source connector is just generating dummy data, is still a Kafka Connect source connector nonetheless, so what I’m showing here is equally applicable to other source connectorsβ€”JDBC, Debezium, MQ, you name itβ€”that’s the beauty of the Kafka Connect architecture, that you can mix & match connectors with transforms (with converters too, but that’s another subject for another day).

Let’s create a second version of the above Kafka Connect source connector, and this time mask out the credit card number at ingest:

curl -i -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day5-01/config \
    -d '{
        "connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day5-01-person.with" : "#{Internet.uuid}",
        "genv.day5-01-person.firstName.with" : "#{Address.firstName}",
        "genv.day5-01-person.lastName.with" : "#{Address.lastName}",
        "genv.day5-01-person.fullAddress.with" : "#{Address.fullAddress}",
        "genv.day5-01-person.phone_num.with" : "#{PhoneNumber.phoneNumber}",
        "genv.day5-01-person.cc_num.with" : "#{Business.creditCardNumber}",
        "genv.day5-01-person.cc_exp.with" : "#{Business.creditCardExpiry}",
        "topic.day5-01-person.throttle.ms" : 500,
        "transforms" : "maskCC",
        "transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
        "transforms.maskCC.fields" : "cc_num,cc_exp",
        "transforms.maskCC.replacement" : "<masked>"
    }'
Enter fullscreen mode Exit fullscreen mode

Here’s what it does to the data written to Kafka:

$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-01-person -C -c1 -o end -u -q -J | jq '.'

{
  "topic": "day5-01-person",
  "partition": 0,
  "offset": 201,
  "tstype": "create",
  "ts": 1607596416414,
  "broker": 1,
  "key": "f4d038da-bcc1-40d1-8b29-08d52305e796",
  "payload": {
    "Gen0": {
      "fullAddress": {
        "string": "Apt. 801 356 Frami Canyon, East Irene, CA 15876-9608"
      },
      "phone_num": {
        "string": "(910) 077-1824 x74606"
      },
      "cc_exp": {
        "string": "<masked>"
      },
      "lastName": {
        "string": "Thompson"
      },
      "cc_num": {
        "string": "<masked>"
      },
      "firstName": {
        "string": "Wade"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Masking data at egress in a Kafka Connect sink connector

πŸ‘‰ https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)

Here we’re going to stream the data from that topic above (day5-01-person) to MySQL, with the assumption that whilst the owner of the Kafka topic is permitted to hold a user’s full address, the owner of the database to which we’re streaming the data is not, and thus will mask it out at egress:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day5-01-person",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "maskAddress",
          "transforms.maskAddress.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
          "transforms.maskAddress.fields" : "fullAddress",
          "transforms.maskAddress.replacement" : "[❌redacted❌]"
        }'
Enter fullscreen mode Exit fullscreen mode

Now if we launch MySQL

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
Enter fullscreen mode Exit fullscreen mode

and check out the table we can see we have the full schema:

mysql> describe `day5-01-person`;
+-------------+------+------+-----+---------+-------+
| Field       | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| fullAddress | text | YES  |     | NULL    |       |
| phone_num   | text | YES  |     | NULL    |       |
| cc_exp      | text | YES  |     | NULL    |       |
| lastName    | text | YES  |     | NULL    |       |
| cc_num      | text | YES  |     | NULL    |       |
| firstName   | text | YES  |     | NULL    |       |
+-------------+------+------+-----+---------+-------+
6 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

and the data is as it should be, with the fullAddress masked:

mysql> select * from `day5-01-person` LIMIT 5;
+--------------+-----------------------+----------+---------------+----------+-------------+
| fullAddress  | phone_num             | cc_exp   | lastName      | cc_num   | firstName   |
+--------------+-----------------------+----------+---------------+----------+-------------+
| [?redacted?] | 383-349-1787 x792     | <masked> | Koss          | <masked> | Courtney    |
| [?redacted?] | 989-731-8207          | <masked> | Hand          | <masked> | Trudy       |
| [?redacted?] | 809.775.2196 x66858   | <masked> | Pollich       | <masked> | Jame        |
| [?redacted?] | 1-788-930-4978        | <masked> | Boyle         | <masked> | Claud       |
| [?redacted?] | (599) 706-0041 x0995  | <masked> | Stark         | <masked> | Gertie      |
+--------------+-----------------------+----------+---------------+----------+-------------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

(maybe the ❌ emojis were just a step too far ;) )

Top comments (0)