添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Confluent Cloud is not only a fully -managed Apache Kafka service, but also provides important additional pieces for building applications and pipelines including managed connectors , Schema Registry , and ksqlDB . Managed Connectors are run for you (hence, managed!) within Confluent Cloud - you just specify the technology to which you want to integrate in or out of Kafka and Confluent Cloud does the rest.

In my case I have a Confluent Cloud cluster running on GCP, so it makes sense to run my worker there too (although I could run it anywhere, closer to the cluster seems sensible). I have an ActiveMQ connector that’s pulling data from a 3rd party service that’s also Cloud-based (hence wanting to get all my processing into the Cloud too).

Let’s take a look at what’s involved in running a self-managed Kafka Connect worker alongside Confluent Cloud.

What are my options for running a Kafka Connect worker? 🔗

Ultimately, Kafka Connect workers are just JVM processes. You can deploy on bare metal or containers. A few options present themselves:

I’m missing some of the points, but it’s the ability to simply declare the config and runtime and then instantiate it that makes Docker such a joy to use.

To run Kafka Connect using Docker you start with the base image . From there you need to do a few things before the container launches the worker:

Install necessary connector plugins (as well as any Single Message Transform and Converters if using ones other than those that ship with the image)

Install any other requires files, such as JDBC Drivers .

kafka-connect-ccloud : image : confluentinc/cp-kafka-connect-base:6.0.1 container_name : kafka-connect-ccloud ports : - 8083:8083 environment : CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN : " [%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CUB_KAFKA_TIMEOUT : 300 CONNECT_BOOTSTRAP_SERVERS : " MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092" CONNECT_REST_ADVERTISED_HOST_NAME : ' kafka-connect-ccloud' CONNECT_REST_PORT : 8083 CONNECT_GROUP_ID : kafka-connect-group-01-v04 CONNECT_CONFIG_STORAGE_TOPIC : _kafka-connect-group-01-v04-configs CONNECT_OFFSET_STORAGE_TOPIC : _kafka-connect-group-01-v04-offsets CONNECT_STATUS_STORAGE_TOPIC : _kafka-connect-group-01-v04-status CONNECT_KEY_CONVERTER : io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL : " https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud" CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE : " USER_INFO" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO : " SR_USER:SR_PASSWORD" CONNECT_VALUE_CONVERTER : io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL : " https://MY-SR-CCLOUD-ENDPOINT.gcp.confluent.cloud" CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE : " USER_INFO" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO : " SR_USER:SR_PASSWORD" CONNECT_LOG4J_ROOT_LOGLEVEL : ' INFO' CONNECT_LOG4J_LOGGERS : ' org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR' CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR : ' 3' CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR : ' 3' CONNECT_STATUS_STORAGE_REPLICATION_FACTOR : ' 3' CONNECT_PLUGIN_PATH : ' /usr/share/java,/usr/share/confluent-hub-components/' # Confluent Cloud config CONNECT_REQUEST_TIMEOUT_MS : " 20000" CONNECT_RETRY_BACKOFF_MS : " 500" CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM : " https" CONNECT_SASL_MECHANISM : " PLAIN" CONNECT_SECURITY_PROTOCOL : " SASL_SSL" CONNECT_SASL_JAAS_CONFIG : " org.apache.kafka.common.security.plain.PlainLoginModule required username= \" CCLOUD_USER \" password= \" CCLOUD_PASSWORD \" ;" CONNECT_CONSUMER_SECURITY_PROTOCOL : " SASL_SSL" CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM : " https" CONNECT_CONSUMER_SASL_MECHANISM : " PLAIN" CONNECT_CONSUMER_SASL_JAAS_CONFIG : " org.apache.kafka.common.security.plain.PlainLoginModule required username= \" CCLOUD_USER \" password= \" CCLOUD_PASSWORD \" ;" CONNECT_CONSUMER_REQUEST_TIMEOUT_MS : " 20000" CONNECT_CONSUMER_RETRY_BACKOFF_MS : " 500" CONNECT_PRODUCER_SECURITY_PROTOCOL : " SASL_SSL" CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM : " https" CONNECT_PRODUCER_SASL_MECHANISM : " PLAIN" CONNECT_PRODUCER_SASL_JAAS_CONFIG : " org.apache.kafka.common.security.plain.PlainLoginModule required username= \" CCLOUD_USER \" password= \" CCLOUD_PASSWORD \" ;" CONNECT_PRODUCER_REQUEST_TIMEOUT_MS : " 20000" CONNECT_PRODUCER_RETRY_BACKOFF_MS : " 500" command : - bash - -c echo "Installing connector plugins" confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0 echo "Launching Kafka Connect worker" /etc/confluent/docker/run & echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳" while : ; do curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)" if [ $$curl_status -eq 200 ] ; then break sleep 5 echo -e "\n--\n+> Creating Kafka Connect source connectors" curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \ -d '{ "connector.class" : "io.confluent.connect.activemq.ActiveMQSourceConnector", "activemq.url" : "tcp://my-activemq-endpoint:61619", "activemq.username" : "ACTIVEMQ_USER", "activemq.password" : "ACTIVEMQ_PASSWORD", "jms.destination.type" : "topic", "jms.destination.name" : "TRAIN_MVT_EA_TOC", "kafka.topic" : "networkrail_TRAIN_MVT", "value.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable" : "false", "key.converter" : "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable" : "false", "topic.creation.default.partitions" : 1, "topic.creation.default.replication.factor" : 3, "confluent.license" : "", "confluent.topic.bootstrap.servers" : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092", "confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";", "confluent.topic.security.protocol" : "SASL_SSL", "confluent.topic.ssl.endpoint.identification.algorithm": "https", "confluent.topic.sasl.mechanism" : "PLAIN", "confluent.topic.request.timeout.ms" : "20000", "confluent.topic.retry.backoff.ms" : "500" sleep infinity

