rmoff's random ramblings
about talks

๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 2: ValueToKey and ExtractField

Published Dec 9, 2020 by in Kafka Connect, Single Message Transform, TwelveDaysOfSMT at https://preview.rmoff.net/2020/12/09/twelve-days-of-smt-day-2-valuetokey-and-extractfield/

Setting the key of a Kafka message is important as it ensures correct logical processing when consumed across multiple partitions, as well as being a requirement when joining to messages in other topics. When using Kafka Connect the connector may already set the key, which is great. If not, you can use these two Single Message Transforms (SMT) to set it as part of the pipeline based on a field in the value part of the message.

To use the ValueToKey Single Message Transform specify the name of the field (id) that you want to copy from the value to the key:

"transforms"                    : "copyIdToKey",
"transforms.copyIdToKey.type"   : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",

This writes it as a Struct to the Key, so you will often want to combine it with the ExtractField Single Message Transform:

"transforms"                            : "copyIdToKey,extractKeyFromStruct",
"transforms.copyIdToKey.type"           : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields"         : "id",
"transforms.extractKeyFromStruct.type"  :"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyFromStruct.field" :"id"

๐Ÿ‘พ Demo code

Example - JDBC Source connector ๐Ÿ”—

Letโ€™s start with a basic JDBC source connector:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:mysql://mysql:3306/demo",
  "connection.user": "mysqluser",
  "connection.password": "mysqlpw",
  "topic.prefix": "mysql-00-",
  "poll.interval.ms": 1000,
  "tasks.max":1,
  "table.whitelist" : "customers",
  "mode":"incrementing",
  "incrementing.column.name": "id",
  "validate.non.null": false
}

An ingested Kafka message written by this connector looks like this - note the null key:

{
  "topic": "mysql-00-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512308962,
  "broker": 1,
  "key": null,
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}

Assuming you want to use the id field from the source as the message key you can add the Single Message Transforms as shown here:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:mysql://mysql:3306/demo",
  "connection.user": "mysqluser",
  "connection.password": "mysqlpw",
  "topic.prefix": "mysql-02-",
  "poll.interval.ms": 1000,
  "tasks.max":1,
  "table.whitelist" : "customers",
  "mode":"incrementing",
  "incrementing.column.name": "id",
  "validate.non.null": false,
  "transforms": "copyIdToKey,extractKeyFromStruct",
  "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.copyIdToKey.fields": "id",
  "transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKeyFromStruct.field":"id"
}

The resulting Kafka message looks like this:

{
  "topic": "mysql-02-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512714619,
  "broker": 1,
  "key": "1",
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}

See also Kafka Tutorials

Try it out! ๐Ÿ”—

You can find the full code for trying this outโ€”including a Docker Compose so you can spin it up on your local machineโ€” ๐Ÿ‘พ here


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025