ywg 发表于 2019-1-31 09:34:57

flume写kafka topic覆盖问题fix

  结构:

  nginx-flume->kafka->flume->kafka(因为牵扯到跨机房问题,在两个kafka之间加了个flume,蛋疼。。)
  

  现象:
  在第二层,写入kafka的topic和读取的kafka的topic相同,手动设定的sink topic不生效
  

  打开debug日志:
  source实例化:
21 Apr 2015 19:24:03,146 INFO (org.apache.flume.source.DefaultSourceFactory.create:41) - Creating instance of source kafka1, type org.apache.flume.source.kafka.KafkaSource
21 Apr 2015 19:24:03,146 DEBUG (org.apache.flume.source.DefaultSourceFactory.getClass:61)- Source type org.apache.flume.source.kafka.KafkaSource is a custom type
21 Apr 2015 19:24:03,152 INFO (org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties:37)- context={ parameters:{topic=bigdata_api_ele_me_access, batchDurationMillis=5000, groupId=nginx, zookeeperConnect=xxx, channels=bigdata_api_ele_me_access-channel4, batchSize=2000, type=org.apache.flume.source.kafka.KafkaSource} }  sink实例化:
21 Apr 2015 19:24:03,185 INFO (org.apache.flume.sink.DefaultSinkFactory.create:42)- Creating instance of sink: web-sink2, type: org.apache.flume.sink.kafka.KafkaSink
21 Apr 2015 19:24:03,185 DEBUG (org.apache.flume.sink.DefaultSinkFactory.getClass:63)- Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type
21 Apr 2015 19:24:03,190 DEBUG (org.apache.flume.sink.kafka.KafkaSink.configure:220)- Using batch size: 2000
21 Apr 2015 19:24:03,190 INFO (org.apache.flume.sink.kafka.KafkaSink.configure:229)- Using the static topic: nginx-access this may be over-ridden by event headers
21 Apr 2015 19:24:03,191 INFO (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)- context={ parameters:{topic=nginx-access, brokerList=1xxx, requiredAcks=1, batchSize=2000, type=org.apache.flume.sink.kafka.KafkaSink, channel=bigdata_api_ele_me_access-channel4} }
21 Apr 2015 19:24:03,191 DEBUG (org.apache.flume.sink.kafka.KafkaSink.configure:236)- Kafka producer properties: {metadata.broker.list=192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder}  可以看到创建sink和source实例的时候配置上下文context中topic是按设置的来的,但是看到日志中有下面一段:
Using the static topic: nginx-access this may be over-ridden by event headers  分析KafkaSink源码:
  org.apache.flume.sink.kafka.KafkaSink.process方法中:
public static final String KEY_HDR = "key";
public static final String TOPIC_HDR = "topic";
...
      if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
      } //eventTopic的取值,会从header中获取,如果header中没有才会使用配置的topic
      ...
      eventKey = headers.get(KEY_HDR);
      ...
      KeyedMessage data = new KeyedMessage
          (eventTopic, eventKey, eventBody);
      messageList.add(data);  其中topic的取值在configure中:
    topic = context.getString(KafkaSinkConstants.TOPIC,
      KafkaSinkConstants.DEFAULT_TOPIC); //通过flume的配置获取topic,如果没有设置topic按默认default-flume-topic处理
    if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
      logger.warn("The Property 'topic' is not set. " +
      "Using the default topic name: " +
      KafkaSinkConstants.DEFAULT_TOPIC);
    } else {
      logger.info("Using the static topic: " + topic +
      " this may be over-ridden by event headers"); //这里提示可能会被header覆盖
    }  header的来源:
  1)kafka中的数据是没有header的概念的
  2)flume中的消息分header/body概念
  这种结构下,数据由kafkasource进入flume,添加了header信息,然后流入到kafkasink
  kafkasource中header的添加处理在org.apache.flume.source.kafka.KafkaSource.process方法中:
      if (iterStatus) {
          // get next message
          MessageAndMetadata messageAndMetadata = it.next();
          kafkaMessage = messageAndMetadata.message();
          kafkaKey = messageAndMetadata.key();
          // Add headers to event (topic, timestamp, and key)
          headers = new HashMap();
          headers.put(KafkaSourceConstants.TIMESTAMP,
                  String.valueOf(System.currentTimeMillis()));
          headers.put(KafkaSourceConstants.TOPIC, topic);  因为kafka中不需要header,注释掉org.apache.flume.sink.kafka.KafkaSink.process中这几段代码即可:
      /*
      if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
      }
      */
      eventTopic = topic; //增加这一段,否则会有npe错误


页: [1]
查看完整版本: flume写kafka topic覆盖问题fix