|
这里以按自定义头部的配置为例(根据某些业务不同写入不同的主目录)
配置:
source:
interceptors = i1
interceptors.i1.type = regex_extractor
interceptors.i1.regex = /apps/logs/(.*?)/
interceptors.i1.serializers = s1
interceptors.i1.serializers.s1.name = logtypename sink:
hdfs.path = hdfs://xxxxxx/%{logtypename}/%Y%m%d/%H
hdfs.round = true
hdfs.roundValue = 30
hdfs.roundUnit = minute
hdfs.filePrefix = xxxxx1- 在source中定义了regex_extractor 类型的interceptor,使用org.apache.flume.interceptor.RegexExtractorInterceptor类构建interceptor对象,这个interceptor可以根据一个正则表达式提取字符串,并使用serializers把字符串作为header的值,这header可以在sink中获取对应的值做进一步的操作.
比如写hdfs的sink HDFSEventSink的process方法中
// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit , roundValue , useLocalTime );
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit , roundValue , useLocalTime ); 几个参数项:
useLocalTime 是hdfs.useLocalTimeStamp的设置,默认是false
filePath为hdfs.path的设置,不能为空
fileName为hdfs.filePrefix的设置,默认为FlumeData
rounding(取近似值)的设置相关:
needRounding = context.getBoolean( "hdfs.round", false );
//hdfs.round的设置,默认为false
if(needRounding) {
String unit = context.getString( "hdfs.roundUnit", "second" );
//hdfs.roundUnit,默认为second
if (unit.equalsIgnoreCase( "hour")) {
this.roundUnit = Calendar.HOUR_OF_DAY;
} else if (unit.equalsIgnoreCase("minute" )) {
this.roundUnit = Calendar.MINUTE;
} else if (unit.equalsIgnoreCase("second" )){
this.roundUnit = Calendar.SECOND;
} else {
LOG.warn("Rounding unit is not valid, please set one of" +
"minute, hour, or second. Rounding will be disabled" );
needRounding = false ;
}
this.roundValue = context.getInteger("hdfs.roundValue" , 1);
//hdfs.roundValue值的设置,默认为1
if(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){
//下面个为检测roundValue的值是否设置合理
Preconditions.checkArgument(roundValue > 0 && roundValue 0 && roundValue |
|
|