rmoff's random ramblings
about talks

Working with JSON nested arrays in ksqlDB - example

Published May 26, 2020 by in KsqlDB at https://preview.rmoff.net/2020/05/26/working-with-json-nested-arrays-in-ksqldb-example/

Question from the Confluent Community Slack group:

How can I access the data in object in an array like below using ksqlDB stream

"Total": [
        {
          "TotalType": "Standard",
          "TotalAmount": 15.99
        },
{
          "TotalType": "Old Standard",
          "TotalAmount": 16,
" STID":56
        }
]

Let’s take a look at this using using ksqlDB 0.9 (latest version as of May 2020). First, spin up a ksqlDB environment using this Docker Compose.

Send the sample message to a Kafka topic, first wrapping it in curly braces to make it valid JSON

docker exec -i kafkacat kafkacat \
        -b kafka:29092 -P \
        -t my_topic <<EOF
{ "Total": [ { "TotalType": "Standard", "TotalAmount": 15.99 }, { "TotalType": "Old Standard", "TotalAmount": 16, "STID": 56 } ] }
EOF

Fire up ksqlDB CLI

$ ksql htp://ksqldb:8088

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.9.0, Server v0.9.0 located at http://ksqldb:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Set the offset to the earliest so that we’re querying all the data in the topic

ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

Model the input data as a ksqlDB stream:

ksql> CREATE STREAM my_stream (TOTAL ARRAY<STRUCT<TotalType   VARCHAR, 
                                                  TotalAmount VARCHAR, 
                                                  STID        VARCHAR>>) 
                         WITH (KAFKA_TOPIC='my_topic', 
                               VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Play with the data:

  • Select specific array entry

    Note
    ksqlDB arrays are 1-based
    ksql> SELECT TOTAL[1] FROM my_stream EMIT CHANGES LIMIT 1;
    +-------------------------------------------------------------------------------------------------------------------------------------------+
    |KSQL_COL_0                                                                                                                                 |
    +-------------------------------------------------------------------------------------------------------------------------------------------+
    |{TOTALTYPE=Standard, TOTALAMOUNT=15.99, STID=null}                                                                                         |
  • Select nested array element

    ksql> SELECT TOTAL[1]->TotalType, TOTAL[1]->totalamount FROM my_stream EMIT CHANGES;
    +--------------------------------------------------------------------+--------------------------------------------------------------------+
    |TOTALTYPE                                                           |TOTALAMOUNT                                                         |
    +--------------------------------------------------------------------+--------------------------------------------------------------------+
    |Standard                                                            |15.99                                                               |
  • Explode the array

    ksql> SELECT EXPLODE(TOTAL) FROM my_stream EMIT CHANGES;
    +-------------------------------------------------------------------------------------------------------------------------------------------+
    |KSQL_COL_0                                                                                                                                 |
    +-------------------------------------------------------------------------------------------------------------------------------------------+
    |{TOTALTYPE=Standard, TOTALAMOUNT=15.99, STID=null}                                                                                         |
    |{TOTALTYPE=Old Standard, TOTALAMOUNT=16, STID=56}                                                                                          |
  • Explode, un-nest, and change the field names of the resulting fields

    ksql> SELECT EXPLODE(TOTAL)->TOTALTYPE AS TOTAL_TYPE, 
                 EXPLODE(TOTAL)->TOTALAMOUNT AS TOTALAMOUNT, 
                 EXPLODE(TOTAL)->STID AS STID 
            FROM my_stream EMIT CHANGES;
    +---------------------------------------------+---------------------------------------------+---------------------------------------------+
    |TOTAL_TYPE                                   |TOTALAMOUNT                                  |STID                                         |
    +---------------------------------------------+---------------------------------------------+---------------------------------------------+
    |Standard                                     |15.99                                        |null                                         |
    |Old Standard                                 |16                                           |56                                           |

Persist this to a new stream (backed by a Kafka topic):

ksql> CREATE STREAM new_stream AS
         SELECT EXPLODE(TOTAL)->TOTALTYPE AS TOTAL_TYPE, 
                EXPLODE(TOTAL)->TOTALAMOUNT AS TOTALAMOUNT, 
                EXPLODE(TOTAL)->STID AS STID 
            FROM my_stream EMIT CHANGES;
 Message
-----------------------------------------
 Created query with ID CSAS_NEW_STREAM_0
-----------------------------------------
ksql>
ksql> SHOW TOPICS;

 Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------
 NEW_STREAM  | 1          | 1
 my_topic    | 1          | 1
-----------------------------------------------
ksql> PRINT NEW_STREAM FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2020/05/26 08:58:35.691 Z, key: <null>, value: {"TOTAL_TYPE":"Standard","TOTALAMOUNT":"15.99","STID":null}
rowtime: 2020/05/26 08:58:35.691 Z, key: <null>, value: {"TOTAL_TYPE":"Old Standard","TOTALAMOUNT":"16","STID":"56"}

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025