Here’s a fun one that Kafka Connect can sometimes throw out:
java.lang.ClassCastException:
java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
HashMap? Struct? HUH?
What went wrong? đź”—
The connector you’re using relies on the data having a declared schema, and you didn’t pass it one.
Schemas? In my data? đź”—
There are two ways that this can happen:
-
Confluent Schema Registry (the best option IMO) with Avro, Protobuf, or JSON Schema
-
JSON with an embedded schema per message
-
This is not just JSON that you glance at and can say "yeah I reckon I know the schema", or JSON with your own funky schema definition embedded in your own funky bespoke way. This is JSON that looks like this, with
schema
andpayload
root elements:{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true } ] }, "payload": { "ID": 1, "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } }
Read more about it here.
-
So what went wrong? đź”—
You’ve got JSON data in your Kafka topic, so you told Kafka Connect to use the JSON converter. Makes sense:
"value.converter" : "org.apache.kafka.connect.json.JsonConverter"
BUT then you set this:
"value.converter.schemas.enable":"false",
Now the JSON converter will read the data, but the connector (e.g. the Influx DB Sink) relies on there being a declared schema—which there isn’t (and we told the JSON converter not to parse for one, by setting "value.converter.schemas.enable":"false"
).
How do I fix it? đź”—
If you’ve got JSON data with a schema embedded like this:
{
"schema": {
"type": "struct", "optional": false, "version": 1, "fields": [
{ "field": "ID", "type": "string", "optional": true },
{ "field": "Artist", "type": "string", "optional": true },
{ "field": "Song", "type": "string", "optional": true }
] },
"payload": {
"ID": 1,
"Artist": "Rick Astley",
"Song": "Never Gonna Give You Up"
}
}
then you can just fix the configuration of your converter, by setting:
"value.converter.schemas.enable":"true"
If you get JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
check this link
But if you don’t actually have a schema present, then you’re going to need to declare one and make it available as part of your Kafka message - either embedded in the JSON, or using Avro (a better solution).
Sounds like a good idea! How do I add a schema to my Kafka message? đź”—
Let’s assume that you can’t just fix this at source, and have your producer write data that’s got the schema declared already (since this is the overall best solution). Your source data looks like this:
{
"ID": "1",
"Artist": "Rick Astley",
"Song": "Never Gonna Give You Up"
}
You’ve got a couple of approaches to fixing this.
Option 1 : Stream Processing—the right way 🔗
Use ksqlDB, Kafka Streams, or another stream processing to read your source messages from a topic, apply the schema, and write the message to a new topic. That new topic is then the one that you consume from Kafka Connect (and anywhere else that will benefit from a declared schema).
Here’s an example of doing it using ksqlDB:
-
Declare the schema on the existing topic
CREATE STREAM TESTDATA_JSON (ID VARCHAR, ARTIST VARCHAR, SONG VARCHAR) \ WITH (KAFKA_TOPIC='testdata-json', VALUE_FORMAT='JSON');
-
Reserialise the data to Avro
CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='testdata-avro') AS SELECT * FROM TESTDATA_JSON;
If you’re going to use Avro make sure you change your converter config in the connector:
"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter.schema.registry.url": "http://schema-registry:8081"
Read more about this technique here
Option 2 : Stream processing the hacky way đź”—
Sometimes needs must. We know we should do it the right way, but we need a dirty little fix. Here’s that.
Let’s assume that we don’t care about:
-
Message timestamps
-
Message headers
-
Message keys
-
Message partitions
not to mention
-
Maintainability
-
Supportability
The hack here is to interpolate the JSON-with-schema template with the payload value from the source, using kafkacat. kafkacat
reads from the topic, pipes it into jq
which adds the schema definition, and then pipes it to another instance of kafkacat
which writes it to a new topic.
kafkacat -b localhost:9092 -q -u -X auto.offset.reset=earliest -G rmoff_cg_01 testdata-json | \
jq --compact-output --unbuffered \
'. |
{ schema: { type: "struct", optional: false, version: 1, fields: [
{ field: "ID", type: "string", optional: true},
{ field: "Artist", type: "string", optional: true},
{ field: "Song", type: "string", optional: true}]},
payload: {
ID: .ID,
Artist: .Artist,
Song: .Song
}
}' | \
kafkacat -b localhost:9092 -t testdata-json-with-schema -P -T -u | jq --unbuffered '.'
It runs using a consumer group so can be stopped and started, and even scaled out if you have more than one partition. It also dumps to screen the transformed message - remove the final jq
if you don’t want that.
The transformed message looks like this:
{
"schema": {
"type": "struct",
"optional": false,
"version": 1,
"fields": [
{
"field": "ID",
"type": "string",
"optional": true
},
{
"field": "Artist",
"type": "string",
"optional": true
},
{
"field": "Song",
"type": "string",
"optional": true
}
]
},
"payload": {
"ID": 1,
"Artist": "Rick Astley",
"Song": "Never Gonna Give You Up"
}
}