殇帝刘玢你 发表于 2016-12-27 08:29:39

nginx 自定义协议 扩展模块开发

原文地址:http://blog.iyunv.com/uid-26443921-id-3018781.html






公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
      在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
      在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
      看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
      废话少说,直接上代码:
       xdrive.rar   
      注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;


      nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:

CPP = g++
LINK = \$(CPP) ##采用g++来链接
##line=338 below was changed by kevin_zhong on 2011-11-14

      ngx_obj=`echo $ngx_obj \
            | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
                  -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
                  -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
                  -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`

      ngx_post_suffix=`echo $ngx_src \
            | sed -e "s#^.*\(\.c\)\\$#\1#g" \
               -e "s#^.*\(\.cc\)\\$#\1#g" \
               -e "s#^.*\(\.cpp\)\\$#\1#g"`

      if [ "$ngx_post_suffix"x = ".cpp"x ];then
            ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
      else
            ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
      fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;


下面是具体代码分析:
/*
* Copyright (C) Igor Sysoev; kevin_zhong
* mail: qq2000zhong@gmail.com
* date: 2011-11-13
*/

//因是cpp文件,固包含c头文件需要 extern c
extern "C" {
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include "ngx_chain_util.h"
}

//与服务器内部通信二进制协议实现
#include "ngx_thrift_transport.h"
#include "ngx_xdrive_datagram.h"
#include "protocal/rc_updator_types.h"

using namespace xdrive::msg::rc_updator;
using namespace xdrive;

/*
* 扩展模块需要3个业务相关输入变量,uid,path,recusive
* 参考nginx.conf中的配置写法
*/

typedef struct
{
      ngx_http_upstream_conf_t upstream;

      //将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
      ngx_int_t uid_index;
      ngx_int_t path_index;
      ngx_int_t recusive_index;
}
ngx_http_xdrive_rc_loc_conf_t;

/*
* 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
*
* 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
* 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
* 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
* 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
*
* 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
* 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
*
* 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
*/

typedef struct
{
      ngx_http_request_t *request;
      ngx_chain_pair_t body_buff;
      ngx_chain_t * tail_buff;
      uint64_t uid;
      ngx_str_t path;
      bool recusive;

      //后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
      //开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
      //upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
      //的 content len,导致接受http包体数据异常...
      //参考 ngx_http_upstream.c:2391
      int rest_length;
}
ngx_http_xdrive_rc_ctx_t;


static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);

static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);


