添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Hi Team,

I am using Strimzi Kafka inside my K8s cluster and I want to use KafkaConnect to archive my topics data to S3 bucket. So I created a docker image using the following Dockerfile.
Dockerfile :

FROM quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

And I download and extract the confluent AWS s3 sink plugin from the following link and using the above Dockerfile.

https://www.confluent.io/hub/confluentinc/kafka-connect-s3?_ga=2.34258849.1091420604.1648005277-226780291.1648005277

I could see that the KafkaConnect pod is running, but the topics are not archiving to the S3 bucket. And I could see the following errors from the KafkaConnect pod:

2022-03-25 05:22:04,727 ERROR [s3-sink-connector|task-0] WorkerSinkTask{id=s3-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-s3-sink-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'exit': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"exit"; line: 1, column: 5]
	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
	... 17 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'exit': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"exit"; line: 1, column: 5]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
	... 18 more

KafkaConnector YAML:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: "strimzi-kafka"
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    s3.bucket.name: test-kafka-connect
    topics: kafka-topic
    topics.dir: topics
    path.format: "YYYY/MM-dd/HH"
    flush.size: 99999999
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    s3.compression.type: gzip
    timezone: UTC
    aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    s3.region: ca-central-1

Can you help me to resolve this error?

key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
              

Dear @rmoff Thanks for taking my doubts. The provided error I am getting when I am creating a new KafkaConnetor. But this error is not getting me for now in the logs. If I send some JSON data to my test Kafka topic(kafka-topic), parse and consumers are working fine. But the data is not archived in the S3 bucket. The following lines are my test JSON:

'{"name":"John", "age":20, "car":kia}'
'{"name":"mike", "age":30, "car":toyota}'
'{"name":"jack", "age":40, "car":benz}'
{"number":0}

I don’t know what I did wrong here!

I’m not clear from your post if you’ve now overcome the error or not.

What’s the status of the connector? Ref: Monitoring Kafka Connect and Connectors | Confluent Documentation