Note that this does everything needed:

This it is 💯 a Proof-of-Concept (i.e. not blessed by Confluent in any way as "The Right Way"), and builds on my previous experimentation. If you are doing this in anger then for sure you should figure out how to do it properly, but for my purposes of a quick & dirty solution it worked well.

It Works On My Machine [well, Google’s]™ .

So taking the above Docker Compose definition, we can then use GCE’s feature to run Containers on Compute Engine to provision this directly on GCE. For AWS see the approach that I wrote about here .

To launch a container on GCE either use the Web UI, or the gcloud commandline . The first part of it is simple enough - we name the VM holding the container, we specify the image to use, and so on:

gcloud compute instances create-with-container \
        rmoff-connect-source-v01 \
        --zone=us-east1-b \
        --tags kafka-connect \
      	--metadata=google-logging-enabled=true \
        --container-image confluentinc/cp-kafka-connect-base:6.0.1 \
        --container-restart-policy=never \

When the image starts up, by default it runs the Kafka Connect worker. However, we can override this by specifying a custom command. We run /bin/bash as the command, and then pass in -c as the argument followed by an argument that holds the actual shell script we want to execute:

--container-command=/bin/bash \ --container-arg=-c \ --container-arg='set -x # Run this stuff when the container launches sleep infinity'

Within that command block we use the command seen in the Docker Compose YAML above. So far, so good.

But ( you knew there was a but coming, didn’t you ), we also need to specify environment variables, and not just a few - and not just with straightforward values. We’ve got dozens of values, and because we’re specifying SASL config there’s quote marks in there, escape characters, and more. The gcloud CLI has the --container-env argument in which we can pass the environment variables as a comma-separated list of key=value pairs, and the = can be overriden to a custom character - but you still end up with an awful mess like this:

It’s not pretty, and it’s a bit of a bugger to debug. You can pass in a separate file holding environment values but I’m always keen on keeping things self-contained if possible. So instead, since I was overriding the command to run as container launch anyway, I overrode the environment variables at that point instead:

--container-command = /bin/bash \ --container-arg = -c \ --container-arg = 'set -x # Set the environment variables export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01 sleep infinity'

Most important is to finish with sleep infinity so that the container does not exit (since the Kafka Connect worker process is forked to the background).

It needs some tricky escaping, both of the curl data ( -d ) block, as well as the quoted passages within it. Here is the final shell invocation:

gcloud compute instances create-with-container rmoff-connect-source-v01 \
        --zone=us-east1-b \
        --tags kafka-connect \
      	--metadata=google-logging-enabled=true \
        --container-image confluentinc/cp-kafka-connect-base:6.0.1 \
        --container-restart-policy=never \
        --container-command=/bin/bash \
        --container-arg=-c \
        --container-arg='set -x
        # Set the environment variables
        export CONNECT_CUB_KAFKA_TIMEOUT=300
        export CONNECT_BOOTSTRAP_SERVERS=MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092
        export CONNECT_REST_ADVERTISED_HOST_NAME=rmoff-connect-source-v01
        export CONNECT_REST_PORT=8083
        export CONNECT_GROUP_ID=kafka-connect-group-gcp-v01
        export CONNECT_CONFIG_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-configs
        export CONNECT_OFFSET_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-offsets
        export CONNECT_STATUS_STORAGE_TOPIC=_kafka-connect-group-gcp-v01-status
        export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
        export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
        export CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
        export CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
        export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
        export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
        export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
        export CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
        export CONNECT_RETRY_BACKOFF_MS=500
        export CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
        export CONNECT_SASL_MECHANISM=PLAIN
        export CONNECT_SECURITY_PROTOCOL=SASL_SSL
        export CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
        export CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
        export CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
        export CONNECT_CONSUMER_RETRY_BACKOFF_MS=500
        export CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
        export CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
        export CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
        export CONNECT_PRODUCER_RETRY_BACKOFF_MS=500
        export CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
        export CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
        export CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_USER\" password=\"CCLOUD_PASSWORD\";"
        echo "Installing connector plugins"
        confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
        while : ; do
            curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
            echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
            if [ $curl_status -eq 200 ] ; then
            break
            sleep 5
        echo -e "\n--\n+> Creating Kafka Connect source connectors"
        curl -s -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/source-activemq-networkrail-TRAIN_MVT_EA_TOC-01/config \
            -d '"'"'{
                "connector.class"                                      : "io.confluent.connect.activemq.ActiveMQSourceConnector",
                "activemq.url"                                         : "tcp://my-activemq-endpoint:61619",
                "activemq.username"                                    : "ACTIVEMQ_USER",
                "activemq.password"                                    : "ACTIVEMQ_PASSWORD",
                "jms.destination.type"                                 : "topic",
                "jms.destination.name"                                 : "TRAIN_MVT_EA_TOC",
                "kafka.topic"                                          : "networkrail_TRAIN_MVT_v01",
                "value.converter"                                      : "org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable"                       : "false",
                "key.converter"                                        : "org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable"                         : "false",
                "topic.creation.default.partitions"                    : 1,
                "topic.creation.default.replication.factor"            : 3,
                "confluent.license"                                    : "",
                "confluent.topic.bootstrap.servers"                    : "MY-CCLOUD-BROKER-ENDPOINT.gcp.confluent.cloud:9092",
                "confluent.topic.sasl.jaas.config"                     : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'CCLOUD_USER'\" password=\"'CCLOUD_PASSWORD'\";",
                "confluent.topic.security.protocol"                    : "SASL_SSL",
                "confluent.topic.ssl.endpoint.identification.algorithm": "https",
                "confluent.topic.sasl.mechanism"                       : "PLAIN",
                "confluent.topic.request.timeout.ms"                   : "20000",
                "confluent.topic.retry.backoff.ms"                     : "500"
            }'"'"'
        sleep infinity'
Warning: Permanently added 'compute.8428359303178581516' (ED25519) to the list of known hosts.
[email protected]: Permission denied (publickey).
ERROR: (gcloud.compute.ssh) [/usr/bin/ssh] exited with return code [255].

Once the VM is running properly you’ll get a shell prompt

  ########################[ Welcome ]########################
  #  You have logged in to the guest OS.                    #
  #  To access your containers use 'docker attach' command  #
  ###########################################################
rmoff@rmoff-connect-source-v01 ~ $

From here, you can see the containers running on the VM. To start with you’ll see a couple of internal ones ( stackdriver-logging-agent , konlet ):

rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID  IMAGE                                                                COMMAND                  CREATED         STATUS                  PORTS   NAMES
4a04df77a0be  gcr.io/gce-containers/konlet:v.0.11-latest                           "/bin/gce-containers…"   35 seconds ago  Up 32 seconds                   pedantic_tu
0d008a624e56  gcr.io/stackdriver-agents/stackdriver-logging-agent:0.2-1.5.33-1-1   "/entrypoint.sh /usr…"   2 days ago      Up 2 days                       stackdriver-logging-agent

and soon after, the actual container that you’ve configured to run:

rmoff@rmoff-connect-source-v01 ~ $ docker ps
CONTAINER ID        IMAGE                                                                COMMAND                  CREATED             STATUS                             PORTS               NAMES
1e349180aa20        confluentinc/cp-kafka-connect-base:6.0.1                             "/bin/bash -c 'set -…"   33 seconds ago      Up 30 seconds (health: starting)                       klt-rmoff-connect-source-v01-qjez

At this point you’re just in normal Docker world, and can look at the logs as you would locally:

rmoff@rmoff-connect-source-v01 ~ $ docker logs -f klt-rmoff-connect-source-v01-qjez|more
+ export CONNECT_CUB_KAFKA_TIMEOUT=300
+ CONNECT_CUB_KAFKA_TIMEOUT=300
Installing connector plugins
+ echo 'Installing connector plugins'
+ confluent-hub install --no-prompt confluentinc/kafka-connect-activemq:10.1.0
Running in a "--no-prompt" mode
[2021-01-11 21:56:38,614] INFO [Worker clientId=connect-1, groupId=kafka-connect-group-gcp-v01] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

With all of this done, you should now see topics on your Confluent Cloud cluster for both the internal Kafka Connect worker topics, and any populated by the connector: