设为首页 收藏本站
查看: 1445|回复: 0

[经验分享] Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发

[复制链接]

尚未签到

发表于 2019-1-30 09:41:45 | 显示全部楼层 |阅读模式
版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]

一、Transaction interface
  Transaction接口是基于flume的稳定性考虑的。所有主要的组件(sources、sinks、channels)都必须使用Flume Transaction。我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的。


  从上图中可以看出,一个Transaction在Channel实现内实现。每一个连接到channel的source和sink都要获取一个Transaction对象。这Sources实际上使用了一个ChannelSelector接口来封装Transaction。存放事件到channel和从channel中提取事件的操作是在一个活跃的Transaction内执行的。
  下面是官网例子
  

  [java] view plain copy

  •   Channel ch = new MemoryChannel();  
  •   Transaction txn = ch.getTransaction();  
  •   txn.begin();  
  •   try {  
  •     // This try clause includes whatever Channel operations you want to do  
  •     
  •     Event eventToStage = EventBuilder.withBody("Hello Flume!",  
  •                          Charset.forName("UTF-8"));  
  •     ch.put(eventToStage);  
  •     // Event takenEvent = ch.take();  
  •     // ...  
  •     txn.commit();  
  •   } catch (Throwable t) {  
  •     txn.rollback();  
  •     
  •     // Log exception, handle individual exceptions as needed  
  •     
  •     // re-throw all Errors  
  •     if (t instanceof Error) {  
  •       throw (Error)t;  
  •     }  
  •   } finally {  
  •     txn.close();  
  •   }  
  

  上面的代码是一个很简单的Transaction示例,在自定义Source与自定义Sink中都要使用。
