设为首页 收藏本站
查看: 1009|回复: 0

[经验分享] 基于nodejs的消息中心

[复制链接]

尚未签到

发表于 2017-2-21 10:20:56 | 显示全部楼层 |阅读模式
需求很简单实时向客户端(目前只有浏览器)推送消息
核心为 rabbitmq + nodejs的socket.io + redis
做消息中心后端的消息中间件必不可少,当初考虑是从rabbitmq和redis选一个。
redis现在很火支持消息订阅性能也非常不错可惜它对消息这块支持的相对弱功能比较少,像消息的过期、ack功能都没有。rabbitmq做为老牌的消息中间件功能完善性能也不错也有很多监控插件可以选择,当然相对redis它也劣势做为企业级中间件占用资源比较多也没有redis那么有上升空间。
websocket的服务选择用nodejs是基本nodejs高效的事件驱动模型还有socket.io对所有浏览器的支持,劣势就是nodejs做为新生语言可参考的成功案例少,第三方模块的稳定性不高且本身服务也容易挂。
这里使用redis是原因socket.io的群集是基于redis的消息实现,我自己也使用它做了nodejs集群内部的消息广播。
各组件的安装:node rabbitmq
业务需求大概是对客户端进行分组解决多个页面的问题,消息内容包含前端所需调用js方法名和消息体(话说这方式我真心不认可)。
1.建立socket.io服务,使用express形式

var express=require('express')
,app = express()
, sio=require('socket.io')
, http = require('http').createServer(app)
, io = sio.listen(http);
//创建webscoket监听服务器
http.listen(port);
var msgSubClient = redisConnectionFactory.getConnectionToDB(config.redis.socketDB);
var msgKey="amsg:";
//监听连接成功事件
io.sockets.on('connection',function(socket){
//监听客户端请求注册
socket.on('register',function (data){
if(data.socketId){
//如果客户端有指定socketId就建立于sokcet.io内部id的map
socketManage.addIdMap(data.socketId,socket.id);
socket.set("socketId",data.socketId);
//订阅指定socketId消息
msgSubClient.subscribe(msgKey+data.socketId);
}
if(data.groupFlag){
var flagArray=[];
if(typeof data.groupFlag == 'string'){
flagArray.push(data.groupFlag);
}
if(data.groupFlag instanceof Array){
flagArray=data.groupFlag;
}
for(var i=0;i<flagArray.length;i++){
//使用socket.io的join方式建立组
socket.join(flagArray);
}
}
});
//关闭时清除连接的注册信息
socket.on('disconnect',function(){
socketManage.removeIdMap(socket.id);
socket.get("socketId",function (err,socketId){
if(!err){
//取消消息订阅
msgSubClient.unsubscribe(msgKey+socketId);
}
})
});
});
//接收redis 消息
msgSubClient.on("message", function (channel, message) {
var socketId=channel.substring(msgKey.length,channel.length);
var oldId=socketManage.getOldId(socketId);
var socket=io.sockets.sockets[oldId]; //得到socket客户端连接
if(socket){
message=JSON.parse(message);
var isVolatile=message.isVolatile || true;
//推送消息到客户端
emitMessageToClietn(socket,message,isVolatile);
//log.debug(appId+" emit "+oldId+" msg:"+message);
}
});
//加工消息
function processMessage(msg){
if(msg.data){
if(typeof msg.data == "string"){
msg.data=JSON.parse(msg.data);
}
}
return msg;
}
//推送到客户端消息
function emitMessageToClietn(socket,msg,isVolatile){
msg=processMessage(msg);
//消息是否瞬时
if(isVolatile){
//true不保证消息发送到客户端效率高
socket.volatile.emit('message', msg);
}else{
//false保证发送效率不高
socket.emit('message',msg);
}
}


2.接收rabbitmq 消息

