rmoff's random ramblings
about talks

๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 6: InsertField II

Published Dec 15, 2020 by in Kafka Connect, Single Message Transform, TwelveDaysOfSMT at https://preview.rmoff.net/2020/12/15/twelve-days-of-smt-day-6-insertfield-ii/

We kicked off this series by seeing on day 1 how to use InsertField to add in the timestamp to a message passing through the Kafka Connect sink connector. Today weโ€™ll see how to use the same Single Message Transform to add in a static field value, as well as the name of the Kafka topic, partition, and offset from which the message has been read.

"transforms"                                : "insertStaticField1",
"transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "sourceSystem",
"transforms.insertStaticField1.static.value": "NeverGonna"

๐Ÿ‘พ Demo code

Adding fields to messages at ingest in the Kafka Connect source connector ๐Ÿ”—

When ingesting data from a source it can be useful to add fields to store information such as the server from which it was read.

Hereโ€™s an example source connector, which adds

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day6-00/config \
    -d '{
        "connector.class"                           : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day6-transactions.with"              : "#{Internet.uuid}",
        "genv.day6-transactions.cost.with"          : "#{Commerce.price}",
        "genv.day6-transactions.card_type.with"     : "#{Business.creditCardType}",
        "genv.day6-transactions.item.with"          : "#{Beer.name}",
        "topic.day6-transactions.throttle.ms"       : 5000,
        "transforms"                                : "insertStaticField1,insertStaticField2",
        "transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField1.static.field": "sourceSystem",
        "transforms.insertStaticField1.static.value": "NeverGonna",
        "transforms.insertStaticField2.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField2.static.field": "ingestAgent",
        "transforms.insertStaticField2.static.value": "GiveYouUp"
    }'

The resulting message thatโ€™s written to Kafka includes the data from the source system (a data generator, in this case, writing fields cost, card_type, and item), plus the static fields configured (sourceSystem, ingestAgent):

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day6-transactions -C -c1 -o end -u -q -J | jq  '.payload'
{
  "cost": {
    "string": "25.79"
  },
  "card_type": {
    "string": "visa"
  },
  "item": {
    "string": "Westmalle Trappist Tripel"
  },
  "sourceSystem": {
    "string": "NeverGonna"
  },
  "ingestAgent": {
    "string": "GiveYouUp"
  }
}

Adding details about the Kafka message at egress with a Kafka Connect sink connector ๐Ÿ”—

๐Ÿ‘‰ https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

See also ๐ŸŽฅ Kafka Connect in Action : JDBC Sink (๐Ÿ‘พ demo code) and ๐ŸŽฅ ksqlDB & Kafka Connect JDBC Sink in action (๐Ÿ‘พ demo code)

It can often be useful to add information about the Kafka message (topic, partition, offset) when the data is sent to a target system. Hereโ€™s an example using the above topic, with another static field added in for good measure. You can also add the timestamp of the Kafka message, as shown previously.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day6-00/config \
    -d '{
          "connector.class"                           : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                            : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                           : "mysqluser",
          "connection.password"                       : "mysqlpw",
          "topics"                                    : "day6-transactions",
          "tasks.max"                                 : "4",
          "auto.create"                               : "true",
          "auto.evolve"                               : "true",
          "transforms"                                : "insertPartition,insertOffset,insertTopic",
          "transforms.insertPartition.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertPartition.partition.field": "kafkaPartition",
          "transforms.insertOffset.type"              : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertOffset.offset.field"      : "kafkaOffset",
          "transforms.insertTopic.type"               : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTopic.topic.field"        : "kafkaTopic"
        }'

Hereโ€™s the data as it appears in the target system:

mysql> SELECT * FROM `day6-transactions` LIMIT 5;
+-------+-----------+---------------------+--------------+-------------+----------------+-------------+-------------------+
| cost  | card_type | item                | sourceSystem | ingestAgent | kafkaPartition | kafkaOffset | kafkaTopic        |
+-------+-----------+---------------------+--------------+-------------+----------------+-------------+-------------------+
| 11.78 | visa      | Schneider Aventinus | NeverGonna   | GiveYouUp   |              0 |           0 | day6-transactions |
| 17.65 | discover  | Two Hearted Ale     | NeverGonna   | GiveYouUp   |              0 |           1 | day6-transactions |
| 11.63 | jcb       | Stone IPA           | NeverGonna   | GiveYouUp   |              0 |           2 | day6-transactions |
| 67.51 | switch    | Shakespeare Oatmeal | NeverGonna   | GiveYouUp   |              0 |           3 | day6-transactions |
| 22.25 | discover  | Hop Rod Rye         | NeverGonna   | GiveYouUp   |              0 |           4 | day6-transactions |
+-------+-----------+---------------------+--------------+-------------+----------------+-------------+-------------------+
5 rows in set (0.00 sec)

Try it out! ๐Ÿ”—

You can find the full code for trying this outโ€”including a Docker Compose so you can spin it up on your local machineโ€” ๐Ÿ‘พ here


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025