Introduction 🔗
What can you use stream-stream joins for? Can you use them to join between a stream of orders and stream of related shipments to do useful things? What’s not supported in KSQL, where are the cracks?
Test data 🔗
via Mockaroo:
curl -s "https://api.mockaroo.com/api/86f89de0?count=500&key=ff7856d0" | \
kafkacat -P -b localhost -t orders
curl -s "https://api.mockaroo.com/api/b410d0b0?count=500&key=ff7856d0" | \
kafkacat -P -b localhost -t shipments
Model the streams in KSQL 🔗
Note setting the timestamp to reflect the time of the event (ORDER_TS
and SHIPMENT_TS
respectively)
CREATE STREAM ORDERS (ORDER_ID INT,
CUSTOMER_ID INT,
ORDER_TS VARCHAR,
ORDER_TOTAL_USD DOUBLE,
MAKE VARCHAR)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON',
TIMESTAMP='ORDER_TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
CREATE STREAM SHIPMENTS (SHIPMENT_ID VARCHAR,
ORDER_ID INT,
SHIPMENT_PROVIDER VARCHAR,
SHIPMENT_TS VARCHAR)
WITH (KAFKA_TOPIC='shipments', VALUE_FORMAT='JSON',
TIMESTAMP='SHIPMENT_TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
SET 'auto.offset.reset' = 'earliest';
Simple aggregations (no joins yet) 🔗
-
How many orders have been placed per hour? What was the maximum and average order value?
CREATE TABLE ORDERS_AGG AS SELECT WINDOWSTART() AS WINDOW_START_TS, MAKE, COUNT(*) AS ORDER_COUNT, MAX(ORDER_TOTAL_USD) AS MAX_ORDER_VALUE_USD, SUM(ORDER_TOTAL_USD) AS TOTAL_ORDER_VALUE_USD, SUM(ORDER_TOTAL_USD)/COUNT(*) AS AVG_ORDER_VALUE_USD FROM ORDERS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY MAKE;
SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss'), MAKE, ORDER_COUNT, MAX_ORDER_VALUE_USD FROM ORDERS_AGG; 2019-03-24 16:00:00 | Ford | 8 | 193828.28 2019-03-24 12:00:00 | Ford | 7 | 171296.8 2019-03-24 09:00:00 | Ford | 4 | 181811.96 2019-03-24 04:00:00 | Audi | 9 | 184161.1 2019-03-24 05:00:00 | BMW | 1 | 130305.35 2019-03-24 19:00:00 | BMW | 1 | 200194.86
-
How many shipments per hour?
CREATE TABLE SHIPMENTS_AGG AS SELECT WINDOWSTART() AS WINDOW_START_TS, SHIPMENT_PROVIDER, COUNT(*) AS SHIPMENT_COUNT FROM SHIPMENTS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY SHIPMENT_PROVIDER;
SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss'), SHIPMENT_PROVIDER, SHIPMENT_COUNT FROM SHIPMENTS_AGG; 2019-03-26 01:00:00 | ups | 2 2019-03-26 04:00:00 | dhl | 1 2019-03-26 13:00:00 | dhl | 6 2019-03-26 11:00:00 | ups | 3 2019-03-26 16:00:00 | dhl | 1
Match orders to shipments 🔗
Notes about the SQL:
-
The timestamp of the generated message is set to that of the order. By default KSQL will set the timestamp to that of the message that triggers the join, which here could be an order or shipment.
-
The data is written as Avro, rather than the source JSON
-
A column is created using
CASE
that denotes if the order has shipped, which can be useful for subsequent filtering or aggregations -
The time difference between order and shipment is calculated and stored as a minutes value
-
The join is a
LEFT OUTER
, meaning that records must exist on the left of the join (ORDERS
) and if no matching record on the right (SHIPMENTS
) is found the value will be returned asNULL
. -
Because this is a stream-stream join it needs to specify a time window across which to evaluate the condition. By setting it to
(0 SECONDS, 7 DAYS)
any shipment up to seven days after the order will be matched. No shipments occurring before the order will be matched.
CREATE STREAM ORDER_SHIPMENTS WITH (TIMESTAMP='ORDER_TS', VALUE_FORMAT='AVRO') AS
SELECT O.ROWTIME AS ORDER_TS,
S.ROWTIME AS SHIPMENT_TS,
O.ORDER_ID AS ORDER_ID,
O.MAKE AS MAKE,
O.ORDER_TOTAL_USD AS ORDER_TOTAL_USD,
S.SHIPMENT_ID AS SHIPMENT_ID,
S.SHIPMENT_PROVIDER AS SHIPMENT_PROVIDER,
(S.ROWTIME - O.ROWTIME)/1000/60 AS LEADTIME_MINUTES,
CASE WHEN S.SHIPMENT_ID IS NULL THEN 0 ELSE 1 END AS SHIPPED_IND
FROM ORDERS O
LEFT OUTER JOIN SHIPMENTS S
WITHIN (0 SECONDS, 7 DAYS)
ON O.ORDER_ID=S.ORDER_ID;
From this new stream we can answer questions such as what was the lead time? That is, the time difference between the order being placed and it shipping.
SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
TIMESTAMPTOSTRING(SHIPMENT_TS,'yyyy-MM-dd HH:mm:ss'),
ORDER_ID,
MAKE,
SHIPMENT_PROVIDER,
LEADTIME_MINUTES
FROM ORDER_SHIPMENTS
WHERE ORDER_ID=329
AND SHIPPED_IND=1;
2019-03-24 09:41:13 | 2019-03-26 05:42:37 | 329 | BMW | ups | 2641
(2641 minutes = 44 hours, 1 minute)
What we can’t answer yet from this stream is "Has an order shipped?". It may seem obvious to simply query it for where no match to a shipment was made (e.g. WHERE SHIPMENT_ID IS NULL
) or use the derived SHIPPED_IND
column (which uses the same logic), but this won’t work. Why? Because we are working with a stream of events. Consider the order above, ID 329:
SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
ORDER_ID,
MAKE,
SHIPPED_IND,
SHIPMENT_ID
FROM ORDER_SHIPMENTS
WHERE ORDER_ID=329;
2019-03-24 09:41:13 | 329 | BMW | 0 | null
2019-03-24 09:41:13 | 329 | BMW | 1 | ship-wv85258
There are two events:
-
The order is placed. No shipment matches
-
The shipment is made. It matches to the order and a new event is written.
So if we simply query the stream for a "not shipped" condition we’ll just pull back the initial Order record:
SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
ORDER_ID,
MAKE,
SHIPPED_IND,
SHIPMENT_ID
FROM ORDER_SHIPMENTS
WHERE ORDER_ID=329
AND SHIPPED_IND=0;
2019-03-24 09:41:13 | 329 | BMW | 0 | null
Stream/Table duality in action 🔗
Let’s take the stream of events relating to a given key (ORDER_ID
) and treat it as a table—what’s the current state for the given key? Here we register a KSQL table on top of the Kafka topic to which the previous join query is writing.
CREATE TABLE ORDER_SHIPMENTS_T
WITH (KAFKA_TOPIC='ORDER_SHIPMENTS',
VALUE_FORMAT='AVRO',
KEY='ORDER_ID');
Now we can run the same query as above, but this time we get the current state for any order:
SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
ORDER_ID,
MAKE,
SHIPPED_IND,
SHIPMENT_ID
FROM ORDER_SHIPMENTS_T
WHERE ORDER_ID=329;
2019-03-24 09:41:13 | 329 | BMW | 1 | ship-wv85258
Just one row returned; the current state of the record. Since the table maintains the current state it means that we can accurately query it for orders that have not shipped:
SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
ORDER_ID,
MAKE,
SHIPPED_IND,
SHIPMENT_ID
FROM ORDER_SHIPMENTS_T
WHERE SHIPPED_IND=0;
2019-03-24 16:27:56 | 7 | Audi | 0 | null
2019-03-24 05:54:43 | 39 | Audi | 0 | null
2019-03-24 23:20:37 | 53 | Audi | 0 | null
Build a windowed aggregate on a table? Not yet. 🔗
Note
|
non-windowed aggregates are possible; just not windowed aggregates. |
Taking the above state provided by the KSQL table, we’d like to know things like :
-
By hour, how many orders are there outstanding (that have not shipped)?
However, doing an aggregation on a table is not currently possible in KSQL:
ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss'),
> MAKE,
> COUNT(*) AS ORDERS_PLACED,
> SUM(SHIPPED_IND) AS SHIPPED_ORDERS,
> COUNT(*) - SUM(SHIPPED_IND) AS OUTSTANDING_ORDERS
> FROM ORDER_SHIPMENTS_T
> WINDOW TUMBLING (SIZE 1 HOUR)
> GROUP BY MAKE;
Windowing not supported for table aggregations.
But what about the order count? It would also be useful to have it in the same query as the count of shipments, rather than individual queries against ORDERS_AGG
and SHIPMENTS_AGG
we saw earlier. Can’t we just get that from the ORDER_SHIPMENTS
stream?
Let’s try the aggregate just for ORDER_ID=329
, so that we know there is only a single order:
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss'),
MAKE,
COUNT(*) AS ORDERS_PLACED,
SUM(SHIPPED_IND) AS SHIPPED_ORDERS
FROM ORDER_SHIPMENTS
WINDOW TUMBLING (SIZE 1 HOUR)
WHERE ORDER_ID=329
GROUP BY MAKE;
2019-03-24 09:00:00 | BMW | 2 | 1
Why’s ORDERS_PLACED==2
? Because it’s a COUNT(*)
of all matching rows, which is indeed 2:
SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
ORDER_ID,
MAKE,
SHIPPED_IND,
SHIPMENT_ID
FROM ORDER_SHIPMENTS
WHERE ORDER_ID=329;
2019-03-24 09:41:13 | 329 | BMW | 0 | null
2019-03-24 09:41:13 | 329 | BMW | 1 | ship-wv85258
Useful aggregations that we can do 🔗
Even though we can’t do an aggregate against the state, we can still do some very useful stream processing against the stream of events themselves.
CREATE TABLE SHIPPED_ORDERS_AGG AS
SELECT WINDOWSTART() AS WINDOW_START_TS,
MAKE,
COUNT(*) AS SHIPPED_ORDERS,
MAX(LEADTIME_MINUTES) AS MAX_LEADTIME_MINUTES,
SUM(LEADTIME_MINUTES) / COUNT(*) AS AVG_LEADTIME_MINUTES
FROM ORDER_SHIPMENTS
WINDOW TUMBLING (SIZE 1 HOUR)
WHERE SHIPPED_IND=1
GROUP BY MAKE;
-
By hour, how many orders have shipped
-
By hour, what was the max leadtime?
-
By hour, what was the average leadtime?
SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss'),
MAKE,
SHIPPED_ORDERS,
MAX_LEADTIME_MINUTES,
AVG_LEADTIME_MINUTES
FROM SHIPPED_ORDERS_AGG;
2019-03-24 09:00:00 | Audi | 8 | 3397 | 2939
2019-03-24 20:00:00 | Audi | 9 | 3022 | 2266
2019-03-24 22:00:00 | BMW | 1 | 2317 | 2317
2019-03-24 21:00:00 | BMW | 3 | 2534 | 2121
ALERT! We breached our SLA 😫 🔗
As well as creating aggregates, we can set thresholds and monitor for any orders that breach an SLA in terms of the leadtime.
CREATE STREAM ORDERS_BREACHED_LEADTIME_SLA AS
SELECT ORDER_ID,
SHIPMENT_ID,
MAKE,
SHIPMENT_PROVIDER,
ORDER_TS,
SHIPMENT_TS,
LEADTIME_MINUTES
FROM ORDER_SHIPMENTS
WHERE LEADTIME_MINUTES > 4100;
Now we have a KSQL stream (and thus Kafka topic) which lists an orders that took longer than the defined threshold to ship. This topic can be used for driving both dashboards and applications directly that need to respond to this breach.
SELECT ORDER_ID,
SHIPMENT_ID,
MAKE,
SHIPMENT_PROVIDER,
TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
TIMESTAMPTOSTRING(SHIPMENT_TS,'yyyy-MM-dd HH:mm:ss'),
LEADTIME_MINUTES
FROM ORDERS_BREACHED_LEADTIME_SLA;
14 | ship-og22112 | Audi | hermes | 2019-03-24 03:15:52 | 2019-03-26 23:46:19 | 4110
315 | ship-yp90671 | Ford | dhl | 2019-03-24 00:11:42 | 2019-03-26 21:03:43 | 4132
Try it out? 🔗
Use this Docker Compose.