添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Red Hat AMQ streams is a massively scalable, distributed, and high-performance data streaming platform based on the Apache Kafka project. It offers a distributed backbone that allows microservices and other applications to share data with high throughput and low latency.

Red Hat AMQ streams deployed on open shift with persistence storage. Spring Kafka clients deployed as producers and consumers.

The Kafka consumer part of the consumer group suddenly stopped consuming messages.

Kafka Broker Logs

Kafka broker-0
ERROR [ReplicaManager broker=0] Error processing append operation on partition __consumer_offsets-13 (kafka.server.ReplicaManager) [data-plane-kafka-request-handler-2]
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-13
INFO [GroupCoordinator 0]: Preparing to rebalance group group-id in state PreparingRebalance with old generation 118964 (__consumer_offsets-13) (reason: Error COORDINATOR_NOT_AVAILABLE when storing group assignment during SyncGroup (member: consumer-group-id-x-aaa-bbb-ccc)) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-2]
Kafka broker-2
ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-13 at offset xxxxx (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-13 at offset xxxxx (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic __consumer_offsets-13 Topic: __consumer_offsets Partition: 13 Leader: 0 Replicas: 0,1,2 Isr: 0 <--------- problematic one Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0 Topic: __consumer_offsets Partition: 15 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0
  • The logs from broker-2 logs indicate why broker-2 is not able to get in sync for that partition and is not able to fetch offset xxxxx from __consumer_offset_13 on broker-0.
  • One should inspect/check what happened to that record using kafka-dump-log.sh tool to print that record. For example:
  • bin/kafka-dump-log.sh --files data/kafka/__consumer_offsets-13/000000000000000xxx.log --value-decoder-class "kafka.serializer.StringDecoder" --offsets-decoder
    ----------------------------------------------------------------------------
    ---------------------------------------------------------------------------
    -------------------------------------------------------------------------------
    baseOffset: xxxxx lastOffset: xxxxx count: 0 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 24 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: yyyyyyy CreateTime: 111111111111 size: 137 magic: 2 compresscodec: none crc: 2xxxxxxx isvalid: false
    org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /home/kkakarla/Downloads/__consumer_offsets-13/0000000000000xxx.log.  
  • Looking at the above, it is clearly evident that the record is corrupted. Upon checking the entire __consumer_offset_13 from broker-0. we found some .log files missing.
  • One can copy the __consumer_offset_13 from Kafka broker-0 pod to local using the below command.
  • From the logs, we observed the below error messages due to missing .log files from __consumer_offsets_13
  • org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on 
    the disk.
    ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition SQL_xxx-10 at offset 12345 (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
    org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on 
    the disk.
  • We observed the below read-only error on Kafka storage mount volumes.
  • message: 'relabel failed /var/lib/kubelet/pods/xxxxxyyyyyzzzz/volumes/kubernetes.io~csi/pvc-abcdef/mount:
              lsetxattr /var/lib/kubelet/pods/xxxxxyyyyyzzzz/volumes/kubernetes.io~csi/pvc-abcdef/mount/kafka-log0:
              read-only file system'
  • It is possible that the read-only issue prevented the creation of files that should have been created. As a result, the file may have been seemingly lost and corrupted.
  • Now the question is how one can fix the corrupted __consumer_offset_13 from Kafka broker-0.

    If other Kafka brokers Kafka-broker-1 and Kafka-broker-2 have log files in good condition. There is a way to restore __consumer-offset-13 using logs of Kafka-broker-1 or Kafka-broker-2

  • Stopping all the spring consumer applications
  • Configuring unclean.leader.election.enable=true in the Kafka CR yaml (oc edit Kafka -o yaml)
  • unclean.leader.election.enable=true is the configuration to fix __consumer-offset-13 in Kafka-0 using the Kafka-1 and Kafka-2 log files.
  • unclean.leader.election.enable (default: false)
  • Indicates whether to enable replicas not in the ISR set to be elected as a leader as a last resort, even though doing so may result in data loss
  • Wait for about three minutes, checking whether unclean.leader.election.enable is set to true for each of the Kafka brokers, using the following command
  • Please adjust the {BROKER_ID} for each broker id(0, 1, 2) and execute repeatedly.
  • It is possible that the Strimzi cluster operator does not set unclean.leader.election.enable=true due to a lack of ISRs
  • If unclean.leader.election.enable=tru e could not be set by the strimzi cluster operator], we should set it manually. Check again after executing the following commands. please adjust the {BROKER_ID} for each broker id(0, 1, 2) and execute repeatedly.
  • Just to be sure, checking that the kafka-0 pod can be restarted using " oc delete pod <kafka-0 pod name> "
  • If the broker does not start here related to other than __consumer-offset-13 , there may be other problems.
  • Logging in the kafka-0 pod, using " oc debug <kafka-0 pod name> "
  • After logging in the kafka-0 pod, deleting the __consumer-offset-13(/var/lib/kafka/data-<JBOD_ID>/kafka-log<BROKER_ID>/__consumer_offsets-13) directory in the kafka-0 pod.
  • Logging off from the kafka-0 pod
  • Restarting kafka-0 pod, using " oc delete pod <kafka-0 pod name> "
  • Wait for about 3 minutes
  • If everything goes smooth then __consumer-offset-13 of Kafka-0 may recover from kafka-1 and kafka-2 logs automatically at this point.
  • If the above solution fails for some reason, then one can try the below workaround:

  • The safe workaround to avoid the use of __consumer-offsets-13 by changing the consumer group name(group.xxx) of the client spring application.
  • The partition number of __consumer_offsets is determined from the hash value of the consumer group name(group.xxx) and the max number of __consumer_offsets partition.
  • For your information, the following is a small Java program to identify the partition number of the __consumer_offsets . If you use group-xxx it will be 13.
  • The partition number of your new consumer group name should not be 13. For example, if you use group-zzz as the new consumer group name, the partition number should be some other integer no.
  • One can extract a list of offsets of the consumer group(group.id=group-xxx) from the log files of Kafka-broker 1 and Kafka-broker 2
  • This is the Java program. to find the partition number of the __consumer_offsets

        public static void main(String[] args) {
          int maxConsumerOffsetsTopicPartition = 50; // __consumer_offsets topic partition num: default 50
          String groupId = "group-xxx"; // corrupted consumer group
          int consumerOffsetsPartitionNum = Math.abs(groupId.hashCode()) % maxConsumerOffsetsTopicPartition;
          System.out.println(consumerOffsetsPartitionNum);
    

    After changing the consumer group name in the above workaround, one should delete the __consumer-offsets-13 log files manually for cleanup.

    Follow the steps mentioned in the section "How one can fix the corrupted __consumer_offset_13 from Kafka broker-0."