wfkjxy 发表于 2017-6-2 08:29:55

Kafka学习笔记-Java简单操作

  Maven依赖包:



view plain copy

[*]<dependency>
[*]      <groupId>org.apache.kafka</groupId>
[*]      <artifactId>kafka-clients</artifactId>
[*]      <version>0.8.2.1</version>
[*]</dependency>
[*]
[*]<dependency>
[*]    <groupId>org.apache.kafka</groupId>
[*]    <artifactId>kafka_2.11</artifactId>
[*]    <version>0.8.2.1</version>
[*]</dependency>
  
代码如下:



view plain copy

[*]import java.util.Properties;
[*]
[*]import org.apache.kafka.clients.producer.Callback;
[*]import org.apache.kafka.clients.producer.KafkaProducer;
[*]import org.apache.kafka.clients.producer.ProducerRecord;
[*]import org.apache.kafka.clients.producer.RecordMetadata;
[*]import org.slf4j.Logger;
[*]import org.slf4j.LoggerFactory;
[*]
[*]public class KafkaProducerTest {
[*]
[*]    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);
[*]
[*]    private static Properties properties = null;
[*]
[*]    static {
[*]      properties = new Properties();
[*]      properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
[*]      properties.put("producer.type", "sync");
[*]      properties.put("request.required.acks", "1");
[*]      properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
[*]      properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
[*]      properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
[*]//      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
[*]      properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
[*]//      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
[*]    }
[*]
[*]    public void produce() {
[*]      KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);
[*]      ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(
[*]                "test", "kkk".getBytes(), "vvv".getBytes());
[*]      kafkaProducer.send(kafkaRecord, new Callback() {
[*]            public void onCompletion(RecordMetadata metadata, Exception e) {
[*]                if(null != e) {
[*]                  LOG.info("the offset of the send record is {}", metadata.offset());
[*]                  LOG.error(e.getMessage(), e);
[*]                }
[*]                LOG.info("complete!");
[*]            }
[*]      });
[*]      kafkaProducer.close();
[*]    }
[*]
[*]    public static void main(String[] args) {
[*]      KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();
[*]      for (int i = 0; i < 10; i++) {
[*]            kafkaProducerTest.produce();
[*]      }
[*]    }
[*]}



view plain copy

[*]import java.util.List;
[*]import java.util.Map;
[*]import java.util.Properties;
[*]
[*]import org.apache.kafka.clients.consumer.ConsumerConfig;
[*]import org.apache.kafka.clients.consumer.ConsumerRecord;
[*]import org.apache.kafka.clients.consumer.ConsumerRecords;
[*]import org.apache.kafka.clients.consumer.KafkaConsumer;
[*]import org.slf4j.Logger;
[*]import org.slf4j.LoggerFactory;
[*]
[*]public class KafkaConsumerTest {
[*]
[*]    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);
[*]
[*]    public static void main(String[] args) {
[*]      Properties properties = new Properties();
[*]      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
[*]                "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
[*]      properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");            
[*]      properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");            
[*]      properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
[*]      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
[*]//      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
[*]      properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");   
[*]      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
[*]                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
[*]      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
[*]                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
[*]
[*]      KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);
[*]      kafkaConsumer.subscribe("test");
[*]//      kafkaConsumer.subscribe("*");
[*]      boolean isRunning = true;            
[*]      while(isRunning) {
[*]            Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);
[*]            if (null != results) {
[*]                for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {
[*]                  LOG.info("topic {}", entry.getKey());
[*]                  ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();
[*]                  List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();
[*]                  for (int i = 0, len = records.size(); i < len; i++) {
[*]                        ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);
[*]                        LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());
[*]                        try {
[*]                            LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));
[*]                        } catch (Exception e) {
[*]                            LOG.error(e.getMessage(), e);
[*]                        }
[*]                  }
[*]                }
[*]            }
[*]      }
[*]
[*]      kafkaConsumer.close();
[*]
[*]    }
[*]
[*]}
  发现KafkaConsumer的poll方法未实现



view plain copy

[*]@Override
[*]public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
[*]   // TODO Auto-generated method stub
[*]   return null;
[*]}
  
后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行



view plain copy

