Whether you’re processing data in batch or as a stream, the concept of time is an important part of accurate processing logic.
Because we process data after it happens, there are a minimum of two different types of time to consider:
-
When it happened, known as Event Time
-
When we process it, known as Processing Time (or system time or wall clock time)
In the days of yore, event time could be different from processing time by many hours. At 9am a supermarket opens and I buy a tin of baked beans. At 6pm the store closes, the end-of-day runs on the in-store PoS, and sends the data back to head office. Perhaps at 10pm all the data from all the stores has been received and processing starts.
In the modern age many platforms will be much lower latency. PoS systems would be sending information about my tin of baked beans purchase back to a central system (probably using Apache Kafka…) as soon as it happened. But even then, times can differ. There’s the natural latency that the speed of light has a strong opinion on, on top of which is any other processing or batching that might be introduced along the way. Then there are power outages and network blips and gremlins that can mean records can be significantly delayed.
There’s a whole category of data that wasn’t a significant thing in the past—data that’s generated from mobile devices. Even with improving cellular coverage and in-flight wifi, data can still be delayed by minutes or hours.
So that’s event time and processing time. But there are other types of time too:
-
The time the record was written in the system
-
The time a record was first created
-
The time a record was last updated
-
Arbitrary time fields on a record, such as an order record with times that the order was place, fulfilled, and shipped
I’ll come back to these in detail later. For now, just keep in mind that there is no single "correct" time to use. It depends on what your processing is supposed to be doing. If you are trying to calculate how many orders were shipped in the last hour, then using the time that the order was created or that it landed in the processing system would give you an answer—it would just be the wrong one.
This seminal article from Tyler Akidau is nearly ten years old, but is still a highly relevant and excellent read and covers a lot of what you need to know about time in stream processing. |
Time in Kafka 🔗
Each Kafka message is made up of several parts:
-
Key
-
Value (the payload)
-
Header
-
Timestamp
Here’s an example record. The value is in JSON, which I’ve pretty-printed to make is easier to follow:
Key (14 bytes): {"order_id":1}
Value (205 bytes):
{
"order_id": 1,
"customer_id": 1001,
"total_amount": 149.99,
"status": "pending",
"created_at": "2025-04-25 09:44:25",
"shipped_at": null,
"shipping_address": "221B Baker Street, London",
"payment_method": "Credit Card"
}
Timestamp: 1745587589625
There are two times here:
-
The event time which we’ll take as
created_at
from the value:2025-04-25 09:44:25
. -
The processing time which is stored in the Kafka message timestamp
1745587589625
. This is the milliseconds since the epoch, and converts to2025-04-25 13:26:29.625
.
The timestamp of a Kafka message can be set explicitly by the producer.
If it’s not set explicitly then the producer sets the timestamp to the current time when it sends the message to the broker.
The timestamp that’s included in the message written by the broker depends on the message.timestamp.type
topic configuration.
If this is set to CreateTime
then it uses the time from the producer (either explicit or implicitly set), or it can use the time on the broker when the record is written (LogAppendTime
).
In theory you could use the Timestamp field of the Kafka message to hold the event time; the data is being stored anyway, so why not optimise by not holding it twice? The disadvantage of this comes if you’re using a system which doesn’t expose the timestamp metadata of a message, so if it doubt, keep it simple :)
Time in Flink 🔗
The longer you’ve been working with Flink, the higher the chances are that you’ve heard the word "Watermark". I must profess to having spent the last 18 months—since I started working with Flink—with my head in the sand, somewhat Ostrich-like, when it comes to Watermarks. They’re spoken of in hushed tones and with great reverence. They seem to cause great wailing and gnashing of teeth. Conference talks are written about them.
I am going to write about Watermarks in this article…just not quite yet.
Before we get to Watermarks, we need to understand that Flink has the concept of a Time Attribute. There are two types of time attribute:
-
Event Time
-
Processing Time (a.k.a. Wall Clock)
Both of these map to the explanations above; when something happened (event time) and when it was processed (…erm, processing time!).
If you don’t care about event time, you can also forget about watermarks. Watermarks are an event-time only thing. |
In a very rough nut-shell, watermarks define where on the sliding scale between data completeness and data freshness you want your Flink processing to be. But like I said, we’ll get to watermarks later.
Time attributes are used when doing certain processing with Flink that has a time element to it. If you don’t have one you’ll get errors like:
The window function requires the timecol is a time attribute type
Just because a column is a timestamp, it doesn’t mean that it’s a time attribute. A time attribute is a specific characteristic in a Flink table, and you need to explicitly declare it.
To add a time attribute for processing time to a table use a computed column with the PROCTIME()
function.
An event time column is denoted implicitly as a time attribute by assigning a WATERMARK FOR
statement to it in the table DDL.
Some good Flink reference pages:
Let’s look at this in practice, using a table defined over an existing Kafka topic.
Time in Kafka in Flink 🔗
Here’s our Kafka message from above:
Key (14 bytes): {"order_id":1}
Value (205 bytes):
{
"order_id": 1,
"customer_id": 1001,
"total_amount": 149.99,
"status": "pending",
"created_at": "2025-04-25 09:44:25",
"shipped_at": null,
"shipping_address": "221B Baker Street, London",
"payment_method": "Credit Card"
}
Timestamp: 1745488756689
Let’s now create a Flink table for this Kafka topic and explore time attributes. We’ll start off with no declared time attributes:
CREATE TABLE orders_kafka (
order_id INT,
customer_id INT,
total_amount DECIMAL(10, 2),
status STRING,
created_at TIMESTAMP(3),
shipped_at TIMESTAMP(3),
shipping_address STRING,
payment_method STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders_cdc',
'properties.bootstrap.servers' = 'broker:9092',
'key.format' = 'json',
'value.format' = 'json'
);
Here we only see the event time column that we defined in the schema (created_at
):
Flink SQL> SELECT * FROM orders_kafka;
+----+-------------+-------------+--------------+------------+-------------------------+[…]
| op | order_id | customer_id | total_amount | status | created_at |[…]
+----+-------------+-------------+--------------+------------+-------------------------+[…]
| +I | 1 | 1001 | 149.99 | pending | 2025-04-25 09:44:25.000 |[…]
We can access the timestamp of the Kafka message if we add a metadata column:
ALTER TABLE orders_kafka
ADD `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp';
This metadata column looks like this in the schema:
Flink SQL> DESCRIBE orders_kafka;
+------------------+------------------+-------+---------------+---------------------------+-----------+
| name | type | null | key | extras | watermark |
+------------------+------------------+-------+---------------+---------------------------+-----------+
| order_id | INT | FALSE | PRI(order_id) | | |
| customer_id | INT | TRUE | | | |
| total_amount | DECIMAL(10, 2) | TRUE | | | |
| status | STRING | TRUE | | | |
| created_at | TIMESTAMP(3) | TRUE | | | |
| shipped_at | TIMESTAMP(3) | TRUE | | | |
| shipping_address | STRING | TRUE | | | |
| payment_method | STRING | TRUE | | | |
| record_time | TIMESTAMP_LTZ(3) | TRUE | | METADATA FROM 'timestamp' | |
+------------------+------------------+-------+---------------+---------------------------+-----------+
9 rows in set
Now we can query it:
SELECT order_id, created_at, record_time FROM orders_kafka;
+----+-------------+-------------------------+-------------------------+
| op | order_id | created_at | record_time |
+----+-------------+-------------------------+-------------------------+
| +I | 1 | 2025-04-25 09:44:25.000 | 2025-04-25 13:26:29.625 |
This matches the timestamps above that we observed in the raw Kafka message—except the record_time
is displayed here in UTC whereas the conversion that I did above gave it in BST (UTC+1).
Aren’t timestamps fun!? ;)
Whilst we’ve got the Kafka message’s timestamp metadata, which is akin to the processing time of the message (unless overriden by the producer), this is not the same as the processing time attribute in Flink. For this, we need another special column:
ALTER TABLE orders_kafka
ADD `flink_proc_time` AS PROCTIME();
Now we have three timestamps :)
SELECT order_id, created_at, record_time, flink_proc_time FROM orders_kafka;
+----+-------------+-------------------------+-------------------------+-------------------------+
| op | order_id | created_at | record_time | flink_proc_time |
+----+-------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 2025-04-25 09:44:25.000 | 2025-04-25 13:26:29.625 | 2025-04-25 15:09:57.349 |
If I re-run the query I get this: (note that the flink_proc_time
changes whilst the others don’t)
+----+-------------+-------------------------+-------------------------+-------------------------+
| op | order_id | created_at | record_time | flink_proc_time |
+----+-------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 2025-04-25 09:44:25.000 | 2025-04-25 13:26:29.625 | 2025-04-25 15:10:09.743 |
The processing time attribute is literally just the time at which the data is passing through Flink. You may have figured already by now, but since the processing time is just the wall clock, queries using processing time are going to be non-deterministic. Contrast that to event time attribute in which it’s part of the actual data, making the queries deterministic. That is, when you re-run the query, you’ll get the same results.
It’s time… 🔗
Let’s now actually run a query in Flink that relies on time.
I’ve added another row of data to the Kafka topic, meaning that the data in Flink now looks like this:
Flink SQL> SELECT order_id, created_at, record_time, flink_proc_time FROM orders_kafka;
+----+-------------+-------------------------+-------------------------+-------------------------+
| op | order_id | created_at | record_time | flink_proc_time |
+----+-------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 2025-04-25 09:44:25.000 | 2025-04-25 13:26:29.625 | 2025-04-25 15:10:09.743 |
| +I | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 13:26:35.928 | 2025-04-25 15:10:09.743 |
We’ll count how many orders were placed every minute. For this we can use a tumbling window:
SELECT window_start,
window_end,
COUNT(*) as event_count
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
Now we hit our first problem:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException:
The window function requires the timecol is a time attribute type, but is TIMESTAMP(3).
The timecol
in this message means the time column that we specified in the query as the one to use in the time-based aggregated—created_at
.
But even though created_at
is a timestamp, it’s not a time attribute.
Recall that above we detailed the two types of time attribute in Flink:
-
Event Time
-
Processing Time (a.k.a. Wall Clock)
We do have a time attribute on the table—flink_proc_time
Flink SQL> DESCRIBE orders_kafka;
+------------------+-----------------------------+-------+---------------+---------------------------+-----------+
| name | type | null | key | extras | watermark |
+------------------+-----------------------------+-------+---------------+---------------------------+-----------+
| order_id | INT | FALSE | PRI(order_id) | | |
| customer_id | INT | TRUE | | | |
| total_amount | DECIMAL(10, 2) | TRUE | | | |
| status | STRING | TRUE | | | |
| created_at | TIMESTAMP(3) | TRUE | | | |
| shipped_at | TIMESTAMP(3) | TRUE | | | |
| shipping_address | STRING | TRUE | | | |
| payment_method | STRING | TRUE | | | |
| record_time | TIMESTAMP_LTZ(3) | TRUE | | METADATA FROM 'timestamp' | |
| flink_proc_time | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS `PROCTIME`() | |
+------------------+-----------------------------+-------+---------------+---------------------------+-----------+
10 rows in set
So let’s use that in the query and see what happens:
SELECT window_start,
window_end,
COUNT(*) as event_count
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(flink_proc_time),
INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
At first, we get nothing:
+----+-------------------------+-------------------------+----------------------+
| op | window_start | window_end | event_count |
+----+-------------------------+-------------------------+----------------------+
That’s because Flink waits for the window to close before issuing the result:
+----+-------------------------+-------------------------+----------------------+
| op | window_start | window_end | event_count |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-04-25 15:11:00.000 | 2025-04-25 15:12:00.000 | 2 |
Let’s look closely at the window timestamp though. The question we’ve answered here is how many records arrived for processing per minute. What it definitely doesn’t tell us is how many orders were placed each minute (which is what we were trying to answer originally).
For that we need to build a window using a different time field; created_at
.
(If we wanted to know how many orders were written to Kafka we’d use the record timestamp, if we wanted to know how many orders shipped each minute we’d use shipped_at
, and so on).
We saw above already that we can’t just pass a timestamp column to the window aggregation; it has to be a column that has been marked as a time attribute. We don’t want to use a processing time attribute because that doesn’t answer our question; we need to use an event time attribute.
To mark a column as an event time attribute we need to use the WATERMARK
statement.
This is where I think things get a bit confusing until you understand it, and then it’s just… shrugs how Flink is.
Let me explain…
Watermarks 🔗
When you run a batch query the engine doing the processing knows when it’s read all of the data. Life is simple. Contrast that to a streaming query in which, by definition, the source of the data is unbounded—so there’s no such thing as having "read all the data". Not only is the source unbounded, but the data may arrive out of order.
TODO: PUT A DIAGRAM HERE
Watermarks are a clever idea that tell the processing engine when it’s OK to consider a passage of time as complete. Where we set the watermark is up to us. Set a watermark too short and whilst you’ll get your final result quicker you’re much more likely to have incomplete data because anything arriving late will be ignored. Then again, set the watermark too long you’ll increase the chances of getting a complete set of data, but at the expense of the result taking longer to finalise.
Which is right? That depends on you and your business process :)
To learn more about watermarks in detail check out these excellent resources: |
So watermarks are a thing, and we need to configure them. We can’t just define a column as an event time attribute on its own; we need to define the watermark that goes with it so that Flink knows when to have considered all data as having been read for a given time.
In my mind here is some pseudo-SQL that I’d like to run when working with an event time attribute, but is not correct Flink SQL:
ALTER TABLE orders_kafka
ALTER COLUMN `created_at` TIMESTAMP_LTZ(3) AS EVENT_TIME;
or something like that.
The point being, we never explicitly say this field is the event time attribute
.
What we actually do is implicitly mark it as the event time attribute by defining the watermark.
Since there’s a watermark, the column on which the watermark is defined must be the event time.
Obvious, right?!
ALTER TABLE orders_kafka
ADD WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND;
This basically tells Flink that it needs to give a five-second leeway when processing created_at
for any late records to arrive on the stream.
There are actually a lot more nuance to how it works, and complexities if you have partitioned input too—https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#advanced-watermark-features[the Flink docs] cover these well, as does this video. |
With the event time attribute defined on the table (by virtue of us having set the WATERMARK
), let’s try our windowed aggregation again, reverting to using created_at
by which the aggregate is generated:
SELECT window_start,
window_end,
COUNT(*) as event_count
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
But this happens…
+----+-------------------------+-------------------------+----------------------+
| op | window_start | window_end | event_count |
+----+-------------------------+-------------------------+----------------------+
No rows get emitted.

We can start to debug this by removing the aggregation and looking at the columns that the table valued function (TVF) return about the window, and also add the CURRENT_WATERMARK
detail:
SELECT order_id,
created_at,
window_start,
window_end,
CURRENT_WATERMARK(created_at) AS CURRENT_WATERMARK
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
);
+----+-------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
| +I | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
So we can see that the orders are being bucketed into the correct time window based on created_at
; but CURRENT_WATERMARK
is null, which I’m guessing is why I don’t get any rows emitted for my aggregate.
Why is there no watermark (i.e. CURRENT_WATERMARK
is NULL)?
Well, the devil is in the detail, and there are two factors at play here.
Idle partitions 🔗
If you’re working with Kafka, it’s vital to be aware of what’s known as the "idle stream problem". This is expertly described here. In short, it occurs when the Kafka source hasn’t sent a watermark from each and every partition yet.

The overall watermark of the job is the minimum of the watermarks across the source partitions. The crucial point here is that if there is no data flowing through one (or more) partitions, that means that no watermark is generated by them either. This means that the overall watermark is not updated.
To see how this impacts our situation let’s first check the number of partitions on the source topic:
$ docker compose exec -it kcat kcat -b broker:9092 -L
Metadata for all topics (from broker 1: broker:9092/1):
1 brokers:
broker 1 at broker:9092 (controller)
1 topics:
topic "orders_cdc" with 3 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
This shows that there are three partitions. To check if we are getting data from each of them we can bring the partition in as a metadata column (like we did for the message timestamp above):
ALTER TABLE orders_kafka
ADD topic_partition INT METADATA FROM 'partition';
And now run the same query, but showing the partitions for each row to check the message partition assignments:
SELECT order_id,
topic_partition,
created_at,
window_start,
window_end,
CURRENT_WATERMARK(created_at) AS CURRENT_WATERMARK
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
);
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
This shows that there’s no messages on partition 1, and thus no watermark is getting generated overall.
One option here is just to add data to the partition and thus cause a watermark to be generated.
The partition is set based on the key of the Kafka message, which is order_id
.
If we add more orders, then we should end up with an order on partition 1.
What I see after adding row to the partition is this—even though it’s in partition 1, still no watermark (based on CURRENT_WATERMARK
being NULL)
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
[…]
| +I | 5 | 1 | 2025-04-25 09:46:01.000 | 2025-04-25 09:46:00.000 | 2025-04-25 09:47:00.000 | <NULL> |
When I add another row, I then get a watermark:
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
[…]
| +I | 6 | 1 | 2025-04-25 09:46:06.000 | 2025-04-25 09:46:00.000 | 2025-04-25 09:47:00.000 | <NULL> |
We’ll come back to this point (that is, why we only see CURRENT_WATERMARK
after a second insert) shortly.
First though, we’ve seen that the reason we weren’t getting a watermark generated was an idle partition; there was no record in partition 1, and so no watermark passed downstream to the watermark for the job.
To deal with this we can configure an idle timeout which tells the upstream watermark generator to ignore any missing watermarks after the amount of time specified.
The configuration property is scan.watermark.idle-timeout
and can be set as a query hint, or a table property:
ALTER TABLE orders_kafka
SET ('scan.watermark.idle-timeout'='5sec');
To test this out I reset the source topic, and added rows afresh, one by one. First, no watermark:
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
but then, a watermark (note that there’s only data on two of the three partitions; this is the scan.watermark.idle-timeout
taking effect):
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | 2025-04-25 09:44:20.000 |
Let’s now look at why CURRENT_WATERMARK
isn’t being set on the first row—and in the example above, why it took a second row being added to partition 1 for CURRENT_WATERMARK
to be set.
When does a watermark get generated in Flink? 🔗
As described here, the watermark is generated by the source (the Kafka connector, in this case). It’s generated based on the watermark generation strategy specified in the DDL.
We’ve specified our watermark generation strategy as a bounded out of orderness one. That is, events might be out of order, but we’re specifying a bound to how long we will wait for late events:
`created_at` - INTERVAL '5' SECOND
This means that the watermark is generated based on the value of created_at
that’s read by the source, minus five seconds.
The wrinkle here is that by default the watermark is not created immediately when the first row of data is read. Per the docs:
For sql tasks, watermark is emitted periodically by default, with a default period of 200ms, which can be changed by the parameter pipeline.auto-watermark-interval
Since the CURRENT_WATERMARK
function returns the watermark at the time that the row is emitted, and thus if it’s the very beginning of the execution can mean that a watermark hasn’t been generated yet.
There is a cleaner way to look at the current watermark; through the Flink UI:

If there is no watermark then it looks like this:

Putting it into practice 🔗
These two 'nuances' to Flink watermarking (idle partitions, and observing the current watermark/auto-watermark-interval
) are somewhat circularly interlinked.
Now that we’ve considered each on their own, let’s apply it to the problems we saw above.
Here’s the same query as above, with no idle timeout set, and as we saw before CURRENT_WATERMARK
is NULL
which is what we’d expect.
SELECT order_id,
topic_partition,
created_at,
window_start,
window_end,
CURRENT_WATERMARK(created_at) AS CURRENT_WATERMARK
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
);
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
The idle timeout can be set as a table property, but also through a query hint. This has the benefit of proving the difference without needing to change the table definition. In theory it could be that you want to use a different watermark configuration for different uses of the table too.
Here’s the same query, with a hint:
SELECT /*+ OPTIONS('scan.watermark.idle-timeout'='5sec') */
order_id,
topic_partition,
created_at,
window_start,
window_end,
CURRENT_WATERMARK(created_at) AS CURRENT_WATERMARK
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
);
The results in the SQL client look the same:
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end | CURRENT_WATERMARK |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | <NULL> |
But crucially, over in the Flink UI we can inspect the actual watermark for the job:

The watermark rendered locally in my browser is 25/04/2025, 10:44:20
, which is in BST (UTC+1).
This comes from the lowest of the upstream watermarks, of which there are two.
These watermarks are the highest value of created_at
for each partition, with the watermark generation strategy applied, which was
`created_at` - INTERVAL '5' SECOND
Thus partition 0’s watermark (09:44:25
minus 5 seconds) is used: 2025-04-25 09:44:20.000
UTC
So back to where we were: a tumbling time window 🔗
From the above we’ve learnt two things:
-
We need to understand the impact of an idle partition on the overall watermark that’s generated. By setting
scan.watermark.idle-timeout
as a query hint we can see if it resolves the problem, and if it does, modify the table’s properties:ALTER TABLE orders_kafka SET ('scan.watermark.idle-timeout'='30 sec');
-
CURRENT_WATERMARK
is useful but only once a query is 'warmed up'; at the beginning, or for a very sparse number of records, the row it is emitted with in a query may not reflect the watermark that follows from the logical implications of the row itself. For example, even if the row emitted is for a previously-idle partition and thus a watermark would be expected, it may not be reflected inCURRENT_WATERMARK
in that row.In this situation a more reliable way to examine the watermark can be through the Flink UI as this is updated continually and does not rely on a row being emitted from the query itself.
Here’s the current state of the table’s definition; we’ve marked the created_at
column as an event time attribute by virtue of having defined a watermark generation strategy on it (`created_at` AS
), and we’ve configure a timeout to avoid an idle partition blocking a watermark from being generated.created_at
- INTERVAL '5' SECOND
CREATE TABLE `orders_kafka` (
`order_id` INT NOT NULL,
`customer_id` INT,
`total_amount` DECIMAL(10, 2),
`status` VARCHAR(2147483647),
`created_at` TIMESTAMP(3),
`shipped_at` TIMESTAMP(3),
`shipping_address` VARCHAR(2147483647),
`payment_method` VARCHAR(2147483647),
`topic_partition` INT METADATA FROM 'partition',
WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND,
CONSTRAINT `PK_order_id` PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'properties.bootstrap.servers' = 'broker:9092',
'connector' = 'upsert-kafka',
'value.format' = 'json',
'key.format' = 'json',
'topic' = 'orders_cdc',
'scan.watermark.idle-timeout' = '30 sec'
);
Now for our original tumbling window query, to answer the question: how many orders have been created each minute?
SELECT window_start,
window_end,
COUNT(*) as event_count
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
But…still nothing

This time (sorry…) though, I know why! Or at least, I think I do.
Here are the two rows of data currently in the source topic:
SELECT order_id,
topic_partition,
created_at,
window_start,
window_end
FROM TABLE(
TUMBLE(TABLE orders_kafka,
DESCRIPTOR(created_at),
INTERVAL '1' MINUTE)
);
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
So there is a window that we’re expecting to get emitted in our query.
It starts at 09:44 and ends a minute later (defined by INTERVAL '1' MINUTE
in the TUMBLE
part of the query) at 09:45.
The window will get emitted once it’s considered 'closed'; that is, the watermark has passed the window_end
time.
It’s worth reiterating here because it’s so crucial to understanding what’s going on: the query emits results based on the watermark. The watermark is driven by event time and not wall clock.
So whilst I’ve just inserted these two rows of data, I can wait until kingdom come, just because a minute has passed on the wallclock, nothing is getting emitted until the watermark moves on past the end of the window.
What’s the current watermark?
It should be the lower of the watermarks across the partitions, which as we can see from the table of data here is going to be 2025-04-25 09:44:25.000
minus five seconds (which is our declared watermark generation strategy), thus 2025-04-25 09:44:20.000
.
If that is the case, then the watermark of the job (09:44:20
) will not be later than the window end time (09:45:00
), and thus nothing can be emitted yet.
Let’s check what the current watermark is to determine if my wild guesseducated reasoning is correct:

Oh no! I was wrong…or was I?

Taking into account the timezone offset (UTC+1) I was right! The current watermark is 25/04/2025, 09:44:20
Why the delay? Because the watermark is only generated after the idle timeout period (30 seconds) has passed.
Monitoring the watermark 🔗
Here’s a trick for monitoring the watermark—use the REST API. This is what the Flink UI is built on, and is also documented.
You can get the REST call from the Flink UI (use DevTools to copy the /watermarks call made when you click on the subtask). You can also construct it by figuring out the job and subtask ("vertex") ID from the /jobs API endpoint.
The REST call using httpie will look like this:
$ http http://localhost:8081/jobs/e79bb1ffe31e359a8152278c43ce81c7/vertices/19843528532cdce10b652a1bfda378b5/watermarks
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8
[
{
"id": "0.currentInputWatermark",
"value": "1745574260000"
}
]
With some jq
magic we can wrap it in a watch
statement to update automagically:
$ watch http "http://localhost:8081/jobs/e79bb1ffe31e359a8152278c43ce81c7/vertices/19843528532cdce10b652a1bfda378b5/watermarks \
| jq '.[].value |= (tonumber / 1000 | todate)'"
[
{
"id": "0.currentInputWatermark",
"value": "2025-04-25T09:44:20Z"
}
]
Back to the tumbling window 🔗
So how do we move the watermark on and get some data emitted from the tumbling window? First off, we need a new watermark to be generated. When Flink SQL is reading from Kafka a watermark is only generated when the Kafka consumer reads a message. No new messages, no updated watermark.
The generated watermark is the lowest (earliest) of the upstream watermarks (i.e. per partition), which are in turn the latest value seen of created_at
minus five seconds.
Note that this excludes idle partitions.
An idle partition could be one in which there’s no data, but it could also be a partition with data but for which no new data has been received within the configured scan.watermark.idle-timeout
time.
This makes sense if you step back and think about what the whole point of watermarks is; to provide a mechanism for handling late and out-of-order data. What Flink is doing is saying "I cannot close this window yet because one or more of the partitions have not told me that it’s got all the data [because the watermark for that partition has not passed the window close time]". It’s also saying "Regardless of the watermark generation policy (5 seconds in our case), I’m going to class any partitions have have not produced any data for a given period of time (30 seconds in our case) as idle, and so ignore their watermark when generating the downstream watermark"
So if I add one more row of data with a more recent created_at
outside of the window it’s not necessarily going to cause the window to close.
Why not?
Because in the other partitions the watermark is still going to be earlier.
But if it’s more than the idle timeout (scan.watermark.idle-timeout
) that partition’s watermark gets disregarded, and so the new row will cause the window to close.
Let’s add the row of data.
It’s several minutes since I created the previous ones.
Remember, created_at
is an event time, not wall clock time.
That said, the idle timeout is based on wall clock time.
Fun, huh!
Here’s the data now:
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 3 | 2 | 2025-04-25 09:45:33.000 | 2025-04-25 09:45:00.000 | 2025-04-25 09:46:00.000 |
So in partition 2 the watermark is 2025-04-25 09:45:28
(2025-04-25 09:45:33
minus five seconds) and in partition 0 the watermark would be 2025-04-25 09:44:25.000
except the partition has idled out (scan.watermark.idle-timeout
) and so in effect is the same as partition 1—idle, and so not included in the calculation of the generated watermark:
http "http://localhost:8081/jobs/e79bb1ffe31e359a8152278c43ce81c7/vertices/19843528532cdce10b652a1bfda378b5/watermarks \
| jq '.[].value |= (tonumber / 1000 | todate)'"
[
{
"id": "0.currentInputWatermark",
"value": "2025-04-25T09:45:28Z"
}
]
Since 09:45:28
is outside the window end, we get our windowed aggregate emitted!
+----+-------------------------+-------------------------+----------------------+
| op | window_start | window_end | event_count |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | 2 |
Now let’s add a record within the next window (09:45
-09:46
):
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 3 | 2 | 2025-04-25 09:45:33.000 | 2025-04-25 09:45:00.000 | 2025-04-25 09:46:00.000 |
| +I | 4 | 2 | 2025-04-25 09:45:38.000 | 2025-04-25 09:45:00.000 | 2025-04-25 09:46:00.000 |
The watermark is now 2025-04-25 09:45:33
(2025-04-25 09:45:38
minus 5 seconds).
If we want to make this window (09:45
-09:46
) emit a row we need to cause the watermark to be greater than 09:46:00
, so we’ll add a record with a created_at
of 09:46:06
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| op | order_id | topic_partition | created_at | window_start | window_end |
+----+-------------+-----------------+-------------------------+-------------------------+-------------------------+
| +I | 2 | 2 | 2025-04-25 09:44:28.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 1 | 0 | 2025-04-25 09:44:25.000 | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 |
| +I | 3 | 2 | 2025-04-25 09:45:33.000 | 2025-04-25 09:45:00.000 | 2025-04-25 09:46:00.000 |
| +I | 4 | 2 | 2025-04-25 09:45:38.000 | 2025-04-25 09:45:00.000 | 2025-04-25 09:46:00.000 |
| +I | 5 | 1 | 2025-04-25 09:46:06.000 | 2025-04-25 09:46:00.000 | 2025-04-25 09:47:00.000 |
The watermark moves on to 2025-04-25 09:46:01
and the aggregate window gets emitted:
+----+-------------------------+-------------------------+----------------------+
| op | window_start | window_end | event_count |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-04-25 09:44:00.000 | 2025-04-25 09:45:00.000 | 2 |
| +I | 2025-04-25 09:45:00.000 | 2025-04-25 09:46:00.000 | 2 |
<< A summary goes here >> 🔗
Reference 🔗
To understand watermarking I needed to digest content from across four of the Flink docs: