设为首页 收藏本站
查看: 877|回复: 0

[经验分享] nginx-push-stream模块源码学习(三)——发布

[复制链接]

尚未签到

发表于 2016-12-28 10:12:38 | 显示全部楼层 |阅读模式
一、概述
    发布:发布者将MSG post到某一特定通道上,channel将信息缓存
在说明发布流程之前有必要说明下channel和msg的数据结构。
二、数据结构
2.1 MSG
    发布时,模块先将消息转化为ngx_http_push_stream_msg_t的数据结构进行存储。

// message queue
typedef struct {
ngx_queue_t                     queue; // this MUST be first
time_t                          expires;//消息过期时间
time_t                          time;//消息创建时间
ngx_flag_t                      deleted;//是否已删除
ngx_int_t                       id;
ngx_str_t                      *raw;//纯文本
ngx_int_t                       tag;
ngx_str_t                      *event_id;//支持event source
ngx_str_t                      *event_id_message;
ngx_str_t                      *formatted_messages;//格式化后消息
ngx_int_t                       workers_ref_count;//待发送该消息ngx worker计数
} ngx_http_push_stream_msg_t;

    @queue:每个channel会维护一个消息链表,每向channel发布一条消息,channel将其添加到自身的消息链表中。
    @expires,@deleted:消息的过期时间,待介绍过订阅流程后,我会整理出一条消息的生产周期,到时会详细阐述该字段的意义。
    @raw,@formatted_messages:该模块允许自定义三种消息模板——header模板:当收到订阅请求后发送模板消息;message模板:对消息体格式化;footer模板:断开连接时发送该模板。raw为消息原始内容,后者为应用message模板格式化后的信息
2.2 channel
    channel作为发布订阅的中间载体,理解理解它的存储至关重要。

typedef struct {
ngx_rbtree_node_t                   node; // this MUST be first
ngx_str_t                           id;
ngx_uint_t                          last_message_id;
time_t                              last_message_time;
ngx_int_t                           last_message_tag;
ngx_uint_t                          stored_messages;//# of messages
ngx_uint_t                          subscribers;//# of subscribers
ngx_http_push_stream_pid_queue_t    workers_with_subscribers;//处理该channel上订阅者的ngx worker进程链表
ngx_http_push_stream_msg_t          message_queue;//消息链表
time_t                              expires;//过期时间
ngx_flag_t                          deleted;//是否已删除
ngx_flag_t                          broadcast;//是否为广播通道
ngx_http_push_stream_msg_t         *channel_deleted_message;//已删除消息链表
} ngx_http_push_stream_channel_t;

2.3 worker msg

// messages to worker processes
typedef struct {
ngx_queue_t                         queue;
ngx_http_push_stream_msg_t         *msg; // ->shared memory
ngx_pid_t                           pid;
ngx_http_push_stream_channel_t     *channel; // ->shared memory
ngx_http_push_stream_queue_elem_t  *subscribers_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t;
  
    模块初始化时为每个ngx worker分配一片独立的工作区,工作区中维护一份消息链表。
三、流程
发布流程总的流程图如图所示:
DSC0000.png
    对于删除channel和获取channel info的流程比较简单,不做阐述,具体说明下发布消息流程,流程图如图所示:
DSC0001.png

    需要说明的是“向所有订阅者发送MSG”的过程:

  • 向每个有该channel订阅者的worker(workers_with_subscriber)的消息链表中插入一条消息
  • 向上述worker发送CHECK_MESSAGES指令,触发msg发送流程(ngx_http_push_stream_process_worker_message)

MSG发送(ngx_http_push_stream_process_worker_message):

// now let's respond to some requests!
//对于该channel上的所有订阅者
while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != subscribers_sentinel) {
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
//如果订阅者为longpolling模式
if (subscriber->longpolling) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur-
>queue);
//发送longpolling头(last Modified/Etag)
ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->reque
st->pool);
ngx_http_send_header(subscriber->request);
//发送模块配置header模板
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(
subscriber->request, ngx_http_push_stream_module));
//发送响应MSG
ngx_http_push_stream_send_response_message(subscriber->request, channel, msg);
//发送footer模板,last chunck("\0"CRLF CRLF)
ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev;
} else {//stream或polling模式
if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg) == NGX_ERROR) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&
cur->queue);
ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev;
}
}

说明:
    可以看出push stream模块在发布过程中针对longpolling和stream两种模式的不同:

  • Longpolling模式下,每次发布消息时会发送longpolling头:last modified和etag,使得客户端下次请求时可据此判断服务端是否有更新的消息待发布。
  • longpolling模式下,订阅者每次请求都会在获得数据后断开重连,所以每次发布时都会发送header模板
  • ngx_http_push_stream_send_response_finalize同时会清理订阅者

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-320548-1-1.html 上篇帖子: 编译安装spawn-fcgi(nginx通过FastCGI方式调用PHP) 下篇帖子: 安装nginx发生的异常之未使用变量
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表