If you want to mask fields of data as you ingest from a source into Kafka, or write to a sink from Kafka with Kafka Connect, the MaskField
Single Message Transform is perfect for you. It retains the fields whilst replacing its value.
To use the Single Message Transform you specify the field to mask, and its replacement value. To mask the contents of a field called cc_num
you would use:
"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num",
"transforms.maskCC.replacement" : "****-****-****-****"
Source data đź”—
Here’s the data being written by our source connector:
$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-00-person -C -c1 -o beginning -u -q -J | jq '.'
{
"topic": "day5-00-person",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1607595830038,
"broker": 1,
"key": "648e4c63-ba83-4f3b-befe-505c07d9694f",
"payload": {
"Gen0": {
"phone_num": {
"string": "702.792.9316 x5223"
},
"lastName": {
"string": "Reinger"
},
"cc_exp": {
"string": "2015-11-11"
},
"firstName": {
"string": "Wayne"
},
"cc_num": {
"string": "1234-2121-1221-1211"
},
"fullAddress": {
"string": "2338 Andres Ville, North Tonymouth, OH 18398-3319"
}
}
}
}
Check out all that sensitive information! Plenty of times we don’t want to have that either stored in the Kafka topic in the first place, or we OK with that but we don’t want to stream it verbatim to a target sink.
Masking data at ingest with Kafka Connect source connector đź”—
We can mask the data from an source connector in Kafka Connect with the MaskField
Single Message Transform. Whilst the above Kafka Connect source connector is just generating dummy data, is still a Kafka Connect source connector nonetheless, so what I’m showing here is equally applicable to other source connectors—JDBC, Debezium, MQ, you name it—that’s the beauty of the Kafka Connect architecture, that you can mix & match connectors with transforms (with converters too, but that’s another subject for another day).
Let’s create a second version of the above Kafka Connect source connector, and this time mask out the credit card number at ingest:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day5-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day5-01-person.with" : "#{Internet.uuid}",
"genv.day5-01-person.firstName.with" : "#{Address.firstName}",
"genv.day5-01-person.lastName.with" : "#{Address.lastName}",
"genv.day5-01-person.fullAddress.with" : "#{Address.fullAddress}",
"genv.day5-01-person.phone_num.with" : "#{PhoneNumber.phoneNumber}",
"genv.day5-01-person.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day5-01-person.cc_exp.with" : "#{Business.creditCardExpiry}",
"topic.day5-01-person.throttle.ms" : 500,
"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num,cc_exp",
"transforms.maskCC.replacement" : "<masked>"
}'
Here’s what it does to the data written to Kafka:
$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-01-person -C -c1 -o end -u -q -J | jq '.'
{
"topic": "day5-01-person",
"partition": 0,
"offset": 201,
"tstype": "create",
"ts": 1607596416414,
"broker": 1,
"key": "f4d038da-bcc1-40d1-8b29-08d52305e796",
"payload": {
"Gen0": {
"fullAddress": {
"string": "Apt. 801 356 Frami Canyon, East Irene, CA 15876-9608"
},
"phone_num": {
"string": "(910) 077-1824 x74606"
},
"cc_exp": {
"string": "<masked>"
},
"lastName": {
"string": "Thompson"
},
"cc_num": {
"string": "<masked>"
},
"firstName": {
"string": "Wade"
}
}
}
}
Masking data at egress in a Kafka Connect sink connector đź”—
See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code
) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code
)
Here we’re going to stream the data from that topic above (day5-01-person
) to MySQL, with the assumption that whilst the owner of the Kafka topic is permitted to hold a user’s full address, the owner of the database to which we’re streaming the data is not, and thus will mask it out at egress:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day5-01-person",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "maskAddress",
"transforms.maskAddress.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskAddress.fields" : "fullAddress",
"transforms.maskAddress.replacement" : "[❌redacted❌]"
}'
Now if we launch MySQL
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
and check out the table we can see we have the full schema:
mysql> describe `day5-01-person`;
+-------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| fullAddress | text | YES | | NULL | |
| phone_num | text | YES | | NULL | |
| cc_exp | text | YES | | NULL | |
| lastName | text | YES | | NULL | |
| cc_num | text | YES | | NULL | |
| firstName | text | YES | | NULL | |
+-------------+------+------+-----+---------+-------+
6 rows in set (0.00 sec)
and the data is as it should be, with the fullAddress
masked:
mysql> select * from `day5-01-person` LIMIT 5;
+--------------+-----------------------+----------+---------------+----------+-------------+
| fullAddress | phone_num | cc_exp | lastName | cc_num | firstName |
+--------------+-----------------------+----------+---------------+----------+-------------+
| [?redacted?] | 383-349-1787 x792 | <masked> | Koss | <masked> | Courtney |
| [?redacted?] | 989-731-8207 | <masked> | Hand | <masked> | Trudy |
| [?redacted?] | 809.775.2196 x66858 | <masked> | Pollich | <masked> | Jame |
| [?redacted?] | 1-788-930-4978 | <masked> | Boyle | <masked> | Claud |
| [?redacted?] | (599) 706-0041 x0995 | <masked> | Stark | <masked> | Gertie |
+--------------+-----------------------+----------+---------------+----------+-------------+
5 rows in set (0.00 sec)
(maybe the ❌ emojis were just a step too far ;)
)