天天看點

memcached I/O模型源碼分析

      memcached是基于多路複用實作事件驅動的伺服器,在事件處理方面,應用了master+works的線程模型。

      master線程負責監聽端口,當有新的連接配接,master線程負責接收連接配接,将連接配接加入到指定的work線程的隊列,

然後通過管道通知該線程處理該連接配接。

     master線程和work線程之間通過一個管道進行資訊互動。

     work線程監聽管道讀端,當master線程發送'c',work線程從隊列擷取用戶端連接配接進行監聽,負責用戶端所有

請求的處理。

     總的來說,master線程負責對listen fd的監聽,work線程負責對管道fd和client fd的監聽。

     memcached的線程處理邏輯圖如下所示:

memcached I/O模型源碼分析

    總的來說,master和work都需要實作事件監聽,master監聽listen fd,work監聽client fd。memcached實作事件

監聽是基于libevent網絡庫。主要的實作流程:

   1)main_base = event_init();初始化事件基地

   2)event_set(event, fd, event_flags, event_handler, args); 設定事件的監聽的fd,事件類型,事件處理函數

   3)event_base_set(event_base, event);設定事件指定的事件基地

   4)event_add(event, timeval);将事件加入事件基地的監聽隊列

   5)event_base_loop(event_base, flag); 進入事件循環

一、main函數

int main (int argc, char **argv) {
	……
	main_base = event_init();//初始化主線程的事件基地
	……
    memcached_thread_init(settings.num_threads, main_base);//建立工作線程
    ……
    if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
        exit(EX_OSERR);//監聽tcp端口
    }    
    if (settings.udpport && server_sockets(settings.udpport, udp_transport,
                                              portnumber_file)) {
        exit(EX_OSERR);//監聽udp端口
    }
    ……
    if (event_base_loop(main_base, 0) != 0) {
        retval = EXIT_FAILURE;//進入主線的的事件循環
    }
}
           

二、建立工作線程

 1、工作線程結構體

typedef struct {
    pthread_t thread_id;        /* 線程id*/
    struct event_base *base;    /* 每個線程自有的事件基地 */
    struct event notify_event;  /* 監聽與master線程的管道的事件 */
    int notify_receive_fd;      /* 互動管道的讀端 */
    int notify_send_fd;         /* 互動管道的寫端*/
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* 主線程分發過來未處理的新連接配接的隊列*/
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;//每個工作線程對應的結構體
           

2、建立工作線程池的主要流程

    1)建立LIBEVENT_THREAD數組

    2)建立工作線程與主線程的通信管道

    3)建立工作線程的事件基地,添加監聽管道的事件

    4)啟動工作線程

3、建立工作線程的主函數

void memcached_thread_init(int nthreads, struct event_base *main_base) {
    ……//初始化工作線程——LIBEVENT_THREAD數組
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    dispatcher_thread.base = main_base;//設定主線程的event_base
    dispatcher_thread.thread_id = pthread_self();//設定主線程的線程ID
    for (i = 0; i < nthreads; i++) {
        ……
        //設定工作線程與主線通信的管道
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];
        setup_thread(&threads[i]);//設定工作線程的配置
        stats.reserved_fds += 5;
    }//啟動工作線程
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);//等待所有work線程建立完畢
    pthread_mutex_unlock(&init_lock);
}
           

4、設定工作線程的事件模型的函數

static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();//建立工作線程的event_base;
    //設定與主線程管道讀端的讀事件
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    if (event_add(&me->notify_event, 0) == -1) {
        exit(1);//添加讀事件到線程的event_base
    }//初始化線程的接收新連接配接的隊列
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    cq_init(me->new_conn_queue);
    ……
}//工作線程notify_send_fd的可讀事件的事件處理函數
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    ……
    if (read(fd, buf, 1) != 1)
        ……
    switch (buf[0]) {
    case 'c'://從線程的新連接配接隊列中擷取client fd進行監聽
    item = cq_pop(me->new_conn_queue);
    if (NULL != item) {
        //監聽新的client fd
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        ……
        c->thread = me;
        cqi_free(item);
    }
        break;
    /* we were told to pause and report in */
    case 'p':
    register_thread_initialized();
        break;
    }
}
           

5、工作線程的邏輯函數

static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;
    register_thread_initialized();
    event_base_loop(me->base, 0);//進入事件循環,等待與主線程和client的通信
    return NULL;
}
           

三、網絡通信處理

1、建立socket,監聽listen fd

static int server_sockets(int port, enum network_transport transport,
                          FILE *portnumber_file) {
    if (settings.inter == NULL) {
        return server_socket(settings.inter, port, transport, portnumber_file);
    }
    …… 
}
static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
	//套接字類型設定
	hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
	……
    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {
            ……
            continue;
        }
        ……//設定socket的選項
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            continue;
        } else {
            success++;
            if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                ……
                return 1;
            }
            ……
        }
        if (IS_UDP(transport)) {
            int c;//udp連接配接
            for (c = 0; c < settings.num_threads_per_udp; c++) {
                int per_thread_fd = c ? dup(sfd) : sfd;
                dispatch_conn_new(per_thread_fd, conn_read,
                                  EV_READ | EV_PERSIST,
                                  UDP_READ_BUFFER_SIZE, transport);
            }
        } else {//主線程進行端口監聽,conn的狀态為conn_listening
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) { 
                exit(EXIT_FAILURE);
            }//監聽的端口的fd對應的conn結構體
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }
    return success == 0;
}
           

2、監聽fd和事件處理

conn_new主要是添加fd到事件基地,主線程監聽listen fd和工作線程監聽client fd

//添加sfd到事件基地,主要用于監聽listen fd和client fd
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;
    c = conns[sfd];
    //conn結構體的屬性初始化
    ……
    //注冊sfd的可讀事件
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;
    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }
    return c;
}
           

事件處理函數主要是根據c->state的狀态執行狀态機

/*監聽listen fd時,c->state為conn_listening
監聽client fd時,c->states比較多樣化*/
void event_handler(const int fd, const short which, void *arg) {
    conn *c;
    c = (conn *)arg;
    ……
    drive_machine(c);//關于c->state的狀态機處理io事件
    return;
}
           

狀态機函數,此處隻展開關于conn_listening的處理,工作線程中的狀态事件的處理會在後續memcached

請求進行中介紹。

static void drive_machine(conn *c) {
    ……
    while (!stop) {

        switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            ……
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            ……
            if (settings.maxconns_fast &&
                stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                ……
            } else {
            	//連接配接建立成功,配置設定連接配接給工作線程
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;
            ……     
        }
    }
    return;
}
           

主線程進行新連接配接的分發,選擇一個工作線程,将新連接配接封裝成CQ_ITEM添加到線程的new_conn_queue,

發送‘c’給工作線程,工作線程接收到‘c’會從new_conn_queue擷取CQ_ITEM,調用conn_new進行client fd的

監聽。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();
    char buf[1];
    //選擇工作線程來管理新連接配接
    int tid = (last_thread + 1) % settings.num_threads;
    LIBEVENT_THREAD *thread = threads + tid;
    last_thread = tid;
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    //将新連接配接加入到工作線程的新連接配接隊列
    cq_push(thread->new_conn_queue, item);
    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    //發送消息給工作線程,notify_send_fd可讀調用thread_libevent_process
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}
           

繼續閱讀