rmoff's random ramblings
about talks

Using ksqlDB to process data ingested from ActiveMQ with Kafka Connect

Published Mar 19, 2021 by in ActiveMQ, Kafka Connect, KsqlDB at https://preview.rmoff.net/2021/03/19/using-ksqldb-to-process-data-ingested-from-activemq-with-kafka-connect/

The ActiveMQ source connector creates a Struct holding the value of the message from ActiveMQ (as well as its key). This is as would be expected. However, you can encounter challenges in working with the data if the ActiveMQ data of interest within the payload is complex. Things like converters and schemas can get really funky, really quick.

The Problem ðŸ”—

In my example the messages from the source system that I wanted to process in Kafka were sent to ActiveMQ as batches of messages. They were batched up in the text field of the ActiveMQ payload as a JSON array. What I actually wanted to handle as individual messages like this:

{ "id": 1, "name": "Foo", "colour": "yellow" }
{ "id": 2, "name": "Bar", "colour": "blue" }

Came through as a single array object like this:

[
  { "id": 1, "name": "Foo", "colour": "yellow" },
  { "id": 2, "name": "Bar", "colour": "blue" }
]

The additional challenge I soon encountered was that the array was a a top-level element, rather than a named element like this:

{
  "messages": [
    { "id": 1, "name": "Foo", "colour": "yellow" },
    { "id": 2, "name": "Bar", "colour": "blue" }
  ]
}

And all of this was then wrapped in the envelope of the ActiveMQ message:

{
  "messageID": "42",
  "messageType": "text",
  "timestamp": 1616146381271,
  "text": "[{\"id\":1,\"name\":\"Foo\",\"colour\":\"yellow\"},{\"id\":2,\"name\":\"Bar\",\"colour\":\"blue\"}]"
}

So what to do? Well, courtesy of lots of head scratching and a genius insight from my colleague Mike Bingham I came up with a solution of sorts.

In short, use ksqlDB to manipulate the field of interest out into its own topic as a named element, and then use ksqlDB’s EXPLODE function to split the array into individual messages. Read on to see the details…

The Answer (…well, an answer) ðŸ”—

Serialise your data from the ActiveMQ connector using a converter that will retain the schema, because it has one (the 'envelope', if you want to think of it like that). I’m using Avro:

curl -i -X PUT -H "Accept:application/json" \
-H  "Content-Type:application/json" http://localhost:8083/connectors/source-activemq-networkrail-test_avro/config \
-d '{
    "connector.class"                    : "io.confluent.connect.activemq.ActiveMQSourceConnector",
[…]
    "kafka.topic"                        : "test_avro",
    "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter"                      : "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url"  : "http://schema-registry:8081"
}'

The key, value, and metadata of a message in the topic looks like this:

kafkacat -b broker:29092 -t test_avro -C -c1 -r http://schema-registry:8081 -s avro -J | jq '.'
{
  "topic": "test_avro",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1616146381271,
  "broker": 1,
  "key": {
    "messageID": "ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:3:46:7053"
  },
  "payload": {
    "messageID": "ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:3:46:7053",
    "messageType": "text",
    "timestamp": 1616146381271,
    "deliveryMode": 2,
    "correlationID": null,
    "replyTo": null,
    "destination": {
      "Destination": {
        "destinationType": "topic",
        "name": "TRAIN_MVT_EA_TOC"
      }
    },
    "redelivered": false,
    "type": null,
    "expiration": 1616146681271,
    "priority": 4,
    "properties": {},
    "bytes": null,
    "map": null,
    "text": {
      "string": "[{\"header\":{\"msg_type\":\"0003\",\"source_dev_id\":\"\",\"user_id\":\"\",\"original_data_source\":\"SMART\",\"msg_queue_timestamp\":\"1616146379000\",\"source_system_id\":\"TRUST\"},\"body\":{\"event_type\":\"DEPARTURE\",\"gbtt_timestamp\":\"1616146440000\",\"original_loc_stanox\":\"\",\"planned_timestamp\":\"1616146440000\",\"timetable_variation\":\"2\",\"original_loc_timestamp\":\"\",\"current_train_id\":\"\",\"delay_monitoring_point\":\"true\",\"next_report_run_time\":\"2\",\"reporting_stanox\":\"04235\",\"actual_timestamp\":\"1616146320000\",\"correction_ind\":\"false\",\"event_source\":\"AUTOMATIC\",\"train_file_address\":null,\"platform\":\" 3\",\"division_code\":\"20\",\"train_terminated\":\"false\",\"train_id\":\"321S321619\",\"offroute_ind\":\"false\",\"variation_status\":\"EARLY\",\"train_service_code\":\"21730001\",\"toc_id\":\"20\",\"loc_stanox\":\"04235\",\"auto_expected\":\"true\",\"direction_ind\":\"UP\",\"route\":\"1\",\"planned_event_type\":\"DEPARTURE\",\"next_report_stanox\":\"04291\",\"line_ind\":\"\"}}]"
    }
  }
}

