There was a good question on StackOverflow recently in which someone was struggling to find the appropriate ksqlDB DDL to model a source topic in which there was a variable number of fields in a STRUCT
.
This was their example:
{
"device" : "REDACTED",
"context" : {
"2" : "25",
"3" : "0",
"4" : "60",
"1" : "REDACTED"
}
}
{
"device" : "REDACTED",
"context" : {
"2" : "2020-10-07T07:02:48.0000000Z",
"1" : "REDACTED"
}
}
So context
looks like a STRUCT - but when they tried that they found that ksqlDB wouldn’t return the data unless all four fields were included in the payload, and since it was variable, this wouldn’t work.
The Answer - MAP
🔗
You need to use a MAP
rather than a STRUCT
.
Here’s a working example using ksqlDB 0.12.
-
Load the sample data into a topic
kafkacat -b localhost:9092 -P -t events <<EOF { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } } { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } } EOF
-
In ksqlDB, declare the stream:
CREATE STREAM "events" ( "device" VARCHAR, "event" VARCHAR, "firstOccurenceTime" VARCHAR, "lastOccurenceTime" VARCHAR, "occurenceCount" INTEGER, "receiveTime" VARCHAR, "persistTime" VARCHAR, "state" VARCHAR, "context" MAP < VARCHAR, VARCHAR > ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
-
Query the stream to check things work:
ksql> SET 'auto.offset.reset' = 'earliest'; Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change. ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES; +----------+--------+--------------------------+--------+------------------------------------+ |device |event |receiveTime |state |context | +----------+--------+--------------------------+--------+------------------------------------+ |REDACTED |403151 |2020-09-30T11:03:50.000Z |open |{1=REDACTED, 2=25, 3=0, 4=60} | |REDACTED |402004 |2020-10-07T07:02:48Z |open |{1=REDACTED, 2=2020-10-07T07:02:48.0| | | | | |000000Z} |
-
Use the
['']
syntax to access specific keys within the map:ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES; +-----------+--------+------------------------------------+-----------+-----------+ |device |event |context |CONTEXT_1 |CONTEXT_3 | +-----------+--------+------------------------------------+-----------+-----------+ |REDACTED |403151 |{1=REDACTED, 2=25, 3=0, 4=60} |REDACTED |0 | |REDACTED |402004 |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED |null | | | |000000Z} | | |