Apache Kafka 2.6 included KIP-585 which adds support for defining predicates against which transforms are conditionally executed, as well as a Filter
Single Message Transform to drop messages - which in combination means that you can conditionally drop messages.
As part of Apache Kafka, Kafka Connect ships with pre-built Single Message Transforms and Predicates, but you can also write you own. The API for each is documented: Transformation
/ Predicate
. The predicates that ship with Apache Kafka are:
-
RecordIsTombstone
- The value part of the message is null (denoting a tombstone message) -
HasHeaderKey
- Matches if a header exists with the name given -
TopicNameMatches
- Matches based on topic
It can be used in both a source and sink connector depending on requirements.
Conditionally renaming fields based on the topic name 🔗
Consider two topics holding logically identical entities, but with varying field names. Awful, isn’t it 😱
-
Topic
day11-sys01
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \ -t day11-sys01 | \ jq '.payload.Gen0'
{ "txn_date": { "string": "Sun Dec 13 02:18:44 GMT 2020" }, "amount" : { "string": "68.32" }, "product" : { "string": "Yeti Imperial Stout" }, "units" : { "string": "56" }, "source" : { "string": "SYS01" } }
-
Topic
day11-systemB
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \ -t day11-systemB | \ jq '.payload.Gen0'
{ "txn_date": { "string": "Mon Dec 07 16:22:03 GMT 2020" }, "cost" : { "string": "63.39" }, "units" : { "string": "27" }, "item" : { "string": "St. Bernardus Abt 12" }, "source" : { "string": "SYSTEM_B" } }
Our esteemed data engineers have determined that in these two systems cost
is the same as amount
and item
the same as product
.
With Kafka Connect and Single Message Transform we can stream this data to a single harmonious home downstream from one connector. This is enabled through two Single Message Transform:
-
Conditionally modifying the field names
-
The transform includes the
predicate
property ("transforms.renameSystemBFields.predicate": "isSystemBTopic"
) which resolves to thepredicates.isSystemBTopic
configuration. -
Predicate configuration follows a similar pattern to transforms;
predicate
prefix, followed by a label (isSystemBTopic
), atype
(org.apache.kafka.connect.transforms.predicates.TopicNameMatches
) and then additional configuration properties as required by the particular predicate being used.
-
-
Renaming the topic (and thus target table name) for all messages to
transactions
.-
Note that this happens in the
transforms
list after the field renaming, otherwise the predicate will not match the topic for.*-systemB
since the topic will already be calledtransactions
-
The sink connector here picks up both topics ("topics.regex" : "day11-.*"
) but only applies the ReplaceField
renames
operation to messages from the day11-systemB
topic.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics.regex" : "day11-.*",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "renameSystemBFields,renameTargetTopic",
"transforms.renameSystemBFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameSystemBFields.renames" : "item:product,cost:amount",
"transforms.renameSystemBFields.predicate": "isSystemBTopic",
"transforms.renameTargetTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTargetTopic.regex" : "day11-.*",
"transforms.renameTargetTopic.replacement": "transactions",
"predicates" : "isSystemBTopic",
"predicates.isSystemBTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isSystemBTopic.pattern" : ".*-systemB"
}'
The resulting table is in the target database looks like this:
mysql> describe transactions;
+----------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+------+------+-----+---------+-------+
| txn_date | text | YES | | NULL | |
| amount | text | YES | | NULL | |
| units | text | YES | | NULL | |
| product | text | YES | | NULL | |
| source | text | YES | | NULL | |
+----------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)
with data from both topics present (identifiable by the different source
values):
mysql> SELECT * FROM transactions LIMIT 5;
+------------------------------+----------+--------+-------------------------------+-------+
| txn_date | source | amount | product | units |
+------------------------------+----------+--------+-------------------------------+-------+
| Tue Dec 08 14:27:13 GMT 2020 | SYS01 | 10.03 | Stone IPA | 39 |
| Tue Dec 15 23:09:20 GMT 2020 | SYSTEM_B | 7.24 | Ruination IPA | 72 |
| Wed Dec 09 06:26:34 GMT 2020 | SYS01 | 92.66 | Bells Expedition | 55 |
| Thu Dec 10 19:38:26 GMT 2020 | SYSTEM_B | 65.11 | Sierra Nevada Celebration Ale | 5 |
| Fri Dec 11 01:38:48 GMT 2020 | SYS01 | 55.52 | Sierra Nevada Celebration Ale | 31 |
+------------------------------+----------+--------+-------------------------------+-------+
5 rows in set (0.00 sec)
Inverting a Predicate 🔗
You can use the negate
option to invert a predicate. Consider this predicate:
"predicates" : "isSystemBTopic",
"predicates.isSystemBTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isSystemBTopic.pattern" : ".*-systemB"
If you wanted to apply a Single Message Transform to any topic except those that matched, you would use "…negate": "true"
"transforms.renameNonSystemBFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameNonSystemBFields.renames" : "product:item,amount:cost",
"transforms.renameNonSystemBFields.predicate": "isSystemBTopic",
"transforms.renameNonSystemBFields.negate" : "true",
Filtering out null records 🔗
Consider a source topic in which there are tombstone (null) records being produced. These may be by design, or by error - but either way, we want to exclude them from the sink connector pipeline.
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -o-10 -u -q -J \
-t sys02 | \
jq -c '[.offset,.key,.payload]'
[88,"0d011ee6-4424-4cb6-8665-61b46918b3d9",null]
[89,"b859f443-e92e-4599-a426-91c4bc6b1d28",null]
[90,"5633d30f-5b08-4a94-8690-b576e3e3d978",null]
[91,"aa0efeae-9dac-43a9-854b-1da3b589dee7",{"Gen0":{"amount":{"string":"73.66"},"txn_date":{"string":"Sun Dec 13 01:21:10 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Kirin Inchiban"},"units":{"string":"67"}}}]
[92,"4de86341-8165-42ca-bbea-276875cc9585",{"Gen0":{"amount":{"string":"6.86"},"txn_date":{"string":"Tue Dec 08 16:42:27 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Trappistes Rochefort 8"},"units":{"string":"61"}}}]
[93,"478dd272-a0cb-4f36-9dcb-73dd5bba245a",{"Gen0":{"amount":{"string":"30.50"},"txn_date":{"string":"Sun Dec 13 11:03:59 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Edmund Fitzgerald Porter"},"units":{"string":"11"}}}]
[94,"50a2e247-1a2b-4321-bc3e-a3980df83c23",{"Gen0":{"amount":{"string":"19.18"},"txn_date":{"string":"Fri Dec 11 03:48:47 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Samuel Smith’s Imperial IPA"},"units":{"string":"4"}}}]
[95,"6f2172b7-d3b2-4890-a295-82a889e9a5b7",null]
[96,"fdfc9d85-fe02-4846-86a7-e31d1acdf26c",{"Gen0":{"amount":{"string":"7.27"},"txn_date":{"string":"Thu Dec 10 09:53:55 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Stone IPA"},"units":{"string":"87"}}}]
[97,"2b307e28-ff01-4f01-9a7e-529c60afb8ce",{"Gen0":{"amount":{"string":"53.49"},"txn_date":{"string":"Wed Dec 16 15:05:38 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Samuel Smith’s Imperial IPA"},"units":{"string":"3"}}}]
Here’s a sink connector similar to above, again using predicate
to apply a transform selectively. In this instance it’s the Filter
transform (which drops a record), applied only when isNullRecord
predicate is true.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "sys02",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropNullRecords",
"transforms.dropNullRecords.type" : "org.apache.kafka.connect.transforms.Filter",
"transforms.dropNullRecords.predicate": "isNullRecord",
"predicates" : "isNullRecord",
"predicates.isNullRecord.type" : "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}'
Filtering records based on the contents of its key or value with the JMES Predicate plugin 🔗
There’s a really useful predicate plugin that you can use which is based on JMES. You can read more about it in this posting on the Confluent Community Forum from the author of the plugin.
Filtering based on the contents of a message 🔗
Confluent Platform includes its own Filter
Single Message Transform. Instead of being intended for use in combination with a predicate
(as the org.apache.kafka.connect.transforms.Filter
transform is), the one in Confluent Platform uses JSON path to specify a predicate based on the data in the message itself.
Here’s an example that filters out all messages except those that include Stout
in the product field:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day11-sys01",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "filterStout",
"transforms.filterStout.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterStout.filter.condition": "$[?(@.product =~ /.*Stout/)]",
"transforms.filterStout.filter.type" : "include"
}'
The resulting data in MySQL has just the expected messages in:
mysql> select * from `day11-sys01`;
+------------------------------+--------+--------+------------------------------+-------+
| txn_date | source | amount | product | units |
+------------------------------+--------+--------+------------------------------+-------+
| Fri Dec 11 07:27:51 GMT 2020 | SYS01 | 58.75 | Stone Imperial Russian Stout | 67 |
| Sat Dec 12 05:15:18 GMT 2020 | SYS01 | 28.66 | Oak Aged Yeti Imperial Stout | 43 |
| Tue Dec 15 10:56:00 GMT 2020 | SYS01 | 73.17 | Storm King Stout | 28 |
| Tue Dec 15 12:46:52 GMT 2020 | SYS01 | 55.06 | Stone Imperial Russian Stout | 68 |
| Tue Dec 15 09:04:27 GMT 2020 | SYS01 | 0.34 | Bourbon County Stout | 33 |
| Wed Dec 09 02:12:24 GMT 2020 | SYS01 | 88.97 | Bourbon County Stout | 28 |
| Sun Dec 13 04:18:51 GMT 2020 | SYS01 | 6.29 | Samuel Smiths Oatmeal Stout | 7 |
| Thu Dec 10 10:51:51 GMT 2020 | SYS01 | 6.95 | Samuel Smiths Oatmeal Stout | 1 |
+------------------------------+--------+--------+------------------------------+-------+
8 rows in set (0.00 sec)
If you want to filter on numerics then make sure the data type is correct; use Cast
if necessary, as shown here. In this case, the order of the "transforms"
is important:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day11-sys01",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "castTypes,filterSmallOrder",
"transforms.filterSmallOrder.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterSmallOrder.filter.condition": "$[?(@.amount < 42)]",
"transforms.filterSmallOrder.filter.type" : "include",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "amount:float32"
}'
In the resulting data you can see that all the values in amount
are less than 42, per the specified filter
mysql> select * from `day11-sys01` LIMIT 10;
+------------------------------+--------+--------+-------------------------------------------+-------+
| txn_date | source | amount | product | units |
+------------------------------+--------+--------+-------------------------------------------+-------+
| Thu Dec 10 00:57:55 GMT 2020 | SYS01 | 3.53 | Sierra Nevada Celebration Ale | 26 |
| Mon Dec 14 01:01:00 GMT 2020 | SYS01 | 10.19 | Racer 5 India Pale Ale, Bear Republic Bre | 26 |
| Wed Dec 09 13:57:03 GMT 2020 | SYS01 | 20.29 | Hennepin | 32 |
| Wed Dec 09 19:58:35 GMT 2020 | SYS01 | 33.27 | 90 Minute IPA | 44 |
| Fri Dec 11 14:21:57 GMT 2020 | SYS01 | 14.87 | Yeti Imperial Stout | 52 |
| Wed Dec 09 17:19:18 GMT 2020 | SYS01 | 28.58 | Yeti Imperial Stout | 60 |
| Wed Dec 09 18:59:01 GMT 2020 | SYS01 | 34.28 | Two Hearted Ale | 67 |
| Mon Dec 07 18:47:19 GMT 2020 | SYS01 | 14.62 | Shakespeare Oatmeal | 47 |
| Sat Dec 12 23:07:38 GMT 2020 | SYS01 | 35.98 | Samuel Smiths Oatmeal Stout | 31 |
| Fri Dec 11 19:14:25 GMT 2020 | SYS01 | 32.12 | Founders Breakfast Stout | 73 |
+------------------------------+--------+--------+-------------------------------------------+-------+
10 rows in set (0.00 sec)