设为首页 收藏本站
查看: 932|回复: 0

[经验分享] kafka入门示例(Java)

[复制链接]

尚未签到

发表于 2017-5-23 17:13:06 | 显示全部楼层 |阅读模式
  上篇说到kafka在window环境下的搭建之后,这篇我们就开始尝试写一个简单的producer和consumer来测试了
  依次启动bin/windows下的zookeeper-start.bat和kafak-start.bat(这2个.bat是我自己为了方便启动而写的)。下面就开始测试了:kafak jar包版本:kafak_2.9.2-0.8.1.jar
  Produce端:

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducerTest {
String topic = "test";
public static void main(String[] args) {
Properties props = new Properties();
//props.put("zookeeper.connect", "10.16.0.200:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "async");//默认是sync
props.put("compression.codec", "1");
props.put("metadata.broker.list", "127.0.0.1:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, Object> producer = new Producer<String, Object>(config);
KeyedMessage<String, Object> message =
new KeyedMessage<String, Object>("test", "hello world");
producer.send(message);
}
  其中ProducerConfig是Producer端的属性配置类,更多属性可参见kakfa的jar包反编译后的kafka.producer.ProducerConfig类,该类的属性定义里面有很多Producer端的必须或可选属性。上述代码中的属性不过
  是必须要配置的属性而已。
  注意kafak的jar包里会有2个Producer类,我们必须引用的是kafka.javaapi.producer包下面的才行。
  KeyedMessage即表明消息。它的构造函数有以下几个:

public KeyedMessage(String topic, K key, Object partKey, V message) { Product.class.$init$(this);
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");  }
public KeyedMessage(String topic, V message) {
this(topic, null, null, message);
}
public KeyedMessage(String topic, K key, V message) { this(topic, key, key, message); }
  第一个参数是消息的topic,第二个参数是消息的内容
  metadata.broker.list为Producer端配置的用于指定元数据节点的属性,节点与节点之间用,隔开。关于集群式节点的配置这里不再祥述。
  Consumer端:

public class KafkaConsumerTest {  
public static void main(String[] args) {  
// specify some consumer properties  
Properties props = new Properties();  
props.put("zookeeper.connect", "127.0.0.1:2181");  
props.put("zookeeper.connectiontimeout.ms", "1000000");  
props.put("group.id", "test_group");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
// Create the connection to the cluster  
ConsumerConfig consumerConfig = new ConsumerConfig(props);  
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
// create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume  
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
topicCountMap.put("test", new Integer(4));  
//key--topic  
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
StringBuffer sb = new StringBuffer();
while(it.hasNext()) {
try {
String msg = new String(it.next().message(), "utf-8").trim();
System.out.println("receive:" + msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
  while(it.hasNext())循环里可以进一步改进成这样:

while (it.hasNext()) {
String msg= "";
byte[] packs= it.next().message();
InputStream is = new ByteArrayInputStream(bPack);
//如果阻塞了,也就是该流不可以被读取,那么ready()就返回false.
BufferedReader ioBuffRead = new BufferedReader(new InputStreamReader(is, charset));
while(ioBuffRead.ready()){
msg= ioBuffRead.readLine();
//接下来的代码略。。
}
}
  同样注意kafak的jar包里会有2个Consumer类,我们必须引用的是kafka.javaapi.consumer包下面的才行;而关于Consumer端具体的属性配置的也可在ConsumerConfig类的属性定义下面找到。
  Producer端的代码比较简单。我们需要好好理解的是Consumer端的代码——
  1)ConsumerConnector类是消费端根据消费配置的连接类
  2)topicCountMap为·topic的map,key为topic,value为该topic消息流的分区数
  3)consumerMap为消费端的map,key为topic,value为该topic对应的消息队列,表现为一个List集合,头元素即为该队列的头部元素,就是为什么consumerMap.get("test").get(0);
  的原因,该集合的大小是动态的,因为队列中有元素不停地进出;
  4)ConsumerIterator为该topic消息流的迭代器,用于迭代从而取出里面的消息。
  运行之后,会发现在Consumer端打印:hello world。你也可以在控制台运行命令查看该topic的消息。
  至此,kafka的一个入门级生产-消费测试就完毕了。
  在成功启动windows下的kafka或Linux版本的kafka之后,运行该简单的demo示例,你很可能会出现如下异常:
  java.nio.channels.UnresolvedAddressException
  at sun.nio.ch.Net.checkAddress(Net.java:29)
  at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
  at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
  at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
  at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
  解决这个异常也是比较诡异,多亏F哥的帮助。这是由于你缺少必要的地址的dns解析,修改C盘下的hosts文件,加上:
  服务器ip地址 服务器域名
  即可。其中很有可能你访问windows本地的kafka也会出现该异常,则加上127.0.0.1 localhost即可
  注:1个topic对应1个offset(消费内容的位置); 1个group.id可对应多个topic
  最后再次衷心的感谢F哥的帮助!!!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379882-1-1.html 上篇帖子: Kafka 中文文档 下篇帖子: kafka参数转
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表