设为首页 收藏本站
查看: 1511|回复: 0

[经验分享] kafka发布订阅消息

[复制链接]

尚未签到

发表于 2017-5-23 16:46:39 | 显示全部楼层 |阅读模式
1. kafka原理

1-1 基本构成

DSC0000.png
 


1-2 工作原理




DSC0001.png
① 每个partition会创建3个备份replica,并分配到broker集群中; --replication-factor 3

② 用zookeeper来管理,consumer、producer、broker的活动状态;
③ 分配的每个备份replica的id和broker的id保持一致;
④ 对每个partition,会选择一个broker作为集群的leader; 
 

1-3 使用场景

① 站点用户活动追踪:页面浏览,搜索,点击;
② 用户/管理员网站操作的监控;
③ 日志处理;
 

2. kafka发布/订阅消息 



2-1 启动zookeeper

sh zk-jiqun-start.sh
 

2-2 启动kafka



sh jiqun-start.sh
 

  1 #!/bin/bash
2
3 ###
4 #  start 3 broker kafka cluster
5 ###
6
7 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_1.properties&
8 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_2.properties&
9 sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/bin/kafka-server-start.sh /Users/hyy044101331/java_tools/kafka_2.10-0.8.2.0/config/server_3.properties
 

2-3  添加maven依赖



<!-- *** kafka *** -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.3</version>
</dependency>
<!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version> </dependency> -->
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.1.2</version>
</dependency>
<!-- *************** -->
 

2-4 启动consumer

import com.mengka.kafka.KafkaConstant;
public class Taa {
public static void main(String[] args) throws Exception{
ConsumerGroup consumerGroup = new ConsumerGroup(KafkaConstant.KAFKA_ZOOKEEPER_CONNECT,"pv",KafkaConstant.KAFKA_TOPIC);
consumerGroup.consumer();
Thread.sleep(10000);
consumerGroup.shutdown();
}
}

 

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 配置consumer, <br>
* <br>
* 》》kafka提供的consumer API,高级api里面已经做了以下处理:
* <ul>
* <li>维护consumer状态;</li>
* <li>负载均衡;</li>
* </ul>
*
* @author mengka.hyy
*
*/
public class ConsumerGroup {
private static final Log log = LogFactory.getLog(ConsumerGroup.class);
private final ConsumerConnector consumer;
private final String topic;
public ConsumerGroup(String zk, String groupId, String topic) {
log.info("---------, init consumer zk = " + zk + " , groupId = "
+ groupId + " , topic = " + topic);
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig(zk, groupId));
this.topic = topic;
}
/**
*  消费消息,String类型
*
*/
public void consumer_String() {
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer
.createMessageStreams(topicMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()){
log.info("----------------, receive message = "+it.next().message());
}
}
/**
*  消费消息,byte类型
*
*/
public void consumer() {
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaStream<byte[], byte[]> stream = streams.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()){
String message = new String(it.next().message());
log.info("----------------, receive message = "+message);
}
}
/**
* consumer配置
* <ul>
* <li>
* "zookeeper.session.timeout.ms": <br>
* zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,
* 其他消费者要等该指定时间才能检查到并且触发重新负载均衡;</li>
* <li>
* "group.id":<br>
* 指定消费组;</li>
* <li>
* "zookeeper.sync.time.ms":<br>
* 当consumer reblance时,重试失败时时间间隔;</li>
* <li>
* "auto.commit.interval.ms":<br>
* 自动更新时间。默认60 * 1000;</li>
* </ul>
*
* @param a_zookeeper
* @param a_groupId
* @return
*/
private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); // 必须要加,如果要读旧数据
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "10000");
// props.put("zookeeper.connection.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.interval.ms", "1000");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
return config;
}
/**
* 释放资源
*/
public void shutdown() {
if (consumer != null)
consumer.shutdown();
}
}
 

public class KafkaConstant {
/**
* kafka集群配置:
*   --broker-list localhost:9093,localhost:9094,localhost:9095
*/
public static final String KAFKA_BROKER_LIST = "localhost:9093,localhost:9094,localhost:9095";
/**
*  topic名称
*   --topic mengka-broker-3
*/
public static final String KAFKA_TOPIC = "mengka-broker-3";
/**
* zookeeper配置:
*   --zookeeper localhost:2181
*/
public static final String KAFKA_ZOOKEEPER_CONNECT = "localhost:2181";
public static final String KAFKA_CONSUMER_ZOOKEEPER_CONNECT = "localhost:2181/config/mobile/mq/mafka";
}
 

2-5 启动producer

public class TaaProducer {
public static void main(String[] args) {
/**
* step01: 配置producer
*/
Properties props = new Properties();
props.put("metadata.broker.list", KafkaConstant.KAFKA_BROKER_LIST);
props.put("serializer.class", "kafka.serializer.StringEncoder");//配置value的序列化类
props.put("key.serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类
props.put("partitioner.class",
"com.mengka.kafka.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
/**
* step02: 发送kafka消息
*/
KeyedMessage<String, String> data1 = new KeyedMessage<String, String>(
KafkaConstant.KAFKA_TOPIC, "12.13.14.15", "baicai AAA.."
+ TimeUtil.toDate(new Date(), TimeUtil.format_1));
KeyedMessage<String, String> data2 = new KeyedMessage<String, String>(
KafkaConstant.KAFKA_TOPIC, "12.13.14.16", "baicai BBB.."
+ TimeUtil.toDate(new Date(), TimeUtil.format_1));
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(data1);
producer.send(data2);
/**
* step03: 释放资源
*/
producer.close();
}
}
public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {
}
@Override
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}
}
  

2-6 结果

DSC0002.png
 

 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379849-1-1.html 上篇帖子: 【Kafka七】使用Kafka Consumer Offset Monitor 进行Kakfa监控 下篇帖子: 001.Kafka三种部署模式
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表