rmoff's random ramblings
about talks

Watermarks in Confluent Cloud for Apache Flink

Published Apr 30, 2025 by in Apache Flink, Confluent Cloud, Watermarks at https://preview.rmoff.net/2025/04/30/watermarks-in-confluent-cloud-for-apache-flink/

In my last post I wrote all about watermarks in Apache Flink. As you’ll have realised—or been aware of already—it can get fairly hairy. That’s why I’m keen to see how Confluent Cloud for Apache Flink deals with watermarks, to see if it makes life any easier for the developer.

As a quick recap, watermarks in Flink are used to mark the latest point in time for which data can be considered to be complete. Without a watermark, Flink won’t issue the results of temporal queries, which includes windowed aggregations and joins.

If you’re sat starting at a screen that looks like this yet have data flowing into the source table, watermarks are often your problem:

CleanShot 2025 04 25 at 16.20.12

On Confluent Cloud for Apache Flink watermarks are handled differently. Let’s see how!

If you’re not familiar with how watermarks work in Apache Flink, please do go back over my previous article. You’ll also find a lot of good content on Confluent Developer, including these videos from David Anderson and Wade Waldron. In this post I’m going to go straight into the detail, talking about things like event time and idle partitions and watermark strategy.

Quick recap: What’s a watermark strategy? 🔗

A Flink watermark strategy defines when a watermark is generated, and the strategy of what value for the watermark is generated.

  • What value is defined by the WATERMARK DDL on a table. For example:

    WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND

    This is what is often referred to as the watermark generation strategy. Here the value of the watermark will be whatever created_at is, minus five seconds.

    It is important to put in place because without it Flink won’t treat the column against which its defined as an event time attribute.

  • When a watermark is generated (or "emitted") based on the watermark emit strategy configuration, which can be set as a property for the table or as a query hint.

The Data 🔗

I’ve populated a topic using the Postgres CDC connector on Confluent Cloud, streaming records into a topic called pg.public.orders. Here’s the first record in the topic:

kcat -q -b $CNFL_KAFKA_BROKER \
    -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN \
    -X sasl.username=$CNFL_KC_API_KEY -X sasl.password=$CNFL_KC_API_SECRET \
    -s avro -r https://$CNFL_SR_API_KEY:$CNFL_SR_API_SECRET@$CNFL_SR_HOST \
    -C -t pg.public.orders -c1 -f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
Key (6 bytes): {"order_id": 1}
Value (76 bytes): {
    "order_id": 1,
    "customer_id": 1001,
    "total_amount": ":\u0097",
    "status": "pending",
    "created_at": {
        "long": 1745574265000000
    },
    "shipped_at": null,
    "shipping_address": {
        "string": "221B Baker Street, London"
    },
    "payment_method": {
        "string": "Credit Card"
    },
    "__deleted": {
        "string": "false"
    }
}
Timestamp: 1746012695220
Partition: 0
Offset: 0
Headers:

From this we can see we’ve got three possible timestamps to work with:

created_at

1745574265000000

Apr 25 2025 10:44:25 GMT+0100

Represents the time at which an event (placing an order) happened.

shipped_at

null

Represents the time at which an event (shipping an order) happened.

Timestamp

1746012695220

Apr 30 2025 12:31:35 GMT+0100

Ingest time of the record into the Kafka topic.

The query 🔗

We’re going to create a windowed aggregation to calculate how many orders were created per minute. Let’s fire up Confluent Cloud for Apache Flink and see what happens.

The aggregation is a straightforward one—it’s a COUNT over a tumbling window, which we implement using a Table-Valued Function (TVF). Before we run the query we need to figure out watermarks. Confluent Cloud for Apache Flink implements a default watermarking strategy based on the $rowtime column (mapped to the Kafka message timestamp). However, in our query we want to aggregate based on created_at (when the Order record was created, set by the source application)—not the Kafka message timestamp (which could be very different from when the order was placed, depending on how we’re getting the data into Kafka and various latencies along the way).

