设为首页 收藏本站
查看: 1416|回复: 0

[经验分享] flume源码学习9

[复制链接]

尚未签到

发表于 2019-1-30 09:31:10 | 显示全部楼层 |阅读模式
  HDFSEventSink用于把数据从channel中拿出来(主动pull的形式)然后放到hdfs中,HDFSEventSink在启动时会启动两个线程池callTimeoutPool 和timedRollerPool ,callTimeoutPool 用于运行append/flush等操作hdfs的task(通过callWithTimeout方法调用,并实现timeout功能),用于运行翻转文件的计划任务timedRollerPool:

    callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
            new ThreadFactoryBuilder().setNameFormat(rollerName).build());  channel到sink的操作最终调用了sink的process方法(由SinkProcessor实现类调用),比如HDFSEventSink的process方法,每个process方法中都是一个事务,用来提供原子性操作,process方法调用Channel的take方法从Channel中取出Event,每个transaction中最多的Event数量由hdfs.batchSize设定,默认是100,对每一个Event有如下操作:
1.获取文件的完整路径和名称lookupPath
2.声明一个BucketWriter对象和HDFSWriter 对象,HDFSWriter由hdfs.fileType设定,负责实际数据的写入,BucketWriter可以理解成对hdfs文件和写入方法的封装,每个lookupPath对应一个BucketWriter对象,对应关系写入到sfWriters中(这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类(private static class WriterLinkedHashMap  extends LinkedHashMap),用来存放文件到BucketWriter的对应关系,在start方法中初始化:
this.sfWriters = new WriterLinkedHashMap( maxOpenFiles);
长度为hdfs.maxOpenFiles的设置,默认为5000,这个代表最多能打开的文件数量)
3.调用BucketWriter的append方法写入数据
4.当操作的Event数量达到hdfs.batchSize设定后,循环调用每个BucketWriter对象的flush方法,并提交transaction
5.如果出现异常则回滚事务
6.最后关闭transaction
process方法最后返回的是代表Sink状态的Status对象(BACKOFF或者READY),这个可以用于判断Sink的健康状态,比如failover的SinkProcessor就根据这个来判断Sink是否可以提供服务
主要方法分析:
1.构造函数声明一个HDFSWriterFactory对象
在后面会使用HDFSWriterFactory的getWriter方法会根据file类型返回对应的HDFSWriter实现类
2.configure
1)通过configure方法会根据Context设置各种参数项
比如:

inUseSuffix = context.getString( "hdfs.inUseSuffix", defaultInUseSuffix ); //正在写入的文件的后缀名,默认为".tmp"
rollInterval = context.getLong( "hdfs.rollInterval", defaultRollInterval ); //文件翻转时间,默认30
rollSize = context.getLong( "hdfs.rollSize", defaultRollSize ); //文件翻转大小,默认1024
rollCount = context.getLong( "hdfs.rollCount", defaultRollCount ); //默认为10
batchSize = context.getLong( "hdfs.batchSize", defaultBatchSize ); //默认为100
idleTimeout = context.getInteger( "hdfs.idleTimeout", 0); //默认为
String codecName = context.getString( "hdfs.codeC"); //压缩格式
fileType = context.getString( "hdfs.fileType", defaultFileType ); //默认为HDFSWriterFactory.SequenceFileType,即sequencefile
maxOpenFiles = context.getInteger( "hdfs.maxOpenFiles", defaultMaxOpenFiles ); //默认为5000
callTimeout = context.getLong( "hdfs.callTimeout", defaultCallTimeout ); //BucketWriter超时时间,默认为10000
threadsPoolSize = context.getInteger( "hdfs.threadsPoolSize",
        defaultThreadPoolSize); //操作append/open/close/flush任务的线程池大小,默认为10
rollTimerPoolSize = context.getInteger( "hdfs.rollTimerPoolSize",
        defaultRollTimerPoolSize); //文件翻转计时器线程池大小,默认为1
tryCount = context.getInteger( "hdfs.closeTries", defaultTryCount ); //尝试close文件的此数(大于0)
retryInterval = context.getLong( "hdfs.retryInterval", defaultRetryInterval); //间隔时间(大于0)  2)获取压缩格式

    if (codecName == null) { //如果hdfs.codeC没有设置
      codeC = null; //则没有压缩功能
      compType = CompressionType. NONE;
    } else {
      codeC = getCodec(codecName);  //调用getCodec方法获取压缩格式
      // TODO : set proper compression type
      compType = CompressionType. BLOCK; //压缩类型为BLOCK类型
    }  3)hdfs文件翻转相关设置,在实例化BucketWriter对象时会用到
   needRounding = context.getBoolean( "hdfs.round", false );
    if(needRounding) {
      String unit = context.getString( "hdfs.roundUnit", "second" );
      if (unit.equalsIgnoreCase( "hour")) {
        this.roundUnit = Calendar.HOUR_OF_DAY;
      } else if (unit.equalsIgnoreCase("minute" )) {
        this.roundUnit = Calendar.MINUTE;
      } else if (unit.equalsIgnoreCase("second" )){
        this.roundUnit = Calendar.SECOND;
      } else {
        LOG.warn("Rounding unit is not valid, please set one of" +
            "minute, hour, or second. Rounding will be disabled" );
        needRounding = false ;
      }
      this. roundValue = context.getInteger("hdfs.roundValue" , 1);
      if(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){
        Preconditions. checkArgument(roundValue > 0 && roundValue  0 && roundValue

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669445-1-1.html 上篇帖子: 日志收集框架 Flume 简介 下篇帖子: 日志抽取框架 flume 简介与安装配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表