Declare a ksqlDB stream on the source data:

CREATE STREAM MQ_SOURCE WITH (KAFKA_TOPIC='test_avro', FORMAT='AVRO');

The schema is that declared by the connector:

ksql> DESCRIBE MQ_SOURCE;

Name                 : MQ_SOURCE
 Field         | Type
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ROWKEY        | STRUCT<MESSAGEID VARCHAR(STRING)> (key)
 MESSAGEID     | VARCHAR(STRING)
 MESSAGETYPE   | VARCHAR(STRING)
 TIMESTAMP     | BIGINT
 DELIVERYMODE  | INTEGER
 CORRELATIONID | VARCHAR(STRING)
 REPLYTO       | STRUCT<DESTINATIONTYPE VARCHAR(STRING), NAME VARCHAR(STRING)>
 DESTINATION   | STRUCT<DESTINATIONTYPE VARCHAR(STRING), NAME VARCHAR(STRING)>
 REDELIVERED   | BOOLEAN
 TYPE          | VARCHAR(STRING)
 EXPIRATION    | BIGINT
 PRIORITY      | INTEGER
 PROPERTIES    | MAP<STRING, STRUCT<PROPERTYTYPE VARCHAR(STRING), BOOLEAN BOOLEAN, BYTE INTEGER, SHORT INTEGER, INTEGER INTEGER, LONG BIGINT, FLOAT DOUBLE, DOUBLE DOUBLE, STRING VARCHAR(STRING)>>
 MAP           | MAP<STRING, STRUCT<PROPERTYTYPE VARCHAR(STRING), BOOLEAN BOOLEAN, BYTE INTEGER, SHORT INTEGER, INTEGER INTEGER, LONG BIGINT, FLOAT DOUBLE, DOUBLE DOUBLE, STRING VARCHAR(STRING)>>
 TEXT          | VARCHAR(STRING)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

The field of interest is TEXT, which is a string holding the JSON payload, and specifically, an unbounded top-level array JSON object.

ksql> SELECT TEXT FROM MQ_SOURCE EMIT CHANGES LIMIT 1;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|TEXT                                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"header":{"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616146379000","source_system_id":"TRUST"},"body":{"event_type":"DEPARTURE","gbtt_timestamp":"1616146440000","original_loc_stanox":"","planned_timestamp":"1616146440000","timetable_variation":"2","original_loc_timestamp":"","current_train_id":"","delay_monitoring|
|_point":"true","next_report_run_time":"2","reporting_stanox":"04235","actual_timestamp":"1616146320000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":" 3","division_code":"20","train_terminated":"false","train_id":"321S321619","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21730001","toc_id":"20","loc_stanox":|
|"04235","auto_expected":"true","direction_ind":"UP","route":"1","planned_event_type":"DEPARTURE","next_report_stanox":"04291","line_ind":""}}]                                                                                                                                                                                                                                             |
Limit Reached
Query terminated
ksql>

