kafka producer实例及原理分析
1.前言首先,描述下应用场景:
假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大。将日志数据插入数据库然后再进行分析,已经满足不了。最好的办法是存日志,然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。
步骤如下:
[*] 搭建KAFKA系统运行环境
如果你还没有搭建起来,可以参考我的博客:
http://zhangfengzhe.blog.运维网.com/8855103/1556650
[*] 设计数据存储格式
[*]Producer端获取数据,并对数据按上述设计的格式进行编码
[*]Producer将已经编码的数据发送到broker上,在broker上进行存储
[*]Consumer端从broker中获取数据,分析计算。
2.实现过程
为了快速实现,我们简化日志消息格式。
在eclipse新建JAVA PROJECT,将kafka/libs下*.jar配置到项目build path即可。
Step 1 : 简单的POJO对象(MobileGameLog)
private String actionType;
private String appKey;
private String guid;
private String time;
说明:
actionType 代表行为类型
appKey 代表游戏ID
guid 代表角色
time 代表时间
提供getter/setter方法,并override toString()
Step 2 : 提供serializer
需要注意的是,POJO对象需要序列化转化成KAFKA识别的消息存储格式--byte[]
public class MobileGameKafkaMessage implements kafka.serializer.Encoder{
@Override
public byte[] toBytes(MobileGameLog mobileGameLog) {
return mobileGameLog.toString().getBytes();
}
public MobileGameKafkaMessage(VerifiableProperties props){
}
}
Step 3 : 提供Partitioner
我们可以提供Partitioner,这样可以使得数据按照我们的策略来存储在brokers中。
http://s3.运维网.com/wyfs02/M00/4B/CC/wKioL1QzrbnROV9UAAKDYIN8EWw025.jpg
这里,我根据appKey来进行分区。
Step 4 : 提供Producer
[*] 提供配置
http://s3.运维网.com/wyfs02/M02/4B/CC/wKioL1QzrsiiG8CyAAGOsHs0upE284.jpg
[*] 运行kafka环境
启动zookeeper:
# bin/zookeeper-server-start.sh
config/zookeeper.properties &
启动kafka broker(id=0):
# bin/kafka-server-start.sh
config/server.properties &
启动kafka broker(id=1)
# bin/kafka-server-start.sh
config/server-1.properties &
上述过程,在我的博客【搭建kafka运行环境】里面都有详细记录,大家可以参考。
创建一个topic:
# bin/kafka-topics.sh --zookeeper localhost:2181
--create --topic log_1 --replication-factor 2 --partitions 3 注意topic:log_1有3个分区,2个复制。
[*] 制造数据并发送
// Producer
// V: type of the message
// K: type of the optional key associated with the message
kafka.javaapi.producer.Producer producer
= new Producer(
config);
List list
= new ArrayList();
// 5条tlbb数据
for (int i = 1; i
页:
[1]