Setting the key of a Kafka message is important as it ensures correct logical processing when consumed across multiple partitions, as well as being a requirement when joining to messages in other topics. When using Kafka Connect the connector may already set the key, which is great. If not, you can use these two Single Message Transforms (SMT) to set it as part of the pipeline based on a field in the value part of the message.
To use the ValueToKey
Single Message Transform specify the name of the field (id
) that you want to copy from the value to the key:
"transforms" : "copyIdToKey",
"transforms.copyIdToKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",
This writes it as a Struct
to the Key, so you will often want to combine it with the ExtractField
Single Message Transform:
"transforms" : "copyIdToKey,extractKeyFromStruct",
"transforms.copyIdToKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",
"transforms.extractKeyFromStruct.type" :"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyFromStruct.field" :"id"
Example - JDBC Source connector ๐
Letโs start with a basic JDBC source connector:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topic.prefix": "mysql-00-",
"poll.interval.ms": 1000,
"tasks.max":1,
"table.whitelist" : "customers",
"mode":"incrementing",
"incrementing.column.name": "id",
"validate.non.null": false
}
An ingested Kafka message written by this connector looks like this - note the null
key:
{
"topic": "mysql-00-customers",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1607512308962,
"broker": 1,
"key": null,
"payload": {
"id": {
"int": 1
},
"full_name": {
"string": "Leone Puxley"
},
"birthdate": {
"int": 9167
},
"fav_animal": {
"string": "Violet-eared waxbill"
},
"fav_colour": {
"string": "Puce"
},
"fav_movie": {
"string": "Oh! What a Lovely War"
}
}
}
Assuming you want to use the id
field from the source as the message key you can add the Single Message Transforms as shown here:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topic.prefix": "mysql-02-",
"poll.interval.ms": 1000,
"tasks.max":1,
"table.whitelist" : "customers",
"mode":"incrementing",
"incrementing.column.name": "id",
"validate.non.null": false,
"transforms": "copyIdToKey,extractKeyFromStruct",
"transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields": "id",
"transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyFromStruct.field":"id"
}
The resulting Kafka message looks like this:
{
"topic": "mysql-02-customers",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1607512714619,
"broker": 1,
"key": "1",
"payload": {
"id": {
"int": 1
},
"full_name": {
"string": "Leone Puxley"
},
"birthdate": {
"int": 9167
},
"fav_animal": {
"string": "Violet-eared waxbill"
},
"fav_colour": {
"string": "Puce"
},
"fav_movie": {
"string": "Oh! What a Lovely War"
}
}
}
See also Kafka Tutorials