static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
      { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
      { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
      { ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
      { ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
      { ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
      { ngx_null_string, 0 }
};

/*
* 参数设置,不可变,注意和变量的区别
*/
static ngx_command_t ngx_http_xdrive_rc_commands[] = {
      { ngx_string("xdrive_rc_pass"),
          NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
          ngx_http_xdrive_rc_pass,
          NGX_HTTP_LOC_CONF_OFFSET,
          0,
          NULL },

      { ngx_string("xdrive_rc_connect_timeout"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_msec_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
          NULL },

      { ngx_string("xdrive_rc_send_timeout"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_msec_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
          NULL },

      { ngx_string("xdrive_rc_buffer_size"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_size_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
          NULL },

      { ngx_string("xdrive_rc_read_timeout"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
          ngx_conf_set_msec_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
          NULL },

      { ngx_string("xdrive_rc_next_upstream"),
          NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
          ngx_conf_set_bitmask_slot,
          NGX_HTTP_LOC_CONF_OFFSET,
          offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
          &ngx_http_xdrive_rc_next_upstream_masks },

      ngx_null_command
};


static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
      ngx_http_xdrive_rc_add_variables, /* preconfiguration */
      NULL, /* postconfiguration */

      NULL, /* create main configuration */
      NULL, /* init main configuration */

      NULL, /* create server configuration */
      NULL, /* merge server configuration */

      ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
      ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
};


ngx_module_t ngx_http_xdrive_rc_module = {
      NGX_MODULE_V1,
      &ngx_http_xdrive_rc_module_ctx, /* module context */
      ngx_http_xdrive_rc_commands, /* module directives */
      NGX_HTTP_MODULE, /* module type */
      NULL, /* init master */
      NULL, /* init module */
      NULL, /* init process */
      NULL, /* init thread */
      NULL, /* exit thread */
      NULL, /* exit process */
      NULL, /* exit master */
      NGX_MODULE_V1_PADDING
};

//业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
//正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
//因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
//业务需要uid放在 query_string,这时候就只需要改配置即可了
//思路来源于 ngx_http_memcached_module.c

static ngx_http_variable_t ngx_http_proxy_vars[] = {
      { ngx_string("uid"), NULL,
          NULL, 0,
          NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
          0 },
      { ngx_string("path"), NULL,
          NULL, 0,
          NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
          0 },
      { ngx_string("recusive"), NULL,
          NULL, 0,
          NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
          0 },
      { ngx_null_string, NULL,NULL,0, 0, 0 }
};


static ngx_int_t
ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
{
      ngx_int_t rc;
      ngx_http_upstream_t *u;
      ngx_http_xdrive_rc_ctx_t *ctx;
      ngx_http_xdrive_rc_loc_conf_t *mlcf;

      if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
      {
                return NGX_HTTP_NOT_ALLOWED;
      }

      //get 请求,不需要包体
      rc = ngx_http_discard_request_body(r);

      if (rc != NGX_OK)
      {
                return rc;
      }

      if (ngx_http_set_content_type(r) != NGX_OK)
      {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
      }

      if (ngx_http_upstream_create(r) != NGX_OK)
      {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
      }

      u = r->upstream;

      ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
      
      u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;

      mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

      u->conf = &mlcf->upstream;

      //设置回调,网上大都只讲这里
      u->create_request = ngx_http_xdrive_rc_create_request;
      u->reinit_request = ngx_http_xdrive_rc_reinit_request;
      u->process_header = ngx_http_xdrive_rc_process_header;
      u->abort_request = ngx_http_xdrive_rc_abort_request;
      u->finalize_request = ngx_http_xdrive_rc_finalize_request;

      //分配context内存
      ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
      if (ctx == NULL)
      {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
      }
      ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));

      ctx->request = r;

      ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);

      u->input_filter_init = ngx_http_xdrive_rc_filter_init;

      /*
      * 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
      * input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
      * filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
      * 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
      */
      u->input_filter = ngx_http_xdrive_rc_filter;
      u->input_filter_ctx = ctx;

      u->buffering = 0; //note, no buffering...cause too complicated !!

      r->main->count++;

      //不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
      //调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
      ngx_http_upstream_init(r);

      return NGX_DONE;
}


static ngx_int_t
ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
{
      size_t len;
      ngx_buf_t *b;
      ngx_chain_t *cl;
      ngx_http_xdrive_rc_ctx_t *ctx;
      ngx_http_variable_value_t *vv;
      ngx_http_xdrive_rc_loc_conf_t *mlcf;

      mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

      //根据配置文件uid index号从变量中获取uid的变量值
      vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);

      if (vv == NULL || vv->not_found || vv->len == 0)
      {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "the \"$uid\" variable is not set");
                return NGX_ERROR;
      }

      ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
      ctx->uid = ngx_atoof(vv->data, vv->len);
      if (ctx->uid == (off_t)NGX_ERROR)
      {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "the \"$uid\" variable is err %s set", vv->data);
                return NGX_ERROR;
      }

      //根据配置文件path index号从变量中获取path的变量值
      vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
      if (vv == NULL || vv->not_found || vv->len == 0)
      {
                ngx_str_set(&ctx->path, "/");
      }
      else {
                ctx->path.data = vv->data;
                ctx->path.len = vv->len;
      }

      vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
      if (vv == NULL || vv->not_found || vv->len == 0)
      {
                ctx->recusive = false;
      }
      else {
                ctx->recusive = ngx_atoi(vv->data, vv->len);
      }

      RcUpdateReq list_req;
      list_req._user_id = ctx->uid;
      list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
      list_req._recursive = ctx->recusive;

      static uint32_t seq = ngx_time();

      //编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
      //细节见具体代码,不表
      cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
                                 &list_req, ++seq, 0xC01);
      if (cl == NULL)
                return NGX_ERROR;

      //准备发送
      r->upstream->request_bufs = cl;

      ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                     "http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
                     ctx->uid, &ctx->path, ctx->recusive);

      return NGX_OK;
}


static ngx_int_t
ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
{
      return NGX_OK;
}

/*
* 读取二进制包体头部
*/
static ngx_int_t
ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
{
      ngx_http_upstream_t *u;
      ngx_http_xdrive_rc_ctx_t *ctx;

      u = r->upstream;

      //因包头固定长度,所以很好判断
      if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
                return NGX_AGAIN;

      ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);

      ngx_xdrive_datagram_header_t header;
      //解包头,获取最重要参数 : 包体长度,根据包体长度收包
      if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
                              &header, r->connection->log) != NGX_OK)
      {
                return NGX_HTTP_UPSTREAM_INVALID_HEADER;
      }

      //业务代码
      if (header._type != 0x08C01)
      {
                ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
                              "xdrive_rc ret type not legal = %d", header._type);

                return NGX_HTTP_UPSTREAM_INVALID_HEADER;
      }

      //业务代码
      if (header._status != 0)
      {
                ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
                              "xdrive_rc ret status not ok in response = %d", header._status);

                return NGX_HTTP_UPSTREAM_INVALID_HEADER;
      }

      //非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
      ngx_http_clear_content_length(r);

      //因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
      //所以我们必须自己处理记录剩余包体长度;
      ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;

      u->headers_in.status_n = NGX_HTTP_OK;
      u->state->status = NGX_HTTP_OK;

      //包头数据已经处理完毕,必须丢弃;
      u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;

      return NGX_OK;
}


