I’ve been playing around with the new SerDes (serialisers/deserialisers) that shipped with Confluent Platform 5.5 - Protobuf, and JSON Schema (these were added to the existing support for Avro). The serialisers (and associated Kafka Connect converters) take a payload and serialise it into bytes for sending to Kafka, and I was interested in what those bytes look like. For that I used my favourite Kafka swiss-army knife: kafkacat.
Here’s a message serialised to JSON Schema:
$ kafkacat -b kafka:29092 -t pageviews-js -C -c1
{"viewtime":1,"userid":"User_9","pageid":"Page_57"}
Looks just like a message from another topic serialised as regular JSON, right?
$ kafkacat -b kafka:29092 -t pageviews-j -C -c1
{"viewtime":1,"userid":"User_3","pageid":"Page_77"}
Except it’s not! We can confirm this by looking at the raw bytes on the message itself by piping the output from kafkacat into hexdump.
Check out these magical, pesky, bytes on the front of the JSON Schema-encoded message, and note that they’re not there on the JSON message:
$ kafkacat -b kafka:29092 -t pageviews-jsonschema -C -c1 | hexdump -C
00000000 00 00 00 00 02 7b 22 76 69 65 77 74 69 6d 65 22 |.....{"viewtime"|
00000010 3a 31 2c 22 75 73 65 72 69 64 22 3a 22 55 73 65 |:1,"userid":"Use|
00000020 72 5f 39 22 2c 22 70 61 67 65 69 64 22 3a 22 50 |r_9","pageid":"P|
00000030 61 67 65 5f 35 37 22 7d 0a |age_57"}.|
00000039
$ kafkacat -b kafka:29092 -t pageviews-json -C -c1 | hexdump -C
00000000 7b 22 76 69 65 77 74 69 6d 65 22 3a 31 2c 22 75 |{"viewtime":1,"u|
00000010 73 65 72 69 64 22 3a 22 55 73 65 72 5f 33 22 2c |serid":"User_3",|
00000020 22 70 61 67 65 69 64 22 3a 22 50 61 67 65 5f 37 |"pageid":"Page_7|
00000030 37 22 7d 0a |7"}.|
00000034
The five extra bytes (00 00 00 00 02
) are defined in the wire format used by the Schema Registry serdes:
-
Byte 0: Magic Byte - Confluent serialization format version number; currently always 0.
-
Bytes 1-4: 4-byte schema ID as returned by Schema Registry.
JSON != JSON Schema 🔗
JSON vs JSON Schema in Kafka Connect 🔗
They may sound similar, but the above analysis shows that you can’t just interchange org.apache.kafka.connect.json.JsonConverter
and io.confluent.connect.json.JsonSchemaConverter
- they are writing and expecting to read data with different wire formats. If you try to read data that’s been serialised with one using the other, it’s gonna break.
Here’s an example of writing data in the two formats in Kafka Connect:
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-jsonschema-01/config \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"quickstart": "ratings",
"iterations":1,
"kafka.topic": "test-jsonschema",
"tasks.max": 1
}'
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-json-01/config \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"quickstart": "ratings",
"iterations":1,
"kafka.topic": "test-json",
"tasks.max": 1
}'
From this we have two topics; test-json
and test-jsonschema
. Let’s read the contents of these using a Kafka Connect sink with the correct converters:
curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-file-jsonschema-as-jsonschema/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1,
"file": "/jsonschema-as-jsonschema.txt",
"topics": "test-jsonschema"
}'
curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-file-json-as-json/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"tasks.max": 1,
"file": "/json-as-json.txt",
"topics": "test-json"
}'
As expected, this works. But what about if we mix it up, and try to read JSON data using the JSON Schema deserialiser (through the io.confluent.connect.json.JsonSchemaConverter
converter)?
curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-file-json-as-jsonschema/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1,
"file": "/json-as-jsonschema.txt",
"topics": "test-json"
}'
⚠️ It fails!
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:111)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
What’s this mean? Well Unknown magic byte!
is the deserialiser’s quirky way of say that the bytes on the front of the message that JSON Schema has (which we saw above) aren’t there. Why aren’t they there? Because it’s just straight-up JSON that we’re trying to read - and so we should be use the JSON deserialiser (provided for Kafka Connect by the org.apache.kafka.connect.json.JsonConverter
converter).
-
Actual (JSON)
00000000 7b 22 76 69 65 77 74 69 6d 65 22 3a 31 2c 22 75 |{"viewtime":1,"u|
-
Expected (JSON Schema)
00000000 00 00 00 00 02 7b 22 76 69 65 77 74 69 6d 65 22 |…..{"viewtime"|
The final permutation here is trying to read JSON Schema messages using the JSON deserialiser:
curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-file-jsonschema-as-json/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"tasks.max": 1,
"file": "/jsonschema-as-json.txt",
"topics": "test-jsonschema"
}'
As we might expect, this also fails
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:355)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x27a2272 (above 0x0010ffff) at char #1, byte #7)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x27a2272 (above 0x0010ffff) at char #1, byte #7)
Here the JSON deserialiser is trying to read JSON, but hitting the bytes that the JSON Schema serialiser writes to the front of each message, which are not valid JSON (Invalid UTF-32 character 0x27a2272 (above 0x0010ffff) at char #1, byte #7
). If you’ve serialised your data using the Confluent Schema Registry JSON Schema serialiser, you’ve gotta deserialise it with that too.
-
Actual (JSON Schema)
00000000 00 00 00 00 02 7b 22 76 69 65 77 74 69 6d 65 22 |…..{"viewtime"|
-
Expected (JSON)
00000000 7b 22 76 69 65 77 74 69 6d 65 22 3a 31 2c 22 75 |{"viewtime":1,"u|
JSON vs JSON Schema in ksqlDB 🔗
JSON and JSON Schema can cause similar confusion in ksqlDB. Let’s see why, starting off with writing a message to a new topic using the JSON Schema serialiser:
$ echo '{"id": "2", "host": "test-machine", "body": "hello this is a test"}' | \
kafka-json-schema-console-producer --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --topic my_topic_jsonsr \
--property value.schema='{ "type": "object", "properties": { "id": { "type": "string" }, "host": { "type": "string" }, "body": { "type": "string" } } }'
If we try to use this topic in ksqlDB we need to specify JSON_SR
serde:
ksql> CREATE STREAM MY_STREAM WITH (KAFKA_TOPIC='my_topic_jsonsr', VALUE_FORMAT='JSON_SR');
Message
----------------
Stream created
----------------
ksql> DESCRIBE MY_STREAM;
Name : MY_STREAM
Field | Type
-------------------------
HOST | VARCHAR(STRING)
ID | VARCHAR(STRING)
BODY | VARCHAR(STRING)
-------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM MY_STREAM EMIT CHANGES LIMIT 1;
+-------------+----+----------------------+
|HOST |ID |BODY |
+-------------+----+----------------------+
|test-machine |2 |hello this is a test |
Limit Reached
Query terminated
If I try to use JSON FORMAT
alone then this happens:
ksql> CREATE STREAM MY_STREAM_02 WITH (KAFKA_TOPIC='my_topic_jsonsr', VALUE_FORMAT='JSON');
No columns supplied.
Oh. Of course - JSON doesn’t have an explicit schema, so I need to declare it. I’m already wishing I was using JSON Schema (or Avro, or Protobuf):
ksql> CREATE STREAM MY_STREAM_02 (HOST VARCHAR, ID VARCHAR, BODY VARCHAR)
WITH (KAFKA_TOPIC='my_topic_jsonsr', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> DESCRIBE MY_STREAM_02;
Name : MY_STREAM_02
Field | Type
-------------------------
HOST | VARCHAR(STRING)
ID | VARCHAR(STRING)
BODY | VARCHAR(STRING)
-------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Now when I try to query it, I get…
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
ksql> SELECT * FROM MY_STREAM_02 EMIT CHANGES LIMIT 1;
+--------+--------+---------+
|HOST |ID |BODY |
+--------+--------+---------+
Press CTRL-C to interrupt
…I get nothing. But we know that there’s data in it - any consumer can show that, including PRINT
:
ksql> PRINT my_topic_jsonsr FROM BEGINNING LIMIT 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON_SR or KAFKA_STRING
rowtime: 2021/03/09 14:08:14.436 Z, key: <null>, value: {"id":"2","host":"test-machine","body":"hello this is a test"}, partition: 0
Topic printing ceased
ksql>
Now, if you’re eagle-eyed you’ll notice this:
Value format: JSON_SR or KAFKA_STRING
which tells us that ksqlDB reckons the data could well be JSON Schema (JSON_SR
). But let’s pretend we missed that detail (as I did when I came up against this issue today), and take the next logical troubleshooting step, which is to consult the ksqlDB server log (you can also get this from the ksqlDB Processing log if it’s enabled). When you run the SELECT
above, you’ll see a corresponding error in the ksqlDB server log:
WARN stream-thread [_confluent-ksql-confluent_rmoff_01transient_6449533791924466400_1615299701177-972676ef-317f-4d3b-a30b-66d8ff86f577-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[my_topic_jsonsr] partition=[0] offset=[0] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: my_topic_jsonsr. Invalid UTF-32 character 0x567a2269 (above 0x0010ffff) at char #1, byte #7)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x567a2269 (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:248)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2359)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:671)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:115)
at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:224)
at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:203)
at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:60)
at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:47)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:891)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1038)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:842)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:657)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:559)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:539)
The error is a really good one:
-
What happened?
Skipping record due to deserialization error
-
Which record?
topic=[my_topic_jsonsr] partition=[0] offset=[0]
-
What was the problem?
Invalid UTF-32 character 0x567a2269 (above 0x0010ffff) at char #1, byte #7)
Using this we can validate the issue by taking the exact details of the record to extract it with kafkacat’s precise arguments
-
topic=[my_topic_jsonsr]
:-t
-
partition=[0]
:-p
-
offset=[0]
:-o
$ kafkacat -b localhost:9092 -C -t my_topic_jsonsr -p 0 -o 0
V{"id":"2","host":"test-machine","body":"hello this is a test"}
That V
looks a bit out of place there. Let’s check the bytes of the payload (the -c1
flag makes kafkacat exit once the single message has been consumed):
$ kafkacat -b localhost:9092 -C -t my_topic_jsonsr -p 0 -o 0 -u | hexdump -C
00000000 00 00 00 00 56 7b 22 69 64 22 3a 22 32 22 2c 22 |....V{"id":"2","|
00000010 68 6f 73 74 22 3a 22 74 65 73 74 2d 6d 61 63 68 |host":"test-mach|
00000020 69 6e 65 22 2c 22 62 6f 64 79 22 3a 22 68 65 6c |ine","body":"hel|
00000030 6c 6f 20 74 68 69 73 20 69 73 20 61 20 74 65 73 |lo this is a tes|
% Reached end of topic my_topic_jsonsr [0] at offset 1
Notice the leading bytes (00 00 00 00 56
), which are expected and just as we saw above.
The solution? Redefine the object in ksqlDB using the correct serde for the serialisation - FORMAT=JSON_SR
.