二、自定义Sink开发
  Sink提取event数据从channel中,然后直接将数据发送到下一个flume agent中或者存储到外部库中。
  Sink和channel的关联关系可以在配置文件中配置。有一个SinkRunner实例与每一个已配置的Sink关联,当Flume框架调用SinkRunner.start()方法时候,将创建一个新的线程来驱动这Sink。
  这个线程将管理这个Sink的生命周期。Sink需要实现LifecycleAware接口的start()和stop()方法。start()方法用于初始化数据;stop()用于释放资源;process()是从channel中提取event数据和转发数据的核心方法。
  这Sink需要实现Configurable接口以便操作配置文件。
  下面是官网例子:
  [java] view plain copy

  •   public class MySink extends AbstractSink implements Configurable {  
  •     private String myProp;  
  •     
  •     @Override  
  •     public void configure(Context context) {  
  •       String myProp = context.getString("myProp", "defaultValue");  
  •     
  •       // Process the myProp value (e.g. validation)  
  •     
  •       // Store myProp for later retrieval by process() method  
  •       this.myProp = myProp;  
  •     }  
  •     
  •     @Override  
  •     public void start() {  
  •       // Initialize the connection to the external repository (e.g. HDFS) that  
  •       // this Sink will forward Events to ..  
  •     }  
  •     
  •     @Override  
  •     public void stop () {  
  •       // Disconnect from the external respository and do any  
  •       // additional cleanup (e.g. releasing resources or nulling-out  
  •       // field values) ..  
  •     }  
  •     
  •     @Override  
  •     public Status process() throws EventDeliveryException {  
  •       Status status = null;  
  •     
  •       // Start transaction  
  •       Channel ch = getChannel();  
  •       Transaction txn = ch.getTransaction();  
  •       txn.begin();  
  •       try {  
  •         // This try clause includes whatever Channel operations you want to do  
  •     
  •         Event event = ch.take();  
  •     
  •         // Send the Event to the external repository.  
  •         // storeSomeData(e);  
  •     
  •         txn.commit();  
  •         status = Status.READY;  
  •       } catch (Throwable t) {  
  •         txn.rollback();  
  •     
  •         // Log exception, handle individual exceptions as needed  
  •     
  •         status = Status.BACKOFF;  
  •     
  •         // re-throw all Errors  
  •         if (t instanceof Error) {  
  •           throw (Error)t;  
  •         }  
  •       } finally {  
  •         txn.close();  
  •       }  
  •       return status;  
  •     }  
  •   }  
  下面是测试例子:

  [java] view plain copy

  •   import org.apache.flume.Channel;  
  •   import org.apache.flume.Context;  
  •   import org.apache.flume.Event;  
  •   import org.apache.flume.EventDeliveryException;  
  •   import org.apache.flume.Transaction;  
  •   import org.apache.flume.conf.Configurable;  
  •     
  •   import org.apache.flume.sink.AbstractSink;  
  •     
  •     
  •   public class Custom_Sink extends AbstractSink implements Configurable {  
  •         private String myProp;  
  •        @Override  
  •         public void configure(Context context) {  
  •           String myProp = context.getString("myProp", "defaultValue");  
  •     
  •           // Process the myProp value (e.g. validation)  
  •     
  •           // Store myProp for later retrieval by process() method  
  •           this.myProp = myProp;  
  •         }  
  •     
  •         @Override  
  •         public void start() {  
  •           // Initialize the connection to the external repository (e.g. HDFS) that  
  •           // this Sink will forward Events to ..  
  •         }  
  •     
  •         @Override  
  •         public void stop () {  
  •           // Disconnect from the external respository and do any  
  •           // additional cleanup (e.g. releasing resources or nulling-out  
  •           // field values) ..  
  •         }  
  •     
  •         @Override  
  •         public Status process() throws EventDeliveryException {  
  •           Status status = null;  
  •     
  •           // Start transaction  
  •           Channel ch = getChannel();  
  •           Transaction txn = ch.getTransaction();  
  •           txn.begin();  
  •           try {  
  •             // This try clause includes whatever Channel operations you want to do  
  •               
  •             Event event = ch.take();  
  •             String out = new String(event.getBody());   
  •             // Send the Event to the external repository.  
  •             // storeSomeData(e);  
  •             System.out.println(out);  
  •               
  •             txn.commit();  
  •             status = Status.READY;  
  •           } catch (Throwable t) {  
  •             txn.rollback();  
  •     
  •             // Log exception, handle individual exceptions as needed  
  •     
  •             status = Status.BACKOFF;  
  •     
  •             // re-throw all Errors  
  •             if (t instanceof Error) {  
  •               throw (Error)t;  
  •             }  
  •           } finally {  
  •             txn.close();  
  •           }  
  •           return status;  
  •         }  
  •     
  •   }  
  上面的测试例子只输出事件的BODY信息,这里说明下直接用代码event.getBody().tostring() 输出是乱码。因为所有sink都是在Transaction里完成的,因此自定义开发sink是需要加上Transaction相关设置。
  然后是测试配置,这里是自定义的jar 包是flumedev.Custom_Sink。注意,打包之后请放在目录$FLUME_HOME/lib下
  [html] view plain copy

  •   #配置文件:custom_sink_case23.conf  
  •   # Name the components on this agent  
  •   a1.sources = r1  
  •   a1.sinks = k1  
  •   a1.channels = c1  
  •     
  •   # Describe/configure the source  
  •   a1.sources.r1.type = syslogtcp  
  •   a1.sources.r1.port = 50000  
  •   a1.sources.r1.bind = 192.168.233.128  
  •   a1.sources.r1.channels = c1  
  •     
  •   # Describe the sink  
  •   a1.sinks.k1.channel = c1  
  •   a1.sinks.k1.type = flumedev.Custom_Sink  
  •   #a1.sinks.k1.type =logger  
  •     
  •   # Use a channel which buffers events in memory  
  •   a1.channels.c1.type = memory  
  •   a1.channels.c1.capacity = 1000  
  •   a1.channels.c1.transactionCapacity = 100  
  #敲命令
  flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console
  启动成功后
  打开另一个终端输入,往侦听端口送数据
  echo "testcustom_sink" | nc 192.168.233.128 50000
  #在启动的终端查看console输出


  可以看到数据正常输出。
  