To be able to manipulate the array we need to pull it out into a named element. We do this—and discard the rest of the ActiveMQ data—writing just the JSON array with a constructed root level element ('{"MSG_ARRAY":' + text + '}') as a primitive (i.e. no schema) back to a new Kafka topic.

Note that we use VALUE_FORMAT='KAFKA' to make sure that the raw JSON is written to the topic and not with a schema wrapped around it. The KEY_FORMAT remains as Avro because it doesn’t need to change.

CREATE STREAM CONSTRUCTED_JSON
  WITH (KEY_FORMAT  ='AVRO',
        VALUE_FORMAT='KAFKA') AS
  SELECT ROWKEY,
         '{"MSG_ARRAY":' + TEXT + '}'
    FROM MQ_SOURCE;

Now we have a topic with a value that looks like this:

kafkacat -b broker:29092 -t CONSTRUCTED_JSON -C -c1 \
         -r http://schema-registry:8081 -s value=s -s key=avro -f '%s'
{
  "MSG_ARRAY": [
    {
      "header": {
        "msg_type": "0003",
        "source_dev_id": "",
        "user_id": "",
        "original_data_source": "SMART",
        "msg_queue_timestamp": "1616146379000",
        "source_system_id": "TRUST"
      },
      "body": {
        "event_type": "DEPARTURE",
        "gbtt_timestamp": "1616146440000",
        "original_loc_stanox": "",
        "planned_timestamp": "1616146440000",
        "timetable_variation": "2",
        "original_loc_timestamp": "",
        "current_train_id": "",
        "delay_monitoring_point": "true",
        "next_report_run_time": "2",
        "reporting_stanox": "04235",
        "actual_timestamp": "1616146320000",
        "correction_ind": "false",
        "event_source": "AUTOMATIC",
        "train_file_address": null,
        "platform": " 3",
        "division_code": "20",
        "train_terminated": "false",
        "train_id": "321S321619",
        "offroute_ind": "false",
        "variation_status": "EARLY",
        "train_service_code": "21730001",
        "toc_id": "20",
        "loc_stanox": "04235",
        "auto_expected": "true",
        "direction_ind": "UP",
        "route": "1",
        "planned_event_type": "DEPARTURE",
        "next_report_stanox": "04291",
        "line_ind": ""
      }
    }
  ]
}
NOTE: The actual data is a single line of JSON, but I’ve expanded it above for readability.

Now we can create a new ksqlDB stream over this data and declare a schema for the actual JSON object. For now we just declare the array and its first-level elements (we can access the rest of it later by parsing the varcar with EXTRACTJSONFIELD):

CREATE STREAM PAYLOAD (MSG_ARRAY array<struct<header varchar,body varchar>>)
  WITH (KAFKA_TOPIC='CONSTRUCTED_JSON',
        KEY_FORMAT='AVRO',
        VALUE_FORMAT='JSON');

Using the ARRAY_LENGTH function we can check that the JSON is being handled as an array as expected:

ksql> SELECT ARRAY_LENGTH(MSG_ARRAY) AS MSG_ARRAY_LENGTH, ROWKEY FROM PAYLOAD EMIT CHANGES LIMIT 5;
+-----------------+---------------------------------------------------------------------------------+
|MSG_ARRAY_LENGTH |ROWKEY                                                                           |
+-----------------+---------------------------------------------------------------------------------+
|1                |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089} |
|2                |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016} |
|1                |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987} |
|4                |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988} |
|2                |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961} |
Limit Reached
Query terminated

From here we can test exploding each array, and based on the array lengths in the sample above verify that the correct number of total nested messages is returned:

SELECT EXPLODE(MSG_ARRAY)
  FROM PAYLOAD
 WHERE ROWKEY->MESSAGEID IN ('ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961')