//推送消息给组, io.sockets.in 表示关联组
function emitMessageToGroup(groupFlag,msg,isVolatile){
msg=processMessage(msg);
if(isVolatile){
io.sockets.volatile.in(groupFlag).emit("message",msg);
}else{
io.sockets.in(groupFlag).emit("message",msg);
}
}
var msgPubClient = redisConnectionFactory.getConnectionToDB(config.redis.socketDB);
var rabbitConnection=rabbitConnectionFactory.getConnection();
rabbitConnection.on('ready', function () {
//x-message-ttl 为消息的默认过期时间
var q=rabbitConnection.queue(config.rabbit.receiveQueueName,
{durable:true,autoDelete:false,'arguments': {'x-message-ttl': 600000}});
//订阅消息,prefetchCount:1是为了设置订阅的负载均衡
q.subscribe({ ack: true, prefetchCount: 1 },function (json, headers, deliveryInfo, m) {
try{
var socketSessionId=json.socketSessionId;
var groupFlag=json.socketGroupFlag;
var isVolatile=json.isVolatile || true;
//如有group 就按组推送
if(groupFlag && groupFlag !=""){
emitMessageToGroup(groupFlag,json,isVolatile);
}
//如有socketSessionId 就按单客户推送
if(socketSessionId && socketSessionId!=""){
var oldId=socketManage.getOldId(socketSessionId);
var socket=io.sockets.sockets[oldId];
if(socket){
//推送到客户端消息
emitMessageToClietn(socket,json,isVolatile);
}else{
//推送到队列
msgPubClient.publish(msgKey+socketSessionId,JSON.stringify(json));
}
}
}catch(e){
log.error(e);
}
try{
//确认消息完成
m.acknowledge(true);
}catch(e){
log.error("ack msg error:",e);
}
});
});


3.建立nodejs的集群
socket.io设置以redis方式共享客户端

var config={
redis:{
host:'192.168.1.226',
port:6379,
debug_mode:false
}};
io.configure(function () {
//使用redis存储会话
io.set('store', new RedisStore({
redisPub:config.redis,
redisSub:config.redis,
redisClient:config.redis
}
));
});


使用基于nodejs cluster的mixture来完成单机多进程充分利用多核cpu资源(mixture很简单就两文件)

var mix = require('mixture').mix("balanced")
,bouncy = require('bouncy');
var config=require('./config.js')
,log=require('./logger.js').getLogger("system");
var count =  require('os').cpus().length
, port = config.socket.port
,portmap = [];
mix.debug(log.info);
// socket.io instances
var socketio = mix.task('socket.io', { filename: 'app.js' })
//var socketio = mix.task('socket.io', { filename: 'app_cached.js' })
//进程总数
var workerTatol=config.service.workerTatol || (config.service.isProxy == true ? count-1:count);
for (var i = 0; i < workerTatol; i++) {
port++;
portmap.push(port)
var worker = socketio.fork({ args: [port, "node_"+createNodeId()] })
}
if(config.service.isAnew == true){
//如果线程死亡,尝试重启
mix.on("death",function (worker, task){
setTimeout(function (){
task.fork({args:worker.args});
},config.service.anewTime)
});
}
if(config.service.isProxy == true){
//代理请求
bouncy(function (req, bounce) {
bounce(portmap[Math.random()*portmap.length|0])
}).listen(config.socket.port)
}
function createNodeId() {
// by default, we generate a random id
return Math.abs(Math.random() * Math.random() * Date.now() | 0);
};

上述代码还有个bouncy是用来做类似nginx的请求代理,这个只是简单的解决方案不完美。
接下来是较为好的请求代理、负载均衡方案使用nginx,nginx本身不支持tcp连接这里就需要用到某国人写的nginx模块https://github.com/LearnBoost/socket.io/wiki/Nginx-and-Socket.io
安装nginx可看这里
只需要在步骤中记得加上这些就行拉,patch 命令可能需要yum install一下
patch -p1 < /path/to/nginx_tcp_proxy_module/tcp.patch
./configure --add-module=/path/to/nginx_tcp_proxy_module
我是nginx配制

