KSQL provides the ability to create windowed aggregations. For example, count the number of messages in a 1 minute window, grouped by a particular column:
CREATE TABLE RATINGS_BY_CLUB_STATUS AS \ SELECT CLUB_STATUS, COUNT(*) AS RATING_COUNT \ FROM RATINGS_WITH_CUSTOMER_DATA \ WINDOW TUMBLING (SIZE 1 MINUTES) \ GROUP BY CLUB_STATUS; How KSQL, and Kafka Streams, stores the window timestamp associated with an aggregate, has recently changed. See #1497 for details.
Whereas previously the Kafka message timestamp (accessible through the KSQL ROWTIME system column) stored the start of the window for which the aggregate had been calculated, this changed in July 2018 to instead be the timestamp of the latest message to update that aggregate value.
Continue Reading