EMIT CHANGES;
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147239000","source_system_id":"TRUST"}, BODY={"event_type":"ARRIVAL","gbtt_timestamp":"1616147280000","original_loc_stanox":"","planned_timestamp":"1616147250000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"1","reporting_stanox":"09312","actual_timestamp":"1616147280000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"041M931C19","offroute_ind":"false","variation_status":"LATE","train_service_code":"21730001","toc_id":"20","loc_stanox":"09312","auto_expected":"true","direction_ind":"UP","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"09330","line_ind":""}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147242000","source_system_id":"TRUST"}, BODY={"event_type":"ARRIVAL","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147310000","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"1","reporting_stanox":"00000","actual_timestamp":"1616147280000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"129M141619","offroute_ind":"false","variation_status":"ON TIME","train_service_code":"21731000","toc_id":"20","loc_stanox":"35439","auto_expected":"true","direction_ind":"","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"36605","line_ind":""}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147246000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"1616147220000","original_loc_stanox":"","planned_timestamp":"1616147220000","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"1","reporting_stanox":"12931","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":" 6","division_code":"20","train_terminated":"false","train_id":"129M201F19","offroute_ind":"false","variation_status":"ON TIME","train_service_code":"21731000","toc_id":"20","loc_stanox":"12931","auto_expected":"true","direction_ind":"UP","route":"2","planned_event_type":"DEPARTURE","next_report_stanox":"12932","line_ind":"M"}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147260000","source_system_id":"TRUST"}, BODY={"event_type":"ARRIVAL","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147250000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"1","reporting_stanox":"00000","actual_timestamp":"1616147280000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"361P171B19","offroute_ind":"false","variation_status":"LATE","train_service_code":"21734000","toc_id":"20","loc_stanox":"17112","auto_expected":"true","direction_ind":"","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"16602","line_ind":""}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|{HEADER={"msg_type":"0003","source_dev_id":"VDFB","user_id":"#QGE7066","original_data_source":"SDR","msg_queue_timestamp":"1616147261000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147310000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"4","reporting_stanox":"16591","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"MANUAL","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"191K121E19","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21733000","toc_id":"20","loc_stanox":"16591","auto_expected":"true","direction_ind":"","route":"","planned_event_type":"DEPARTURE","next_report_stanox":"16602","line_ind":""}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147268000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147340000","timetable_variation":"2","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"1","reporting_stanox":"00000","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"361P191D19","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21734000","toc_id":"20","loc_stanox":"32121","auto_expected":"true","direction_ind":"DOWN","route":"3","planned_event_type":"DEPARTURE","next_report_stanox":"32123","line_ind":""}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|{HEADER={"msg_type":"0001","source_dev_id":"","user_id":"","original_data_source":"TSIA","msg_queue_timestamp":"1616147268000","source_system_id":"TRUST"}, BODY={"schedule_source":"C","train_file_address":null,"schedule_end_date":"2021-03-27","train_id":"172J62MJ19","tp_origin_timestamp":"2021-03-19","creation_timestamp":"1616147268000","tp_origin_stanox":"","origin_dep_timestamp":"1616154420000","train_service_code":"21733000","toc_id":"20","d1266_record_number":"00000","train_call_type":"AUTOMATIC","train_uid":"C53833","train_call_mode":"NORMAL","schedule_type":"C","sched_origin_stanox":"17132","schedule_wtt_id":"2J62M","schedule_start_date":"2021-02-01"}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|{HEADER={"msg_type":"0002","source_dev_id":"","user_id":"","original_data_source":"","msg_queue_timestamp":"1616147268000","source_system_id":"TRUST"}, BODY={"train_file_address":null,"train_service_code":"21733000","orig_loc_stanox":"","toc_id":"20","dep_timestamp":"1616154420000","division_code":"20","loc_stanox":"17132","canx_timestamp":"1616147220000","canx_reason_code":"PD","train_id":"172J62MJ19","orig_loc_timestamp":"","canx_type":"ON CALL"}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147276000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147310000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"2","reporting_stanox":"00000","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"129M141619","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21731000","toc_id":"20","loc_stanox":"35439","auto_expected":"true","direction_ind":"DOWN","route":"2","planned_event_type":"DEPARTURE","next_report_stanox":"36605","line_ind":""}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|{HEADER={"msg_type":"0001","source_dev_id":"","user_id":"","original_data_source":"TSIA","msg_queue_timestamp":"1616147279000","source_system_id":"TRUST"}, BODY={"schedule_source":"C","train_file_address":null,"schedule_end_date":"2021-03-19","train_id":"325P981L19","tp_origin_timestamp":"2021-03-19","creation_timestamp":"1616147279000","tp_origin_stanox":"","origin_dep_timestamp":"1616158200000","train_service_code":"21730005","toc_id":"20","d1266_record_number":"00000","train_call_type":"MANUAL","train_uid":"N13562","train_call_mode":"NORMAL","schedule_type":"N","sched_origin_stanox":"32043","schedule_wtt_id":"5P981","schedule_start_date":"2021-03-08"}}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |

Ten messages, as expected based on the array lengths above (1 + 2 + 1 + 4 + 2)

Finally, we can actually explode the arrays of messages into individual messages on a new Kafka topic:

CREATE STREAM INDIVIDUAL_MESSAGES WITH (FORMAT='AVRO') AS
  SELECT ROWKEY, EXPLODE(MSG_ARRAY) AS MESSAGE
    FROM PAYLOAD;

Using the same test as above we can check the message counts grouped by key

SELECT ROWKEY->MESSAGEID,
       COUNT(*) AS MESSAGE_CT
  FROM INDIVIDUAL_MESSAGES
 WHERE ROWKEY->MESSAGEID IN ('ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988',
                             'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961')
GROUP BY ROWKEY->MESSAGEID
EMIT CHANGES;
+----------------------------------------------------------------------+-----------+
|MESSAGEID                                                             |MESSAGE_CT |
+----------------------------------------------------------------------+-----------+
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089  |1          |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016  |2          |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987  |1          |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988  |4          |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961  |2          |

And there you have it - a JSON array nested within a source ActiveMQ message, exploded out into separate Kafka messages, ready for your consumption and delectation:

ksql> SELECT EXTRACTJSONFIELD(MESSAGE->HEADER,'$.msg_type')           AS MSG_TYPE,
             TIMESTAMPTOSTRING(CAST(
               EXTRACTJSONFIELD(MESSAGE->BODY,'$.actual_timestamp')
               AS BIGINT),'yyyy-MM-dd HH:mm:ss','Europe/London')      AS ACTUAL_TIMESTAMP ,
             EXTRACTJSONFIELD(MESSAGE->BODY,  '$.train_service_code') AS TRAIN_SERVICE_CODE,
             EXTRACTJSONFIELD(MESSAGE->BODY,  '$.event_type')         AS EVENT_TYPE,
             EXTRACTJSONFIELD(MESSAGE->BODY,  '$.variation_status')   AS VARIATION_STATUS
        FROM INDIVIDUAL_MESSAGES EMIT CHANGES;

+--------------------+--------------------+--------------------+--------------------+--------------------+
|MSG_TYPE            |ACTUAL_TIMESTAMP    |TRAIN_SERVICE_CODE  |EVENT_TYPE          |VARIATION_STATUS    |
+--------------------+--------------------+--------------------+--------------------+--------------------+
|0003                |2021-03-19 10:29:00 |11817020            |DEPARTURE           |ON TIME             |
|0003                |2021-03-19 10:29:00 |21734000            |DEPARTURE           |LATE                |
|0003                |2021-03-19 10:29:00 |21732000            |DEPARTURE           |EARLY               |
|0003                |2021-03-19 10:32:00 |21734000            |ARRIVAL             |LATE                |
|0003                |2021-03-19 10:29:00 |21730001            |DEPARTURE           |EARLY               |
|0003                |2021-03-19 10:30:00 |21732000            |DEPARTURE           |LATE                |
[…]

Data used in this article is provided by Network Rail under OGL v3


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025