bmwm3 发表于 2017-5-23 18:23:08

linkedin高吞吐量分布式消息系统kafka使用手记

  kafka使用例子导入eclipse:https://cwiki.apache.org/KAFKA/developer-setup.html

  kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
  通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
  设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。
  kakfa的consumer使用拉的方式工作。
  安装kafka
下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
  > tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。
  使用kafka

[*]importjava.util.Arrays;
[*]importjava.util.List;
[*]importjava.util.Properties;
[*]importkafka.javaapi.producer.SyncProducer;
[*]importkafka.javaapi.message.ByteBufferMessageSet;
[*]importkafka.message.Message;
[*]importkafka.producer.SyncProducerConfig;
[*]
[*]...
[*]
[*]Propertiesprops=newProperties();
[*]props.put(“zk.connect”,“127.0.0.1:2181”);
[*]props.put("serializer.class","kafka.serializer.StringEncoder");
[*]ProducerConfigconfig=newProducerConfig(props);
[*]Producer<String,String>producer=newProducer<String,String>(config);
[*]
[*]Sendasinglemessage
[*]
[*]//ThemessageissenttoarandomlyselectedpartitionregisteredinZK
[*]ProducerData<String,String>data=newProducerData<String,String>("test-topic","test-message");
[*]producer.send(data);
[*]
[*]producer.close();

  这样就是一个标准的producer。
  consumer的代码

[*]//specifysomeconsumerproperties
[*]Propertiesprops=newProperties();
[*]props.put("zk.connect","localhost:2181");
[*]props.put("zk.connectiontimeout.ms","1000000");
[*]props.put("groupid","test_group");
[*]
[*]//Createtheconnectiontothecluster
[*]ConsumerConfigconsumerConfig=newConsumerConfig(props);
[*]ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);
[*]
[*]//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsume
[*]Map<String,List<KafkaMessageStream<Message>>>topicMessageStreams=
[*]consumerConnector.createMessageStreams(ImmutableMap.of("test",4));
[*]List<KafkaMessageStream<Message>>streams=topicMessageStreams.get("test");
[*]
[*]//createlistof4threadstoconsumefromeachofthepartitions
[*]ExecutorServiceexecutor=Executors.newFixedThreadPool(4);
[*]
[*]//consumethemessagesinthethreads
[*]for(finalKafkaMessageStream<Message>stream:streams){
[*]executor.submit(newRunnable(){
[*]publicvoidrun(){
[*]for(Messagemessage:stream){
[*]//processmessage
[*]}
[*]}
[*]});
[*]}


原创文章如转载,请注明:转载自五四陈科学院
页: [1]
查看完整版本: linkedin高吞吐量分布式消息系统kafka使用手记