"transforms"                                     : "addTimestampToTopic",
"transforms.addTimestampToTopic.type"            : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format"    : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
π Twelve Days of SMT π - Day 11: Predicate and Filter
Apache Kafka 2.6 included KIP-585 which adds support for defining predicates against which transforms are conditionally executed, as well as a Filter Single Message Transform to drop messages - which in combination means that you can conditionally drop messages.
As part of Apache Kafka, Kafka Connect ships with pre-built Single Message Transforms and Predicates, but you can also write you own. The API for each is documented: Transformation / Predicate. The predicates that ship with Apache Kafka are:
- 
RecordIsTombstone- The value part of the message is null (denoting a tombstone message) - 
HasHeaderKey- Matches if a header exists with the name given - 
TopicNameMatches- Matches based on topic 
π Twelve Days of SMT π - Day 10: ReplaceField
The ReplaceField Single Message Transform has three modes of operation on fields of data passing through Kafka Connect:
- 
Include only the fields specified in the list (
include) - 
Include all fields except the ones specified (
exclude) - 
Rename field(s) (
renames) 
π Twelve Days of SMT π - Day 9: Cast
The Cast Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
π Twelve Days of SMT π - Day 8: TimestampConverter
The TimestampConverter Single Message Transform lets you work with timestamp fields in Kafka messages. You can convert a string into a native Timestamp type (or Date or Time), as well as Unix epoch - and the same in reverse too.
This is really useful to make sure that data ingested into Kafka is correctly stored as a Timestamp (if it is one), and also enables you to write a Timestamp out to a sink connector in a string format that you choose.
π Twelve Days of SMT π - Day 7: TimestampRouter
Just like the RegExRouter, the TimeStampRouter can be used to modify the topic name of messages as they pass through Kafka Connect. Since the topic name is usually the basis for the naming of the object to which messages are written in a sink connector, this is a great way to achieve time-based partitioning of those objects if required. For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.
The TimeStampRouter takes two arguments; the format of the final topic name to generate, and the format of the timestamp to put in the topic name (based on SimpleDateFormat).
π Twelve Days of SMT π - Day 6: InsertField II
We kicked off this series by seeing on day 1 how to use InsertField to add in the timestamp to a message passing through the Kafka Connect sink connector. Today weβll see how to use the same Single Message Transform to add in a static field value, as well as the name of the Kafka topic, partition, and offset from which the message has been read.
"transforms"                                : "insertStaticField1",
"transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "sourceSystem",
"transforms.insertStaticField1.static.value": "NeverGonna"
π Twelve Days of SMT π - Day 5: MaskField
If you want to mask fields of data as you ingest from a source into Kafka, or write to a sink from Kafka with Kafka Connect, the MaskField Single Message Transform is perfect for you. It retains the fields whilst replacing its value.
To use the Single Message Transform you specify the field to mask, and its replacement value. To mask the contents of a field called cc_num you would use:
"transforms"                               : "maskCC",
"transforms.maskCC.type"                   : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields"                 : "cc_num",
"transforms.maskCC.replacement"            : "****-****-****-****"
π Twelve Days of SMT π - Day 4: RegExRouter
If you want to change the topic name to which a source connector writes, or object name thatβs created on a target by a sink connector, the RegExRouter is exactly what you need.
To use the Single Message Transform you specify the pattern in the topic name to match, and its replacement. To drop a prefix of test- from a topic you would use:
"transforms"                             : "dropTopicPrefix",
"transforms.dropTopicPrefix.type"        : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex"       : "test-(.*)",
"transforms.dropTopicPrefix.replacement" : "$1"
π Twelve Days of SMT π - Day 3: Flatten
The Flatten Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.
To use the Single Message Transform you only need to reference it; thereβs no additional configuration required:
"transforms"                    : "flatten",
"transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value"