Kafka学习笔记-Java简单操作
Maven依赖包:view plain copy
[*]<dependency>
[*] <groupId>org.apache.kafka</groupId>
[*] <artifactId>kafka-clients</artifactId>
[*] <version>0.8.2.1</version>
[*]</dependency>
[*]
[*]<dependency>
[*] <groupId>org.apache.kafka</groupId>
[*] <artifactId>kafka_2.11</artifactId>
[*] <version>0.8.2.1</version>
[*]</dependency>
代码如下:
view plain copy
[*]import java.util.Properties;
[*]
[*]import org.apache.kafka.clients.producer.Callback;
[*]import org.apache.kafka.clients.producer.KafkaProducer;
[*]import org.apache.kafka.clients.producer.ProducerRecord;
[*]import org.apache.kafka.clients.producer.RecordMetadata;
[*]import org.slf4j.Logger;
[*]import org.slf4j.LoggerFactory;
[*]
[*]public class KafkaProducerTest {
[*]
[*] private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);
[*]
[*] private static Properties properties = null;
[*]
[*] static {
[*] properties = new Properties();
[*] properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
[*] properties.put("producer.type", "sync");
[*] properties.put("request.required.acks", "1");
[*] properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
[*] properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
[*] properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
[*]// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
[*] properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
[*]// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
[*] }
[*]
[*] public void produce() {
[*] KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);
[*] ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(
[*] "test", "kkk".getBytes(), "vvv".getBytes());
[*] kafkaProducer.send(kafkaRecord, new Callback() {
[*] public void onCompletion(RecordMetadata metadata, Exception e) {
[*] if(null != e) {
[*] LOG.info("the offset of the send record is {}", metadata.offset());
[*] LOG.error(e.getMessage(), e);
[*] }
[*] LOG.info("complete!");
[*] }
[*] });
[*] kafkaProducer.close();
[*] }
[*]
[*] public static void main(String[] args) {
[*] KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();
[*] for (int i = 0; i < 10; i++) {
[*] kafkaProducerTest.produce();
[*] }
[*] }
[*]}
view plain copy
[*]import java.util.List;
[*]import java.util.Map;
[*]import java.util.Properties;
[*]
[*]import org.apache.kafka.clients.consumer.ConsumerConfig;
[*]import org.apache.kafka.clients.consumer.ConsumerRecord;
[*]import org.apache.kafka.clients.consumer.ConsumerRecords;
[*]import org.apache.kafka.clients.consumer.KafkaConsumer;
[*]import org.slf4j.Logger;
[*]import org.slf4j.LoggerFactory;
[*]
[*]public class KafkaConsumerTest {
[*]
[*] private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);
[*]
[*] public static void main(String[] args) {
[*] Properties properties = new Properties();
[*] properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
[*] "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
[*] properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
[*] properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");
[*] properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
[*] properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
[*]// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
[*] properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");
[*] properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
[*] "org.apache.kafka.common.serialization.ByteArrayDeserializer");
[*] properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
[*] "org.apache.kafka.common.serialization.ByteArrayDeserializer");
[*]
[*] KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);
[*] kafkaConsumer.subscribe("test");
[*]// kafkaConsumer.subscribe("*");
[*] boolean isRunning = true;
[*] while(isRunning) {
[*] Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);
[*] if (null != results) {
[*] for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {
[*] LOG.info("topic {}", entry.getKey());
[*] ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();
[*] List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();
[*] for (int i = 0, len = records.size(); i < len; i++) {
[*] ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);
[*] LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());
[*] try {
[*] LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));
[*] } catch (Exception e) {
[*] LOG.error(e.getMessage(), e);
[*] }
[*] }
[*] }
[*] }
[*] }
[*]
[*] kafkaConsumer.close();
[*]
[*] }
[*]
[*]}
发现KafkaConsumer的poll方法未实现
view plain copy
[*]@Override
[*]public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
[*] // TODO Auto-generated method stub
[*] return null;
[*]}
后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行
view plain copy
[*]import java.nio.ByteBuffer;
[*]import java.util.ArrayList;
[*]import java.util.Collections;
[*]import java.util.HashMap;
[*]import java.util.List;
[*]import java.util.Map;
[*]
[*]import kafka.api.FetchRequest;
[*]import kafka.api.FetchRequestBuilder;
[*]import kafka.api.PartitionOffsetRequestInfo;
[*]import kafka.cluster.Broker;
[*]import kafka.common.ErrorMapping;
[*]import kafka.common.TopicAndPartition;
[*]import kafka.javaapi.FetchResponse;
[*]import kafka.javaapi.OffsetRequest;
[*]import kafka.javaapi.OffsetResponse;
[*]import kafka.javaapi.PartitionMetadata;
[*]import kafka.javaapi.TopicMetadata;
[*]import kafka.javaapi.TopicMetadataRequest;
[*]import kafka.javaapi.TopicMetadataResponse;
[*]import kafka.javaapi.consumer.SimpleConsumer;
[*]import kafka.message.MessageAndOffset;
[*]
[*]public class KafkaSimpleConsumerTest {
[*]
[*] private List<String> borkerList = new ArrayList<String>();
[*]
[*] public KafkaSimpleConsumerTest() {
[*] borkerList = new ArrayList<String>();
[*] }
[*]
[*] public static void main(String args[]) {
[*] KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();
[*] // 最大读取消息数量
[*] long maxReadNum = Long.parseLong("3");
[*] // 订阅的topic
[*] String topic = "test";
[*] // 查找的分区
[*] int partition = Integer.parseInt("0");
[*] // broker节点
[*] List<String> seeds = new ArrayList<String>();
[*] seeds.add("centos.master");
[*] seeds.add("centos.slave1");
[*] seeds.add("centos.slave2");
[*] // 端口
[*] int port = Integer.parseInt("9092");
[*] try {
[*] kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);
[*] } catch (Exception e) {
[*] System.out.println("Oops:" + e);
[*] e.printStackTrace();
[*] }
[*] }
[*]
[*] public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {
[*] // 获取指定topic partition的元数据
[*] PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
[*] if (metadata == null) {
[*] System.out.println("can't find metadata for topic and partition. exit");
[*] return;
[*] }
[*] if (metadata.leader() == null) {
[*] System.out.println("can't find leader for topic and partition. exit");
[*] return;
[*] }
[*] String leadBroker = metadata.leader().host();
[*] String clientName = "client_" + topic + "_" + partition;
[*]
[*] SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
[*] long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
[*] int numErrors = 0;
[*] while (maxReadNum > 0) {
[*] if (consumer == null) {
[*] consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
[*] }
[*] FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();
[*] FetchResponse fetchResponse = consumer.fetch(req);
[*]
[*] if (fetchResponse.hasError()) {
[*] numErrors++;
[*] short code = fetchResponse.errorCode(topic, partition);
[*] System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);
[*] if (numErrors > 5)
[*] break;
[*] if (code == ErrorMapping.OffsetOutOfRangeCode()) {
[*] readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
[*] continue;
[*] }
[*] consumer.close();
[*] consumer = null;
[*] leadBroker = findNewLeader(leadBroker, topic, partition, port);
[*] continue;
[*] }
[*] numErrors = 0;
[*]
[*] long numRead = 0;
[*] for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
[*] long currentOffset = messageAndOffset.offset();
[*] if (currentOffset < readOffset) {
[*] System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);
[*] continue;
[*] }
[*]
[*] readOffset = messageAndOffset.nextOffset();
[*] ByteBuffer payload = messageAndOffset.message().payload();
[*]
[*] byte[] bytes = new byte;
[*] payload.get(bytes);
[*] System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
[*] numRead++;
[*] maxReadNum--;
[*] }
[*]
[*] if (numRead == 0) {
[*] try {
[*] Thread.sleep(1000);
[*] } catch (InterruptedException ie) {
[*] }
[*] }
[*] }
[*] if (consumer != null)
[*] consumer.close();
[*] }
[*]
[*] /**
[*] * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker
[*] * @param seedBrokers
[*] * @param port
[*] * @param topic
[*] * @param partition
[*] * @return
[*] */
[*] private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {
[*] PartitionMetadata partitionMetadata = null;
[*] loop: for (String seedBroker : seedBrokers) {
[*] SimpleConsumer consumer = null;
[*] try {
[*] consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");
[*] List<String> topics = Collections.singletonList(topic);
[*] TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);
[*] TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);
[*]
[*] List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
[*] for (TopicMetadata topicMetadata : topicMetadatas) {
[*] for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {
[*] if (pMetadata.partitionId() == partition) {
[*] partitionMetadata = pMetadata;
[*] break loop;
[*] }
[*] }
[*] }
[*] } catch (Exception e) {
[*] System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);
[*] } finally {
[*] if (consumer != null)
[*] consumer.close();
[*] }
[*] }
[*] if (partitionMetadata != null) {
[*] borkerList.clear();
[*] for (Broker replica : partitionMetadata.replicas()) {
[*] borkerList.add(replica.host());
[*] }
[*] }
[*] return partitionMetadata;
[*] }
[*]
[*] public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
[*] TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
[*] Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
[*] requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
[*] OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
[*] OffsetResponse response = consumer.getOffsetsBefore(request);
[*] if (response.hasError()) {
[*] System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));
[*] return 0;
[*] }
[*] long[] offsets = response.offsets(topic, partition);
[*] return offsets;
[*] }
[*]
[*] private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {
[*] for (int i = 0; i < 3; i++) {
[*] boolean goToSleep = false;
[*] PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);
[*] if (metadata == null) {
[*] goToSleep = true;
[*] } else if (metadata.leader() == null) {
[*] goToSleep = true;
[*] } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
[*] goToSleep = true;
[*] } else {
[*] return metadata.leader().host();
[*] }
[*] if (goToSleep) {
[*] try {
[*] Thread.sleep(1000);
[*] } catch (InterruptedException ie) {
[*] }
[*] }
[*] }
[*] System.out.println("unable to find new leader after broker failure. exit");
[*] throw new Exception("unable to find new leader after broker failure. exit");
[*] }
[*]
[*]}
[*]
页:
[1]