FlumeNG与Kafka整合
1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
#producer.sources.s.type = seq
producer.sources.s.type = netcat
producer.sources.s.bind = localhost
producer.sources.s.port = 44444
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
2,配置consumer,source是Kafka,sink是logger
consumer.sources = s
consumer.channels = c
consumer.sinks = r
consumer.sources.s.type = seq
consumer.sources.s.channels = c
consumer.sinks.r.type = logger
consumer.sinks.r.channel = c
consumer.channels.c.type = memory
consumer.channels.c.capacity = 100
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource
consumer.sources.s.zookeeper.connect=127.0.0.1:2181
consumer.sources.s.group.id=testGroup
consumer.sources.s.zookeeper.session.timeout.ms=400
consumer.sources.s.zookeeper.sync.time.ms=200
consumer.sources.s.auto.commit.interval.ms=1000
consumer.sources.s.custom.topic.name=test
consumer.sources.s.custom.thread.per.consumer=4
3,分别运行着两个agent
bin/flume-ng agent --conf conf --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/comsumer1.properties --name consumer -Dflume.root.logger=INFO,console
4,这时telnet上端口44444
hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1111111111111111
OK
kak^Hfkakakkakakakkakkakkaakaknnnm
OK
abcdefghijklmnopqrstuvwxyz
OK
两个agent都有信息输出了
org.apache.flume.plugins的代码参考 :https://github.com/baniuyao/flume-kafka上面也有详细的使用方法
页:
[1]