三、自定义Source开发
  Source从外面接收数据并把数据存入Channel中。很少有人用。
  下面是官网的例子
  [java] view plain copy

  •   public class MySource extends AbstractSource implements Configurable, PollableSource {  
  •     private String myProp;  
  •     
  •     @Override  
  •     public void configure(Context context) {  
  •       String myProp = context.getString("myProp", "defaultValue");  
  •     
  •       // Process the myProp value (e.g. validation, convert to another type, ...)  
  •     
  •       // Store myProp for later retrieval by process() method  
  •       this.myProp = myProp;  
  •     }  
  •     
  •     @Override  
  •     public void start() {  
  •       // Initialize the connection to the external client  
  •     }  
  •     
  •     @Override  
  •     public void stop () {  
  •       // Disconnect from external client and do any additional cleanup  
  •       // (e.g. releasing resources or nulling-out field values) ..  
  •     }  
  •     
  •     @Override  
  •     public Status process() throws EventDeliveryException {  
  •       Status status = null;  
  •     
  •       // Start transaction  
  •       Channel ch = getChannel();  
  •       Transaction txn = ch.getTransaction();  
  •       txn.begin();  
  •       try {  
  •         // This try clause includes whatever Channel operations you want to do  
  •     
  •         // Receive new data  
  •         Event e = getSomeData();  
  •     
  •         // Store the Event into this Source's associated Channel(s)  
  •         getChannelProcessor().processEvent(e)  
  •     
  •         txn.commit();  
  •         status = Status.READY;  
  •       } catch (Throwable t) {  
  •         txn.rollback();  
  •     
  •         // Log exception, handle individual exceptions as needed  
  •     
  •         status = Status.BACKOFF;  
  •     
  •         // re-throw all Errors  
  •         if (t instanceof Error) {  
  •           throw (Error)t;  
  •         }  
  •       } finally {  
  •         txn.close();  
  •       }  
  •       return status;  
  •     }  
  •   }  
  

  测试的话,主要针对Event e 这里进行传输数据,这里就不测试了。
四、自定义Channel开发
  官网说待定。
  下面是美团网的自定义Channel 开发,下面是链接
  http://tech.meituan.com/mt-log-system-optimization.html
……

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

其具体的逻辑如下:

  [java] view plain copy

  •   /***
  •    * putToMemChannel indicate put event to memChannel or fileChannel
  •    * takeFromMemChannel indicate take event from memChannel or fileChannel
  •    * */  
  •   private AtomicBoolean putToMemChannel = new AtomicBoolean(true);  
  •   private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);  
  •     
  •   void doPut(Event event) {  
  •           if (switchon && putToMemChannel.get()) {  
  •                 //往memChannel中写数据  
  •                 memTransaction.put(event);  
  •     
  •                 if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {  
  •                   putToMemChannel.set(false);  
  •                 }  
  •           } else {  
  •                 //往fileChannel中写数据  
  •                 fileTransaction.put(event);  
  •           }  
  •     }  
  •     
  •   Event doTake() {  
  •       Event event = null;  
  •       if ( takeFromMemChannel.get() ) {  
  •           //从memChannel中取数据  
  •           event = memTransaction.take();  
  •           if (event == null) {  
  •               takeFromMemChannel.set(false);  
  •           }   
  •       } else {  
  •           //从fileChannel中取数据  
  •           event = fileTransaction.take();  
  •           if (event == null) {  
  •               takeFromMemChannel.set(true);  
  •     
  •               putToMemChannel.set(true);  
  •           }   
  •       }  
  •       return event;  
  •   }  
  这里要说明下,官网是建议使用file channel,虽然它的效率比较低,但是它能保证数据完整性,而memory channel效率高,但是只能对数据丢失和重复不太敏感的业务使用
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669453-1-1.html 上篇帖子: 利用Flume将MySQL表数据准实时抽取到HDFS 下篇帖子: 第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表