设为首页 收藏本站
查看: 596|回复: 0

[经验分享] Kafka Consumer接口

[复制链接]

尚未签到

发表于 2017-5-23 16:42:55 | 显示全部楼层 |阅读模式
对于kafka的consumer接口,提供两种版本,

high-level

一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset
参考, https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example



不过要注意一些注意事项,对于多个partition和多个consumer
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的



import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample {private final ConsumerConnector consumer;
private final String topic;
private  ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer
= kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}
public void run(int a_numThreads) { // 创建并发的consumers
Map
<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic,
new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map
<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
List
<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream
// now launch all the threads//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(
new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber
++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props
= new Properties();
props.put(
"zookeeper.connect", a_zookeeper);
props.put(
"group.id", a_groupId);
props.put(
"zookeeper.session.timeout.ms", "400");
props.put(
"zookeeper.sync.time.ms", "200");
props.put(
"auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper
= args[0];
String groupId
= args[1];
String topic
= args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example
= new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(
10000);
}
catch (InterruptedException ie) {
}
example.shutdown();
}
}

SimpleConsumer

另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考, https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
什么时候用这个接口?


  • Read a message multiple times
  • Consume only a subset of the partitions in a topic in a process
  • Manage transactions to make sure a message is processed once and only once

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用




  • You must keep track of the offsets in your application to know where you left off consuming.
  • You must figure out which Broker is the lead Broker for a topic and partition
  • You must handle Broker leader changes

使用SimpleConsumer的步骤:


  • Find an active Broker and find out which Broker is the leader for your topic and partition
  • Determine who the replica Brokers are for your topic and partition
  • Build the request defining what data you are interested in
  • Fetch the data
  • Identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变



逐步来看,
Finding the Lead Broker for a Topic and Partition

思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回
根据返回的PartitionMetadata.leader().host()找到leader broker



private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData
= null;
loop:
for (String seed : a_seedBrokers) { //遍历每个broker
SimpleConsumer consumer
= null;
try {
//创建Simple Consumer,
//class SimpleConsumer(val host: String,val port: Int,val soTimeout: Int
// ,val bufferSize: Int,val clientId: String)
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List
<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req
= new TopicMetadataRequest(topics); //
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //发送TopicMetadata Request请求
List
<TopicMetadata> metaData = resp.topicsMetadata(); //取到Topic的Metadata
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {//遍历每个partition的metadata
if (part.partitionId() == a_partition) { //确认是否是我们要找的partition
returnMetaData = part;
break loop; //找到就返回
}
}
}
}
catch (Exception e) {
System.out.println(
"Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
+ ", " + a_partition + "] Reason: " + e);
}
finally {
if (consumer != null) consumer.close();
}
}
return returnMetaData;
}

Finding Starting Offset for Reads
request主要的信息就是 Map <TopicAndPartition, PartitionOffsetRequestInfo>

TopicAndPartition就是对topic和partition信息的封装
PartitionOffsetRequestInfo的定义
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
其中参数time,表示where to start reading data,两个取值
kafka.api.OffsetRequest.EarliestTime(),the beginning of the data in the logs
kafka.api.OffsetRequest.LatestTime(),will only stream new messages



不要认为起始的offset一定是0,因为messages会过期,被删除
另外一个参数不清楚什么含义,代码中取的是1

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition
= new TopicAndPartition(topic, partition);
Map
<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,
new PartitionOffsetRequestInfo(whichTime, 1)); //build offset fetch request info
kafka.javaapi.OffsetRequest request
= new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response
= consumer.getOffsetsBefore(request); //取到offsets

if (response.hasError()) {
System.out.println(
"Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition); //取到的一组offset
        return offsets[0]; //取第一个开始读
}

Reading the Data

首先在FetchRequest上加上Fetch,指明topic,partition,开始的offset,读取的大小
如果producer在写入很大的message时,也许这里指定的1000000是不够的,会返回an empty message set,这时需要增加这个值,直到得到一个非空的message set。



// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset,
100000) // 1000000bytes
.build();
FetchResponse fetchResponse
= consumer.fetch(req);
if (fetchResponse.hasError()) {
// See Error Handling
}
numErrors
= 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) { // 必要判断,因为对于compressed message,会返回整个block,所以可能包含old的message
System.out.println(
"Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset
= messageAndOffset.nextOffset(); // 获取下一个readOffset
ByteBuffer payload
= messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
numRead
++;
}
if (numRead == 0) {
try {
Thread.sleep(
1000);
}
catch (InterruptedException ie) {
}
}

Error Handling

if (fetchResponse.hasError()) {
numErrors
++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println(
"Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode())  { // 处理offset非法的问题,用最新的offset
// We asked for an invalid offset. For simple case ask for the last element to reset
readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer
= null;
leadBroker
= findNewLeader(leadBroker, a_topic, a_partition, a_port); // 更新leader broker
continue;
}

没有特别的逻辑,只是重新调用findLeader获取leader broker
并且防止在切换过程中,取不到leader信息,加上sleep逻辑



private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata
= findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep
= true;
}
else if (metadata.leader() == null) {
goToSleep
= true;
}
else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//
   goToSleep = true;
}
else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(
1000);
}
catch (InterruptedException ie) {
}
}
}
System.out.println(
"Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}

Full Source Code

package com.test.simple;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SimpleExample {public static void main(String args[]) {
SimpleExample example
= new SimpleExample();
long maxReads = Long.parseLong(args[0]);
String topic
= args[1];
int partition = Integer.parseInt(args[2]);
List
<String> seeds = new ArrayList<String>();
seeds.add(args[
3]);
int port = Integer.parseInt(args[4]);
try {
example.run(maxReads, topic, partition, seeds, port);
}
catch (Exception e) {
System.out.println(
"Oops:" + e);
e.printStackTrace();
}
}
private List<String> m_replicaBrokers = new ArrayList<String>();
public SimpleExample() {
m_replicaBrokers
= new ArrayList<String>();
}
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in//
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
System.out.println(
"Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
System.out.println(
"Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker
= metadata.leader().host();
String clientName
= "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer
= new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer
= new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequest req
= new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset,
100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
FetchResponse fetchResponse
= consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors
++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println(
"Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode())  {
// We asked for an invalid offset. For simple case ask for the last element to reset
readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer
= null;
leadBroker
= findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors
= 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println(
"Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset
= messageAndOffset.nextOffset();
ByteBuffer payload
= messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
numRead
++;
a_maxReads
--;
}
if (numRead == 0) {
try {
Thread.sleep(
1000);
}
catch (InterruptedException ie) {
}
}
}
if (consumer != null) consumer.close();
}
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition
= new TopicAndPartition(topic, partition);
Map
<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,
new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request
= new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response
= consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println(
"Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata
= findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep
= true;
}
else if (metadata.leader() == null) {
goToSleep
= true;
}
else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//
goToSleep = true;
}
else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(
1000);
}
catch (InterruptedException ie) {
}
}
}
System.out.println(
"Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData
= null;
loop:
for (String seed : a_seedBrokers) {
SimpleConsumer consumer
= null;
try {
consumer
= new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List
<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req
= new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp
= consumer.send(req);
List
<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData
= part;
break loop;
}
}
}
}
catch (Exception e) {
System.out.println(
"Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
+ ", " + a_partition + "] Reason: " + e);
}
finally {
if (consumer != null) consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379845-1-1.html 上篇帖子: 闲扯kafka mq 下篇帖子: 002.Kafka的Java访问
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表