When a message from your source Kafka topic is written to InfluxDB the InfluxDB values are set thus:
-
Timestamp is taken from the Kafka message timestamp (which is either set by your producer, or the time at which it was received by the broker)
-
Tag(s) are taken from the
tags
field in the message. This field must be amap
type - see below -
Value fields are taken from the rest of the message, and must be numeric or boolean
-
Measurement name can be specified as a field of the message, or hardcoded in the connector config.
JSON example 🔗
Warning
|
A sharp edge to watch out for is that the InfluxDB connector relies on there being a schema in your data. You cannot just throw some JSON at it and hope that it will just work. Learn more about schemas in Kafka Connect here. |
If this is your JSON payload
{
"tags": {
"host": "FOO",
"product": "wibble"
},
"stock": 500.0
}
Then you’ll need to wrap it in an envelope with a schema thus:
{
"schema": {
"type": "struct",
"fields": [
{ "field": "tags" , "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false},
{ "field": "stock", "type": "double", "optional": true }
],
"optional": false, "version": 1
},
"payload": {
"tags": {
"host": "FOO",
"product": "wibble"
},
"stock": 500.0
}
}
Tip
|
If you don’t have the option of adding this schema to the producer writing the data, there is a hacky way that you can embed your payload in a schema using kafkacat. |
Now create the connector.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink_influx_json_01/config \
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "json_01",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
A most important thing is that the value.converter
is set correctly. This tells Kafka Connect understands how to deserialise the value
component of the message (as opposed to the key, which is specified with key.converter
).
For a JSON message such as that shown above with the schema make sure you have set both fields:
-
"value.converter" : "org.apache.kafka.connect.json.JsonConverter"
-
"value.converter.schemas.enable": "true"
Now let’s check that the data’s made it to InfluxDB:
$ influx -execute 'show measurements on "my_db"'
name: measurements
name
----
json_01
$ influx -execute 'show tag keys on "my_db"'
name: json_01
tagKey
------
host
product
$ influx -execute 'SELECT * FROM json_01 GROUP BY host, product;' -database "my_db"
name: json_01
tags: host=FOO, product=wibble
time stock
---- -----
1579779810974000000 500
Avro example 🔗
Warning
|
ksqlDB/KSQL cannot yet write data in an Avro format that is compatible with this connector. |
Assuming you’re writing Avro data from your application with full control over the schema, you should follow the same pattern as above, with the tags
field being a map
. Here’s an example schema:
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "tags",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "stock",
"type": "double"
}
]
}
With your data written in Avro following the above schema pattern using the serialiser provided with Schema Registry, you can now stream it into InfluxDB:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink_influx_avro_01/config \
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "avro_01",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
As above, we can check that the data made it into InfluxDB:
$ influx -execute 'SELECT * FROM avro_01 GROUP BY host, product;' -database "my_db"
name: avro_01
tags: host=FOO, product=wibble
time stock
---- -----
1579782223737000000 500
Troubleshooting 🔗
Assuming your connector gets created successfully, you can check its status:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink_influx_json_01 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
If the task is FAILED
you’ll need to dive into the Kafka Connect worker log to find the reason. Even if it’s RUNNING
you’ll need the Kafka Connect worker log for troubleshooting more subtle problems.
If you’re using Confluent Platform 5.4 (or Apache Kafka 2.4) you can use the new dynamic logging capabilities in Kafka Connect to bump up the logging just for the InfluxDB connector without being overwhelmed by tons of other TRACE
data:
curl -s -X PUT http://localhost:8083/admin/loggers/io.confluent.influxdb \
-H "Content-Type:application/json" \
-d '{"level": "TRACE"}'
With this set, you then get this kind of useful information:
…
put() - Processing records for 'INFLUX_TEST' database (io.confluent.influxdb.sink.InfluxDBSinkTask:224)
put() - Looking for tags (io.confluent.influxdb.sink.InfluxDBSinkTask:240)
put() - tags field found but doesn't match Schema{MAP} or Schema{MAP}. (io.confluent.influxdb.sink.InfluxDBSinkTask:253)
put() - tags = (io.confluent.influxdb.sink.InfluxDBSinkTask:258)
put() - Processing field 'measurement':STRING:'null' (io.confluent.influxdb.sink.InfluxDBSinkTask:272)
put() - Skipping field 'tags' (io.confluent.influxdb.sink.InfluxDBSinkTask:269)
put() - Processing field 'cpu1':INT32:'null' (io.confluent.influxdb.sink.InfluxDBSinkTask:272)
…
Resources 🔗
-
Read more about Kafka and serialisation/converters
-
An example of the InfluxDB connector in action
-
Learn more about Kafka Connect