The ActiveMQ source connector creates a Struct holding the value of the message from ActiveMQ (as well as its key). This is as would be expected. However, you can encounter challenges in working with the data if the ActiveMQ data of interest within the payload is complex. Things like converters and schemas can get really funky, really quick.
The Problem 🔗
In my example the messages from the source system that I wanted to process in Kafka were sent to ActiveMQ as batches of messages. They were batched up in the text
field of the ActiveMQ payload as a JSON array. What I actually wanted to handle as individual messages like this:
{ "id": 1, "name": "Foo", "colour": "yellow" }
{ "id": 2, "name": "Bar", "colour": "blue" }
Came through as a single array object like this:
[
{ "id": 1, "name": "Foo", "colour": "yellow" },
{ "id": 2, "name": "Bar", "colour": "blue" }
]
The additional challenge I soon encountered was that the array was a a top-level element, rather than a named element like this:
{
"messages": [
{ "id": 1, "name": "Foo", "colour": "yellow" },
{ "id": 2, "name": "Bar", "colour": "blue" }
]
}
And all of this was then wrapped in the envelope of the ActiveMQ message:
{
"messageID": "42",
"messageType": "text",
"timestamp": 1616146381271,
"text": "[{\"id\":1,\"name\":\"Foo\",\"colour\":\"yellow\"},{\"id\":2,\"name\":\"Bar\",\"colour\":\"blue\"}]"
}
So what to do? Well, courtesy of lots of head scratching and a genius insight from my colleague Mike Bingham I came up with a solution of sorts.
In short, use ksqlDB to manipulate the field of interest out into its own topic as a named element, and then use ksqlDB’s EXPLODE
function to split the array into individual messages. Read on to see the details…
The Answer (…well, an answer) 🔗
Serialise your data from the ActiveMQ connector using a converter that will retain the schema, because it has one (the 'envelope', if you want to think of it like that). I’m using Avro:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-activemq-networkrail-test_avro/config \
-d '{
"connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
[…]
"kafka.topic" : "test_avro",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://schema-registry:8081"
}'
The key, value, and metadata of a message in the topic looks like this:
kafkacat -b broker:29092 -t test_avro -C -c1 -r http://schema-registry:8081 -s avro -J | jq '.'
{
"topic": "test_avro",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1616146381271,
"broker": 1,
"key": {
"messageID": "ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:3:46:7053"
},
"payload": {
"messageID": "ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:3:46:7053",
"messageType": "text",
"timestamp": 1616146381271,
"deliveryMode": 2,
"correlationID": null,
"replyTo": null,
"destination": {
"Destination": {
"destinationType": "topic",
"name": "TRAIN_MVT_EA_TOC"
}
},
"redelivered": false,
"type": null,
"expiration": 1616146681271,
"priority": 4,
"properties": {},
"bytes": null,
"map": null,
"text": {
"string": "[{\"header\":{\"msg_type\":\"0003\",\"source_dev_id\":\"\",\"user_id\":\"\",\"original_data_source\":\"SMART\",\"msg_queue_timestamp\":\"1616146379000\",\"source_system_id\":\"TRUST\"},\"body\":{\"event_type\":\"DEPARTURE\",\"gbtt_timestamp\":\"1616146440000\",\"original_loc_stanox\":\"\",\"planned_timestamp\":\"1616146440000\",\"timetable_variation\":\"2\",\"original_loc_timestamp\":\"\",\"current_train_id\":\"\",\"delay_monitoring_point\":\"true\",\"next_report_run_time\":\"2\",\"reporting_stanox\":\"04235\",\"actual_timestamp\":\"1616146320000\",\"correction_ind\":\"false\",\"event_source\":\"AUTOMATIC\",\"train_file_address\":null,\"platform\":\" 3\",\"division_code\":\"20\",\"train_terminated\":\"false\",\"train_id\":\"321S321619\",\"offroute_ind\":\"false\",\"variation_status\":\"EARLY\",\"train_service_code\":\"21730001\",\"toc_id\":\"20\",\"loc_stanox\":\"04235\",\"auto_expected\":\"true\",\"direction_ind\":\"UP\",\"route\":\"1\",\"planned_event_type\":\"DEPARTURE\",\"next_report_stanox\":\"04291\",\"line_ind\":\"\"}}]"
}
}
}
Declare a ksqlDB stream on the source data:
CREATE STREAM MQ_SOURCE WITH (KAFKA_TOPIC='test_avro', FORMAT='AVRO');
The schema is that declared by the connector:
ksql> DESCRIBE MQ_SOURCE;
Name : MQ_SOURCE
Field | Type
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ROWKEY | STRUCT<MESSAGEID VARCHAR(STRING)> (key)
MESSAGEID | VARCHAR(STRING)
MESSAGETYPE | VARCHAR(STRING)
TIMESTAMP | BIGINT
DELIVERYMODE | INTEGER
CORRELATIONID | VARCHAR(STRING)
REPLYTO | STRUCT<DESTINATIONTYPE VARCHAR(STRING), NAME VARCHAR(STRING)>
DESTINATION | STRUCT<DESTINATIONTYPE VARCHAR(STRING), NAME VARCHAR(STRING)>
REDELIVERED | BOOLEAN
TYPE | VARCHAR(STRING)
EXPIRATION | BIGINT
PRIORITY | INTEGER
PROPERTIES | MAP<STRING, STRUCT<PROPERTYTYPE VARCHAR(STRING), BOOLEAN BOOLEAN, BYTE INTEGER, SHORT INTEGER, INTEGER INTEGER, LONG BIGINT, FLOAT DOUBLE, DOUBLE DOUBLE, STRING VARCHAR(STRING)>>
MAP | MAP<STRING, STRUCT<PROPERTYTYPE VARCHAR(STRING), BOOLEAN BOOLEAN, BYTE INTEGER, SHORT INTEGER, INTEGER INTEGER, LONG BIGINT, FLOAT DOUBLE, DOUBLE DOUBLE, STRING VARCHAR(STRING)>>
TEXT | VARCHAR(STRING)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
The field of interest is TEXT
, which is a string holding the JSON payload, and specifically, an unbounded top-level array JSON object.
ksql> SELECT TEXT FROM MQ_SOURCE EMIT CHANGES LIMIT 1;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|TEXT |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"header":{"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616146379000","source_system_id":"TRUST"},"body":{"event_type":"DEPARTURE","gbtt_timestamp":"1616146440000","original_loc_stanox":"","planned_timestamp":"1616146440000","timetable_variation":"2","original_loc_timestamp":"","current_train_id":"","delay_monitoring|
|_point":"true","next_report_run_time":"2","reporting_stanox":"04235","actual_timestamp":"1616146320000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":" 3","division_code":"20","train_terminated":"false","train_id":"321S321619","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21730001","toc_id":"20","loc_stanox":|
|"04235","auto_expected":"true","direction_ind":"UP","route":"1","planned_event_type":"DEPARTURE","next_report_stanox":"04291","line_ind":""}}] |
Limit Reached
Query terminated
ksql>
To be able to manipulate the array we need to pull it out into a named element. We do this—and discard the rest of the ActiveMQ data—writing just the JSON array with a constructed root level element ('{"MSG_ARRAY":' + text + '}'
) as a primitive (i.e. no schema) back to a new Kafka topic.
Note that we use VALUE_FORMAT='KAFKA'
to make sure that the raw JSON is written to the topic and not with a schema wrapped around it. The KEY_FORMAT
remains as Avro because it doesn’t need to change.
CREATE STREAM CONSTRUCTED_JSON
WITH (KEY_FORMAT ='AVRO',
VALUE_FORMAT='KAFKA') AS
SELECT ROWKEY,
'{"MSG_ARRAY":' + TEXT + '}'
FROM MQ_SOURCE;
Now we have a topic with a value that looks like this:
kafkacat -b broker:29092 -t CONSTRUCTED_JSON -C -c1 \
-r http://schema-registry:8081 -s value=s -s key=avro -f '%s'
{
"MSG_ARRAY": [
{
"header": {
"msg_type": "0003",
"source_dev_id": "",
"user_id": "",
"original_data_source": "SMART",
"msg_queue_timestamp": "1616146379000",
"source_system_id": "TRUST"
},
"body": {
"event_type": "DEPARTURE",
"gbtt_timestamp": "1616146440000",
"original_loc_stanox": "",
"planned_timestamp": "1616146440000",
"timetable_variation": "2",
"original_loc_timestamp": "",
"current_train_id": "",
"delay_monitoring_point": "true",
"next_report_run_time": "2",
"reporting_stanox": "04235",
"actual_timestamp": "1616146320000",
"correction_ind": "false",
"event_source": "AUTOMATIC",
"train_file_address": null,
"platform": " 3",
"division_code": "20",
"train_terminated": "false",
"train_id": "321S321619",
"offroute_ind": "false",
"variation_status": "EARLY",
"train_service_code": "21730001",
"toc_id": "20",
"loc_stanox": "04235",
"auto_expected": "true",
"direction_ind": "UP",
"route": "1",
"planned_event_type": "DEPARTURE",
"next_report_stanox": "04291",
"line_ind": ""
}
}
]
}
NOTE: The actual data is a single line of JSON, but I’ve expanded it above for readability. |
Now we can create a new ksqlDB stream over this data and declare a schema for the actual JSON object. For now we just declare the array and its first-level elements (we can access the rest of it later by parsing the varcar with EXTRACTJSONFIELD
):
CREATE STREAM PAYLOAD (MSG_ARRAY array<struct<header varchar,body varchar>>)
WITH (KAFKA_TOPIC='CONSTRUCTED_JSON',
KEY_FORMAT='AVRO',
VALUE_FORMAT='JSON');
Using the ARRAY_LENGTH
function we can check that the JSON is being handled as an array as expected:
ksql> SELECT ARRAY_LENGTH(MSG_ARRAY) AS MSG_ARRAY_LENGTH, ROWKEY FROM PAYLOAD EMIT CHANGES LIMIT 5;
+-----------------+---------------------------------------------------------------------------------+
|MSG_ARRAY_LENGTH |ROWKEY |
+-----------------+---------------------------------------------------------------------------------+
|1 |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089} |
|2 |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016} |
|1 |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987} |
|4 |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988} |
|2 |{MESSAGEID=ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961} |
Limit Reached
Query terminated
From here we can test exploding each array, and based on the array lengths in the sample above verify that the correct number of total nested messages is returned:
SELECT EXPLODE(MSG_ARRAY)
FROM PAYLOAD
WHERE ROWKEY->MESSAGEID IN ('ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961')
EMIT CHANGES;
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|KSQL_COL_0 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147239000","source_system_id":"TRUST"}, BODY={"event_type":"ARRIVAL","gbtt_timestamp":"1616147280000","original_loc_stanox":"","planned_timestamp":"1616147250000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"1","reporting_stanox":"09312","actual_timestamp":"1616147280000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"041M931C19","offroute_ind":"false","variation_status":"LATE","train_service_code":"21730001","toc_id":"20","loc_stanox":"09312","auto_expected":"true","direction_ind":"UP","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"09330","line_ind":""}} |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147242000","source_system_id":"TRUST"}, BODY={"event_type":"ARRIVAL","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147310000","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"1","reporting_stanox":"00000","actual_timestamp":"1616147280000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"129M141619","offroute_ind":"false","variation_status":"ON TIME","train_service_code":"21731000","toc_id":"20","loc_stanox":"35439","auto_expected":"true","direction_ind":"","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"36605","line_ind":""}} |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147246000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"1616147220000","original_loc_stanox":"","planned_timestamp":"1616147220000","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"1","reporting_stanox":"12931","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":" 6","division_code":"20","train_terminated":"false","train_id":"129M201F19","offroute_ind":"false","variation_status":"ON TIME","train_service_code":"21731000","toc_id":"20","loc_stanox":"12931","auto_expected":"true","direction_ind":"UP","route":"2","planned_event_type":"DEPARTURE","next_report_stanox":"12932","line_ind":"M"}} |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147260000","source_system_id":"TRUST"}, BODY={"event_type":"ARRIVAL","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147250000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"1","reporting_stanox":"00000","actual_timestamp":"1616147280000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"361P171B19","offroute_ind":"false","variation_status":"LATE","train_service_code":"21734000","toc_id":"20","loc_stanox":"17112","auto_expected":"true","direction_ind":"","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"16602","line_ind":""}} |
|{HEADER={"msg_type":"0003","source_dev_id":"VDFB","user_id":"#QGE7066","original_data_source":"SDR","msg_queue_timestamp":"1616147261000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147310000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"4","reporting_stanox":"16591","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"MANUAL","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"191K121E19","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21733000","toc_id":"20","loc_stanox":"16591","auto_expected":"true","direction_ind":"","route":"","planned_event_type":"DEPARTURE","next_report_stanox":"16602","line_ind":""}} |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147268000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147340000","timetable_variation":"2","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"1","reporting_stanox":"00000","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"361P191D19","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21734000","toc_id":"20","loc_stanox":"32121","auto_expected":"true","direction_ind":"DOWN","route":"3","planned_event_type":"DEPARTURE","next_report_stanox":"32123","line_ind":""}} |
|{HEADER={"msg_type":"0001","source_dev_id":"","user_id":"","original_data_source":"TSIA","msg_queue_timestamp":"1616147268000","source_system_id":"TRUST"}, BODY={"schedule_source":"C","train_file_address":null,"schedule_end_date":"2021-03-27","train_id":"172J62MJ19","tp_origin_timestamp":"2021-03-19","creation_timestamp":"1616147268000","tp_origin_stanox":"","origin_dep_timestamp":"1616154420000","train_service_code":"21733000","toc_id":"20","d1266_record_number":"00000","train_call_type":"AUTOMATIC","train_uid":"C53833","train_call_mode":"NORMAL","schedule_type":"C","sched_origin_stanox":"17132","schedule_wtt_id":"2J62M","schedule_start_date":"2021-02-01"}} |
|{HEADER={"msg_type":"0002","source_dev_id":"","user_id":"","original_data_source":"","msg_queue_timestamp":"1616147268000","source_system_id":"TRUST"}, BODY={"train_file_address":null,"train_service_code":"21733000","orig_loc_stanox":"","toc_id":"20","dep_timestamp":"1616154420000","division_code":"20","loc_stanox":"17132","canx_timestamp":"1616147220000","canx_reason_code":"PD","train_id":"172J62MJ19","orig_loc_timestamp":"","canx_type":"ON CALL"}} |
|{HEADER={"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616147276000","source_system_id":"TRUST"}, BODY={"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616147310000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"2","reporting_stanox":"00000","actual_timestamp":"1616147220000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"129M141619","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21731000","toc_id":"20","loc_stanox":"35439","auto_expected":"true","direction_ind":"DOWN","route":"2","planned_event_type":"DEPARTURE","next_report_stanox":"36605","line_ind":""}} |
|{HEADER={"msg_type":"0001","source_dev_id":"","user_id":"","original_data_source":"TSIA","msg_queue_timestamp":"1616147279000","source_system_id":"TRUST"}, BODY={"schedule_source":"C","train_file_address":null,"schedule_end_date":"2021-03-19","train_id":"325P981L19","tp_origin_timestamp":"2021-03-19","creation_timestamp":"1616147279000","tp_origin_stanox":"","origin_dep_timestamp":"1616158200000","train_service_code":"21730005","toc_id":"20","d1266_record_number":"00000","train_call_type":"MANUAL","train_uid":"N13562","train_call_mode":"NORMAL","schedule_type":"N","sched_origin_stanox":"32043","schedule_wtt_id":"5P981","schedule_start_date":"2021-03-08"}} |
Ten messages, as expected based on the array lengths above (1 + 2 + 1 + 4 + 2)
Finally, we can actually explode the arrays of messages into individual messages on a new Kafka topic:
CREATE STREAM INDIVIDUAL_MESSAGES WITH (FORMAT='AVRO') AS
SELECT ROWKEY, EXPLODE(MSG_ARRAY) AS MESSAGE
FROM PAYLOAD;
Using the same test as above we can check the message counts grouped by key
SELECT ROWKEY->MESSAGEID,
COUNT(*) AS MESSAGE_CT
FROM INDIVIDUAL_MESSAGES
WHERE ROWKEY->MESSAGEID IN ('ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988',
'ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961')
GROUP BY ROWKEY->MESSAGEID
EMIT CHANGES;
+----------------------------------------------------------------------+-----------+
|MESSAGEID |MESSAGE_CT |
+----------------------------------------------------------------------+-----------+
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:9:15:1089 |1 |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:1:20:7016 |2 |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6987 |1 |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:7:44:6988 |4 |
|ID:opendata-backend.rockshore.net-37346-1615271453054-11:1:4:10:6961 |2 |
And there you have it - a JSON array nested within a source ActiveMQ message, exploded out into separate Kafka messages, ready for your consumption and delectation:
ksql> SELECT EXTRACTJSONFIELD(MESSAGE->HEADER,'$.msg_type') AS MSG_TYPE,
TIMESTAMPTOSTRING(CAST(
EXTRACTJSONFIELD(MESSAGE->BODY,'$.actual_timestamp')
AS BIGINT),'yyyy-MM-dd HH:mm:ss','Europe/London') AS ACTUAL_TIMESTAMP ,
EXTRACTJSONFIELD(MESSAGE->BODY, '$.train_service_code') AS TRAIN_SERVICE_CODE,
EXTRACTJSONFIELD(MESSAGE->BODY, '$.event_type') AS EVENT_TYPE,
EXTRACTJSONFIELD(MESSAGE->BODY, '$.variation_status') AS VARIATION_STATUS
FROM INDIVIDUAL_MESSAGES EMIT CHANGES;
+--------------------+--------------------+--------------------+--------------------+--------------------+
|MSG_TYPE |ACTUAL_TIMESTAMP |TRAIN_SERVICE_CODE |EVENT_TYPE |VARIATION_STATUS |
+--------------------+--------------------+--------------------+--------------------+--------------------+
|0003 |2021-03-19 10:29:00 |11817020 |DEPARTURE |ON TIME |
|0003 |2021-03-19 10:29:00 |21734000 |DEPARTURE |LATE |
|0003 |2021-03-19 10:29:00 |21732000 |DEPARTURE |EARLY |
|0003 |2021-03-19 10:32:00 |21734000 |ARRIVAL |LATE |
|0003 |2021-03-19 10:29:00 |21730001 |DEPARTURE |EARLY |
|0003 |2021-03-19 10:30:00 |21732000 |DEPARTURE |LATE |
[…]
Data used in this article is provided by Network Rail under OGL v3