设为首页 收藏本站
查看: 409|回复: 0

[经验分享] limit_rate

[复制链接]

尚未签到

发表于 2016-12-23 10:38:49 | 显示全部楼层 |阅读模式
  记得去年的一个 enhance FTP 的项目里, 有一个需求是要求
  “针对每个IP进行限速 k ,然后对来自这个区域的所有IP 也要限速 K ”
  因为应用是多线程多进程的。 就用了一个比较搞笑的方法 我叫他为《水桶法》
  准备工作是创建一个消息队列 ,赶项目 就用了Posix的
  1 每个连接都对应一个 con_id(独一无二)
  2 每次该con_id收满 M 字节 就往队列中插入一个 segment,
  里面优先级是con_id buf 就是当前的时间戳

struct msgbuf{
long con_id;
char time_stamp[0x10];
};

  3 有一个专门的master线程 负责从队列中每隔 T s进行采样(OS 同步), 统计当前该IP 对应的链接速度求和 然后进行一些计算按照该 IP和链接上限k ,K 算出该con_id  需要usleep的时间片(有点类似TTL) ,放回一个sleep队列
  4 再该链接收消息的时候 就取出sleep队列里面的 usleep需要的时间 。
  大概就是这个样子,还有很多细节包括计算算法就不描述了。  当然经过多次调试 最后效果还凑合~
  但是很明显看到需要大量的 内核态处理 cpu_time会用的很多。 解决方法就是用ACE的MQ 而不是系统自带, 对于时间的获取改成计数器等 。(欢迎讨论)
  昨天公司要一个流媒体服务器 能自动限速的。 我就直接在 rtsp 服务器前面弄了个 nginx去自动proxy_server转向 来利用它的 limit_rate ,效果很好,于是就打算来看看它的实现,哈哈
  nginx  的主要配置都放在了

struct ngx_http_core_loc_conf_s {
ngx_str_t     name;          /* location name */   
.....
size_t        limit_rate;              /* limit_rate */
size_t        limit_rate_after;        /* limit_rate_after */
.....
}
  这个结构里面
  同时去初始化一个过滤模块(ngx_http_write_filter_module) 对应的模块指令:

struct ngx_command_s {
ngx_str_t             name; // 指令的字符串 一般是 nginx.conf 配置文件里面的名字
ngx_uint_t            type; // 表示 这个指令配置的范围 是upstream ? main ? server 和参数个数
char               *(*set)(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);//当然是用来设置模块配置的
ngx_uint_t            conf;//在nginx.conf中的位置
ngx_uint_t            offset;//结构体成员对应的偏移
void                 *post;
};
  limit_rate的配置就是:

    { ngx_string("limit_rate"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_core_loc_conf_t, limit_rate),
NULL },
  然后就是初始化重要的结构体 nginx的模块实现基本靠他了
  就像kernel的 module_init() moudle_eixt()一样
  只是他被C 代码遍历 后者用了gcc特性而已

typedef struct {
ngx_int_t   (*preconfiguration)(ngx_conf_t *cf);
ngx_int_t   (*postconfiguration)(ngx_conf_t *cf);
void       *(*create_main_conf)(ngx_conf_t *cf);
char       *(*init_main_conf)(ngx_conf_t *cf, void *conf);
void       *(*create_srv_conf)(ngx_conf_t *cf);
char       *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, void *conf);
void       *(*create_loc_conf)(ngx_conf_t *cf);
char       *(*merge_loc_conf)(ngx_conf_t *cf, void *prev, void *conf);
} ngx_http_module_t;

  从名字很容易看出他们用来干什么。 
  初始化如下

static ngx_http_module_t  ngx_http_write_filter_module_ctx = {
NULL,                                  /* preconfiguration */
ngx_http_write_filter_init,            /* postconfiguration */
NULL,                                  /* create main configuration */
NULL,                                  /* init main configuration */
NULL,                                  /* create server configuration */
NULL,                                  /* merge server configuration */
NULL,                                  /* create location configuration */
NULL,                                  /* merge location configuration */
};

  这样的话 就会在ngx_http_module_t  链表中 调用 ngx_http_write_filter_init
  函数很简单 其实就是

