rmoff's random ramblings
about talks

Notes on getting data into InfluxDB from Kafka with Kafka Connect

Published Jan 23, 2020 by in Kafka Connect, InfluxDB, Kcat (Kafkacat), Serialisation at https://preview.rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/

You can download the InfluxDB connector for Kafka Connect here. Documentation for it is here.

When a message from your source Kafka topic is written to InfluxDB the InfluxDB values are set thus:

  • Timestamp is taken from the Kafka message timestamp (which is either set by your producer, or the time at which it was received by the broker)

  • Tag(s) are taken from the tags field in the message. This field must be a map type - see below

  • Value fields are taken from the rest of the message, and must be numeric or boolean

  • Measurement name can be specified as a field of the message, or hardcoded in the connector config.

Try it out ðŸ”—

You can find a Docker Compose that will spin up this whole stack for you here.

JSON example ðŸ”—

Warning

A sharp edge to watch out for is that the InfluxDB connector relies on there being a schema in your data. You cannot just throw some JSON at it and hope that it will just work.

Learn more about schemas in Kafka Connect here.

If this is your JSON payload

{
    "tags": {
        "host": "FOO",
        "product": "wibble"
    },
    "stock": 500.0
}

Then you’ll need to wrap it in an envelope with a schema thus:

{
    "schema": {
        "type": "struct",
        "fields": [
            { "field": "tags" , "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false},
            { "field": "stock", "type": "double", "optional": true }
        ],
        "optional": false, "version": 1
    },
    "payload": {
        "tags": {
            "host": "FOO",
            "product": "wibble"
        },
        "stock": 500.0
    }
}
Tip
If you don’t have the option of adding this schema to the producer writing the data, there is a hacky way that you can embed your payload in a schema using kafkacat.

Now create the connector.

curl -i -X PUT -H "Content-Type:application/json" \
        http://localhost:8083/connectors/sink_influx_json_01/config \
        -d '{
            "connector.class"               : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "true",
            "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                        : "json_01",
            "influxdb.url"                  : "http://influxdb:8086",
            "influxdb.db"                   : "my_db",
            "measurement.name.format"       : "${topic}"
        }'

A most important thing is that the value.converter is set correctly. This tells Kafka Connect understands how to deserialise the value component of the message (as opposed to the key, which is specified with key.converter).

For a JSON message such as that shown above with the schema make sure you have set both fields:

  • "value.converter" : "org.apache.kafka.connect.json.JsonConverter"

  • "value.converter.schemas.enable": "true"

Now let’s check that the data’s made it to InfluxDB:

$ influx -execute 'show measurements on "my_db"'
name: measurements
name
----
json_01

$ influx -execute 'show tag keys on "my_db"'
name: json_01
tagKey
------
host
product

$ influx -execute 'SELECT * FROM json_01 GROUP BY host, product;' -database "my_db"
name: json_01
tags: host=FOO, product=wibble
time                stock
----                -----
1579779810974000000 500

Avro example ðŸ”—

Warning
ksqlDB/KSQL cannot yet write data in an Avro format that is compatible with this connector.

Assuming you’re writing Avro data from your application with full control over the schema, you should follow the same pattern as above, with the tags field being a map. Here’s an example schema:

{
    "type": "record",
    "name": "myrecord",
    "fields": [
        {
            "name": "tags",
            "type": {
                "type": "map",
                "values": "string"
            }
        },
        {
            "name": "stock",
            "type": "double"
        }
    ]
}

With your data written in Avro following the above schema pattern using the serialiser provided with Schema Registry, you can now stream it into InfluxDB:

curl -i -X PUT -H "Content-Type:application/json" \
        http://localhost:8083/connectors/sink_influx_avro_01/config \
        -d '{
            "connector.class"                    : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "key.converter"                      : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                             : "avro_01",
            "influxdb.url"                       : "http://influxdb:8086",
            "influxdb.db"                        : "my_db",
            "measurement.name.format"            : "${topic}"
        }'

As above, we can check that the data made it into InfluxDB:

$ influx -execute 'SELECT * FROM avro_01 GROUP BY host, product;' -database "my_db"
name: avro_01
tags: host=FOO, product=wibble
time                stock
----                -----
1579782223737000000 500

Troubleshooting ðŸ”—

Assuming your connector gets created successfully, you can check its status:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort

sink  |  sink_influx_json_01  |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector

If the task is FAILED you’ll need to dive into the Kafka Connect worker log to find the reason. Even if it’s RUNNING you’ll need the Kafka Connect worker log for troubleshooting more subtle problems.

If you’re using Confluent Platform 5.4 (or Apache Kafka 2.4) you can use the new dynamic logging capabilities in Kafka Connect to bump up the logging just for the InfluxDB connector without being overwhelmed by tons of other TRACE data:

curl -s -X PUT http://localhost:8083/admin/loggers/io.confluent.influxdb \
    -H "Content-Type:application/json" \
    -d '{"level": "TRACE"}'

With this set, you then get this kind of useful information:

…
put() - Processing records for 'INFLUX_TEST' database (io.confluent.influxdb.sink.InfluxDBSinkTask:224)
put() - Looking for tags (io.confluent.influxdb.sink.InfluxDBSinkTask:240)
put() - tags field found but doesn't match Schema{MAP} or Schema{MAP}. (io.confluent.influxdb.sink.InfluxDBSinkTask:253)
put() - tags =  (io.confluent.influxdb.sink.InfluxDBSinkTask:258)
put() - Processing field 'measurement':STRING:'null' (io.confluent.influxdb.sink.InfluxDBSinkTask:272)
put() - Skipping field 'tags' (io.confluent.influxdb.sink.InfluxDBSinkTask:269)
put() - Processing field 'cpu1':INT32:'null' (io.confluent.influxdb.sink.InfluxDBSinkTask:272)
…

Resources ðŸ”—

  • Read more about Kafka and serialisation/converters

  • An example of the InfluxDB connector in action

  • Learn more about Kafka Connect


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025