添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • Flume官网地址: http://flume.apache.org/
  • 文档查看地址: http://flume.apache.org/FlumeUserGuide.html
  • 下载地址: http://archive.apache.org/dist/flume/
  • 将下载的软件包上传解压

     # tar -zxvf /opt/soft/apache-flume-1.9.0-bin.tar.gz -C /opt/
    # ln -s /opt/apache-flume-1.9.0-bin /opt/flume

    查看安装的版本

    # cd /opt/flume/bin/
    # flume-ng version
    Flume 1.9.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: d4fcab4f501d41597bc616921329a4339f73585e
    Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
    From source with checksum 35db629a3bda49d23e9b3690c80737f9

    修改 conf 目录下的 log4j.properties 配置文件,配置日志文件路径

    # vim conf/log4j.properties 
    flume.log.dir=/data/flume/log

    修改conf目录下的flume-env.sh,配置JAVA_HOME

    # vim flume-env.sh
    export JAVA_HOME=/opt/jdk

    创建配置文件flume-conf-kafka.properties

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.kafka.bootstrap.servers = 源端KAFKA:9092
    a1.sources.r1.kafka.topics=kitchen
    a1.sources.r1.kafka.groupId = flume
    a1.sources.r1.kafka.consumer.timeout.ms = 100
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers=目端KAFKA:9092
    a1.sinks.k1.kafka.topic=kitchen
    #a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    a1.sinks.k1.kafka.producer.acks=1
    a1.sinks.k1.custom.encoding=UTF-8
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动agent

    /opt/flume/bin/flume-ng agent -n a1 -c /opt/flume/conf/ -f /opt/flume/conf/flume-conf-kafka.properties -Dflume.root.logger=INFO,console

    看到Component type: SINK, name: k1 started 即为启动成功

    2022-05-26 19:33:56,247 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109)] Kafka version : 2.0.1
    2022-05-26 19:33:56,247 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110)] Kafka commitId : fa14705e51bd2ce5
    2022-05-26 19:33:56,248 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
    2022-05-26 19:33:56,248 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started

    #消费消息

    # /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic kitchen --from-beginning

    #生产消息

    /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic init-test

    查看flume日志,有新的Update信息

    2022-05-26 19:35:09,295 (kafka-producer-network-thread | producer-1) [INFO - org.apache.kafka.clients.Metadata.update(Metadata.java:285)] Cluster ID: 5zfnXAaMT4iGG48TrmsiLQ

    https://blog.csdn.net/huxili2020/article/details/120391578?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-120391578-blog-115274144.pc_relevant_antiscanv2&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-120391578-blog-115274144.pc_relevant_antiscanv2&utm_relevant_index=1

    https://blog.csdn.net/qq_47183158/article/details/112179052

    2022-06-01 17:15:51,083 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: Failed to publish events
            at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:268)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kitchen-2: 30001 ms has passed since batch creation plus linger time
            at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
            at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
            at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
            at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
            ... 3 more
    Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kitchen-2: 30001 ms has passed since batch creation plus linger time
    2022-06-01 17:15:52,039 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node 3 could not be established. Broker may not be availabl