[*]import java.nio.ByteBuffer;
[*]import java.util.ArrayList;
[*]import java.util.Collections;
[*]import java.util.HashMap;
[*]import java.util.List;
[*]import java.util.Map;
[*]
[*]import kafka.api.FetchRequest;
[*]import kafka.api.FetchRequestBuilder;
[*]import kafka.api.PartitionOffsetRequestInfo;
[*]import kafka.cluster.Broker;
[*]import kafka.common.ErrorMapping;
[*]import kafka.common.TopicAndPartition;
[*]import kafka.javaapi.FetchResponse;
[*]import kafka.javaapi.OffsetRequest;
[*]import kafka.javaapi.OffsetResponse;
[*]import kafka.javaapi.PartitionMetadata;
[*]import kafka.javaapi.TopicMetadata;
[*]import kafka.javaapi.TopicMetadataRequest;
[*]import kafka.javaapi.TopicMetadataResponse;
[*]import kafka.javaapi.consumer.SimpleConsumer;
[*]import kafka.message.MessageAndOffset;
[*]
[*]public class KafkaSimpleConsumerTest {
[*]
[*]    private List<String> borkerList = new ArrayList<String>();   
[*]
[*]    public KafkaSimpleConsumerTest() {   
[*]      borkerList = new ArrayList<String>();   
[*]    }
[*]
[*]    public static void main(String args[]) {   
[*]      KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();   
[*]      // 最大读取消息数量   
[*]      long maxReadNum = Long.parseLong("3");   
[*]      // 订阅的topic   
[*]      String topic = "test";   
[*]      // 查找的分区   
[*]      int partition = Integer.parseInt("0");   
[*]      // broker节点
[*]      List<String> seeds = new ArrayList<String>();   
[*]      seeds.add("centos.master");   
[*]      seeds.add("centos.slave1");   
[*]      seeds.add("centos.slave2");   
[*]      // 端口   
[*]      int port = Integer.parseInt("9092");   
[*]      try {   
[*]            kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);
[*]      } catch (Exception e) {   
[*]            System.out.println("Oops:" + e);   
[*]            e.printStackTrace();
[*]      }
[*]    }
[*]
[*]    public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {   
[*]      // 获取指定topic partition的元数据   
[*]      PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
[*]      if (metadata == null) {   
[*]            System.out.println("can't find metadata for topic and partition. exit");   
[*]            return;   
[*]      }
[*]      if (metadata.leader() == null) {   
[*]            System.out.println("can't find leader for topic and partition. exit");   
[*]            return;   
[*]      }
[*]      String leadBroker = metadata.leader().host();
[*]      String clientName = "client_" + topic + "_" + partition;   
[*]
[*]      SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);   
[*]      long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);   
[*]      int numErrors = 0;   
[*]      while (maxReadNum > 0) {   
[*]            if (consumer == null) {   
[*]                consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);   
[*]            }
[*]            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();   
[*]            FetchResponse fetchResponse = consumer.fetch(req);
[*]
[*]            if (fetchResponse.hasError()) {   
[*]                numErrors++;
[*]                short code = fetchResponse.errorCode(topic, partition);   
[*]                System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);   
[*]                if (numErrors > 5)   
[*]                  break;   
[*]                if (code == ErrorMapping.OffsetOutOfRangeCode()) {   
[*]                  readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
[*]                  continue;   
[*]                }
[*]                consumer.close();
[*]                consumer = null;   
[*]                leadBroker = findNewLeader(leadBroker, topic, partition, port);
[*]                continue;   
[*]            }
[*]            numErrors = 0;   
[*]
[*]            long numRead = 0;   
[*]            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {   
[*]                long currentOffset = messageAndOffset.offset();   
[*]                if (currentOffset < readOffset) {   
[*]                  System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);   
[*]                  continue;   
[*]                }
[*]
[*]                readOffset = messageAndOffset.nextOffset();
[*]                ByteBuffer payload = messageAndOffset.message().payload();
[*]
[*]                byte[] bytes = new byte;   
[*]                payload.get(bytes);
[*]                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));   
[*]                numRead++;
[*]                maxReadNum--;
[*]            }
[*]
[*]            if (numRead == 0) {   
[*]                try {   
[*]                  Thread.sleep(1000);   
[*]                } catch (InterruptedException ie) {   
[*]                }
[*]            }
[*]      }
[*]      if (consumer != null)   
[*]            consumer.close();
[*]    }
[*]
[*]    /**
[*]   * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker
[*]   * @param seedBrokers
[*]   * @param port
[*]   * @param topic
[*]   * @param partition
[*]   * @return
[*]   */
[*]    private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {   
[*]      PartitionMetadata partitionMetadata = null;   
[*]      loop: for (String seedBroker : seedBrokers) {   
[*]            SimpleConsumer consumer = null;   
[*]            try {   
[*]                consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");   
[*]                List<String> topics = Collections.singletonList(topic);
[*]                TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);   
[*]                TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);
[*]
[*]                List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
[*]                for (TopicMetadata topicMetadata : topicMetadatas) {   
[*]                  for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {   
[*]                        if (pMetadata.partitionId() == partition) {   
[*]                            partitionMetadata = pMetadata;
[*]                            break loop;   
[*]                        }
[*]                  }
[*]                }
[*]            } catch (Exception e) {   
[*]                System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);   
[*]            } finally {   
[*]                if (consumer != null)   
[*]                  consumer.close();
[*]            }
[*]      }
[*]      if (partitionMetadata != null) {   
[*]            borkerList.clear();
[*]            for (Broker replica : partitionMetadata.replicas()) {   
[*]                borkerList.add(replica.host());
[*]            }
[*]      }
[*]      return partitionMetadata;   
[*]    }
[*]
[*]    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {   
[*]      TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);   
[*]      Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();   
[*]      requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));   
[*]      OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);   
[*]      OffsetResponse response = consumer.getOffsetsBefore(request);
[*]      if (response.hasError()) {   
[*]            System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));   
[*]            return 0;   
[*]      }
[*]      long[] offsets = response.offsets(topic, partition);   
[*]      return offsets;   
[*]    }
[*]
[*]    private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {   
[*]      for (int i = 0; i < 3; i++) {   
[*]            boolean goToSleep = false;   
[*]            PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);
[*]            if (metadata == null) {   
[*]                goToSleep = true;   
[*]            } else if (metadata.leader() == null) {   
[*]                goToSleep = true;   
[*]            } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {   
[*]                goToSleep = true;   
[*]            } else {   
[*]                return metadata.leader().host();   
[*]            }
[*]            if (goToSleep) {   
[*]                try {   
[*]                  Thread.sleep(1000);   
[*]                } catch (InterruptedException ie) {   
[*]                }
[*]            }
[*]      }
[*]      System.out.println("unable to find new leader after broker failure. exit");   
[*]      throw new Exception("unable to find new leader after broker failure. exit");   
[*]    }
[*]
[*]}
[*]

页: [1]
查看完整版本: Kafka学习笔记-Java简单操作