# 日志采集到kafka
filebeat -e -c filebeat.yml
-c:配置文件位置
-path.logs:日志位置
-path.data:数据位置
-path.home:家位置
-e:关闭日志输出
-d 选择器:启用对指定选择器的调试。 对于选择器,可以指定逗号分隔的组件列表,也可以使用-d“*”为所有组件启用调试.例如,-d“publish”显示所有“publish”相关的消息。
# consumer 获取日志
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic --from-beginning
如果读到 EOF,则 filebeat 将会等待一段时间再去读文件, 结合 close_inactive 选项,如果等待时间超过了默认值 5 分钟,则 Harvester 结束
backoff
Filebeat检测到某个文件到了EOF之后,每次等待多久再去检测文件是否有更新,默认为1s。
max_backoff
Filebeat检测到某个文件到了EOF之后,等待检测文件更新的最大时间,默认是10秒。
backoff_factor
定义到达max_backoff的速度,默认因子是2,到达max_backoff后,
变成每次等待max_backoff那么长的时间才backoff一次,直到文件有更新才会重置为backoff
func (f *Log) wait() {
// Wait before trying to read file again. File reached EOF.
select {
case <-f.done:
return
case <-time.After(f.backoff):
// Increment backoff up to maxBackoff
if f.backoff < f.config.MaxBackoff {
f.backoff = f.backoff * time.Duration(f.config.BackoffFactor)
if f.backoff > f.config.MaxBackoff {
f.backoff = f.config.MaxBackoff
打开了文件需要 close
,不能占用文件不释放,不然即使 rm
了文件,磁盘空间也不会释放(早期 Filebeat 有这个bug)。
这个配置就是说明多久关闭文件
, 比如一个日志文件,10 分钟都没有读到新的内容就把文件句柄
关闭。
这里的时间不是取决于文件的最后更新时间,而是 Filebeat 内部记录的时间
,上次读到文件和这次尝试读文件的时间差。
官方建议设置的时间是比文件产生数据频率高一个数量级
(默认5m),比如每秒都有日志产生,这个值就可以设置为 1m。
ignore_older
多久前的旧文件就不管
路径下的历史文件可能很多,比如配置了按天分割,显然旧文件我们一般是不需要的
比如设置为 1h,表示文件时间在 1h 之前的日志都不会被 input 模块搜集,直到有新日志产生。
scan_frequency 10s(默认)
如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s
扫描频率如何控制, 多久扫描一次是否有新文件产生
通配设置复杂的话频繁扫文件也是很大的开销。
input 模块只是负责发现新文件,新文件是相对已经被 harvester 获取的文件,第一次发现之后就已经在被 harvester 一行行实时读取了,
所以这里基本上只影响日志切分时的实时性(这种场景下的短暂延迟是可以接受的)
clean_inactive
多久清理一次注册信息
需要保证这个文件已经不活跃
所以这个值需要大于 ignore_older + scan_frequency
不然的话清理后这个文件又被发现,则会重头开始读取,这样就重了。默认值是0(不开启clean_*相关功能)
clean_inactive > ignore_older + scan_frequency > close_inactive
按小时分割的日志配置
scan_frequency: 10s
ignore_older: 60m
close_inactive: 10m
close_renamed: true
close_removed: true
clean_inactive: 70m
clean_removed: true
filebeat.inputs:
- type: log
# Change to true to enable this input configuration.
enabled: true
fields:
kafka_topic: tp1
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /tmp/a/*.log
multiline:
pattern: '^{'
negate: true
match: after
scan_frequency: 100ms # scan 文件状态速度
backoff: 50ms # 监测文件变化速度
max_backoff: 50ms
backoff_factor: 1
flush.timeout: 1s
ignore_older: 24h
close_inactive: 200ms # 重置文件句柄速度
clean_inactive: 25h
- type: log
# Change to true to enable this input configuration.
enabled: true
fields:
kafka_topic: tp2
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /tmp/asd*.log
multiline:
pattern: '^{'
negate: true
match: after
backoff: 50ms
scan_frequency: 100ms
max_backoff: 50ms
backoff_factor: 1
flush.timeout: 1s
close_inactive: 200ms
ignore_older: 24h
clean_inactive: 25h
fields:
kafka_topic: ber_click
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /home/to/logs/click.log*
- type: log
# Change to true to enable this input configuration.
enabled: true
fields:
kafka_topic: ber_show
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /home/to/logs/show.log*
output.kafka:
# Array of hosts to connect to.
hosts: ["xxx:9093", "xxx:9093", "xxx:9093"]
enabled: true
topic: '%{[fields.kafka_topic]}'
使用 filestream input
替代 log
with ` Elastic 7.14+`
kafka
Kafka
是最初由Linkedin
公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper
协调的分布式日志系统
(也可以当做MQ系统)
常见可以用于web/nginx日志、访问日志,消息服务等等
Linkedin
于2010年贡献给了Apache基金会并成为顶级开源项目
kafka下载
windows 下
# 启动 zookeeper
zookeeper-server-start.bat ..\..\config\zookeeper.properties
# 启动 Kafka
kafka-server-start.bat ..\..\config\server.properties
# 创建一个主题
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-test-topic
# 查看创建的主题列表
kafka-topics.bat --list --zookeeper localhost:2181
heartbeat_interval_ms(3s)
表示多长时间向broker报告一次,这个默认值3000ms
这个值官方推荐不要高于session.timeout.ms 的1/3(这个值默认没问题)
session.timeout.ms(10s)
用于心跳线程(heartbeat thread)
如果发送心跳时间超过这个时间,broker就会认为消费者死亡
假设您设置 session.timeout.ms=30000,因此,消费者心跳线程必须在此时间到期之前向代理发送心跳
如果整个消费者死亡(并且死亡的处理线程很可能使包括心跳线程在内的整个消费者崩溃),则只需要 session.timeout.ms 来检测它
max.poll.interval.ms(5m)
用于处理线程(processing thread)
如果处理单个消息需要 1 分钟,则可以将 max.poll.interval.ms 设置为大于一分钟,以便处理线程有更多时间处理消息
如果处理线程死亡,则需要 max.poll.interval.ms 才能检测到这一点
auto.commit.interval.ms(5s)
自动提交间隔
max_poll_interval_ms(5m)
参数设置大一点可以增加两次 poll 之间处理消息的时间。
当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,
会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。
多个进程重复消费数据
因此假设进程A正在消费分区1的信息,并提交了偏移量,
之后又消费了10条数据,还没来得及提交偏移量的时候,
reblance机制
让进程B来继续消费分区1的信息,
此时进程B会从上次进程A提交偏移量的地方开始消费,
因此这10条数据就是重复消费的
解决方式:
将消费者进程与分区静态绑定
self.tp = TopicPartition(topic=self.topic, partition=settings.KAFKA_CONSUMER_PARTITION)
self.consumer.assign([self.tp])
消费组实现容错性机制
一个有2个partition的topic,和2个consumer,这2个consumer共同消费同一个topic中的数据
两个consumer同时运行的情况下,它们分别消费不同partition中的数据。
consumer1消费partition 0中的数据,consumer2消费parition 1中的数据。
刚开始consumer2只消费partition1中的数据,当consumer1退出后, consumer2中也开始消费partition 0中的数据了
同一group_id
的consumer
消费者将继续为特定group自动使用最后一个偏移量的数据