添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

Ask Question

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

Although the user thread hangs for more than 10 seconds, heartbeat thread could still be able to send out heartbeats normally, that's why no exceptions were thrown and that's also the reason why max.poll.intervals.ms was introduced. What I am interested in is why you still got CommitFailException when max.poll.intervals.ms is set to Integer.MAX_VALUE. – amethystic Aug 8, 2017 at 10:01

session.timeout.ms set on the consumer should be less than the group.max.session.timeout.ms set on Kafka broker.

This resolved the issue for me.

Credit to github link Commit Failures

Worked for me as well! Default value for group.max.session.timeout.ms is 30000 and I set my session.timeout.ms to 25000. – F. Santiago Dec 12, 2018 at 8:49 Actually it's in fact 30000, see here: kafka.apache.org/090/documentation.html#brokerconfigs – Zap Dec 28, 2020 at 12:10

Hi For this you need to handle the rebalancing condition in your code and should process the ongoing message and commit it before rebalancing

Like :

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Implement what you want to do once rebalancing is done.
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // commit current method

and Use this syntax for subscribing the topic :

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

The advantage of doing this :

  • Messages will not repeat when the rebalancing is taking place.

  • No commit fail exception

    If you are also writing tests like me, you can use that group id to consume one message with a shell command. It can eliminate this error temporarily. For long-term solutions, you can refer to the above answers, e.g.:

    ./kafka-console-consumer.sh --bootstrap-server xxx:9092 --topic yyy --group zzz --max-messages 1 
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.

  •