|
记得去年的一个 enhance FTP 的项目里, 有一个需求是要求
“针对每个IP进行限速 k ,然后对来自这个区域的所有IP 也要限速 K ”
因为应用是多线程多进程的。 就用了一个比较搞笑的方法 我叫他为《水桶法》
准备工作是创建一个消息队列 ,赶项目 就用了Posix的
1 每个连接都对应一个 con_id(独一无二)
2 每次该con_id收满 M 字节 就往队列中插入一个 segment,
里面优先级是con_id buf 就是当前的时间戳
struct msgbuf{
long con_id;
char time_stamp[0x10];
};
3 有一个专门的master线程 负责从队列中每隔 T s进行采样(OS 同步), 统计当前该IP 对应的链接速度求和 然后进行一些计算按照该 IP和链接上限k ,K 算出该con_id 需要usleep的时间片(有点类似TTL) ,放回一个sleep队列
4 再该链接收消息的时候 就取出sleep队列里面的 usleep需要的时间 。
大概就是这个样子,还有很多细节包括计算算法就不描述了。 当然经过多次调试 最后效果还凑合~
但是很明显看到需要大量的 内核态处理 cpu_time会用的很多。 解决方法就是用ACE的MQ 而不是系统自带, 对于时间的获取改成计数器等 。(欢迎讨论)
昨天公司要一个流媒体服务器 能自动限速的。 我就直接在 rtsp 服务器前面弄了个 nginx去自动proxy_server转向 来利用它的 limit_rate ,效果很好,于是就打算来看看它的实现,哈哈
nginx 的主要配置都放在了
struct ngx_http_core_loc_conf_s {
ngx_str_t name; /* location name */
.....
size_t limit_rate; /* limit_rate */
size_t limit_rate_after; /* limit_rate_after */
.....
}
这个结构里面
同时去初始化一个过滤模块(ngx_http_write_filter_module) 对应的模块指令:
struct ngx_command_s {
ngx_str_t name; // 指令的字符串 一般是 nginx.conf 配置文件里面的名字
ngx_uint_t type; // 表示 这个指令配置的范围 是upstream ? main ? server 和参数个数
char *(*set)(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);//当然是用来设置模块配置的
ngx_uint_t conf;//在nginx.conf中的位置
ngx_uint_t offset;//结构体成员对应的偏移
void *post;
};
limit_rate的配置就是:
{ ngx_string("limit_rate"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_core_loc_conf_t, limit_rate),
NULL },
然后就是初始化重要的结构体 nginx的模块实现基本靠他了
就像kernel的 module_init() moudle_eixt()一样
只是他被C 代码遍历 后者用了gcc特性而已
typedef struct {
ngx_int_t (*preconfiguration)(ngx_conf_t *cf);
ngx_int_t (*postconfiguration)(ngx_conf_t *cf);
void *(*create_main_conf)(ngx_conf_t *cf);
char *(*init_main_conf)(ngx_conf_t *cf, void *conf);
void *(*create_srv_conf)(ngx_conf_t *cf);
char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, void *conf);
void *(*create_loc_conf)(ngx_conf_t *cf);
char *(*merge_loc_conf)(ngx_conf_t *cf, void *prev, void *conf);
} ngx_http_module_t;
从名字很容易看出他们用来干什么。
初始化如下
static ngx_http_module_t ngx_http_write_filter_module_ctx = {
NULL, /* preconfiguration */
ngx_http_write_filter_init, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
NULL, /* create location configuration */
NULL, /* merge location configuration */
};
这样的话 就会在ngx_http_module_t 链表中 调用 ngx_http_write_filter_init
函数很简单 其实就是
gx_http_top_body_filter = ngx_http_write_filter;
给函数指针对象赋值,让上一个模块能够来调用 ngx_http_write_filter
然后开看看关键的
ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
off_t size, sent, nsent, limit;
ngx_uint_t last, flush;
ngx_msec_t delay;
ngx_chain_t *cl, *ln, **ll, *chain;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
if (c->error) {
return NGX_ERROR;
}
size = 0;
flush = 0;
last = 0;
ll = &r->out;
首先是遍历 链接 r 中的 out (ngx_buf_t) 链表,找出保存在链表中的消息大小,刷新点 和恢复上一次链接情况(一些标志位)
for (cl = r->out; cl; cl = cl->next) {
ll = &cl->next;
...
#if 1
//如果内存或者后备缓冲中都没有消息了 并且呢 表明没有被刷新 标志位却表明也在内存里 说明
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
//这么奇异的情况 就需要你调试了,它会根据配置 等你来处理, 很爽吧
ngx_debug_point(); //为什么不加个宏 ? 万一卡死了呢。。。
return NGX_ERROR;
}
#endif
//计算消息总大小
size += ngx_buf_size(cl->buf);
//根据上一次的情况 ,设置正确的标志位
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
if (cl->buf->last_buf) {
last = 1;
}
}
ps : 记得去年刚毕业的时候 写代码我也喜欢这么做 系统是一个巨大的状态机 有很多标志位。
然后在可能矛盾的逻辑中 写一个check_point() 自动把异常的状态位恢复 。。 后来想想其实应该跟踪为啥这样,而不是强制修复,哈哈
继续: 这里就是把新的链表 一个一个加到 ll后面
for (ln = in; ln; ln = ln->next) {
cl = ngx_alloc_chain_link(r->pool);//到 pool去找一个节点,如果找到就使用,否则分配后使用
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = ln->buf;
*ll = cl; //把填充好的节点地址放到ll的当前位置
ll = &cl->next; //ll的地址继续指向新的地址等待填充
//同上
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
ngx_debug_point();
return NGX_ERROR;
}
#endif
好了经过上面的工作 要对根据 正确的buf 和正确的标志位做处理了
//如果没有last buf也不是flush点 而且总大小又小于postpone_output 说明这次无事可做
if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
return NGX_OK;
}
//如果标记了延迟写 那么这次就不发送标记一下放到下次
if (c->write->delayed) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
//如果消息大小为0 而且又没有任何要缓冲的意思
//NGX_LOWLEVEL_BUFFERED 0x0f :每一位都是一个缓冲情况
if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
if (last) { //如果已经是最后一个
r->out = NULL;
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;//去位
return NGX_OK;
}
//如果需要flush 这里就把out遍历到最后一个 然后清除缓存位
if (flush) {
do {
r->out = r->out->next;
} while (r->out);
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
ngx_debug_point();
return NGX_ERROR;
}
下面就是限速的主要实现了
if (r->limit_rate) {
limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
- (c->sent - clcf->limit_rate_after);
if (limit <= 0) {
c->write->delayed = 1;
ngx_add_timer(c->write,
(ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
}
其实关键是一个 ngx_add_timer 函数, 这里利用了和操作系统 计算时间片类似的方法 比如linux kernel 里面都是每次分配默认的时间片(按优先级等等) 用完就从计算然后增加(比如CFS调度算法)
这里的思想就是 计算出超过的时间量 然后增加到他的事件时间里面 通过一个红黑数保存
其实很像libevent 那个小根堆 把时间放到一个数据结构中保存
----待续
其实慢慢的研究nginx发现,里面很多思想和设计都是参考了内核(module , 调度但是没有用和apache一样用伙伴分配算法,这点也体现了他的伟大) 。 |
|
|