I talk and write about Kafka and Confluent Platform a lot, and more and more of the demos that I’m building are around Confluent Cloud. This means that I don’t have to run or manage my own Kafka brokers, Zookeeper, Schema Registry, KSQL servers, etc which makes things a ton easier. Whilst there are managed connectors on Confluent Cloud (S3 etc), I need to run my own Kafka Connect worker for those connectors not yet provided. An example is the MQTT source connector that I use in this demo. Up until now I’d either run this worker locally, or manually build a cloud VM. Locally is fine, as it’s all Docker, easily spun up in a single docker-compose up -d
command. I wanted something that would keep running whilst my laptop was off, but that was as close to my local build as possible—enter GCP and its functionality to run a container on a VM automagically.
You can see the full script here. The rest of this article just walks through the how and why.
The script 🔗
Here’s a walk through of what the script’s doing and how to use it. First up, you need to create a .env
file with your secrets and config in:
CONFLUENTPLATFORM_VERSION=5.4.0-beta1
#
CCLOUD_BROKER_HOST=
CCLOUD_API_KEY=
CCLOUD_API_SECRET=
#
CCLOUD_SCHEMA_REGISTRY_URL=
CCLOUD_SCHEMA_REGISTRY_API_KEY=
CCLOUD_SCHEMA_REGISTRY_API_SECRET=
This is then passed to source
to make the values available to the shell
source .env
I’m running Kafka Connect in distributed mode, which I generally recommend in all instances - even on a single node. There’s no reason not to, and it makes it easier to understand (and work with IMO) to learn a single deployment method instead of two. Since I’m using distributed mode, all of the state for the worker is stored in Kafka itself. This is pretty cool, but it does mean that if you run multiple workers with the same persistence topics configured things will get funky. For that reason, I have a prefix for the worker and topics, which is based on the current timestamp—if you’re using this script yourself you might want to vary this (or not; YMMV):
epoch=$(date +%s)
Whilst Kafka Connect uses the Admin API to create its own internal topics (for state persistence) the topic(s) that the connector itself writes to need to be created manually. Here I use kafka-topics
to do that, through Docker running locally. I use Docker just for isolation and ease portability; if you want to use your own local install then you can. To use Confluent Cloud with kafka-topics
you need to have a local file with the necessary authentication details in, which is then passed with --command-config
in kafka-topics
:
cat > /tmp/config-${epoch}.properties <<EOF
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
security.protocol=SASL_SSL
request.timeout.ms=20000
retry.backoff.ms=500
EOF
docker run --rm --volume /tmp/config-${epoch}.properties:/tmp/config.properties \
confluentinc/cp-kafka:${CONFLUENTPLATFORM_VERSION} /usr/bin/kafka-topics \
--command-config /tmp/config.properties \
--bootstrap-server ${CCLOUD_BROKER_HOST}:9092 \
--create \
--topic data_mqtt \
--partitions 6 \
--replication-factor 3
Now that we’ve got the topic created, we can spin up the worker itself. Or, almost. Because first we need to build a file with the necessary environment variables in for the worker. You can pass environment variables directly in the gcloud
invocation but it’s not a pretty sight
Even if it works, it’s not particularly maintainable. Whilst you can externalise the environment variables into a file that you pass in with container-env-file
you can’t interpolate secure values in that file which means that you end up with a file which is both config and authentication credentials, which is not ideal. Hence, the config is inline in this script and interpolated with credentials held in environment variables (via .env
) at runtime into a temporary file:
PROPERTIES_FILE=/tmp/connect-worker-${epoch}_gcloud_env.properties
cat > ${PROPERTIES_FILE}<<EOF
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n
CONNECT_CUB_KAFKA_TIMEOUT=300
CONNECT_BOOTSTRAP_SERVERS=${CCLOUD_BROKER_HOST}:9092
CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect-01
CONNECT_REST_PORT=8083
CONNECT_GROUP_ID=kafka-connect-group-${epoch}
CONNECT_CONFIG_STORAGE_TOPIC=_kafka-connect-group-${epoch}-configs
CONNECT_OFFSET_STORAGE_TOPIC=_kafka-connect-group-${epoch}-offsets
CONNECT_STATUS_STORAGE_TOPIC=_kafka-connect-group-${epoch}-status
CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=${CCLOUD_SCHEMA_REGISTRY_URL}
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=${CCLOUD_SCHEMA_REGISTRY_URL}
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="${CCLOUD_API_KEY}" password="${CCLOUD_API_SECRET}";
EOF
Now, finally, we can launch the container, using the gcloud beta compute instances create-with-container
option. The options I built out using the GCP web UI and then before launching clicked the magic button
It’s fairly standard list of parameters, including container-image
to refer to the docker image, container-env-file
to point to the environment file that we built above—and then container-command
and container-arg
to run custom commands, which:
-
Install the connector plugin
-
Launch the Kafka Connect worker process
-
Wait for the worker to be ready
-
Submit a connector configuration
gcloud beta compute \
--project=devx-testing instances create-with-container rmoff-connect-mqtt-${epoch} \
--machine-type=n1-standard-1 \
--subnet=default \
--metadata=google-logging-enabled=true \
--maintenance-policy=MIGRATE \
--image=cos-stable-77-12371-114-0 \
--image-project=cos-cloud \
--no-scopes \
--no-service-account \
--boot-disk-size=10GB \
--boot-disk-type=pd-standard \
--boot-disk-device-name=rmoff-connect-mqtt-${epoch} \
--container-restart-policy=always \
--labels=container-vm=cos-stable-77-12371-114-0 \
--container-image=confluentinc/cp-kafka-connect:${CONFLUENTPLATFORM_VERSION} \
--container-env-file=${PROPERTIES_FILE} \
--container-command=bash \
--container-arg=-c \
--container-arg='set -x
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:1.2.3
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [ $curl_status -eq 200 ] ; then
break
fi
sleep 5
done
#
echo -e "\n--\n+> Creating Kafka Connect MQTT source"
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-mqtt/config \
-d '"'"'{
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"mqtt.server.uri" : "'${MQTT_URL}'",
"mqtt.password" : "'${MQTT_PASSWORD}'",
"mqtt.username" : "'${MQTT_USERNAME}'",
"mqtt.topics" : "owntracks/#",
"kafka.topic" : "data_mqtt",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers" : "'${CCLOUD_BROKER_HOST}':9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'${CCLOUD_API_KEY}'\" password=\"'${CCLOUD_API_SECRET}'\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN"
}'"'"'
#
sleep infinity'
The bash script that’s embedded as an argument to bash -c
is mostly as you’d run it natively, except some funky quoting to deal with single quotes within the command (that enclose the -d
value of curl
)—these are done with '"'"'
which breaks down to:
-
'
close the string -
"'"
quote a single quote -
'
open the string
Gotchas 🔗
One problem that I hit a problem was where the VM was created but my container within was not. By looking at the serial port output from bootup using:
gcloud compute instances get-serial-port-output rmoff-connect-mqtt-1573561087
I could see the last entry was:
[ 12.759163] IPv6: ADDRCONF(NETDEV_UP): docker0: link is not ready
Turns out I’d set --no-address
when creating the VM and this caused the problem.
To fix it, I just omitted this configuration which meant that the default allocation of an ephemeral IP address happened, and Docker started up nicely.
My question to you 🔗
Is this an abomination? Am I struggling to do it in an elegant way because I’m just using the wrong technology? All I want to do is spin up a Connect worker using config and settings that I’ve built locally, following the philosophy of cattle-not-pets. Yes I can build a cloud VM and config Connect manually, but with all the context switching that I do I want something I can get working, check in to git, and come back to a month later and run without having to think about any of it.
Should I be learning k8s, or is that over-engineering it? My gut feel is that it would be because I don’t need the orchestration and management bells and whistles of k8s—but perhaps they’re just an added benefit and I should take the leap? What about other options? I gave Terraform a very quick look but I’d prefer something closer to my local Docker builds—and I’m tied to Docker because it’s the standard platform on which a lot of developers are accepting of for trying demos and new technology. The more non-standard pieces, the higher the friction—we’ve all seen those demos that have a laundry list of pre-reqs to use, and we’ve all thought…sod it ;)
So—tell me if I’m wrong - do @
me!
I’m @rmoff on Twitter