There is an open issue for support of EXPLODE
/UNNEST
functionality in KSQL, and if you need it then do up-vote the issue. Here I detail a hacky, but effective, workaround for exploding arrays into multiple messages—so long as you know the upper-bound on your array.
Populate some test data into a Kafka topic
$ curl "https://api.mockaroo.com/api/440970e0?count=5&key=ff7856d0" | \
kafkacat -P -b localhost -t car_data_01
Examine the data in KSQL:
ksql> PRINT 'car_data_01' FROM BEGINNING;
Format:JSON
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1533200557","car":"Oldsmobile","value":[68.93,53.58]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1548442477","car":"Mercury","value":[60.09,69.07,63.77,63.13]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1544928225","car":"Volkswagen","value":[59.77,6.94,97.7,30.86,16.9]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1545383393","car":"Nissan","value":[13.32]}
{"ROWTIME":1557392065412,"ROWKEY":"null","timestamp":"1552825010","car":"Hyundai","value":[12.92]}
Note the value
element is an array:
"value":[68.93,53.58]
Create a stream over the data:
CREATE STREAM CAR_DATA (timestamp BIGINT, CAR VARCHAR, VALUE ARRAY<DOUBLE>) WITH (KAFKA_TOPIC='car_data_01', VALUE_FORMAT='JSON');
ksql> SELECT TIMESTAMP, CAR, VALUE[0], VALUE[1], VALUE[2] FROM CAR_DATA;
1533200557 | Oldsmobile | 68.93 | 53.58 | null
1548442477 | Mercury | 60.09 | 69.07 | 63.77
1544928225 | Volkswagen | 59.77 | 6.94 | 97.7
1545383393 | Nissan | 13.32 | null | null
1552825010 | Hyundai | 12.92 | null | null
Create the output stream, to start with just containing the zero-index elements of the array:
CREATE STREAM CAR_DATA_EXPLODED AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA WHERE VALUE[0] IS NOT NULL;
ksql> SELECT * FROM CAR_DATA_EXPLODED;
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09
Insert the remaining array indices to the new stream:
CREATE STREAM CAR_DATA_EXPLODED_00 AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 01' AS SOURCE, VALUE[1] AS VALUE FROM CAR_DATA WHERE VALUE[1] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 02' AS SOURCE, VALUE[2] AS VALUE FROM CAR_DATA WHERE VALUE[2] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 03' AS SOURCE, VALUE[3] AS VALUE FROM CAR_DATA WHERE VALUE[3] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 04' AS SOURCE, VALUE[4] AS VALUE FROM CAR_DATA WHERE VALUE[4] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 05' AS SOURCE, VALUE[5] AS VALUE FROM CAR_DATA WHERE VALUE[5] IS NOT NULL;
Examine the exploded data:
ksql> SELECT * FROM CAR_DATA_EXPLODED_00;
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1548442477 | Mercury | Sensor 02 | 63.77
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1544928225 | Volkswagen | Sensor 04 | 16.9
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 01 | 53.58
1557392065409 | null | 1544928225 | Volkswagen | Sensor 02 | 97.7
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 01 | 69.07
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09
1557392065409 | null | 1544928225 | Volkswagen | Sensor 01 | 6.94
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13
ksql> SELECT * FROM CAR_DATA_EXPLODED_00 WHERE SOURCE='Sensor 03';
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13
Remember that this is a workaround - if you actually need this functionality then do be sure to upvote the open issue for support of EXPLODE
/UNNEST
functionality in KSQL.