消息系统kafka介绍
http://dongxicheng.org/search-engine/kafka/http://shift-alt-ctrl.iteye.com/blog/1930791
kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
其中kafka为0.8V,zookeeper为3.4.5V
一.Zookeeper集群构建
我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.
1) zk-0
调整配置文件:
Php代码
[*]clientPort=2181
[*]server.0=127.0.0.1:2888:3888
[*]server.1=127.0.0.1:2889:3889
[*]server.2=127.0.0.1:2890:3890
[*]##只需要修改上述配置,其他配置保留默认值
启动zookeeper
Java代码
[*]./zkServer.sh start
2) zk-1
调整配置文件(其他配置和zk-0一只):
Php代码
[*]clientPort=2182
[*]##只需要修改上述配置,其他配置保留默认值
启动zookeeper
Java代码
[*]./zkServer.sh start
3) zk-2
调整配置文件(其他配置和zk-0一只):
Php代码
[*]clientPort=2183
[*]##只需要修改上述配置,其他配置保留默认值
启动zookeeper
Java代码
[*]./zkServer.sh start
二. Kafka集群构建
因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
1) kafka-0
在config目录下修改配置文件为:
Java代码
[*]broker.id=0
[*]port=9092
[*]num.network.threads=2
[*]num.io.threads=2
[*]socket.send.buffer.bytes=1048576
[*]socket.receive.buffer.bytes=1048576
[*]socket.request.max.bytes=104857600
[*]log.dir=./logs
[*]num.partitions=2
[*]log.flush.interval.messages=10000
[*]log.flush.interval.ms=1000
[*]log.retention.hours=168
[*]#log.retention.bytes=1073741824
[*]log.segment.bytes=536870912
[*]num.replica.fetchers=2
[*]log.cleanup.interval.mins=10
[*]zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
[*]zookeeper.connection.timeout.ms=1000000
[*]kafka.metrics.polling.interval.secs=5
[*]kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
[*]kafka.csv.metrics.dir=/tmp/kafka_metrics
[*]kafka.csv.metrics.reporter.enabled=false
因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
Java代码
[*]> cd kafka-0
[*]> ./sbt update
[*]> ./sbt package
[*]> ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
Java代码
[*]> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
2) kafka-1
Java代码
[*]broker.id=1
[*]port=9093
[*]##其他配置和kafka-0保持一致
然后和kafka-0一样执行打包命令,然后启动此broker.
Java代码
[*]> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
到目前为止环境已经OK了,那我们就开始展示编程实例吧。
三.项目准备
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
Java代码
[*]<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
[*] xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
[*] <modelVersion>4.0.0</modelVersion>
[*] <groupId>com.test</groupId>
[*] <artifactId>test-kafka</artifactId>
[*] <packaging>jar</packaging>
[*]
[*] <name>test-kafka</name>
[*] <url>http://maven.apache.org</url>
[*] <version>1.0.0</version>
[*] <dependencies>
[*] <dependency>
[*] <groupId>log4j</groupId>
[*] <artifactId>log4j</artifactId>
[*] <version>1.2.14</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>org.apache.kafka</groupId>
[*] <artifactId>kafka_2.8.0</artifactId>
[*] <version>0.8.0-beta1</version>
[*] <exclusions>
[*] <exclusion>
[*] <groupId>log4j</groupId>
[*] <artifactId>log4j</artifactId>
[*] </exclusion>
[*] </exclusions>
[*] </dependency>
[*] <dependency>
[*] <groupId>org.scala-lang</groupId>
[*] <artifactId>scala-library</artifactId>
[*] <version>2.8.1</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>com.yammer.metrics</groupId>
[*] <artifactId>metrics-core</artifactId>
[*] <version>2.2.0</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>com.101tec</groupId>
[*] <artifactId>zkclient</artifactId>
[*] <version>0.3</version>
[*] </dependency>
[*] </dependencies>
[*] <build>
[*] <finalName>test-kafka-1.0</finalName>
[*] <resources>
[*] <resource>
[*] <directory>src/main/resources</directory>
[*] <filtering>true</filtering>
[*] </resource>
[*] </resources>
[*] <plugins>
[*] <plugin>
[*] <artifactId>maven-compiler-plugin</artifactId>
[*] <version>2.3.2</version>
[*] <configuration>
[*] <source>1.5</source>
[*] <target>1.5</target>
[*] <encoding>gb2312</encoding>
[*] </configuration>
[*] </plugin>
[*] <plugin>
[*] <artifactId>maven-resources-plugin</artifactId>
[*] <version>2.2</version>
[*] <configuration>
[*] <encoding>gbk</encoding>
[*] </configuration>
[*] </plugin>
[*] </plugins>
[*] </build>
[*]</project>
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
Java代码
[*]#partitioner.class=
[*]metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
[*]##,127.0.0.1:9093
[*]producer.type=sync
[*]compression.codec=0
[*]serializer.class=kafka.serializer.StringEncoder
[*]##在producer.type=async时有效
[*]#batch.num.messages=100
2) LogProducer.java代码样例
Java代码
[*]package com.test.kafka;
[*]
[*]import java.util.ArrayList;
[*]import java.util.Collection;
[*]import java.util.List;
[*]import java.util.Properties;
[*]
[*]import kafka.javaapi.producer.Producer;
[*]import kafka.producer.KeyedMessage;
[*]import kafka.producer.ProducerConfig;
[*]public class LogProducer {
[*]
[*] private Producer<String,String> inner;
[*] public LogProducer() throws Exception{
[*] Properties properties = new Properties();
[*] properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
[*] ProducerConfig config = new ProducerConfig(properties);
[*] inner = new Producer<String, String>(config);
[*] }
[*]
[*]
[*] public void send(String topicName,String message) {
[*] if(topicName == null || message == null){
[*] return;
[*] }
[*] KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
[*] inner.send(km);
[*] }
[*]
[*] public void send(String topicName,Collection<String> messages) {
[*] if(topicName == null || messages == null){
[*] return;
[*] }
[*] if(messages.isEmpty()){
[*] return;
[*] }
[*] List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
[*] for(String entry : messages){
[*] KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
[*] kms.add(km);
[*] }
[*] inner.send(kms);
[*] }
[*]
[*] public void close(){
[*] inner.close();
[*] }
[*]
[*] /**
[*] * @param args
[*] */
[*] public static void main(String[] args) {
[*] LogProducer producer = null;
[*] try{
[*] producer = new LogProducer();
[*] int i=0;
[*] while(true){
[*] producer.send("test-topic", "this is a sample" + i);
[*] i++;
[*] Thread.sleep(2000);
[*] }
[*] }catch(Exception e){
[*] e.printStackTrace();
[*] }finally{
[*] if(producer != null){
[*] producer.close();
[*] }
[*] }
[*]
[*] }
[*]
[*]}
五.Consumer端
1) consumer.properties:文件位于/resources目录下
Java代码
[*]zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
[*]##,127.0.0.1:2182,127.0.0.1:2183
[*]# timeout in ms for connecting to zookeeper
[*]zookeeper.connectiontimeout.ms=1000000
[*]#consumer group id
[*]group.id=test-group
[*]#consumer timeout
[*]#consumer.timeout.ms=5000
[*]auto.commit.enable=true
[*]auto.commit.interval.ms=60000
2) LogConsumer.java代码样例
Java代码
[*]package com.test.kafka;
[*]
[*]import java.util.HashMap;
[*]import java.util.List;
[*]import java.util.Map;
[*]import java.util.Properties;
[*]import java.util.concurrent.ExecutorService;
[*]import java.util.concurrent.Executors;
[*]
[*]import kafka.consumer.Consumer;
[*]import kafka.consumer.ConsumerConfig;
[*]import kafka.consumer.ConsumerIterator;
[*]import kafka.consumer.KafkaStream;
[*]import kafka.javaapi.consumer.ConsumerConnector;
[*]import kafka.message.MessageAndMetadata;
[*]public class LogConsumer {
[*]
[*] private ConsumerConfig config;
[*] private String topic;
[*] private int partitionsNum;
[*] private MessageExecutor executor;
[*] private ConsumerConnector connector;
[*] private ExecutorService threadPool;
[*] public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
[*] Properties properties = new Properties();
[*] properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
[*] config = new ConsumerConfig(properties);
[*] this.topic = topic;
[*] this.partitionsNum = partitionsNum;
[*] this.executor = executor;
[*] }
[*]
[*] public void start() throws Exception{
[*] connector = Consumer.createJavaConsumerConnector(config);
[*] Map<String,Integer> topics = new HashMap<String,Integer>();
[*] topics.put(topic, partitionsNum);
[*] Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
[*] List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
[*] threadPool = Executors.newFixedThreadPool(partitionsNum);
[*] for(KafkaStream<byte[], byte[]> partition : partitions){
[*] threadPool.execute(new MessageRunner(partition));
[*] }
[*] }
[*]
[*]
[*] public void close(){
[*] try{
[*] threadPool.shutdownNow();
[*] }catch(Exception e){
[*] //
[*] }finally{
[*] connector.shutdown();
[*] }
[*]
[*] }
[*]
[*] class MessageRunner implements Runnable{
[*] private KafkaStream<byte[], byte[]> partition;
[*]
[*] MessageRunner(KafkaStream<byte[], byte[]> partition) {
[*] this.partition = partition;
[*] }
[*]
[*] public void run(){
[*] ConsumerIterator<byte[], byte[]> it = partition.iterator();
[*] while(it.hasNext()){
[*] //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
[*] MessageAndMetadata<byte[],byte[]> item = it.next();
[*] System.out.println("partiton:" + item.partition());
[*] System.out.println("offset:" + item.offset());
[*] executor.execute(new String(item.message()));//UTF-8,注意异常
[*] }
[*] }
[*] }
[*]
[*] interface MessageExecutor {
[*]
[*] public void execute(String message);
[*] }
[*]
[*] /**
[*] * @param args
[*] */
[*] public static void main(String[] args) {
[*] LogConsumer consumer = null;
[*] try{
[*] MessageExecutor executor = new MessageExecutor() {
[*]
[*] public void execute(String message) {
[*] System.out.println(message);
[*]
[*] }
[*] };
[*] consumer = new LogConsumer("test-topic", 2, executor);
[*] consumer.start();
[*] }catch(Exception e){
[*] e.printStackTrace();
[*] }finally{
[*]// if(consumer != null){
[*]// consumer.close();
[*]// }
[*] }
[*]
[*] }
[*]
[*]}
需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。
[*]test-kafka.zip (14.7 KB)
[*]下载次数: 22
页:
[1]