Already client is connected and during polling event, SSL handshake failure happened. it led to leaving the co-ordinator. Even on SSL handshake failure which was actually intermittent issue, polling should have some resilient and retry the polling. Leaving group caused all instances of clients to drop and left the messages in Kafka for long time until re-subscribe the kafka topic manually.
2019-09-06 04:03:09,016 ERROR [reactive-kafka-xxxx] org.apache.kafka.clients.NetworkClient [Consumer clientId=aaa, groupId=bbb] Connection to node 150 (host:port) failed authentication due to: SSL handshake failed
2019-09-06 04:03:09,021 ERROR [reactive-kafka-xxxx] reactor.kafka.receiver.internals.DefaultKafkaReceiver Unexpected exception
java.lang.NullPointerException: null
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1012) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar!/:?]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:470) ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:401) ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$14(DefaultKafkaReceiver.java:335) ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398) ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484) ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
at reactor.kafka.receiver.internals.KafkaSchedulers$EventScheduler.lambda$decorate$1(KafkaSchedulers.java:100) ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
at org.springframework.cloud.sleuth.instrument.async.TraceCallable.call(TraceCallable.java:70) ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
2019-09-06 04:03:09,023 INFO [reactive-kafka-xxxx] org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=aaa, groupId=bbb] Member x_13-081e61ec-1509-4e0e-819e-58063d1ce8f6 sending LeaveGroup request to coordinator