rmoff's random ramblings
about talks

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 1: InsertField (timestamp)

Published Dec 8, 2020 by in Kafka Connect, Single Message Transform, TwelveDaysOfSMT at https://preview.rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/

You can use the InsertField Single Message Transform (SMT) to add the message timestamp into each message that Kafka Connect sends to a sink.

To use the Single Message Transform specify the name of the field (timestamp.field) that you want to add to hold the message timestamp:

"transforms"                         : "insertTS",
"transforms.insertTS.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"

The message timestamp can be set by the producer API explicitly, or allowed to default to the setting on the broker (log.message.timestamp.type) or topic (message.timestamp.type) which by default is the time on the broker at which the message is created (CreateTime). Message timestamps were added in Apache Kafka 0.10 in KIP-32.

πŸ‘Ύ Demo code

Example - JDBC Sink connector πŸ”—

Given a JDBC sink connector that looks like this:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
    -d '{
          "connector.class"     : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"      : "jdbc:mysql://mysql:3306/demo",
          "connection.user"     : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics"              : "transactions",
          "tasks.max"           : "4",
          "auto.create"         : "true"
        }'

you can add the Single Message Transform as shown here:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
    -d '{
          "connector.class"     : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"      : "jdbc:mysql://mysql:3306/demo",
          "connection.user"     : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics"              : "transactions",
          "tasks.max"           : "4",
          "auto.create"         : "true",
          "auto.evolve"         : "true",
          "transforms"                         : "insertTS",
          "transforms.insertTS.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field": "messageTS"
        }'

Note auto.evolve=true otherwise the target table won’t hold the new field unless it happens to exist already.

The JDBC connector correctly populates the new field as a timestamp type:

mysql> describe transactions;
+-------------+-------------+------+-----+---------+-------+
| Field       | Type        | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| customer_id | text        | YES  |     | NULL    |       |
| cost        | text        | YES  |     | NULL    |       |
| item        | text        | YES  |     | NULL    |       |
| card_type   | text        | YES  |     | NULL    |       |
| messageTS   | datetime(3) | YES  |     | NULL    |       |
+-------------+-------------+------+-----+---------+-------+
5 rows in set (0.01 sec)
mysql> select * from transactions limit 5;
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
| customer_id                          | cost  | item                  | card_type | messageTS               |
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
| b7187b3a-8ef4-469f-a99d-29dbc0cc3608 | 40.26 | Ten FIDY              | discover  | 2020-12-08 22:42:26.503 |
| c44a2596-ad4a-4c51-ba81-0a86cc48a2d3 | 96.60 | Trois Pistoles        | maestro   | 2020-12-08 22:42:26.842 |
| b7187b3a-8ef4-469f-a99d-29dbc0cc3608 | 53.44 | Chimay Grande Rserve  | visa      | 2020-12-08 22:42:27.341 |
| c44a2596-ad4a-4c51-ba81-0a86cc48a2d3 | 25.33 | Chimay Grande Rserve  | dankort   | 2020-12-08 22:42:27.841 |
| 573a732e-1d42-4749-a99d-a89391cd2858 | 43.95 | Arrogant Bastard Ale  | switch    | 2020-12-08 22:42:28.342 |
+--------------------------------------+-------+-----------------------+-----------+-------------------------+
5 rows in set (0.00 sec)

See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)

Example - S3 Sink connector πŸ”—

InsertField writes the timestamp as a unix epoch value - if you’d rather it in a string then you can use an additional Single Message Transform, TimestampConverter as shown here in an example with the AWS S3 sink connector:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-00/config \
    -d '{
          "connector.class"        : "io.confluent.connect.s3.S3SinkConnector",
          "storage.class"          : "io.confluent.connect.s3.storage.S3Storage",
          "s3.region"              : "us-west-2",
          "s3.bucket.name"         : "rmoff-smt-demo-01",
          "topics"                 : "customers,transactions",
          "tasks.max"              : "4",
          "flush.size"             : "16",
          "format.class"           : "io.confluent.connect.s3.format.json.JsonFormat",
          "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
          "schema.compatibility"   : "NONE",
          "partitioner.class"      : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
          "transforms"                          : "insertTS,formatTS",
          "transforms.insertTS.type"            : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field" : "messageTS",
          "transforms.formatTS.type"            : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.formatTS.format"          : "yyyy-MM-dd HH:mm:ss:SSS",
          "transforms.formatTS.field"           : "messageTS",
          "transforms.formatTS.target.type"     : "string"
        }'

This writes messages as JSON to S3 that look like this:

{
  "customer_id": "d0394941-2d2a-4d34-a374-23e5f5364ea9",
  "cost": "25.21",
  "item": "Founders Breakfast Stout",
  "card_type": "jcb",
  "messageTS": "2020-12-08 16:07:39:904"
}

See also πŸŽ₯ Kafka Connect in Action : S3 Sink (πŸ‘Ύ kafka-to-s3 demo code)

Try it out! πŸ”—

You can find the full code for trying this outβ€”including a Docker Compose so you can spin it up on your local machineβ€” πŸ‘Ύ here


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025