I’ve been poking around recently with capturing Wi-Fi packet data and streaming it into Apache Kafka, from where I’m processing and analysing it. Kafka itself is rock-solid - because I’m using ☁️Confluent Cloud and someone else worries about provisioning it, scaling it, and keeping it running for me. But whilst Kafka works just great, my side of the setup—tshark
running on a Raspberry Pi—is less than stable. For whatever reason it sometimes stalls and I have to restart the Raspberry Pi and restart the capture process.
I’m sure there’s a reason and I’m sure if I spent long enough looking and debugging I’d find it - but for now the time invested in this is better spent just knowing when I need to go and bounce it. I don’t mind losing a few minutes of data, or even a few hours, but this kind of gap of a week makes me sad:
So what I want is a very simple way to look at the most recent timestamp on a Kafka message, and send me an alert if it’s outside a threshold.
Herewith a very dirty hacky way to do just this…
Parts List 🔗
-
kafkacat
polls the most recent message on the topic -
jq
extracts the timestamp -
bash
compares the timestamp to our given threshold and also handles errors in callingkafkacat
-
curl
makes a REST call to Telegram to send the status message
Setting up Telegram 🔗
Create a new Telegram bot per the instructions.
Make a note of the access token because you’ll need this later.
What do all hack projects need? A geeky avatar of course. It may be a dirty hack, but it’s going to be a well-presented one ;-)
To get our bot to work we need to start it first, so click on its link from Botfather (or just start a Telegram chat with it directly). This will send it the /start
command:
Getting the recipient’s Chat ID 🔗
We need to get the id of the recipient of messages that our bot is going to send. This can either be a direct message to you, or you can set up a group (which other real people can be members of an see the same message from the bot). If you want to use a group then make sure you start the bot (/start
per above) and then invite it to the group. Using a group is also more convenient because you could create multiple alert routes with a single bot, instead of having to create a new bot for each purpose.
Having started the bot, and optionally added it to a group and sent a message to the group, now invoke the getUpdates
API:
curl -s https://api.telegram.org/bot<bot access token>/getUpdates
Replace <bot access token>
with (you guessed it!) the bot access token that the Botfather gave you above. The API is a bit funky here - note that the bot
is hardcoded part of the URL and should not be changed - you append your bot access token to this. So if Botfather gave you an access token of 99999:XXXXX
you would invoke:
curl -s https://api.telegram.org/bot99999:XXXXX/getUpdates
From this you’ll get one, or more, messages that the bot has received. This might just be the single /start
that you invoked, or it could also be group messages if you’ve added it to one. Regardless, identify the message instance corresponding to the recipient that you want for the bot and make a note of the chat.id
value. Here it’s -468250841
:
{
"message_id": 3,
"from": {
"id": 218419044,
"is_bot": false,
"first_name": "Robin",
"last_name": "Moffatt",
"username": "rmoff",
"language_code": "en"
},
"chat": {
"id": -468250841,
"title": "pcap ingest monitoring",
"type": "group",
"all_members_are_administrators": true
},
"date": 1586894082,
"group_chat_created": true
}
You can use jq
to return just the chat ID and associated recipient information too. Here it shows the group chat message quoted above, plus the DM that I sent the bot previously (/start
).
$ curl -s https://api.telegram.org/bot99999:XXXXX/getUpdates | jq -c '.result[].message.chat | [.id , .title, .username]'
[218419044,null,"rmoff"]
[-468250841,"pcap ingest monitoring",null]
However you do it, you should now have:
-
Bot access token (e.g.
99999:XXXXX
) -
Chat ID (e.g.
-468250841
)
Sending a test message 🔗
Let’s send a test message! We’ll use the sendMessage
API to do this:
curl -s -X POST https://api.telegram.org/bot<BOT ACCESS TOKEN>/sendMessage \
-d chat_id=<CHAT ID> \
-d text="Did you ever play tic-tac-toe?"
and as if by magic…
Getting the latest message from Kafka 🔗
As long-time readers of my blog will know, one of my favourite tools in my Kafka toolbox is kafkacat
. Here we’ll not assume that it’s installed, and instead run it using Docker. We’re also going to connect to Confluent Cloud.
Note
|
You can of course use this same technique against a self-managed Kafka cluster (and indeed, with kafkacat running locally, not Docker) |
docker run --rm edenhill/kafkacat:1.5.0 kafkacat \
-b $CCLOUD_BROKER_HOST \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \\
-X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
-X api.version.request=true \
-C -c1 -o -1 -u -f %T -t pcap
Aside from the broker details (-b
) and various authentication and security settings (all the -X
parameters), what we’re doing here is :
-
-C
run as a consumer, and consume one message (-c1
) -
Read from the penultimate offset
-o 1
-
-u
don’t buffer output -
-t
read from thepcap
topic -
-f %T
tells kafkacat just to return the timestamp from the Kafka message’s metadata
We’re going to compare this timestamp to our threshold, which is ten minutes ago, from date
:
docker run --rm ubuntu date --date '-10 min' "+%s"
1586992072
WHY would you invoke date
using docker? Because date
is one of those delightful *nix commands which has a different implementation across Linux, MacOS etc and is completely incompatible in options - so this way at least it works. I told you this was a dirty hack…
Note that the timestamp that’s returned from kafkacat is the unix epoch in milliseconds, whilst date
is in seconds. No problem. Let’s continue this dirty hack by just truncating the last three digits!
➜ echo 1586993170473
1586993170473
➜ echo 1586993170473|sed 's/[0-9][0-9][0-9]$//g'
1586993170
So we can get the timestamp of the latest Kafka message, and the local timestamp (minus a threshold) - now to compare them. That’s easy enough with a bit of shell scripting. First we store the Kafka timestamp in a variable:
latest_ts=$(docker run --rm edenhill/kafkacat:1.5.0 kafkacat -b $CCLOUD_BROKER_HOST -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" -X api.version.request=true -C -c1 -o -1 -t pcap -u -f %T| sed 's/[0-9][0-9][0-9]$//g' )
Then we store the timestamp against which we want to compare it:
ten_minutes_ago=$(docker run --rm ubuntu date --date '-10 min' "+%s")
Finally we compare the two:
if [ $latest_ts -lt $ten_minutes_ago ]; then
echo "Last Kafka message was received over ten minutes ago"
fi
Putting it all together 🔗
Now we take the logic from above to determine if Kafka ingest has stalled, and combine it with the Telegram REST API that we explored above.
#!/bin/bash
#
# @rmoff 16 April 2020
#
# -------------
# .env should look like:
# CCLOUD_BROKER_HOST=xxxxxx
# CCLOUD_API_KEY=xxxxxx
# CCLOUD_API_SECRET=xxxxxx
# TELEGRAM_BOT_TOKEN=xxxx
source .env
CHAT_ID=-468250841
#---------
echo 'Now : ' $(docker run --rm ubuntu date)
ten_minutes_ago=$(docker run --rm ubuntu date --date '-10 min' "+%s")
echo 'Ten minutes ago : ' $(docker run --rm ubuntu date -d @$ten_minutes_ago)
latest_ts=$(docker run --rm edenhill/kafkacat:1.5.0 kafkacat -b $CCLOUD_BROKER_HOST \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
-X api.version.request=true \
-C -c1 -o -1 -t my_kafka_topic -u -f %T| sed 's/[0-9][0-9][0-9]$//g' )
if [ -z $latest_ts ]; then
echo "TS is empty"
echo '{"chat_id": "'$CHAT_ID'", "text": "❌my_kafka_topic ingest check failed. Latest ingest time is empty", "disable_notification": false}"' |\
curl -s -X POST \
-H 'Content-Type: application/json' \
-d @- \
https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendMessage | jq '.'
else
echo 'Latest timestamp : ' $(docker run --rm ubuntu date -d @$latest_ts)
if [ $latest_ts -lt $ten_minutes_ago ]; then
echo "Ingest has stalled"
echo '{"chat_id": "'$CHAT_ID'", "text": "❌my_kafka_topic ingest has stalled. Latest ingest time is ' $(docker run --rm ubuntu date -d @$latest_ts)'", "disable_notification": false}"' |\
curl -s -X POST \
-H 'Content-Type: application/json' \
-d @- \
https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendMessage | jq '.'
else
echo '{"chat_id": "'$CHAT_ID'", "text": "✅my_kafka_topic ingest looks good. Latest ingest time is ' $(docker run --rm ubuntu date -d @$latest_ts)'", "disable_notification": true}"' |\
curl -s -X POST \
-H 'Content-Type: application/json' \
-d @- \
https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendMessage | jq '.'
fi
fi
Complaints? 🔗
Please send all complaints to /dev/null
;-)