QQ叫紫珊 发表于 2019-1-31 09:20:03

kafka producer实例及原理分析

  1.前言
  首先,描述下应用场景:

  假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大。将日志数据插入数据库然后再进行分析,已经满足不了。最好的办法是存日志,然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。
  步骤如下:


[*]  搭建KAFKA系统运行环境
  

  如果你还没有搭建起来,可以参考我的博客:
  http://zhangfengzhe.blog.运维网.com/8855103/1556650
  


[*]  设计数据存储格式
  

  


[*]Producer端获取数据,并对数据按上述设计的格式进行编码





[*]Producer将已经编码的数据发送到broker上,在broker上进行存储
  

[*]Consumer端从broker中获取数据,分析计算。
  

  

  

  2.实现过程

  

  为了快速实现,我们简化日志消息格式。
  在eclipse新建JAVA PROJECT,将kafka/libs下*.jar配置到项目build path即可。
  

  Step 1 : 简单的POJO对象(MobileGameLog)
private String actionType;
private String appKey;
private String guid;
private String time;  

  说明:
  actionType 代表行为类型
  appKey   代表游戏ID
  guid       代表角色
  time       代表时间
  

  提供getter/setter方法,并override toString()
  

  

  Step 2 : 提供serializer
  

  需要注意的是,POJO对象需要序列化转化成KAFKA识别的消息存储格式--byte[]

  

  

public class MobileGameKafkaMessage implements kafka.serializer.Encoder{
@Override
public byte[] toBytes(MobileGameLog mobileGameLog) {
return mobileGameLog.toString().getBytes();
}
public MobileGameKafkaMessage(VerifiableProperties props){
}
}  

  

  

  Step 3 : 提供Partitioner
  

  我们可以提供Partitioner,这样可以使得数据按照我们的策略来存储在brokers中。
  

http://s3.运维网.com/wyfs02/M00/4B/CC/wKioL1QzrbnROV9UAAKDYIN8EWw025.jpg
  这里,我根据appKey来进行分区。
  

  

  Step 4 : 提供Producer
  

  


[*]  提供配置
  

http://s3.运维网.com/wyfs02/M02/4B/CC/wKioL1QzrsiiG8CyAAGOsHs0upE284.jpg
  


[*]  运行kafka环境
  
启动zookeeper:
# bin/zookeeper-server-start.sh
config/zookeeper.properties &  

  启动kafka broker(id=0):
# bin/kafka-server-start.sh
config/server.properties &  

  启动kafka broker(id=1)
# bin/kafka-server-start.sh
config/server-1.properties &  

  上述过程,在我的博客【搭建kafka运行环境】里面都有详细记录,大家可以参考。
  

  

  创建一个topic:
# bin/kafka-topics.sh --zookeeper localhost:2181
--create --topic log_1 --replication-factor 2 --partitions 3  注意topic:log_1有3个分区,2个复制。
  

  

  


[*]  制造数据并发送
  

// Producer
// V: type of the message
// K: type of the optional key associated with the message
kafka.javaapi.producer.Producer producer
= new Producer(
config);
List list
= new ArrayList();
// 5条tlbb数据
for (int i = 1; i
页: [1]
查看完整版本: kafka producer实例及原理分析