Kafka Connect is a API within Apache Kafka and its modular nature makes it powerful and flexible. Converters are part of the API but not always fully understood. I’ve written previously about Kafka Connect converters, and this post is just a hands-on example to show even further what they are—and are not—about.
Note
|
To understand more about Kafka Connect in general, check out my talk from Kafka Summit London From Zero to Hero with Kafka Connect. |
Here’s the scenario: you’re using a Kafka Connect source connector to read some JSON data from somewhere. Maybe it’s a column in a database, a row in a flat file, a message on an MQ. Regardless, you figure that since it’s JSON then you must use the JsonConverter
? Or, you want to take that JSON and write it as Avro to Kafka? Not so fast sunshine!
Reading JSON from file into Kafka 🔗
-
Check which connector plugins are available :
http localhost:8083/connector-plugins|jq '.[].class'
-
Create a connector reading from flat file
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-file-00", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max": "1", "file": "/tmp/test.json", "topic": "source-file-00" } }'
N.B.
FileStream
connectors are suitable for demo purposes only, they are not for production. -
Put some data in the file. The point here is that it is just strings.
docker-compose exec kafka-connect bash -c 'cat >> /tmp/test.json <<EOF Foo bar {"colour":"Aquamarine","animal":"Rhea, gray"} Well tally ho! With a bing and a bong and a buzz buzz buzz! {"colour":"Orange","animal":"Brazilian tapir"} EOF'
-
Kafka Connect ingests it and stores the data as Avro, because that’s our default Converter set at the Worker level. You can view the data using the Avro console consumer:
docker-compose exec -T kafka-connect \ kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --property schema.registry.url=http://schema-registry:8081 \ --topic source-file-00 --from-beginning
"Foo bar" "{\"colour\":\"Aquamarine\",\"animal\":\"Rhea, gray\"}" "Well tally ho! With a bing and a bong and a buzz buzz buzz!" "{\"colour\":\"Orange\",\"animal\":\"Brazilian tapir\"}"
We can look at the Avro schema that’s been created:
$ http localhost:8081/subjects/source-file-00-value/versions/1|jq '.' { "subject": "source-file-00-value", "version": 1, "id": 2, "schema": "\"string\"" }
So it’s just one flat string. Just because some of those strings happen to be JSON in the source, it doesn’t mean that they’ll get automagically converted to a schema’d message.
-
Let’s do the "obvious" thing, and since we have JSON data in the source, use the JsonConverter. Obvious, right?
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-file-01", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max": "1", "file": "/tmp/test.json", "topic": "source-file-01", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }'
N.B.
FileStream
connectors are suitable for demo purposes only, they are not for production. -
By default the
JsonConverter
will embed schemas, so we get to see the exact same as before - the payload read from the file is embedded in a single-field schema:$ kafkacat -b localhost:9092 -t source-file-01 -C {"schema":{"type":"string","optional":false},"payload":"{\"colour\":\"Aquamarine\",\"animal\":\"European spoonbill\"}"} {"schema":{"type":"string","optional":false},"payload":"{\"colour\":\"Aquamarine\",\"animal\":\"Rhea, gray\"}"} {"schema":{"type":"string","optional":false},"payload":"{\"colour\":\"Orange\",\"animal\":\"Brazilian tapir\"}"} {"schema":{"type":"string","optional":false},"payload":"Well tally ho! With a bing and a bong and a buzz buzz buzz!"}
-
Disable JSON schemas?
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-file-02", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max": "1", "file": "/tmp/test.json", "topic": "source-file-02", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }'
$ kafkacat -b localhost:9092 -t source-file-02 -C
The string is read from the source, and the converter writes it to Kafka—and since it needs to write JSON it escapes the characters as required to make it valid JSON
"Foo bar" "{\"colour\":\"Aquamarine\",\"animal\":\"Rhea, gray\"}" "Well tally ho! With a bing and a bong and a buzz buzz buzz!" "{\"colour\":\"Orange\",\"animal\":\"Brazilian tapir\"}"
So how do we get the data in?? 🔗
If you don’t care about schemas :'-( then you could just use the StringConverter
, which reads the string and writes a string to the topic.
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "source-file-03",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"file": "/tmp/test.json",
"topic": "source-file-03",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
N.B. FileStream
connectors are suitable for demo purposes only, they are not for production.
The data in the topic is then the string read from the source, including the JSON strings and you can work with as you want to:
$ kafkacat -b localhost:9092 -t source-file-03 -C
Foo bar
{"colour":"Aquamarine","animal":"Rhea, gray"}
Well tally ho! With a bing and a bong and a buzz buzz buzz!
{"colour":"Orange","animal":"Brazilian tapir"}
The valid JSON can be read by a JSON parser, e.g. the second message in the topic (offset 1 -o1
):
$ kafkacat -b localhost:9092 -t source-file-03 -C -o1 -c1 | jq '.'
{
"colour": "Aquamarine",
"animal": "Rhea, gray"
}
but the topic also has the raw strings that aren’t JSON, which will trip up a JSON parser that is expecting valid JSON:
$ kafkacat -b localhost:9092 -t source-file-03 -C -o0 -c1 | jq '.'
parse error: Invalid numeric literal at line 1, column 4
kafka-connect-spooldir 🔗
The best option: use kafka-connect-spooldir
. You can either have it infer the schema, or you can declare it yourself.
Inferred schema 🔗
Put some data in the source file
docker-compose exec kafka-connect bash -c 'cat >> /tmp/test-spooldir-00.json <<EOF
{"colour":"Aquamarine","animal":"European spoonbill"}
{"colour":"Aquamarine","animal":"Rhea, gray"}
{"colour":"Orange","animal":"Brazilian tapir"}
EOF'
Run the connector
docker-compose exec kafka-connect bash -c 'mkdir -p /tmp/error && mkdir -p /tmp/finished'
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "source-spooldir-00",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
"tasks.max": "1",
"input.path": "/tmp",
"input.file.pattern": "test-spooldir-00.json",
"finished.path": "/tmp/finished",
"error.path": "/tmp/error",
"topic": "source-spooldir-00",
"cleanup.policy":"NONE",
"empty.poll.wait.ms":1000,
"schema.generation.enabled":"true",
"schema.generation.key.name":"schema_key",
"schema.generation.value.name":"payload"
}
}'
Check the data - it’s in Avro, and it’s got a schema!
$ kafkacat -b localhost:9092 -t source-spooldir-00 -C
% Auto-selecting Consumer mode (use -P or -C to override)
Aquamarine$European spoonbill
AquamarineRhea, gray
OrangeBrazilian tapir
Check the schema - it’s got a schema!
$ http localhost:8081/subjects/source-spooldir-00-value/versions/1|jq '.schema|fromjson'
{
"type": "record",
"name": "Value",
"namespace": "com.github.jcustenborder.kafka.connect.model",
"fields": [
{
"name": "colour",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "animal",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "com.github.jcustenborder.kafka.connect.model.Value"
}
Declared schema 🔗
Put some data in the source file
docker-compose exec kafka-connect bash -c 'cat >> /tmp/test-spooldir-01.json <<EOF
{"colour":"Aquamarine","animal":"European spoonbill"}
{"colour":"Aquamarine","animal":"Rhea, gray"}
{"colour":"Orange","animal":"Brazilian tapir"}
EOF'
Run the connector:
docker-compose exec kafka-connect bash -c 'mkdir -p /tmp/error && mkdir -p /tmp/finished'
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "source-spooldir-01",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
"tasks.max": "1",
"input.path": "/tmp",
"input.file.pattern": "test-spooldir-01.json",
"finished.path": "/tmp/finished",
"error.path": "/tmp/error",
"topic": "source-spooldir-01",
"cleanup.policy":"NONE",
"value.schema": "{\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"colour\":{\"type\":\"STRING\",\"isOptional\":true},\"animal\":{\"type\":\"STRING\",\"isOptional\":true}}}",
"key.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Key\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{}}"
}
}'
Check the data - it’s in Avro, and it’s got a schema!
$ kafkacat -b localhost:9092 -t source-spooldir-01 -C
% Auto-selecting Consumer mode (use -P or -C to override)
Aquamarine$European spoonbill
AquamarineRhea, gray
OrangeBrazilian tapir
Check the schema - it’s got a schema!
$ http localhost:8081/subjects/source-spooldir-01-value/versions/1|jq '.schema|fromjson'
{
"type": "record",
"name": "Value",
"namespace": "com.github.jcustenborder.kafka.connect.model",
"fields": [
{
"name": "colour",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "animal",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "com.github.jcustenborder.kafka.connect.model.Value"
}