设为首页 收藏本站
查看: 1048|回复: 0

[经验分享] Kafka配置示例

[复制链接]

尚未签到

发表于 2017-5-23 16:54:04 | 显示全部楼层 |阅读模式
一、kafka消息系统是一个高吞吐分布式高性能的消息系统,支持在线实时消息处理,他可持久化消息到本地磁盘(有生命周期)支持离线消息处理,随着集群负载的增加可横向扩展机器来实现集群扩容,通过zookeeper来实现负载均衡及broker的管理等等。

平台消息系统已上线,可支持大批量消息的传输,支持平台日志系统采集的各种日志的消息订阅,现有土豆实时计算,大屏幕,无线push等项目在使用。

在使用消息系统之前需要先申请访问Token及对应消息通道,平台另外提供了对消息系统调用api封装包,具体权限的申请及api的调用方式大家可以参考一下几节的介绍。

二、KafKa的token账号申请
邮件示例:
Hi,*** 您好!
我是互动娱乐的**,申请KafKa系统的使用,用于业务的实时数据计算,现申请1种日志的通道!
1.Token:需要申请新Token
2.应用描述:
【消息量】每天约在百万条级别
业务:直播互动礼物实时排行榜日志:log_yule_gittop;
3.申请的通道权限(读或写或读写) :读权限
4.申请人联系方式(姓名,邮箱,手机)  :

三、KafKa日志可以用大数据的同事配置到kafka中

四、KafKa的使用
   1、引包
    <dependency>
   <groupId>com.youku.data</groupId>
   <artifactId>data-mq-common</artifactId>
   <version>0.8.0</version><!--如不使用新版本特性可以不升级成最新版本-->
</dependency>
   2、

五、KafKa 的使用
1、发送消息示例:

#创建生产者连接
#MqSystem.CHANNEL_MODE_WRITE 说明此通道为生产者模式
#topicName为消息通道名称,管理员创建
#token 用户访问通道的token,一个token可以读或写多个消息
#groupId 生产者组名(冗余字段,暂时保留)
IChannel writeChannel= MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_WRITE, "topicName","token", "groupId");

#为通道设置队列大小,如果不设置默认为1个,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().setMaxChannleQue(clients);

#设置是否启动metrics统计,如果不设置默认不统计,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().registerMeter(topicName, meterName);
其中meterName命名规范为:sysId_scenId_min_IP.name
说明  sysId: 用户申请时的分配给的sysId
  sceneId:用户申请时的分配给的场景Id
  min: 代表分钟类型
  IP: 用户运行程序的机器ip地址
  name:用户定义的维度名  格式形如:("log"、"ad.vv") 注意name的命名中请不需要含有下划线'_',同时也不要含有meter,timer,histograms字符

//发送消息,目前只支持字符串
writeChannel.putMsg("test msg");

2、消费消息示例:

#创建一个消费者连接
#MqSystem.CHANNEL_MODE_READ 说明通道为读取模式
#topicName为消息通道名称,管理员创建
#token 用户访问通道的token,一个token可以读或写多个消息
#groupId 消费者组名(一个或多个消费者在同一个组里)
IChannel readChannel= MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_READ, "topicName","token", "group");

#为通道设置队列大小,如果不设置默认为1个,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().setMaxChannleQue(clients);

#设置是否启动metrics统计,如果不设置默认不统计,此设置请在MqSystem.getMqSystem().open(...)方法前调用
MqSystem.getMqSystem().registerMeter(topicName, meterName);
其中meterName命名规范为:sysId_scenId_min_IP.name
说明  sysId: 用户申请时的分配给的sysId
  sceneId:用户申请时的分配给的场景Id
  min: 代表分钟类型
  IP: 用户运行程序的机器ip地址
  name:用户定义的维度名  格式形如:("log"、"ad.vv") 注意name的命名中请不需要含有下划线'_',同时也不要含有meter,timer,histograms字符
#接收消息

String msg = readChannel.getMsg();

3、生产者代码示例:
public class Producer {
   // ........
   public static void main(String[] args) {
       ...... // 你的代码业务逻辑
      
   //设置队列大小,可以不设置,默认为1个
       MqSystem.getMqSystem().setMaxChannleQue(1);
      
   //统计此topic的meter信息,名字为:meterName; 可以不设置,默认不统计
       MqSystem.getMqSystem().registerMeter(topic, meterName);
      
   new Thread(new Send()).start();
       ...........
   }
   
   class Send implements Runnable {
     private IChannel writeChannel;
     public Send() {
       try {
        // 初始化时获取通道连接
   
writeChannel = MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_WRITE, topic, token, group);
        
} catch (MqException e) {
    e.printStackTrace();
}
     }
     public void run() {
       ........ //你的业务逻辑
       writeChannel.putMsg(msg); //将生产的msg消息放入kafka
     }
   }
  }

4、消费者代码示例:
  public class Comsumer {
   // ........
   public static void main(String[] args) {
       ...... // 你的代码业务逻辑
        //设置队列大小,可以不设置,默认为1个
       //MqSystem.getMqSystem().setMaxChannleQue(1);
       统计此topic的meter信息,名字为:meterName; 可以不设置,默认不统计
       //MqSystem.getMqSystem().registerMeter(topic, meterName);
      
      new Thread(new Receive()).start();
       .......
      
   }
   
   class Receiveimplements Runnable {
     private IChannel readChannel;
     public Send() {
       try {
            // 初始化时获取通道连接
    readChannel= MqSystem.getMqSystem().open(MqSystem.CHANNEL_MODE_READ, topic, token, group);
      
   } catch (MqException e) {
    e.printStackTrace();
}
     }
     public void run() {
       ........ //你的业务逻辑
       readChannel.getMsg();
       ........
     }
   }
  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379857-1-1.html 上篇帖子: Kafka(五) -- 数据导入导出 下篇帖子: Kafka单机环境开发示例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表