gx_http_top_body_filter = ngx_http_write_filter;
  给函数指针对象赋值,让上一个模块能够来调用  ngx_http_write_filter
  然后开看看关键的

ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)

    off_t                      size, sent, nsent, limit;
ngx_uint_t                 last, flush;
ngx_msec_t                 delay;
ngx_chain_t               *cl, *ln, **ll, *chain;
ngx_connection_t          *c;
ngx_http_core_loc_conf_t  *clcf;
c = r->connection;
if (c->error) {
return NGX_ERROR;
}
size = 0;
flush = 0;
last = 0;
ll = &r->out;
  首先是遍历 链接 r 中的 out (ngx_buf_t) 链表,找出保存在链表中的消息大小,刷新点 和恢复上一次链接情况(一些标志位)

for (cl = r->out; cl; cl = cl->next) {
ll = &cl->next;
...
#if 1
//如果内存或者后备缓冲中都没有消息了 并且呢 表明没有被刷新 标志位却表明也在内存里 说明
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
//这么奇异的情况 就需要你调试了,它会根据配置 等你来处理, 很爽吧
ngx_debug_point(); //为什么不加个宏 ? 万一卡死了呢。。。
return NGX_ERROR;
}
#endif
//计算消息总大小
size += ngx_buf_size(cl->buf);
//根据上一次的情况 ,设置正确的标志位
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
if (cl->buf->last_buf) {
last = 1;
}
}
  ps : 记得去年刚毕业的时候 写代码我也喜欢这么做 系统是一个巨大的状态机 有很多标志位。
  然后在可能矛盾的逻辑中 写一个check_point() 自动把异常的状态位恢复 。。 后来想想其实应该跟踪为啥这样,而不是强制修复,哈哈
  继续: 这里就是把新的链表 一个一个加到 ll后面

for (ln = in; ln; ln = ln->next) {
cl = ngx_alloc_chain_link(r->pool);//到 pool去找一个节点,如果找到就使用,否则分配后使用
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = ln->buf;
*ll = cl;   //把填充好的节点地址放到ll的当前位置
ll = &cl->next;  //ll的地址继续指向新的地址等待填充
//同上      
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
ngx_debug_point();
return NGX_ERROR;
}
#endif
  好了经过上面的工作 要对根据 正确的buf 和正确的标志位做处理了

//如果没有last buf也不是flush点 而且总大小又小于postpone_output 说明这次无事可做
if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
return NGX_OK;
}
//如果标记了延迟写 那么这次就不发送标记一下放到下次
if (c->write->delayed) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
//如果消息大小为0 而且又没有任何要缓冲的意思
//NGX_LOWLEVEL_BUFFERED 0x0f :每一位都是一个缓冲情况
if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
if (last) { //如果已经是最后一个
r->out = NULL;
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;//去位
return NGX_OK;
}
//如果需要flush 这里就把out遍历到最后一个 然后清除缓存位
if (flush) {
do {
r->out = r->out->next;
} while (r->out);
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
ngx_debug_point();
return NGX_ERROR;
}
  下面就是限速的主要实现了

    if (r->limit_rate) {
limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
- (c->sent - clcf->limit_rate_after);
if (limit <= 0) {
c->write->delayed = 1;
ngx_add_timer(c->write,
(ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
}
  其实关键是一个  ngx_add_timer 函数, 这里利用了和操作系统 计算时间片类似的方法 比如linux kernel 里面都是每次分配默认的时间片(按优先级等等) 用完就从计算然后增加(比如CFS调度算法)
  这里的思想就是 计算出超过的时间量 然后增加到他的事件时间里面 通过一个红黑数保存
  其实很像libevent 那个小根堆 把时间放到一个数据结构中保存
  ----待续
  其实慢慢的研究nginx发现,里面很多思想和设计都是参考了内核(module , 调度但是没有用和apache一样用伙伴分配算法,这点也体现了他的伟大) 。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-318351-1-1.html 上篇帖子: Nginx+tomcat配置负载均衡 下篇帖子: NGINX 502错误排查
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表