//其实没啥用
static ngx_int_t
ngx_http_xdrive_rc_filter_init(void *data)
{
      ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

      ngx_http_upstream_t *u;

      u = ctx->request->upstream;

      return NGX_OK;
}

/*
* 缓存包体,等待包体接受完毕,解码,然后一次回复给client
*/
static ngx_int_t
ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
{
      ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

      u_char *last;
      ngx_buf_t *b;
      ngx_chain_t *cl, **ll;
      ngx_http_upstream_t *u;

      ngx_http_xdrive_rc_loc_conf_t *mlcf;

      mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
               ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);

      u = ctx->request->upstream;
      b = &u->buffer;

      size_t buff_size = mlcf->upstream.buffer_size;
      //assert(bytes <= buff_size);

      ctx->rest_length -= bytes;

      ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
                     "recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);

      //特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
      //直接交互内存即可,不再需要内存拷贝,否则...
      if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
      {
                cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
                ctx->body_buff._chain_head = cl;

                cl->buf->flush = 1;
                cl->buf->memory = 1;

                last = b->last;
                cl->buf->pos = last;
                b->last += bytes;
                cl->buf->last = b->last;
                cl->buf->tag = u->output.tag;
      }
      else {
                //做一次内存拷贝到 body buf 中去
                if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
                                    b->last, bytes) != NGX_OK)
                        return NGX_ERROR;

                b->last += bytes;
      }

      //判断upstream包体是否收完整
      if (ctx->rest_length > 0)
      {
                return NGX_OK;
      }

      //包体收完,进行解码
      RcUpdateResp list_resp;
      if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
                                     ctx->request->connection->log,
                                     &list_resp) != NGX_OK)
      {
                ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                              "xdrive_rc RcUpdateResp decode failed");

                return NGX_ERROR;
      }

      ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
                      "xdrive_rc RcUpdateResp list num=%d",
                      list_resp._action_list.size());

      //内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
      ngx_chain_t *busy_bufs = NULL;
      ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);

      //transfer...
      ngx_chain_pair_t chain_pair;
      ngx_memzero(&chain_pair, sizeof(chain_pair));

      //转成 json 格式
      if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
                                        "uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
                                        ctx->uid, &ctx->path, ctx->recusive,
                                        list_resp._weak_dcid.c_str(),
                                        list_resp._used_space,
                                        list_resp._action_list.size()
                                        ))
                return NGX_ERROR;

      //转成 json 格式
      for (size_t i = 0; i < list_resp._action_list.size(); ++i)
      {
                ActionThrft *ac = &list_resp._action_list;
                if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
                                                "\n",
                                                ac->m_path.c_str(), ac->m_node_type, ac->m_status,
                                                ac->m_gcid.c_str(), ac->m_file_size
                                                ))
                        return NGX_ERROR;
      }

      //这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
      //关后端连接,回前端包
      chain_pair._chain_last->buf->last_buf = 1;

      for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
      {
                ll = &cl->next;
      }
      *ll = chain_pair._chain_head;

      return NGX_OK;
}


static void
ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
{
      ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                     "abort http xdrive_rc request");
      return;
}


static void
ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
      ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                     "finalize http xdrive_rc request");
      return;
}


static void *
ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
{
      ngx_http_xdrive_rc_loc_conf_t *conf;

      conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
                                                            sizeof(ngx_http_xdrive_rc_loc_conf_t));
      if (conf == NULL)
      {
                return NULL;
      }

      conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
      conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
      conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;

      conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;

      /* the hardcoded values */
      conf->upstream.cyclic_temp_file = 0;
      conf->upstream.buffering = 0;
      conf->upstream.ignore_client_abort = 0;
      conf->upstream.send_lowat = 0;
      conf->upstream.bufs.num = 0;
      conf->upstream.busy_buffers_size = 0;
      conf->upstream.max_temp_file_size = 0;
      conf->upstream.temp_file_write_size = 0;
      conf->upstream.intercept_errors = 1;
      conf->upstream.intercept_404 = 1;
      conf->upstream.pass_request_headers = 0;
      conf->upstream.pass_request_body = 0;

      conf->uid_index = NGX_CONF_UNSET;
      conf->path_index = NGX_CONF_UNSET;
      conf->recusive_index = NGX_CONF_UNSET;
      
      return conf;
}


