设为首页 收藏本站
查看: 1631|回复: 0

[经验分享] Flume NG 学习笔记(二)单机与集群Flume 配置

[复制链接]

尚未签到

发表于 2019-1-30 09:38:03 | 显示全部楼层 |阅读模式
下面的内容基本来自官网:http://flume.apache.org/FlumeUserGuide.html
本文使用的是最新版本的apache flume 1.5,安装完Flume然后测试下Flume是否可以用,在Flume目录下用以下语句测试:
bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template
结果如图显示:




Ok,我们接下去看下面常用架构、功能配置示例
一、最简单的单一代理Flume 配置

下面是配置文件:


  [html] view plain copy

  •   #文件名:single_case1.conf.conf  
  •   #配置内容:  
  •   #single_case1.conf.conf: A single-node Flume configuration  
  •   #Name the components on this agent  
  •   a1.sources= r1  
  •   a1.sinks= k1  
  •   a1.channels= c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type= netcat  
  •   a1.sources.r1.bind= localhost  
  •   a1.sources.r1.port= 44444  
  •      
  •   #Describe the sink  
  •   a1.sinks.k1.type= logger  
  •      
  •   #Use a channel which buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.capacity= 1000  
  •   a1.channels.c1.transactionCapacity= 100  
  •      
  •   #Bind the source and sink to the channel  
  •   a1.sources.r1.channels= c1  
  •   a1.sinks.k1.channel= c1  
  




说明下,这里所有的例子都是将配置文件放到 $FLUME_HOME/conf 目录下,后面就不赘述了。

#敲命令
flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console

#参数命令
-c conf 指定配置目录为conf
-f conf/single_case1.conf指定配置文件为conf/single_case1.conf
-n a1 指定agent名字为a1,需要与case1_example.conf中的一致
-Dflume.root.logger=INFO,console指定DEBUF模式在console输出INFO信息
具体参数命令请通过flume-nghelp查看

#然后在另一个终端进行测试
telnet 127.0.0.1 44444




然后会看在之前启动的终端查看console输出到如下:




这里会发现消息hello world! 输出了,而hello world! hello world!hello world!则被拦截了。因为在配置文件中,我们选择的输出方式为:a1.sinks.k1.type= logger
,即console输出,flume-ng针对logger是只显示16个字节的,剩下的都被sink截了。下面是源码
在LoggerSink.Java中:



  [java] view plain copy

  •   if(event != null) {  
  •          if (logger.isInfoEnabled()) {  
  •            logger.info("Event: " + EventHelper.dumpEvent(event));  
  •          }  
  •   }  
  



我们去看EventHelper.java的dumpEvent方法:


  [java] view plain copy

  •   privatestatic final int DEFAULT_MAX_BYTES = 16;  
  •   publicstatic String dumpEvent(Event event) {  
  •      return dumpEvent(event, DEFAULT_MAX_BYTES);  
  •   }  
  •      
  •   publicstatic String dumpEvent(Event event, int maxBytes) {  
  •      StringBuilder buffer = new StringBuilder();  
  •      if (event == null || event.getBody() == null) {  
  •        buffer.append("null");  
  •      } else if (event.getBody().length == 0) {  
  •        // do nothing... in this case, HexDump.dump() will throw anexception  
  •      } else {  
  •        byte[] body = event.getBody();  
  •        byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));  
  •        ByteArrayOutputStream out = new ByteArrayOutputStream();  
  •        try {  
  •          HexDump.dump(data, 0, out, 0);  
  •          String hexDump = new String(out.toByteArray());  
  •          // remove offset since it's not relevant for such a smalldataset  
  •          if(hexDump.startsWith(HEXDUMP_OFFSET)) {  
  •            hexDump =hexDump.substring(HEXDUMP_OFFSET.length());  
  •          }  
  •          buffer.append(hexDump);  
  •        } catch (Exception e) {  
  •         if(LOGGER.isInfoEnabled()) {  
  •           LOGGER.info("Exception while dumpingevent", e);  
  •         }  
  •          buffer.append("...Exception while dumping:").append(e.getMessage());  
  •        }  
  •        String result = buffer.toString();  
  •        if(result.endsWith(EOL) && buffer.length() >EOL.length()) {  
  •          buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();  
  •        }  
  •      }  
  •      return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";  
  •    }  
  



