Prompted by a question on StackOverflow I had a bit of a dig into how windows behave in ksqlDB, specifically with regards to their start time. This article shows also how to create test data in ksqlDB and create data to be handled with a timestamp in the past.
For a general background to windowing in ksqlDB see the excellent docs.
The nice thing about recent releases of ksqlDB/KSQL is that you can create and populate streams directly with CREATE STREAM
and INSERT INTO
respectively. Much as I love kafkacat, being able to build a whole example within the ksqlDB CLI is very useful.
Create the stream 🔗
This creates the stream as well as the underlying topic. Since the topic doesn’t exist already I’ve specified its name and also PARITIONS
- without these ksqlDB won’t create it for me.
CREATE STREAM SOURCE_DATA (OP_TS BIGINT, CUSTOMER VARCHAR, COST INT)
WITH (KAFKA_TOPIC='MY_DATA',
VALUE_FORMAT='AVRO',
PARTITIONS=1,
TIMESTAMP='OP_TS');
Note that I’ve created OP_TS
to hold the timestamp as an epoch (hence BIGINT
) and indicated to ksqlDB that this column is to be used as the timestamp for the records when doing any time-based processing. By default ksqlDB will use the timestamp of the Kafka message.
Populate the stream 🔗
Using https://www.epochconverter.com/ for ease I came up with a handful of times within the past year, and inserted messages into the stream for these:
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1549715863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1560083863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1574339863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1575981463000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1576931863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1578573463000, 'A',1);
Query the stream 🔗
Here’s how the data looks. Note a few things:
-
The use of
TIMESTAMPTOSTRING
to make the milliseconds-since-epoch more readable -
The
UNIX_TIMESTAMP
function is used to do some date maths to show how long ago from now the timestamp is. -
ROWTIME
andOP_TS
match, because that’s what we told ksqlDB with theWITH TIMESTAMP
clause in theCREATE STREAM
. If we hadn’t, then theROWTIME
would just be the time at which the rows were `INSERT`ed above.
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS ROWTIME_STR,
TIMESTAMPTOSTRING(OP_TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS OP_TS,
(UNIX_TIMESTAMP()-OP_TS) / 1000 / 60 / 60 / 24 AS DAYS_DIFF ,
CUSTOMER,
COST
FROM SOURCE_DATA
EMIT CHANGES;
+--------------------+--------------------+----------+-----------+-----+
|ROWTIME_STR |OP_TS |DAYS_DIFF |CUSTOMER |COST |
+--------------------+--------------------+----------+-----------+-----+
|2019-02-09 12:37:43 |2019-02-09 12:37:43 |334 |A |1 |
|2019-06-09 13:37:43 |2019-06-09 13:37:43 |214 |A |1 |
|2019-11-21 12:37:43 |2019-11-21 12:37:43 |49 |A |1 |
|2019-12-10 12:37:43 |2019-12-10 12:37:43 |30 |A |1 |
|2019-12-21 12:37:43 |2019-12-21 12:37:43 |19 |A |1 |
|2020-01-09 12:37:43 |2020-01-09 12:37:43 |0 |A |1 |
Tumbling window examples 🔗
Here’s the output of tumbling windows of various sizes.
-
7 Days
ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS, CUSTOMER, SUM(COST) FROM SOURCE_DATA WINDOW TUMBLING (SIZE 7 DAYS) GROUP BY CUSTOMER EMIT CHANGES ; +----------------------+----------+------------+ |WINDOW_START_TS |CUSTOMER |KSQL_COL_2 | +----------------------+----------+------------+ |2019-02-07 00:00:00 |A |1 | |2019-06-06 01:00:00 |A |1 | |2019-11-21 00:00:00 |A |1 | |2019-12-05 00:00:00 |A |1 | |2019-12-19 00:00:00 |A |1 | |2020-01-09 00:00:00 |A |1 |
This is pretty much what we’d expect to see
-
31 days
I was hoping for a month, but ksqlDB only supports:
Caused by: line 1:160: mismatched input 'MONTH' expecting {'DAY', 'HOUR', 'MINUTE', 'SECOND', 'MILLISECOND', 'DAYS', 'HOURS', 'MINUTES', 'SECONDS', 'MILLISECONDS'}
Hence here’s 31 days instead:
ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS, CUSTOMER, SUM(COST) FROM SOURCE_DATA WINDOW TUMBLING (SIZE 31 DAYS) GROUP BY CUSTOMER EMIT CHANGES ; +-----------------------+----------+------------+ |WINDOW_START_TS |CUSTOMER |KSQL_COL_2 | +-----------------------+----------+------------+ |2019-01-22 00:00:00 |A |1 | |2019-05-26 01:00:00 |A |1 | |2019-10-28 00:00:00 |A |1 | |2019-11-28 00:00:00 |A |2 | |2019-12-29 00:00:00 |A |1 |
Note that two of the values (for
2019-12-10
,2019-12-21
) fall within the same window (starting2019-11-28
) -
365 days
As noted above ksqlDB support
DAYS
as the largest unit of time, so I’ll have to approximate 1 year ~~ 365 daysksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS, CUSTOMER, SUM(COST) FROM SOURCE_DATA WINDOW TUMBLING (SIZE 365 DAYS) GROUP BY CUSTOMER EMIT CHANGES ; +-----------------------+----------+------------+ |WINDOW_START_TS |CUSTOMER |KSQL_COL_2 | +-----------------------+----------+------------+ |2018-12-20 00:00:00 |A |4 | |2019-12-20 00:00:00 |A |2 |
So this is where it gets interesting - looking back on the query output above you can see we only have data within the last year, but for a tumbling window of 365 days we’re getting two values, starting on December 20th of two consecutive years.
Window start date 🔗
When calculating an time window within a day the window starts at midnight. For a window greater than a day, it seems that the window date from which the window end date is calculated is based on Unix time (which also ties in with window sizes less than a day starting a midnight).
Let’s double-check this assumption. Unix time starts at 1st January 1970 00:00:00. Taking our window size of 365 days, this is:
days |
365 |
hours |
8760 |
minutes |
525600 |
seconds |
31536000 |
millseconds |
31536000000 |
Now let’s look at the epoch returned by WINDOWSTART()
:
ksql> SELECT WINDOWSTART() AS WINDOW_START_EPOCH,
CUSTOMER,
SUM(COST)
FROM SOURCE_DATA
WINDOW TUMBLING (SIZE 365 DAYS)
GROUP BY CUSTOMER
EMIT CHANGES ;
+--------------------+---------+-----------+
|WINDOW_START_EPOCH |CUSTOMER |KSQL_COL_2 |
+--------------------+---------+-----------+
|1545264000000 |A |4 |
|1576800000000 |A |2 |
The first of these is 1545264000000
. What do we get if we divide this by the number of milliseconds in a 365-day window (31536000000
)? We find that it fits exactly: 1545264000000âž—31536000000 = 49
.
This suggests that if you want to build monthly or yearly aggregates in ksqlDB that start based on the Gregorian calendar markers, ksqlDB will need to add support for MONTH
and YEAR
as window sizes (tracking in issue #1968).