arongsoft 发表于 2015-9-1 14:07:16

memcached源码阅读笔记一

  
  从main函数开始,位于memcached.c
  



1 int main (int argc, char **argv) {
2   //.......................
3   //.......................
4   //.......................
5   
6   
7   /* handle SIGINT 注册信号处理函数,目前sig_handler是空函数*/
8   signal(SIGINT, sig_handler);
9   
10   settings_init();
11   /* init settings
12          初始化默认设置,其中
13          settings.port = 11211;                     默认端口
14          settings.maxbytes = 64 * 1024 * 1024;      默认使用64MB内存
15          settings.num_threads = 4;                  默认启动4个工作线程
16          settings.item_size_max = 1024 * 1024;      默认每对 key value的value最大1MB
17      */
18   
19   
20   //.......................
21   //.......................
22   //.......................
23   
24   
25   /* 初始化主线程的libevent实例 */
26   main_base = event_init();
27   
28   
29   //.......................
30   //.......................
31   //.......................
32   
33   
34   /* 启动工作线程 */
35   thread_init(settings.num_threads, main_base);
36   /*
37          // 主线程是分配线程,分配工作给工作线程
38          dispatcher_thread.base = main_base;
39          dispatcher_thread.thread_id = pthread_self();
40          //设置工作线程的属性以及它们各自的libevent实例初始化
41          for (i = 0; i < nthreads; i++) {
42          int fds;
43          pipe(fds)
44          threads.notify_receive_fd = fds;
45          threads.notify_send_fd = fds;
46          setup_thread(&threads);
47          }
48          //启动线程,线程处理函数为worker_libevent, 每个线程有各自的event_base_loop
49          for (i = 0; i < nthreads; i++) {
50          create_worker(worker_libevent, &threads);
51          }
52   */
53   
54   
55   //.......................
56   //.......................
57   //.......................
58   
59   
60   /* 创建服务端的socket */
61   server_sockets(settings.port, tcp_transport, portnumber_file))
62   
63   //.......................
64   //.......................
65   //.......................
66   
67   
68   /* 主线程的libevent消息循环 */
69   if (event_base_loop(main_base, 0) != 0) {
70         retval = EXIT_FAILURE;
71   }
72   
73   
74   //.......................
75   //.......................
76   //.......................
77   
78   
79   return retval;
80 }
  
  主线程



1 server_sockets(settings.port, tcp_transport, portnumber_file)
2 --->
3 server_socket(settings.inter, port, transport, portnumber_file);
4 --->
5 listen(sfd, settings.backlog)
6 listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)
7 --->   
8 //设置sfd的消息响应函数event_handler
9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
10 event_base_set(base, &c->event);
11 --->
12 void event_handler(const int fd, const short which, void *arg) {
13   conn *c;
14   c = (conn *)arg;
15   
16   //.......................
17   //.......................
18   //.......................
19   
20   drive_machine(c);
21   
22   return;
23 }
24 --->
25 //drive_machine
26 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)
27 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
28                   DATA_BUFFER_SIZE, tcp_transport);
29
30 --->
31 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
32                        int read_buffer_size, enum network_transport transport) {
33   char buf;
34   int tid = (last_thread + 1) % settings.num_threads;
35   
36   LIBEVENT_THREAD *thread = threads + tid;
37   
38   //往其中一个worker线程的管道写,分配这个链接给worker
39   buf = 'c';
40   if (write(thread->notify_send_fd, buf, 1) != 1) {
41         perror("Writing to thread notify pipe");
42   }
43 }
  
  
  函数补充:



1 static void setup_thread(LIBEVENT_THREAD *me) {
2   me->base = event_init();
3   if (! me->base) {
4         fprintf(stderr, "Can't allocate event base\n");
5         exit(1);
6   }
7   
8   // 把管道的读事件注册到libevent对象中
9   // 分配线程会往工作线程的 notify_send_fd 写数据,然后触发工作线程的 notify_receive_fd 的读事件
10   event_set(&me->notify_event, me->notify_receive_fd,
11               EV_READ | EV_PERSIST, thread_libevent_process, me);
12   event_base_set(me->base, &me->notify_event);
13   
14   if (event_add(&me->notify_event, 0) == -1) {
15         fprintf(stderr, "Can't monitor libevent notify pipe\n");
16         exit(1);
17   }
18   
19   //.......................
20   //.......................
21   //.......................
22 }
  
  工作线程
  thread_libevent_process -> conn_new -> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);event_base_set(base, &c->event);
  ->event_handler -> drive_machine
  工作线程的消息libevent事件处理就由drive_machine负责
  
  尝试去理解它的工作流程
  drive_machine
  -->
  case conn_read:
  case READ_DATA_RECEIVED:
    conn_set_state(c, conn_parse_cmd);
  -->
  case conn_parse_cmd :
    if (try_read_command(c) == 0) {
  -->
  static int try_read_command(conn *c) {
  //..............
  if (c->protocol == binary_prot) {
  }
  else {
  //............
  process_command(c, c->rcurr);
  }
  -->
  static void process_command(conn *c, char *command) {
  //假设是set   
  } else if ((ntokens == 6 || ntokens == 7) &&
                 ((strcmp(tokens.value, "add") == 0 && (comm = NREAD_ADD)) ||
                  (strcmp(tokens.value, "set") == 0 && (comm = NREAD_SET)) ||
                  (strcmp(tokens.value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                  (strcmp(tokens.value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                  (strcmp(tokens.value, "append") == 0 && (comm = NREAD_APPEND)) )) {
  process_update_command(c, tokens, ntokens, comm, false);
  -->
  process_update_command(c, tokens, ntokens, comm, false);
{
  //...........
         //其实我不懂为什么要分配一个出来
         it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
         //................
         conn_set_state(c, conn_nread);
}
  -->
  //drive_machine
case conn_nread:
        if (c->rlbytes == 0) {
                complete_nread(c);
  -->
  complete_nread_ascii(c);
{
      ret = store_item(it, comm, c);
}
  
  不知道理解得对不对
  
  
  -----------------------------------------------------------------------------------------------------------------
  用户连接memcached成功后,主要逻辑在drive_machine这个函数
  我尝试打日志,理解它的流程
http://images.cnitblog.com/blog/468469/201401/081946595048.png
  A:表示memcachedB:表示用户
  A:   ./memcached -m 512 -p 14444 -vv
B:   telnet 127.0.0.1 14444
  
A:   ##############################state = conn_listening
B:   add id 1 0 4
A:   
##############################state = conn_new_cmd
##############################state = conn_waiting
##############################state = conn_read
##############################state = conn_parse_cmd
32: Client using the ascii protocol
<32 add id 1 0 4
##############################state = conn_nread

B:1234
A:
##############################state = conn_nread
##############################state = conn_nread
>32 STORED
##############################state = conn_write
##############################state = conn_mwrite
##############################state = conn_write
##############################state = conn_mwrite
##############################state = conn_new_cmd
##############################state = conn_waiting
B: 收到
STORED
  
  B: get id
  A:
  ############################## state = conn_read
##############################state = conn_parse_cmd
<32 get id
>32 sending key id
>32 END
##############################state = conn_mwrite
##############################state = conn_mwrite
##############################state = conn_new_cmd
##############################state = conn_waiting
  B: 收到
  VALUE id 1 4
1234
END
  
  B: quit
  A:
  ############################## state = conn_read
##############################state = conn_parse_cmd
<32 quit
##############################state = conn_closing
<32 connection closed.
  
  
页: [1]
查看完整版本: memcached源码阅读笔记一