设为首页 收藏本站
查看: 1553|回复: 0

[经验分享] flume按照日志时间写hdfs实现

[复制链接]

尚未签到

发表于 2019-1-30 09:38:53 | 显示全部楼层 |阅读模式
  flume写hdfs的操作在HDFSEventSink.process方法中,路径创建由BucketPath完成
  分析其源码(参考:http://caiguangguang.blog.运维网.com/1652935/1619539)
  可以使用%{}变量替换的形式实现,只需要获取event中时间字段(nginx日志的local time)传入hdfs.path即可
  具体实现如下:
  1.在KafkaSource的process方法中增加:
          dt = KafkaSourceUtil.getDateMessage(new String(kafkaMessage));
          hour = KafkaSourceUtil.getHourMessage(new String(kafkaMessage));
          headers.put("eventdate",dt);
          headers.put("eventhour",hour);
          log.debug("source get one event header info");  增加两个头部,分别用来记录日志的day和hour
  2.KafkaSourceUtil中的方法
  因为我们的消息body是json的,因此用得了java的json-lib包,比如取消息的day:
  public static String getDateMessage(String message) {
    String dt = null;
    JSONObject json = JSONObject.fromObject(message);
    String[] splitMessage = json.getString("message").split("\t");
    String logTime = splitMessage[3].trim();
    log.debug("in getDateMessage logTime is: " + logTime);
    String format = "[dd/MMM/yyyy:HH:mm:ss Z]";
    SimpleDateFormat rawDateFormat = null;
    Date date = null;
    SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyyMMdd");
    rawDateFormat = new SimpleDateFormat(format,Locale.ENGLISH);
    try{
        date = rawDateFormat.parse(logTime);
        dt = dateFormat2.format(date);
        log.debug("in getDateMessage dt is: " + dt);
    }catch(Exception ex){
        dt = "empty";
    }
    return dt;
  }  2.hdfs.path设置头即可
agent-server4.sinks.hdfs-sink2.type = hdfs
agent-server4.sinks.hdfs-sink2.hdfs.path = hdfs://xxx:8020/data/flume/mobile-ubt-all/%{eventdate}/%{eventhour}  最终日志:
flume-server4.log.3:09 Apr 2015 15:18:49,966 DEBUG [hdfs-hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter$2.call:276)  - Rolling file (hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp): Roll scheduled after 60 sec elapsed.
flume-server4.log.3:09 Apr 2015 15:18:49,969 INFO  [hdfs-hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363)  - Closing hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp
flume-server4.log.3:09 Apr 2015 15:18:49,990 INFO  [hdfs-hdfs-sink1-call-runner-2] (org.apache.flume.sink.hdfs.BucketWriter$8.call:629)  - Renaming hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp to hdfs://192.168.101.6:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669451-1-1.html 上篇帖子: Flume NG 学习笔记(二)单机与集群Flume 配置 下篇帖子: 利用Flume将MySQL表数据准实时抽取到HDFS
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表