Since there is a default, changing the watermark strategy in Confluent Cloud for Apache Flink is known as creating a custom watermark strategy. If we don’t do this then the aggregation based on created_at will fail with the error The window function requires the timecol is a time attribute type. Setting a custom watermark strategy is done by using running some ALTER TABLE…MODIFY WATERMARK DDL:

ALTER TABLE `rmoff`.`cluster00`.`pg0.public.orders`
    MODIFY WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND;

Now we can run the query:

SELECT  window_start,
        window_end,
        COUNT(*) as event_count
FROM TABLE(
        TUMBLE(TABLE `rmoff`.`cluster00`.`pg0.public.orders`,
                DESCRIPTOR(created_at),
                INTERVAL '1' MINUTE)
        )
GROUP BY    window_start,
            window_end;

and get a windowed aggregation result :)

╔═══════════════════════════════════════════════════════════╗
║window_start            window_end              event_count║
║2025-04-25 10:44:00.000 2025-04-25 10:45:00.000 2          ║

The rest of the watermark behaviour is the same as when I dug into it using Apache Flink. The results above show two events in the window 10:44-10:45—but what about the rest of my data? Let’s look at the table data:

╔════════════════════════════════════════════════════════════════════╗
║order_id customer_id total_amount status     created_at             ║
║1        1001        149.99       pending    2025-04-25 10:44:25.000║
║2        1003        199.50       pending    2025-04-25 10:44:28.000║
║3        1005        42.00        delivered  2025-04-25 10:45:33.000║
║4        1002        89.95        processing 2025-04-25 10:45:38.000║
║5        1004        125.50       delivered  2025-04-25 10:46:03.000║

Eyeballing this we can see three windows:

  • 10:44-10:45 (2 events)

  • 10:45-10:46 (2 events)

  • 10:46-10:47 (1 events)

So why is the query only emitting one of these windows?

Because the watermark strategy says to generate a watermark five seconds behind the value of created_at:

WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND;

Let’s do that calculation looking at the table data, and we’ll see the problem:

SELECT order_id, created_at, created_at - INTERVAL '5' SECOND AS expected_watermark
    FROM `pg0.public.orders`;
╔═══════════════════════════════════════════════════════════╗
║order_id created_at              expected_watermark        ║
║1        2025-04-25 10:44:25.000 2025-04-25 10:44:20.000   ║
║2        2025-04-25 10:44:28.000 2025-04-25 10:44:23.000   ║
║3        2025-04-25 10:45:33.000 2025-04-25 10:45:28.000   ║
║4        2025-04-25 10:45:38.000 2025-04-25 10:45:33.000   ║
║5        2025-04-25 10:46:03.000 2025-04-25 10:45:58.000   ║

Note that expected_watermark only goes up to 10:45:58, meaning that Flink does not yet consider the window ending at 10:46 has closed yet.

If we add another row of data to the table:

╔═══════════════════════════════════════════════════════════╗
║order_id created_at              expected_watermark        ║
║1        2025-04-25 10:44:25.000 2025-04-25 10:44:20.000   ║
║2        2025-04-25 10:44:28.000 2025-04-25 10:44:23.000   ║
║3        2025-04-25 10:45:33.000 2025-04-25 10:45:28.000   ║
║4        2025-04-25 10:45:38.000 2025-04-25 10:45:33.000   ║
║5        2025-04-25 10:46:03.000 2025-04-25 10:45:58.000   ║
║6        2025-04-25 10:46:51.000 2025-04-25 10:46:46.000   ║

The created_at of 10:46:51 pushes the watermark forward to 10:46:46, thus meaning that Flink can close the previous window, and we get our result:

╔═══════════════════════════════════════════════════════════╗
║window_start            window_end              event_count║
║2025-04-25 10:44:00.000 2025-04-25 10:45:00.000 2          ║
║2025-04-25 10:45:00.000 2025-04-25 10:46:00.000 2          ║

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025