不难看出,在event处理过程中,发生了数据截取操作。
Ok,进入下一个环节。


二、“集群”代理Flume 配置




这里集群的概念是多台机器的管理,最简单的就是两台机器一台代理主机从数据源获取数据,然后将数据在传送到另一台主机上,进行输出。这样做的意义是,一个业务多数据源的时候,我们可以对每个数据源设置代理,然后将它们汇总到一台代理主机上进行输出。
下面实现最简单的集群配置,即两个代理,一台接受数据源数据的代理将数据推送到汇总的代理,而汇总的代理再将数据输出。因此这两台主机分别是push,pull
根据上图需要用AVRO RPC通信,因此推数据sinks类型与拉数据的sources的类型都是avro 。而拉数据代理的数据源,我们用前文讲的Spool Source 形式来处理,这里我们预先建好目录与文件,test.log




下面设置推代理主机的flume配置文件:


  [html] view plain copy

  •   #推数据代理的配置文件push.conf  
  •   #Name the components on this agent  
  •   a2.sources= r1  
  •   a2.sinks= k1  
  •   a2.channels= c1  
  •      
  •   #Describe/configure the source  
  •   a2.sources.r1.type= spooldir  
  •   a2.sources.r1.spoolDir= /tmp/logs  
  •   a2.sources.r1.channels= c1  
  •      
  •   #Use a channel which buffers events in memory  
  •   a2.channels.c1.type= memory  
  •   a2.channels.c1.keep-alive= 10  
  •   a2.channels.c1.capacity= 100000  
  •   a2.channels.c1.transactionCapacity= 100000  
  •      
  •   #Describe/configure the source  
  •   a2.sinks.k1.type= avro  
  •   a2.sinks.k1.channel= c1  
  •   a2.sinks.k1.hostname= pull  
  •   a2.sinks.k1.port= 4444  
  




下面设置汇总代理主机的flume配置文件:


  [html] view plain copy

  •   #汇总数据代理的配置文件pull.conf  
  •   #Name the components on this agent  
  •   a1.sources= r1  
  •   a1.sinks= k1  
  •   a1.channels= c1  
  •      
  •   #Describe/configure the source  
  •   a1.sources.r1.type= avro  
  •   a1.sources.r1.channels= c1  
  •   a1.sources.r1.bind= pull  
  •   a1.sources.r1.port= 44444  
  •      
  •   #Describe the sink  
  •   a1.sinks.k1.type= logger  
  •    a1.sinks.k1.channel = c1  
  •      
  •   #Use a channel which buffers events in memory  
  •   a1.channels.c1.type= memory  
  •   a1.channels.c1.keep-alive= 10  
  •   a1.channels.c1.capacity= 100000  
  •   a1.channels.c1.transactionCapacity= 100000  
  




虽然Spool Source是非实时的,但由于数据量少,处理还是很快的,因此我们只能先启动pull代理。
#敲命令
flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console



上图显示成功。
先后去启动push主机的flume
#敲命令
flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console






查看pull主机的状态,发现数据已经传过来了。
然后会过去看push主机的文件


已经加上后缀名.COMPLETED。这与前文说的是一致的。

下面只要将新数据存入到目录/tmp/logs,push主机就会将数据发送到pull主机输出,并修改新数据文件的文件名。
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669450-1-1.html 上篇帖子: 日志收集框架 Flume 组件之Source使用 下篇帖子: flume按照日志时间写hdfs实现
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表