inputs:
- NewKafka:
codec: json
topic:
topic-test: 6
consumer_settings:
bootstrap.servers: 192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
group.id: topic-test
outputs:
- Elasticsearch:
cluster: es-cluster # cluster name, required
hosts: # required
- 192.168.10.10:9301
- 192.168.10.11:9301
- 192.168.10.12:9301
- 192.168.10.13:9301
- 192.168.10.14:9301
index: 'hangout-test'
index_type: logs # default logs
bulk_actions: 20000 # default 20000
bulk_size: 15 #default 15
flush_interval: 10 #default 10
concurrent_requests: 0 #default 0
timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
sniff: false
很有可能是kafka group consumer 做了rebalance.
默认kafka是XX秒commit一次当前的offset. 如果新起一个消费者, 他会Join Group, 导致Group里面的成员之前的memberID失效. 当commit的时候, 新的memberID还没有拿到. 所以这段时间的数据可能会重复消费.
上面只是我的猜测 :) 你可以看一下client/server端日志, 是不是有rebalance.
childe,你好:
我说明以下三点:
1、如你所说,我做了个测试,重启hangout的时候,确实kafka会有rebalance group topic-test的日志信息出现,但是我昨天一直到今天并没有停止hangout,并且kafka日志也没有rebalance group topic-test的信息,最后查Elasticsearch日志数据量还是比file文件中日志数据量多(查询的是一小时的数据)。
2、为了多方面测试,我昨天用logstash2.3.4版本也消费kafka数据写入Elasticsearch,用的是同一组kafka,不同的topic,发现最后Elasticsearch和file文件中一小时的数据是一样的,并没有出现Elasticsearch多数据的情况。
3、我想到了一个问题:有没有可能是hangout消费完kafka数据并写入Elasticsearch之后成功之后返回给kafka的时候出现错误,然后并没有返回给kafka这条日志已经消费,所以再次消费了同一条数据?请问如果hangout写入Elasticsearch之后成功之后,返回给kafka的那段代码是怎么写的?有没有可能写入Elasticsearch成功因为某方面原因返回给kafka的结果是消费失败?