static char *
ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
      ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
      ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;

      ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
                                  prev->upstream.connect_timeout, 60000);

      ngx_conf_merge_msec_value(conf->upstream.send_timeout,
                                  prev->upstream.send_timeout, 60000);

      ngx_conf_merge_msec_value(conf->upstream.read_timeout,
                                  prev->upstream.read_timeout, 60000);

      ngx_conf_merge_size_value(conf->upstream.buffer_size,
                                  prev->upstream.buffer_size,
                                  (size_t)ngx_pagesize);

      ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
                                     prev->upstream.next_upstream,
                                     (NGX_CONF_BITMASK_SET
                                    | NGX_HTTP_UPSTREAM_FT_ERROR
                                    | NGX_HTTP_UPSTREAM_FT_TIMEOUT));

      if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
      {
                conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
                                             | NGX_HTTP_UPSTREAM_FT_OFF;
      }

      if (conf->upstream.upstream == NULL)
      {
                conf->upstream.upstream = prev->upstream.upstream;
      }

      if (conf->uid_index == NGX_CONF_UNSET) {
                conf->uid_index = prev->uid_index;
      }
      if (conf->path_index == NGX_CONF_UNSET) {
                conf->path_index = prev->path_index;
      }
      if (conf->recusive_index == NGX_CONF_UNSET) {
                conf->recusive_index = prev->recusive_index;
      }

      return NGX_CONF_OK;
}


static char *
ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
      ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;

      ngx_str_t *value;
      ngx_url_t u;
      ngx_http_core_loc_conf_t *clcf;

      if (mlcf->upstream.upstream)
      {
                return "is duplicate";
      }

      value = (ngx_str_t *)cf->args->elts;

      ngx_memzero(&u, sizeof(ngx_url_t));

      u.url = value;
      u.no_resolve = 1;

      mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
      if (mlcf->upstream.upstream == NULL)
      {
                return (char *)(NGX_CONF_ERROR);
      }

      clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);

      clcf->handler = ngx_http_xdrive_rc_handler;

      if (clcf->name.data == '/')
      {
                clcf->auto_redirect = 1;
      }

      //保存变量index用
      mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars.name);
      if (mlcf->uid_index == NGX_ERROR)
      {
                return (char *)(NGX_CONF_ERROR);
      }
      mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars.name);
      if (mlcf->path_index == NGX_ERROR)
      {
                return (char *)(NGX_CONF_ERROR);
      }
      mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars.name);
      if (mlcf->recusive_index == NGX_ERROR)
      {
                return (char *)(NGX_CONF_ERROR);
      }

      return NGX_CONF_OK;
}



static ngx_int_t
ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
{
      ngx_http_variable_t *var, *v;

      for (v = ngx_http_proxy_vars; v->name.len; v++)
      {
                var = ngx_http_add_variable(cf, &v->name, v->flags);
                if (var == NULL)
                {
                        return NGX_ERROR;
                }

                var->get_handler = v->get_handler;
                var->data = v->data;
      }

      return NGX_OK;
}

代码中一些有意思的地方:
//和buf差不多的思想的 buf chain
typedefstruct
{
      ngx_chain_t* _chain_head;
      ngx_chain_t* _chain_pos;
      ngx_chain_t* _chain_last;
      ngx_chain_t* _chain_tail;
}
ngx_chain_pair_t;

//从buf chain中读取len长内存出来
size_t ngx_cdecl
ngx_chain_read(ngx_chain_pair_t* chain_pair
                , uint8_t *buf, uint32_t len);
//将buf写入到buf chain中
ngx_int_t ngx_cdecl
ngx_chain_write(ngx_pool_t* pool
                , ngx_chain_t** free_bufs
                , ngx_chain_pair_t* chain_pair
                , size_t write_chunk_size
                , const uint8_t *buf, uint32_t len);
//写json或者xml之类回复有用
ngx_int_t ngx_cdecl
ngx_chain_sprintf(ngx_pool_t *pool
             , ngx_chain_t **free_bufs
             , ngx_chain_pair_t *chain_pair
             , size_t write_chunk_size
             , const char *fmt, ...);

下面是nginx配置文件中的关键部分
location ~* /rc_list/(+).html$ {
                xdrive_rc_buffer_size 4096;
                set $uid $1;
                set $path /;
                set $recusive 0;
                if ($query_string ~* (|&)recusive=(0|1)(|&)) {
                        set $recusive $2;
                }
                xdrive_rc_pass 127.0.0.1:11001;
      }
解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
页: [1]
查看完整版本: nginx 自定义协议 扩展模块开发