Question from the Confluent Community Slack group:
How can I access the data in object in an array like below using ksqlDB stream
"Total": [ { "TotalType": "Standard", "TotalAmount": 15.99 }, { "TotalType": "Old Standard", "TotalAmount": 16, " STID":56 } ]
Let’s take a look at this using using ksqlDB 0.9 (latest version as of May 2020). First, spin up a ksqlDB environment using this Docker Compose.
Send the sample message to a Kafka topic, first wrapping it in curly braces to make it valid JSON
docker exec -i kafkacat kafkacat \
-b kafka:29092 -P \
-t my_topic <<EOF
{ "Total": [ { "TotalType": "Standard", "TotalAmount": 15.99 }, { "TotalType": "Old Standard", "TotalAmount": 16, "STID": 56 } ] }
EOF
Fire up ksqlDB CLI
$ ksql htp://ksqldb:8088
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.9.0, Server v0.9.0 located at http://ksqldb:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Set the offset to the earliest so that we’re querying all the data in the topic
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
Model the input data as a ksqlDB stream:
ksql> CREATE STREAM my_stream (TOTAL ARRAY<STRUCT<TotalType VARCHAR,
TotalAmount VARCHAR,
STID VARCHAR>>)
WITH (KAFKA_TOPIC='my_topic',
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
Play with the data:
-
Select specific array entry
NoteksqlDB arrays are 1-based ksql> SELECT TOTAL[1] FROM my_stream EMIT CHANGES LIMIT 1; +-------------------------------------------------------------------------------------------------------------------------------------------+ |KSQL_COL_0 | +-------------------------------------------------------------------------------------------------------------------------------------------+ |{TOTALTYPE=Standard, TOTALAMOUNT=15.99, STID=null} |
-
Select nested array element
ksql> SELECT TOTAL[1]->TotalType, TOTAL[1]->totalamount FROM my_stream EMIT CHANGES; +--------------------------------------------------------------------+--------------------------------------------------------------------+ |TOTALTYPE |TOTALAMOUNT | +--------------------------------------------------------------------+--------------------------------------------------------------------+ |Standard |15.99 |
-
Explode the array
ksql> SELECT EXPLODE(TOTAL) FROM my_stream EMIT CHANGES; +-------------------------------------------------------------------------------------------------------------------------------------------+ |KSQL_COL_0 | +-------------------------------------------------------------------------------------------------------------------------------------------+ |{TOTALTYPE=Standard, TOTALAMOUNT=15.99, STID=null} | |{TOTALTYPE=Old Standard, TOTALAMOUNT=16, STID=56} |
-
Explode, un-nest, and change the field names of the resulting fields
ksql> SELECT EXPLODE(TOTAL)->TOTALTYPE AS TOTAL_TYPE, EXPLODE(TOTAL)->TOTALAMOUNT AS TOTALAMOUNT, EXPLODE(TOTAL)->STID AS STID FROM my_stream EMIT CHANGES; +---------------------------------------------+---------------------------------------------+---------------------------------------------+ |TOTAL_TYPE |TOTALAMOUNT |STID | +---------------------------------------------+---------------------------------------------+---------------------------------------------+ |Standard |15.99 |null | |Old Standard |16 |56 |
Persist this to a new stream (backed by a Kafka topic):
ksql> CREATE STREAM new_stream AS
SELECT EXPLODE(TOTAL)->TOTALTYPE AS TOTAL_TYPE,
EXPLODE(TOTAL)->TOTALAMOUNT AS TOTALAMOUNT,
EXPLODE(TOTAL)->STID AS STID
FROM my_stream EMIT CHANGES;
Message
-----------------------------------------
Created query with ID CSAS_NEW_STREAM_0
-----------------------------------------
ksql>
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------
NEW_STREAM | 1 | 1
my_topic | 1 | 1
-----------------------------------------------
ksql> PRINT NEW_STREAM FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2020/05/26 08:58:35.691 Z, key: <null>, value: {"TOTAL_TYPE":"Standard","TOTALAMOUNT":"15.99","STID":null}
rowtime: 2020/05/26 08:58:35.691 Z, key: <null>, value: {"TOTAL_TYPE":"Old Standard","TOTALAMOUNT":"16","STID":"56"}