设为首页 收藏本站
查看: 2138|回复: 0

[经验分享] Flume的介绍和简单操作

[复制链接]

尚未签到

发表于 2019-1-30 09:27:40 | 显示全部楼层 |阅读模式
Flume是什么

  Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。


Flume的功能


  • 支持在日志系统中定制各类数据发送方,用于收集数据
  • 提供对数据简单处理,并写到各类数据接收方(可定制)的能力
    Flume的组成
  • Agent:核心组件

    • source 负责数据的产生或搜集
    • channel  是一种短暂的存储容器,负责数据的存储持久化
    • sink  负责数据的转发
      Flume的工作流示意图

  • 数据流模型
    DSC0000.jpg
  • 多Agent模型
    DSC0001.jpg
  • 合并模型
    DSC0002.jpg
  • 混合模型
    DSC0003.jpg
    Flume的安装
      下载安装包并解压

    wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
    tar -zxvf apache-flume-1.8.0-bin.tar.gz
      配置环境变量


vim ~/.bashrc
export FLUME_HOME=/usr/local/src/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin
source ~/.bashrc
Flume简单操作


  • netcat模式
    进入conf目录下编写netcat.conf文件,内容如下:

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink
agent.sources.netcatSource.type = netcat
agent.sources.netcatSource.bind = localhost
agent.sources.netcatSource.port = 11111
agent.sources.netcatSource.channels = memoryChannel
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10
  启动一个实例

(py27) [root@master conf]# pwd
/usr/local/src/apache-flume-1.8.0-bin/conf
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./netcat.conf --name agent -Dflume.root.logger=INFO,console
  启动成功

18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent
18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels
18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel
18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource, type netcat
18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger
18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink]
18/10/24 11:26:35 INFO node.Application: Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{loggerSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
18/10/24 11:26:35 INFO node.Application: Starting Channel memoryChannel
18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms
18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink
18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource
18/10/24 11:26:36 INFO source.NetcatSource: Source starting
18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.155.120:11111]

  然后新开一个终端,发送数据

(py27) [root@master apache-flume-1.8.0-bin]# telnet localhost 11111
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
OK
  查看接收数据

18/10/24 11:30:15 INFO sink.LoggerSink: Event: { headers:{} body: 31 0D                                           1. }
  注:如果没有telnet工具,请先安装:yum install telnet



  • Exec模式
    编写配置文件exec.conf

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink
agent.sources.netcatSource.type = exec
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10
  启动实例

(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec.conf --name agent -Dflume.root.logger=INFO,console
  启动成功后,创建配置文件中的exec.log文件

(py27) [root@master test_data]# ls
exec.log
(py27) [root@master test_data]# pwd
/home/master/FlumeTest/test_data
(py27) [root@master test_data]#
  然后通过echo命令模拟日志的产生

(py27) [root@master test_data]# echo 'Hello World!!!' >> exec.log
  查看接收的日志

18/10/25 09:19:52 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21       Hello World!!! }
  如何将日志保存到HDFS上
修改配置文件

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink
agent.sources.netcatSource.type = exec
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel
agent.sinks.loggerSink.type = hdfs
agent.sinks.loggerSink.hdfs.path = /flume/%y-%m-%d/%H%M/
agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_
agent.sinks.loggerSink.hdfs.round = true
agent.sinks.loggerSink.hdfs.roundValue = 1
agent.sinks.loggerSink.hdfs.roundUnit = minute
agent.sinks.loggerSink.hdfs.rollInterval = 3
agent.sinks.loggerSink.hdfs.rollSize = 20
agent.sinks.loggerSink.hdfs.rollCount = 5
agent.sinks.loggerSink.hdfs.useLocalTimeStamp = true
agent.sinks.loggerSink.hdfs.fileType = DataStream
agent.sinks.loggerSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.channels.memoryChannel.transactionCapacity = 10
  然后启动实例

(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec_hdfs.conf --name agent -Dflume.root.logger=INFO,console
  然后可以看到它把exec.log文件里的日志给写到了HDFS上

18/10/25 09:54:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming /flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to /flume/18-10-25/0954/exec_hdfs_.1540475666623
18/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.
  我们进入HDFS查看,可以看到log里的内容

(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0954
Found 1 items
-rw-r--r--   3 root supergroup         15 2018-10-25 09:54 /flume/18-10-25/0954/exec_hdfs_.1540475666623
(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0954/exec_hdfs_.1540475666623
Hello World!!!
  然后我们再次写入写的log,然后再查看

//写入新的log
(py27) [root@master test_data]# echo 'test001' >> exec.log               
(py27) [root@master test_data]# echo 'test002' >> exec.log
//进入HDFS目录查看
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25
Found 2 items
drwxr-xr-x   - root supergroup          0 2018-10-25 09:54 /flume/18-10-25/0954
drwxr-xr-x   - root supergroup          0 2018-10-25 09:56 /flume/18-10-25/0956
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0956
Found 1 items
-rw-r--r--   3 root supergroup         16 2018-10-25 09:56 /flume/18-10-25/0956/exec_hdfs_.1540475766338
(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0956/exec_hdfs_.1540475766338
test001
test002

  • 故障转移实例
    首先需要三台机器,master、slave1、slave2,然后分别配置实例并启动,master上的agent实例发送日志,slave1和slave2接收日志
    master配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink1 loggerSink2
agent.sinkgroups = group
agent.sources.netcatSource.type = exec
agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
agent.sources.netcatSource.channels = memoryChannel
agent.sinks.loggerSink1.type = avro
agent.sinks.loggerSink1.hostname = slave1
agent.sinks.loggerSink1.port = 52020
agent.sinks.loggerSink1.channel = memoryChannel
agent.sinks.loggerSink2.type = avro
agent.sinks.loggerSink2.hostname = slave2
agent.sinks.loggerSink2.port = 52020
agent.sinks.loggerSink2.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000
agent.sinkgroups.group.sinks = loggerSink1 loggerSink2
agent.sinkgroups.group.processor.type = failover
agent.sinkgroups.group.processor.loggerSink1 = 10
agent.sinkgroups.group.processor.loggerSink2 = 1
agent.sinkgroups.group.processor.maxpenalty = 10000
  slave1配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink
agent.sources.netcatSource.type = avro
agent.sources.netcatSource.bind = slave1
agent.sources.netcatSource.port = 52020
agent.sources.netcatSource.channels = memoryChannel
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000
  slave2配置

agent.sources = netcatSource
agent.channels = memoryChannel
agent.sinks = loggerSink
agent.sources.netcatSource.type = avro
agent.sources.netcatSource.bind = slave2
agent.sources.netcatSource.port = 52020
agent.sources.netcatSource.channels = memoryChannel
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000
  分别启动master、slave1、slave2的agent,然后在mater上写入日志,然后观察谁收到了

//master
(py27) [root@master test_data]# echo 'hello' >> exec.log  
//slave1
18/10/25 10:53:53 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726                             
  发现是slave1收到数据,然后我们把slave1的agent关掉,再次发送日志

//master
(py27) [root@master test_data]# echo '11111' >> exec.log      
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }

  然后再次启动slave1的agent

//master
(py27) [root@master test_data]# echo '22222' >> exec.log      
//slave1
18/10/25 10:58:21 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 32 32                                  22222 }
//slave2
18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669441-1-1.html 上篇帖子: Flume中央日志系统 下篇帖子: 海量日志收集利器 —— Flume
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表