Hi Team,
I am using Strimzi Kafka inside my K8s cluster and I want to use KafkaConnect to archive my topics data to S3 bucket. So I created a docker image using the following Dockerfile.
Dockerfile
:
FROM quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
And I download and extract the confluent AWS s3 sink plugin from the following link and using the above Dockerfile.
https://www.confluent.io/hub/confluentinc/kafka-connect-s3?_ga=2.34258849.1091420604.1648005277-226780291.1648005277
I could see that the KafkaConnect pod is running, but the topics are not archiving to the S3 bucket. And I could see the following errors from the KafkaConnect pod:
2022-03-25 05:22:04,727 ERROR [s3-sink-connector|task-0] WorkerSinkTask{id=s3-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-s3-sink-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'exit': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"exit"; line: 1, column: 5]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
... 17 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'exit': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"exit"; line: 1, column: 5]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
... 18 more
KafkaConnector YAML:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: "strimzi-kafka"
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
s3.bucket.name: test-kafka-connect
topics: kafka-topic
topics.dir: topics
path.format: "YYYY/MM-dd/HH"
flush.size: 99999999
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.json.JsonFormat
s3.compression.type: gzip
timezone: UTC
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
s3.region: ca-central-1
Can you help me to resolve this error?
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
Dear @rmoff Thanks for taking my doubts. The provided error I am getting when I am creating a new KafkaConnetor. But this error is not getting me for now in the logs. If I send some JSON data to my test Kafka topic(kafka-topic), parse and consumers are working fine. But the data is not archived in the S3 bucket. The following lines are my test JSON:
'{"name":"John", "age":20, "car":kia}'
'{"name":"mike", "age":30, "car":toyota}'
'{"name":"jack", "age":40, "car":benz}'
{"number":0}
I don’t know what I did wrong here!
I’m not clear from your post if you’ve now overcome the error or not.
What’s the status of the connector? Ref: Monitoring Kafka Connect and Connectors | Confluent Documentation
If there’s no data in S3 then the likely causes are one or more of:
The connector has failed, and isn’t reading any data
The connector is running but the messages it consumes from the topic it can’t handle so it’s dropping them
The connector is running, but has no data to read from the topic and so there’s no data written to S3
Hi @rmoff , Thanks for your response. The previously reported error is not showing now. I have tested the commands which you shared. Please see the below details. I strongly believe that the connector running fine in the cluster. But still, the data is not archiving to the S3 bucket.
I ran the following commands inside from the KafkaConnect pod:
$curl -s "http://localhost:8083/connectors/s3-sink-connector/status"
"name": "s3-sink-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.244.2.152:8083"
"tasks": [
"id": 0,
"state": "RUNNING",
"worker_id": "10.244.2.152:8083"
"type": "sink"
$ curl localhost:8083/ | jq
"version": "3.1.0",
"commit": "37edeed0777bacb3",
"kafka_cluster_id": "3lgIEbX1QEi8YlxKRnpkIw"
$ curl localhost:8083/connector-plugins | jq
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "10.0.6"
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "3.1.0"
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "3.1.0"
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "3.1.0"
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
$ curl localhost:8083/connectors
["s3-sink-connector"]
$ curl localhost:8083/connectors/s3-sink-connector/tasks | jq
"id": {
"connector": "s3-sink-connector",
"task": 0
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ca-central-1",
"topics.dir": "topics",
"flush.size": "99999999",
"s3.part.size": "10485760",
"timezone": "UTC",
"tasks.max": "1",
"rotate.interval.ms": "3600000",
"locale": "en",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"s3.bucket.name": "test-kafka-connect",
"partition.duration.ms": "3600000",
"schema.compatibility": "NONE",
"topics": "kafka-topic",
"aws.secret.access.key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
"key.converter.schemas.enable": "false",
"s3.compression.type": "gzip",
"task.class": "io.confluent.connect.s3.S3SinkTask",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.schemas.enable": "false",
"name": "s3-sink-connector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "YYYY/MM-dd/HH",
"timestamp.extractor": "Record"
The connector is running but the messages it consumes from the topic it can’t handle so it’s dropping them
The connector is running, but has no data to read from the topic and so there’s no data written to S3
For the first one, check the details of Kafka Connect error handling.
For the second one, make sure you’re producing data to the topic.
Also bear in mind that data has to flush to S3 so you won’t see it instantaneously. In fact, I think we have a third option to add to the two above to consider:
subinactive:
"flush.size": "99999999",
"s3.part.size": "10485760",
"rotate.interval.ms": "3600000",
Check out the documentation:
flush.size
specifies the number of records per partition the connector needs to write before completing a multipart upload to S3.
rotate.interval.ms
specifies the maximum timespan in milliseconds a file can remain open and ready for additional records
I think with these values set very high it may be that everything is working exactly as you’ve told it to. Try setting the values lower for now to confirm that the pipeline is working, and then increase them to suitable values afterwards.
Hi @rmoff
I tried with the following values inside the kafka connector, but it is still not working. There is no errors,but the data is not archiving to S3.
s3.part.size: 5242880
flush.size: 999
rotate.interval.ms: 360
The JSON data I inputed:
{"registertime":1493456789434,"userid":"User_3","regionid":"Region_3","gender":"FEMALE"}
Test command for pushing data to kafka:
bin/kafka-console-producer.sh --broker-list prod-kafka-bootstrap.myhost.com:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=xcMztndACxK7 --producer-property ssl.truststore.location=/tmp/user-truststore.jks --producer-property ssl.keystore.password=xcMztndACxK7 --producer-property ssl.keystore.location=/tmp/user-keystore.jks --topic kafka-topic
I’m new to Kafka Connect myself, but one thing that helped me run it (before I tried it with Strimzi inside Docker) was to disable the schema lookup.
Do you intend the connector to interpret the keys and values in the JSON messages? Or do you want it to just dump the messages “as they are”, treating them as simple byte strings?
I use Avro for storage format, to dump the messages verbatim for backup, but I think the same (or part of it) could hold here:
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
schemas.enable: false
schema.compatibility: NONE
I think you are missing the schemas.enabled: false
I think you are missing the schemas.enabled: false
On its own, this property does nothing. Bytes never have a schema. Avro always has a schema.
value.converter.schemas.enable
(or for key), on the other hand is only applicable to the JsonConverter.
I have the same issue Amazon s3 sink connector running but not writing to s3, here is my config for the sink connector:-
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-2
flush.size=200
schema.compatibility=NONE
tasks.max=1
topics=prod-arbor-test
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=debezium-test-data
Thanks for the correction.
Without disabling the schema lookup (we don’t use schemas in our topics, some use JSON message format and some use Protobuf), the connector refused to write and complaint about something like “can’t find schema” or “failed to parse the JSON” (I don’t have the exact message right now).
Only when I set the fields to tell it not to look for keys vs. values did it start generating Avro files on the S3 bucket. Reading the Avro objects seemed to contain the messages correctly.
Are keys always supposed to exist?
@motahir it might be more useful to create your own post, including the full stacktrace and version you’re using, and describe what data types your producer is sending. From the original post, a plain string “exit” is being consumed and simply cannot be parsed by the JSON converters being used. Hopefully that helps debug your own error.
Regarding rest of this thread, I personally wouldn’t set rotation intervals to less than 5 minutes. Otherwise, you’ll get lots tiny files in S3, and you need to pay for each those bucket scans! For the default partitioner, you’ll need to wait for the full flush size…
If there’s no errors, you’ll have to look further into JMX metrics to see if consumers are actually reading topic data
@gliderflyer Keys can be null, based on the producer’s needs. I was mostly referring to the original post about “archival”, which ideally means the full record is going to be persisted in its most raw format. For example, Avro files would contain full schemas, not schema ids from the registry.
Also, there is a setting like “include.keys” that didn’t exist until around 6.x release chain, so record keys could never be written without a transform
@OneCricketeer In my case it was the flush size, when I used a flush size of 2 it started to work.
There were only two msg’s in my test topic, and so the sink was waiting for it to reach 200.
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-2
flush.size=2
schema.compatibility=NONE
tasks.max=1
topics=prod-arbor-test
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=debezium-test-data