添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

使用MRS安装集群,主要安装ZooKeeper、Flume、Kafka、Storm、Spark。

使用Storm、Spark、Flume或者自己编写consumer代码来消费Kafka中指定Topic的消息时,发现消费不到任何数据。

  • Kafka服务异常。
  • Consumer中ZooKeeper相关连接地址配置错误,导致无法消费。
  • Consumer发生ConsumerRebalanceFailedException异常,导致无法消费。
  • Consumer发生网络导致的ClosedChannelException异常,导致无法消费。
  • Storm、Spark、Flume或者自定义Consumer代码可以都称为Consumer。

  • 查看kafka服务状态:
  • MRS Manager界面操作:登录MRS Manager,依次选择 "服务管理 > Kafka ,查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。
  • FusionInsight Manager界面操作:登录FusionInsight Manager,选择“集群 > 待操作集群的名称 > 服务 > Kafka,

    查看当前Kafka状态,发现状态为良好,且监控指标内容显示正确。

  • 通过Kafka Client,判断是否可以正常消费数据。

    假设客户端已经安装在/opt/client目录,test为需要消费的Topic名称, 192.168.234.231为ZooKeeper的IP地址。

    cd /opt/client
    source bigdata_env
    kinit admin
    kafka-topics.sh --zookeeper 192.168.234.231:2181/kafka --describe --topic testkafka-console-consumer.sh --topic test --zookeeper 192.168.234.231:2181/kafka --from-beginning

    当可以消费到数据时,表示集群服务正常。

  • 查看Consumer相关配置,发现ZooKeeper连接地址错误。
  • Flume
    server.sources.Source02.type=org.apache.flume.source.kafka.KafkaSource                                            
    server.sources.Source02.zookeeperConnect=192.168.234.231:2181
    server.sources.Source02.topic = test
    server.sources.Source02.groupId = test_01
  • Spark
    val zkQuorum = "192.168.234.231:2181"
  • Storm
    BrokerHosts brokerHosts = new ZKHosts("192.168.234.231:2181");
  • Consumer API
    zookeeper.connect="192.168.234.231:2181"
  • MRS中Kafka在ZooKeeper存储的ZNode是以/kafka为根路径,有别于开源。Kafka对应的ZooKeeper连接配置为192.168.234.231:2181/kafka。

    Consumer中配置为ZooKeeper连接配置为192.168.234.231:2181,导致无法正确获取Kafka中Topic相关信息。

    解决方法参考 1

  • 查看Consumer相关日志,发现打印ConsumerRebalanceFailedException异常。
    2016-02-03 15:55:32,557 | ERROR | [ZkClient-EventThread-75- 192.168.234.231:2181/kafka] |  Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@34b41dfe]  | org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:77)
    kafka.common.ConsumerRebalanceFailedException: pc-zjqbetl86-1454482884879-2ec95ed3 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633)
    at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:487)
    at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

    通过异常信息,发现当前Consumer没有在指定的重试次数内完成Rebalance,使得Consumer没有被分配Kafka Topic-Partition,则无法消费消息。

    解决方法参考 3

  • 查看Consumer相关日志,发现打印Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:26,host:192-168-234-231,port:9092] failed错误信息和ClosedChannelException异常。
    [2016-03-04 03:33:53,047] INFO Fetching metadata from broker id:26,host: 192-168-234-231,port:9092 with correlation id 0 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
    [2016-03-04 03:33:55,614] INFO Connected to 192-168-234-231:21005 for producing (kafka.producer.SyncProducer)
    [2016-03-04 03:33:55,614] INFO Disconnecting from 192-168-234-231:21005 (kafka.producer.SyncProducer)
    [2016-03-04 03:33:55,615] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:26,host: 192-168-234-231,port:21005] failed (kafka.client.ClientUtils$)
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    [2016-03-04 03:33:55,615] INFO Disconnecting from 192-168-234-231:21005 (kafka.producer.SyncProducer)

    通过异常信息,发现当前Consumer无法从Kafka Broker 192-168-234-231节点获取元数据,导致无法连接正确的Broker获取消息。

  • 检查网络是否存在问题,如果网络没有问题,检查是否配置主机和IP的对应关系
  • Linux

    执行 cat /etc/hosts 命令。

    图1 示例1
  • Windows

    打开“C:\Windows\System32\drivers\etc\hosts”。

    图2 示例2

    解决方法参考 4

  • ZooKeeper连接地址配置错误。
  • 修改Consumer配置中的ZooKeeper连接地址信息,保证和MRS相一致。

  • Flume
    server.sources.Source02.type=org.apache.flume.source.kafka.KafkaSource
    server.sources.Source02.zookeeperConnect=192.168.234.231:2181/kafka
    server.sources.Source02.topic = test
    server.sources.Source02.groupId = test_01
  • Spark
    val zkQuorum = "192.168.234.231:2181/kafka"
  • Storm
    BrokerHosts brokerHosts = new ZKHosts("192.168.234.231:2181/kafka");
  • Consumer API
    zookeeper.connect="192.168.234.231:2181/kafka"
  • Rebalance异常。

    同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时消费多个partition数据,consumer端也会有负载均衡(consumer个数小于partitions数量时)。

    consumer实际上是靠存储在zk中的临时节点来表明针对哪个topic的那个partition拥有读权限的。所在路径为:/consumers/consumer-group-xxx/owners/topic-xxx/x。

    当触发负载均衡后,原来的consumer会重新计算并释放已占用的partitions,此过程需要一定的处理时间,新来的consumer抢占该partitions时很有可能会失败。 表1 参数说明

    可以适当调大上述三个参数,可以参考如下数值:

    zookeeper.session.timeout.ms = 45000
    rebalance.max.retries = 10
    rebalance.backoff.ms = 5000

    参数设置应遵循:

    rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

  • 网络异常。

    在hosts文件中没有配置主机名和IP的对应关系,导致使用主机名进行访问时,无法获取信息。

  • 在hosts文件中添加对应的主机名和IP的对应关系。

  • Linux 图3 示例3
  • Windows 图4 示例4
  •