worker_processes  6;
#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;
#pid        logs/nginx.pid;

events {
worker_connections  10000;
}
tcp {
upstream websocket {
server 192.168.0.187:8031;
server 192.168.0.15:8031;
server 192.168.0.187:8032;
server 192.168.0.187:8033;
server 192.168.0.187:8034;
check interval=3000 rise=2 fall=5 timeout=1000;
}
server {
listen 80;
proxy_pass websocket;
}
}


4.接下来就剩下启动了,介绍一个管理nodejs服务的模块 forever
forever start -a -l /opt/log.log startup.js 启动
forever stop startup.js 停止
这玩意还有web服务的,我暂时没去弄了
最后服务跑起来拉..当然要做压力测试,好吧到目前我都还没有找到最好的方式!!目前我是使用socket.io的一个java客户端做的

public class JavaTest {
List<SocketIO> list=new ArrayList<SocketIO>();
volatile AtomicInteger count;
@Test
public void testJava() throws Exception {
count=new AtomicInteger(0);
for (int i = 0; i < 1500; i++) {
//   final SocketIO socket = new SocketIO("http://172.19.1.104:8040/");
final SocketIO socket = new SocketIO("http://172.19.0.15/");
//    final SocketIO socket = new SocketIO("http://172.19.0.187:8030/");
socket.connect(new IOCallback() {
@Override
public void onMessage(JSONObject json, IOAcknowledge ack) {
try {
System.out.println("Server said:" + json.toString(2));
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(String data, IOAcknowledge ack) {
}
@Override
public void onError(SocketIOException socketIOException) {
socketIOException.printStackTrace();
}
@Override
public void onDisconnect() {
}
@Override
public void onConnect() {
JSONObject msg=new JSONObject();
try {
//msg.put("sessionId", "");
msg.put("groupFlag", "3");
socket.emit("register", msg);
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void on(String event, IOAcknowledge ack, Object... args) {
count.addAndGet(1);
// System.out.println("Server triggered event '" + event + "'");
}
});
list.add(socket);
Thread.sleep(20);
}
int old=0;
for (int i = 0; i < 1000; i++) {
int cr=count.intValue();
float ca=0f;
if(old != cr ){
ca=cr-old;
old=cr;
}
System.out.println("list size="+list.size()+"  msg count="+count.intValue()+" TPS="+ca);
Thread.sleep(1000);
}
}
@After
public void after(){
for (SocketIO socket : list) {
socket.disconnect();
}
}
}

好吧吐槽一下这个客户端占用cpu很高开不了很多客户端,测试结果:
服务2台、4核虚拟机在服务器上和2核PC机共启动6进程、nginx一台2核PC、redis一台2核PC、rabbitmq在4核虚拟机
消息类型为非volatile
3000客户端:每秒5W消息
4000客户端:每秒3W消息
受限于机器我没法弄更多客户端,这个结果并不是我想要的我觉得标准应该是1W客户端:每秒2W消息。
另压测时如果需要发送的消息大于所能发送消息的最大值时整个集群会在一段时间内崩溃,这里指的消息类型为非volatile如果为volatile就不会出现崩溃我想处理不了的socket.io应该是丢弃了。
总结:
1.一个高负载的redis集群肯定是这个消息中心最需要的,一个客户端从登陆到接收消息到关闭都要通过redis传递各种消息。
2.压测应该还有更加好的方式去做,我试过用socket.io-client在nodejs上跑结果是一开多个连接就各种异常。
3.介于服务崩溃的问题我想需要在接收rabbitmq的消息时做流量控制,目前还没有想好怎样做才好
4.希望能有机会用到现网去..嘿嘿
有用的一些小模块:
log4js 用于日志记录
eventproxy 防止深度嵌套
date-utils 日期的工具类
想了很久还是决定上传源码,也没什么牛B的代码就是用了很多开源的模块求大牛指导...

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-345133-1-1.html 上篇帖子: nodejs+express的session使用 下篇帖子: NodeJS的调试工具
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表