添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

childe,你好:
首先非常感谢使用了hangout对我的帮助,我这里遇到一个问题,想请教你。
**问题:**我的hangout在消费kafka的并写入到Elasticsearch的时候,发现Elasticsearch上日志数据量比kafka的LOG-END-OFFSET数据要多,而kafka的LOG-END-OFFSET数据与我的日志文件数据量是一样的。
比如我的日志条数是48078条,我的kafka的LOG-END-OFFSE总和也是48078,但是我的Elasticsearch上查看到的日志是51334条,很好奇多余的几千条数据是怎么来的。
**我的理解:**我感觉可能是hangout消费kafka后,kafka并没有认为hangout消费了,所以数据显示没有被消费,则又被hangout消费了一遍,那么请问hangout消费一条数据之后,会做什么处理?
**注:**由于某些原因,我的ES版本不能升级。
下面是我的应用环境:
ES版本:Elasticsearch 2.3.5
hangout版本:hangout-0.1.8.2-ES2.3.5
kafka版本:kafka_2.11-1.1.0
zookeeper版本:zookeeper-3.4.12
下面是我的hangout配置

inputs:
    - NewKafka:
        codec: json
        topic:
          topic-test: 6
        consumer_settings:
          bootstrap.servers:  192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group.id: topic-test
outputs:
    - Elasticsearch:
        cluster: es-cluster # cluster name, required
        hosts: # required
                - 192.168.10.10:9301
                - 192.168.10.11:9301
                - 192.168.10.12:9301
                - 192.168.10.13:9301
                - 192.168.10.14:9301
        index: 'hangout-test'
        index_type: logs # default logs
        bulk_actions: 20000 # default 20000
        bulk_size: 15   #default 15
        flush_interval: 10      #default 10
        concurrent_requests: 0  #default 0
        timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
        sniff: false
          

很有可能是kafka group consumer 做了rebalance.
默认kafka是XX秒commit一次当前的offset. 如果新起一个消费者, 他会Join Group, 导致Group里面的成员之前的memberID失效. 当commit的时候, 新的memberID还没有拿到. 所以这段时间的数据可能会重复消费.
上面只是我的猜测 :) 你可以看一下client/server端日志, 是不是有rebalance.

childe,你好:
我说明以下三点:
1、如你所说,我做了个测试,重启hangout的时候,确实kafka会有rebalance group topic-test的日志信息出现,但是我昨天一直到今天并没有停止hangout,并且kafka日志也没有rebalance group topic-test的信息,最后查Elasticsearch日志数据量还是比file文件中日志数据量多(查询的是一小时的数据)。
2、为了多方面测试,我昨天用logstash2.3.4版本也消费kafka数据写入Elasticsearch,用的是同一组kafka,不同的topic,发现最后Elasticsearch和file文件中一小时的数据是一样的,并没有出现Elasticsearch多数据的情况。
3、我想到了一个问题:有没有可能是hangout消费完kafka数据并写入Elasticsearch之后成功之后返回给kafka的时候出现错误,然后并没有返回给kafka这条日志已经消费,所以再次消费了同一条数据?请问如果hangout写入Elasticsearch之后成功之后,返回给kafka的那段代码是怎么写的?有没有可能写入Elasticsearch成功因为某方面原因返回给kafka的结果是消费失败?