There’s ways, and then there’s ways, to count the number of records/events/messages in a Kafka topic. Most of them are potentially inaccurate, or inefficient, or both. Here’s one that falls into the potentially inefficient category, using kafkacat
to read all the messages and pipe to wc
which with the -l
will tell you how many lines there are, and since each message is a line, how many messages you have in the Kafka topic:
$ kafkacat -b broker:29092 -t mytestopic -C -e -q| wc -l
3
You can verify what’s happening by removing the pipe to just see the messages:
$ kafkacat -b broker:29092 -t mytestopic -C -e -q
I'm message 1
I'm message 2
I'm message 3
What do the flags for kafkacat
mean?
-
-C
: act as a Consumer -
-q
: quiet, no informational messages from kafkacat -
-e
: exit once last offset read
SELECT COUNT(*)…GROUP BY…
🔗
Consuming all the messages from the topic is fine, but what about if you want to break it down further? Say, by key, or other field in the data? This is where being able to express yourself in SQL with ksqlDB comes in handy.
First up we need a schema for the data in the topic (since we’re working with fields now, not just entire records). If we’re using Avro or Protobuf then the schema is already available, but for JSON/CSV we can specify it as part of the statement in which we tell ksqlDB about the Kafka topic:
CREATE STREAM CARPARK_SRC (date VARCHAR ,
time VARCHAR ,
name VARCHAR ,
capacity INT ,
empty_places INT )
WITH (KAFKA_TOPIC='carparks',
VALUE_FORMAT='DELIMITED');
GROUP BY <field>
🔗
Now that we have a stream, we can query it and check the number of messages. We need to tell ksqlDB that we want it to read from the beginning of the topic:
SET 'auto.offset.reset' = 'earliest';
and then we run our COUNT
:
SELECT NAME, COUNT(*) AS RECORD_CT
FROM CARPARK_SRC
GROUP BY NAME
EMIT CHANGES;
+--------------------+----------+
|NAME |RECORD_CT |
+--------------------+----------+
|Westgate |60 |
|Burnett St |60 |
|Crown Court |60 |
|Leisure Exchange |60 |
|NCP Hall Ings |60 |
|Broadway |60 |
|Kirkgate Centre |60 |
|Sharpe Street |60 |
Each of the NAME
values above has 60 records associated with it, and thus if we run a GROUP BY
across all messages (using a dummy GROUP BY
to force the aggregation) we’ll see that there’s a total of 480 messages:
SELECT COUNT(*) AS RECORD_CT
FROM CARPARK_SRC
GROUP BY 1
EMIT CHANGES;
+-------------+
|RECORD_CT |
+-------------+
|480 |
When running this, you may notice that the query doesn’t exit, but instead the CLI says
Press CTRL-C to interrupt
That’s because we’ve run a push query, we’ve subscribed to the stream of results from the Kafka topic, and since Kafka topics are unbounded so are the results of a query against it. As new data arrives, the aggregate values may changes, and will be returned to the client as they do:
+-------------+
|RECORD_CT |
+-------------+
|480 |
|488 |
|496 |
Press CTRL-C to interrupt
GROUP BY <time window>
🔗
In the example above we see how ksqlDB can count all of the messages in a topic, counting them up either in entirety or broken down by a field of your choice. What about if we’d like to count the number of messages by slices of time? For example, how many messages in the topic per four hours? For this we use time windowing.
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
AS FOUR_HOUR_WINDOW_START_TS,
COUNT(*) AS RECORD_CT
FROM CARPARK_SRC
WINDOW TUMBLING (SIZE 4 HOURS)
GROUP BY 1
EMIT CHANGES;
+--------------------------+-----------+
|FOUR_HOUR_WINDOW_START_TS |RECORD_CT |
+--------------------------+-----------+
|2020-09-07 13:00:00 |368 |
|2020-09-07 17:00:00 |464 |
|2020-09-07 21:00:00 |128 |
|2020-09-08 01:00:00 |8 |
|2020-09-08 05:00:00 |8 |
|2020-09-08 09:00:00 |120 |
Not everyone loves streams? 🔗
The above streaming queries are pretty cool, but you don’t always want to run a continuous query just to check on the number of records in a topic. Quite likely, you just want to do a quick lookup. Kinda like against a database table, with a good ole' regular SELECT
that just gives you an answer and exits. Fear not!
In this brave new world of streaming SQL we can still do this, and it’s because ksqlDB can actually build and maintain materialised views. Instead of scanning through the data in a topic each time you want to know how many messages there are, it will instead materialise that information internally and then make it available for you to query on demand. Check this out:
CREATE TABLE MESSAGE_COUNT_BY_4HR AS
SELECT 1 AS DUMMY_FIELD,
TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
AS WINDOW_START_TS,
COUNT(*) AS RECORD_CT
FROM CARPARK_SRC
WINDOW TUMBLING (SIZE 4 HOURS)
GROUP BY 1
EMIT CHANGES;
We’ve now built a table that ksqlDB will keep up to date as any new messages arrive. Whenever we want to know the message count, we can run a query (known as a pull query here, contrast to push query above):
SELECT WINDOW_START_TS, RECORD_CT
FROM MESSAGE_COUNT_BY_4HR
WHERE WINDOWSTART > '2020-09-08T08:00:00+0100'
AND DUMMY_FIELD=1 ;
+---------------------+-----------+
|WINDOW_START_TS |RECORD_CT |
+---------------------+-----------+
|2020-09-08 09:00:00 |184 |
Query terminated
ksql>
Did you see that? That right there ☝️! This:
Query terminated
ksql>
The query ran, looked up the value, and then returned it to the user. You can do this from the commandline, but you can also do it from your application, using the Java client, REST API, or even the nascent Go client being developed. Here’s an example with the REST API:
$ curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT WINDOW_START_TS, RECORD_CT FROM MESSAGE_COUNT_BY_4HR WHERE WINDOWSTART > '\''2020-09-08T08:00:00+0100'\'' AND DUMMY_FIELD=1 ;"}'
{"queryId":null,"columnNames":["WINDOW_START_TS","RECORD_CT"],"columnTypes":["STRING","BIGINT"]}
["2020-09-08 09:00:00",184]
Other ways to count the messages 🔗
On the Confluent Community Slack forum there was an interesting thread about this, and which prompted me to blog it here.
Srinivas Devaki: You can use GetOffsetShell to get the earliest and latest offsets and compute the number of messages by subtracting with each other
# Get Latest Offset /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic my_topic \ --time -1 # Get Earliest Offset /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic my_topic \ --time -2
Mitch Henderson: Small note, offsets are very much not guaranteed to be sequential. Not every offset will be a record the client will receive. The above will give you a round about estimate of the number of messages, not it will not be exact. The only way to get an exact number is to dump the topic and pipe it to wc
Srinivas: awesome detail, never knew that offsets are not guaranteed to be sequential. But why is that so? I’ve tried searching about this but couldn’t find any references, any link where I can read more on this?
Mitch: Idempotent and transactional production are the most common reasons. But there are others.
Weeco: Also because of gaps in compacted topics this won’t work If you don’t want to consume all messages to count the number of records I have just one idea how to get a rough estimate. I described that here: https://github.com/cloudhut/kowl/issues/83