If there’s no data in S3 then the likely causes are one or more of:

  • The connector has failed, and isn’t reading any data
  • The connector is running but the messages it consumes from the topic it can’t handle so it’s dropping them
  • The connector is running, but has no data to read from the topic and so there’s no data written to S3
  • Hi @rmoff , Thanks for your response. The previously reported error is not showing now. I have tested the commands which you shared. Please see the below details. I strongly believe that the connector running fine in the cluster. But still, the data is not archiving to the S3 bucket.

    I ran the following commands inside from the KafkaConnect pod:

    $curl -s "http://localhost:8083/connectors/s3-sink-connector/status"
      "name": "s3-sink-connector",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.244.2.152:8083"
      "tasks": [
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.244.2.152:8083"
      "type": "sink"
    
    $ curl localhost:8083/ | jq
      "version": "3.1.0",
      "commit": "37edeed0777bacb3",
      "kafka_cluster_id": "3lgIEbX1QEi8YlxKRnpkIw"
    
    $ curl localhost:8083/connector-plugins | jq
        "class": "io.confluent.connect.s3.S3SinkConnector",
        "type": "sink",
        "version": "10.0.6"
        "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
        "type": "source",
        "version": "3.1.0"
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "3.1.0"
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "3.1.0"
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
    
    $ curl localhost:8083/connectors
    ["s3-sink-connector"]
    
    $ curl localhost:8083/connectors/s3-sink-connector/tasks | jq
        "id": {
          "connector": "s3-sink-connector",
          "task": 0
        "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "s3.region": "ca-central-1",
          "topics.dir": "topics",
          "flush.size": "99999999",
          "s3.part.size": "10485760",
          "timezone": "UTC",
          "tasks.max": "1",
          "rotate.interval.ms": "3600000",
          "locale": "en",
          "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
          "aws.access.key.id": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "s3.bucket.name": "test-kafka-connect",
          "partition.duration.ms": "3600000",
          "schema.compatibility": "NONE",
          "topics": "kafka-topic",
          "aws.secret.access.key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
          "key.converter.schemas.enable": "false",
          "s3.compression.type": "gzip",
          "task.class": "io.confluent.connect.s3.S3SinkTask",
          "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
          "value.converter.schemas.enable": "false",
          "name": "s3-sink-connector",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "path.format": "YYYY/MM-dd/HH",
          "timestamp.extractor": "Record"
    
  • The connector is running but the messages it consumes from the topic it can’t handle so it’s dropping them
  • The connector is running, but has no data to read from the topic and so there’s no data written to S3
  • For the first one, check the details of Kafka Connect error handling.

    For the second one, make sure you’re producing data to the topic.

    Also bear in mind that data has to flush to S3 so you won’t see it instantaneously. In fact, I think we have a third option to add to the two above to consider:

    subinactive:
          "flush.size": "99999999",
          "s3.part.size": "10485760",
          "rotate.interval.ms": "3600000",
    

    Check out the documentation:

  • flush.size specifies the number of records per partition the connector needs to write before completing a multipart upload to S3.
  • rotate.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records
  • I think with these values set very high it may be that everything is working exactly as you’ve told it to. Try setting the values lower for now to confirm that the pipeline is working, and then increase them to suitable values afterwards.

    Hi @rmoff
    I tried with the following values inside the kafka connector, but it is still not working. There is no errors,but the data is not archiving to S3.

    s3.part.size: 5242880
    flush.size: 999
    rotate.interval.ms: 360
    

    The JSON data I inputed:

    {"registertime":1493456789434,"userid":"User_3","regionid":"Region_3","gender":"FEMALE"}
    

    Test command for pushing data to kafka:

    bin/kafka-console-producer.sh --broker-list prod-kafka-bootstrap.myhost.com:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=xcMztndACxK7 --producer-property ssl.truststore.location=/tmp/user-truststore.jks --producer-property ssl.keystore.password=xcMztndACxK7 --producer-property ssl.keystore.location=/tmp/user-keystore.jks --topic kafka-topic

    I’m new to Kafka Connect myself, but one thing that helped me run it (before I tried it with Strimzi inside Docker) was to disable the schema lookup.

    Do you intend the connector to interpret the keys and values in the JSON messages? Or do you want it to just dump the messages “as they are”, treating them as simple byte strings?

    I use Avro for storage format, to dump the messages verbatim for backup, but I think the same (or part of it) could hold here:

        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        schemas.enable: false
        schema.compatibility: NONE
    

    I think you are missing the schemas.enabled: false

    I think you are missing the schemas.enabled: false

    On its own, this property does nothing. Bytes never have a schema. Avro always has a schema.

    value.converter.schemas.enable (or for key), on the other hand is only applicable to the JsonConverter.

    I have the same issue Amazon s3 sink connector running but not writing to s3, here is my config for the sink connector:-

    connector.class=io.confluent.connect.s3.S3SinkConnector
    s3.region=eu-west-2
    flush.size=200
    schema.compatibility=NONE
    tasks.max=1
    topics=prod-arbor-test
    format.class=io.confluent.connect.s3.format.json.JsonFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    value.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    storage.class=io.confluent.connect.s3.storage.S3Storage
    key.converter=org.apache.kafka.connect.storage.StringConverter
    s3.bucket.name=debezium-test-data

    Thanks for the correction.

    Without disabling the schema lookup (we don’t use schemas in our topics, some use JSON message format and some use Protobuf), the connector refused to write and complaint about something like “can’t find schema” or “failed to parse the JSON” (I don’t have the exact message right now).

    Only when I set the fields to tell it not to look for keys vs. values did it start generating Avro files on the S3 bucket. Reading the Avro objects seemed to contain the messages correctly.

    Are keys always supposed to exist?

    @motahir it might be more useful to create your own post, including the full stacktrace and version you’re using, and describe what data types your producer is sending. From the original post, a plain string “exit” is being consumed and simply cannot be parsed by the JSON converters being used. Hopefully that helps debug your own error.

    Regarding rest of this thread, I personally wouldn’t set rotation intervals to less than 5 minutes. Otherwise, you’ll get lots tiny files in S3, and you need to pay for each those bucket scans! For the default partitioner, you’ll need to wait for the full flush size…

    If there’s no errors, you’ll have to look further into JMX metrics to see if consumers are actually reading topic data

    @gliderflyer Keys can be null, based on the producer’s needs. I was mostly referring to the original post about “archival”, which ideally means the full record is going to be persisted in its most raw format. For example, Avro files would contain full schemas, not schema ids from the registry.

    Also, there is a setting like “include.keys” that didn’t exist until around 6.x release chain, so record keys could never be written without a transform

    @OneCricketeer In my case it was the flush size, when I used a flush size of 2 it started to work.
    There were only two msg’s in my test topic, and so the sink was waiting for it to reach 200.

    connector.class=io.confluent.connect.s3.S3SinkConnector
    s3.region=eu-west-2
    flush.size=2
    schema.compatibility=NONE
    tasks.max=1
    topics=prod-arbor-test
    format.class=io.confluent.connect.s3.format.json.JsonFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    value.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    storage.class=io.confluent.connect.s3.storage.S3Storage
    key.converter=org.apache.kafka.connect.storage.StringConverter
    s3.bucket.name=debezium-test-data