Confluent Cloud is not only a
fully
-managed Apache Kafka service, but also provides important additional pieces for building applications and pipelines including
managed connectors
,
Schema Registry
, and
ksqlDB
. Managed Connectors are run for you (hence, managed!) within Confluent Cloud - you just specify the technology to which you want to integrate in or out of Kafka and Confluent Cloud does the rest.
In my case I have a Confluent Cloud cluster running on GCP, so it makes sense to run my worker there too (although I could run it anywhere, closer to the cluster seems sensible). I have an ActiveMQ connector that’s pulling data from a 3rd party service that’s also Cloud-based (hence wanting to get all my processing into the Cloud too).
Let’s take a look at what’s involved in running a self-managed Kafka Connect worker alongside Confluent Cloud.
What are my options for running a Kafka Connect worker?
🔗
Ultimately, Kafka Connect workers are just JVM processes. You can deploy on bare metal or containers. A few options present themselves:
I’m missing some of the points, but it’s the ability to simply
declare
the config and runtime and then instantiate it that makes Docker such a joy to use.
To run Kafka Connect using Docker you start with the
base image
. From there you need to do a few things
before
the container launches the worker:
Install necessary connector plugins (as well as any Single Message Transform and Converters if using ones other than those that ship with the image)
Install any other requires files, such as
JDBC Drivers
.
kafka-connect-ccloud
:
image
:
confluentinc/cp-kafka-connect-base:6.0.1
container_name
:
kafka-connect-ccloud
ports
:
-
8083:8083
environment
:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
:
"
[%d]
%p
%X{connector.context}%m
(%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT
:
300
CONNECT_BOOTSTRAP_SERVERS
:
"
MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092"
CONNECT_REST_ADVERTISED_HOST_NAME
:
'
kafka-connect-ccloud'
CONNECT_REST_PORT
:
8083
CONNECT_GROUP_ID
:
kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC
:
_kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC
:
_kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC
:
_kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER
:
io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
:
"
https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE
:
"
USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
:
"
SR_USER:SR_PASSWORD"
CONNECT_VALUE_CONVERTER
:
io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
:
"
https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE
:
"
USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
:
"
SR_USER:SR_PASSWORD"
CONNECT_LOG4J_ROOT_LOGLEVEL
:
'
INFO'
CONNECT_LOG4J_LOGGERS
:
'
org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
:
'
3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
:
'
3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
:
'
3'
CONNECT_PLUGIN_PATH
:
'
/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS
:
"
20000"
CONNECT_RETRY_BACKOFF_MS
:
"
500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
:
"
https"
CONNECT_SASL_MECHANISM
:
"
PLAIN"
CONNECT_SECURITY_PROTOCOL
:
"
SASL_SSL"
CONNECT_SASL_JAAS_CONFIG
:
"
org.apache.kafka.common.security.plain.PlainLoginModule
required
username=
\"
CCLOUD_USER
\"
password=
\"
CCLOUD_PASSWORD
\"
;"
CONNECT_CONSUMER_SECURITY_PROTOCOL
:
"
SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
:
"
https"
CONNECT_CONSUMER_SASL_MECHANISM
:
"
PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG
:
"
org.apache.kafka.common.security.plain.PlainLoginModule
required
username=
\"
CCLOUD_USER
\"
password=
\"
CCLOUD_PASSWORD
\"
;"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS
:
"
20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS
:
"
500"
CONNECT_PRODUCER_SECURITY_PROTOCOL
:
"
SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
:
"
https"
CONNECT_PRODUCER_SASL_MECHANISM
:
"
PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG
:
"
org.apache.kafka.common.security.plain.PlainLoginModule
required
username=
\"
CCLOUD_USER
\"
password=
\"
CCLOUD_PASSWORD
\"
;"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS
:
"
20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS
:
"
500"
command
:
-
bash
-
-c
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
sleep 5
echo -e "\n--\n+> Creating Kafka Connect source connectors"
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
-d '{
"connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
"activemq.url" : "tcp://my-activemq-endpoint:61619",
"activemq.username" : "ACTIVEMQ_USER",
"activemq.password" : "ACTIVEMQ_PASSWORD",
"jms.destination.type" : "topic",
"jms.destination.name" : "TRAIN_MVT_EA_TOC",
"kafka.topic" : "networkrail_TRAIN_MVT",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "false",
"topic.creation.default.partitions" : 1,
"topic.creation.default.replication.factor" : 3,
"confluent.license" : "",
"confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";",
"confluent.topic.security.protocol" : "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism" : "PLAIN",
"confluent.topic.request.timeout.ms" : "20000",
"confluent.topic.retry.backoff.ms" : "500"
sleep infinity
Note that this does everything needed:
This it is 💯 a Proof-of-Concept (i.e. not blessed by Confluent in any way as "The Right Way"), and builds on my
previous
experimentation. If you are doing this in anger then for sure you should figure out how to do it properly, but for my purposes of a quick & dirty solution it worked well.
It Works On My Machine [well, Google’s]™
.
So taking the above Docker Compose definition, we can then use GCE’s feature to run
Containers on Compute Engine
to provision this directly on GCE. For AWS see the approach that I wrote about
here
.
To launch a container on GCE either use the Web UI, or the
gcloud
commandline
. The first part of it is simple enough - we name the VM holding the container, we specify the image to use, and so on:
gcloud compute instances create-with-container \
rmoff-connect-source-v01 \
--zone=us-east1-b \
--tags kafka-connect \
--metadata=google-logging-enabled=true \
--container-image confluentinc/cp-kafka-connect-base:6.0.1 \
--container-restart-policy=never \
When the image starts up, by default it runs the Kafka Connect worker. However, we can override this by specifying a custom command
. We run /bin/bash
as the command, and then pass in -c
as the argument followed by an argument that holds the actual shell script we want to execute:
--container-command=/bin/bash \
--container-arg=-c \
--container-arg='set -x
# Run this stuff when the container launches
sleep infinity'
Within that command block we use the
command
seen in the Docker Compose YAML above. So far, so good.
But (
you knew there was a but coming, didn’t you
), we also need to specify environment variables, and not just a few - and not just with straightforward values. We’ve got dozens of values, and because we’re specifying SASL config there’s quote marks in there, escape characters, and more. The
gcloud
CLI has the
--container-env
argument in which we can pass the environment variables as a comma-separated list of key=value pairs, and the
=
can be overriden to a custom character - but you still end up with an awful mess like this:
It’s not pretty, and it’s a bit of a bugger to debug. You can pass in a separate file holding environment values but I’m always keen on keeping things self-contained if possible. So instead, since I was overriding the command to run as container launch anyway, I overrode the environment variables at that point instead:
--container-command
=
/bin/bash
\
--container-arg
=
-c
\
--container-arg
=
'set -x
# Set the environment variables
export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
sleep infinity'
Most important is to finish with
sleep infinity
so that the container does not exit (since the Kafka Connect worker process is forked to the background).
It needs some tricky escaping, both of the
curl
data (
-d
) block, as well as the quoted passages within it. Here is the final shell invocation:
gcloud compute instances create-with-container rmoff-connect-source-v01 \
--zone=us-east1-b \
--tags kafka-connect \
--metadata=google-logging-enabled=true \
--container-image confluentinc/cp-kafka-connect-base:6.0.1 \
--container-restart-policy=never \
--container-command=/bin/bash \
--container-arg=-c \
--container-arg='set -x
# Set the environment variables
export CONNECT_CUB_KAFKA_TIMEOUT=300
export CONNECT_BOOTSTRAP_SERVERS=MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092
export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
export CONNECT_REST_PORT=8083
export CONNECT_GROUP_ID=kafka-connect-group-gcp-v01
export CONNECT_CONFIG_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-configs
export CONNECT_OFFSET_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-offsets
export CONNECT_STATUS_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-status
export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
export CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
export CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
export CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
export CONNECT_RETRY_BACKOFF_MS=500
export CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
export CONNECT_SASL_MECHANISM=PLAIN
export CONNECT_SECURITY_PROTOCOL=SASL_SSL
export CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
export CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
export CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
export CONNECT_CONSUMER_RETRY_BACKOFF_MS=500
export CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
export CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
export CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
export CONNECT_PRODUCER_RETRY_BACKOFF_MS=500
export CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
export CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
export CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [ $curl_status -eq 200 ] ; then
break
sleep 5
echo -e "\n--\n+> Creating Kafka Connect source connectors"
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
-d '"'"'{
"connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector",
"activemq.url" : "tcp://my-activemq-endpoint:61619",
"activemq.username" : "ACTIVEMQ_USER",
"activemq.password" : "ACTIVEMQ_PASSWORD",
"jms.destination.type" : "topic",
"jms.destination.name" : "TRAIN_MVT_EA_TOC",
"kafka.topic" : "networkrail_TRAIN_MVT_v01",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "false",
"topic.creation.default.partitions" : 1,
"topic.creation.default.replication.factor" : 3,
"confluent.license" : "",
"confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'CCLOUD_USER'\" password=\"'CCLOUD_PASSWORD'\";",
"confluent.topic.security.protocol" : "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism" : "PLAIN",
"confluent.topic.request.timeout.ms" : "20000",
"confluent.topic.retry.backoff.ms" : "500"
}'"'"'
sleep infinity'
Warning: Permanently added 'compute.8428359303178581516' (ED25519) to the list of known hosts.
[email protected]: Permission denied (publickey).
ERROR: (gcloud.compute.ssh) [/usr/bin/ssh] exited with return code [255].
Once the VM is running properly you’ll get a shell prompt
########################[ Welcome ]########################
# You have logged in to the guest OS. #
# To access your containers use 'docker attach' command #
###########################################################
rmoff@rmoff-connect-source-v01 ~ $
From here, you can see the containers running on the VM. To start with you’ll see a couple of internal ones (
stackdriver-logging-agent
,
konlet
):
rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4a04df77a0be gcr.io/gce-containers/konlet:v.0.11-latest "/bin/gce-containers…" 35 seconds ago Up 32 seconds pedantic_tu
0d008a624e56 gcr.io/stackdriver-agents/stackdriver-logging-agent:0.2-1.5.33-1-1 "/entrypoint.sh /usr…" 2 days ago Up 2 days stackdriver-logging-agent
and soon after, the actual container that you’ve configured to run:
rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1e349180aa20 confluentinc/cp-kafka-connect-base:6.0.1 "/bin/bash -c 'set -…" 33 seconds ago Up 30 seconds (health: starting) klt-rmoff-connect-source-v01-qjez
At this point you’re just in normal Docker world, and can look at the logs as you would locally:
rmoff@rmoff-connect-source-v01 ~ $ docker logs -f klt-rmoff-connect-source-v01-qjez|more
+ export CONNECT_CUB_KAFKA_TIMEOUT=300
+ CONNECT_CUB_KAFKA_TIMEOUT=300
Installing connector plugins
+ echo 'Installing connector plugins'
+ confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
Running in a "--no-prompt" mode
[2021-01-11 21:56:38,614] INFO [Worker clientId=connect-1, groupId=kafka-connect-group-gcp-v01] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
With all of this done, you should now see topics on your Confluent Cloud cluster for both the internal Kafka Connect worker topics, and any populated by the connector: