memcached是基于多路複用實作事件驅動的伺服器,在事件處理方面,應用了master+works的線程模型。
master線程負責監聽端口,當有新的連接配接,master線程負責接收連接配接,将連接配接加入到指定的work線程的隊列,
然後通過管道通知該線程處理該連接配接。
master線程和work線程之間通過一個管道進行資訊互動。
work線程監聽管道讀端,當master線程發送'c',work線程從隊列擷取用戶端連接配接進行監聽,負責用戶端所有
請求的處理。
總的來說,master線程負責對listen fd的監聽,work線程負責對管道fd和client fd的監聽。
memcached的線程處理邏輯圖如下所示:

總的來說,master和work都需要實作事件監聽,master監聽listen fd,work監聽client fd。memcached實作事件
監聽是基于libevent網絡庫。主要的實作流程:
1)main_base = event_init();初始化事件基地
2)event_set(event, fd, event_flags, event_handler, args); 設定事件的監聽的fd,事件類型,事件處理函數
3)event_base_set(event_base, event);設定事件指定的事件基地
4)event_add(event, timeval);将事件加入事件基地的監聽隊列
5)event_base_loop(event_base, flag); 進入事件循環
一、main函數
int main (int argc, char **argv) {
……
main_base = event_init();//初始化主線程的事件基地
……
memcached_thread_init(settings.num_threads, main_base);//建立工作線程
……
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
exit(EX_OSERR);//監聽tcp端口
}
if (settings.udpport && server_sockets(settings.udpport, udp_transport,
portnumber_file)) {
exit(EX_OSERR);//監聽udp端口
}
……
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;//進入主線的的事件循環
}
}
二、建立工作線程
1、工作線程結構體
typedef struct {
pthread_t thread_id; /* 線程id*/
struct event_base *base; /* 每個線程自有的事件基地 */
struct event notify_event; /* 監聽與master線程的管道的事件 */
int notify_receive_fd; /* 互動管道的讀端 */
int notify_send_fd; /* 互動管道的寫端*/
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* 主線程分發過來未處理的新連接配接的隊列*/
cache_t *suffix_cache; /* suffix cache */
} LIBEVENT_THREAD;//每個工作線程對應的結構體
2、建立工作線程池的主要流程
1)建立LIBEVENT_THREAD數組
2)建立工作線程與主線程的通信管道
3)建立工作線程的事件基地,添加監聽管道的事件
4)啟動工作線程
3、建立工作線程的主函數
void memcached_thread_init(int nthreads, struct event_base *main_base) {
……//初始化工作線程——LIBEVENT_THREAD數組
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
dispatcher_thread.base = main_base;//設定主線程的event_base
dispatcher_thread.thread_id = pthread_self();//設定主線程的線程ID
for (i = 0; i < nthreads; i++) {
……
//設定工作線程與主線通信的管道
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
setup_thread(&threads[i]);//設定工作線程的配置
stats.reserved_fds += 5;
}//啟動工作線程
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);//等待所有work線程建立完畢
pthread_mutex_unlock(&init_lock);
}
4、設定工作線程的事件模型的函數
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();//建立工作線程的event_base;
//設定與主線程管道讀端的讀事件
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
exit(1);//添加讀事件到線程的event_base
}//初始化線程的接收新連接配接的隊列
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->new_conn_queue);
……
}//工作線程notify_send_fd的可讀事件的事件處理函數
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
……
if (read(fd, buf, 1) != 1)
……
switch (buf[0]) {
case 'c'://從線程的新連接配接隊列中擷取client fd進行監聽
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
//監聽新的client fd
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
……
c->thread = me;
cqi_free(item);
}
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
}
}
5、工作線程的邏輯函數
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
register_thread_initialized();
event_base_loop(me->base, 0);//進入事件循環,等待與主線程和client的通信
return NULL;
}
三、網絡通信處理
1、建立socket,監聽listen fd
static int server_sockets(int port, enum network_transport transport,
FILE *portnumber_file) {
if (settings.inter == NULL) {
return server_socket(settings.inter, port, transport, portnumber_file);
}
……
}
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
//套接字類型設定
hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
……
for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
……
continue;
}
……//設定socket的選項
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
continue;
} else {
success++;
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
……
return 1;
}
……
}
if (IS_UDP(transport)) {
int c;//udp連接配接
for (c = 0; c < settings.num_threads_per_udp; c++) {
int per_thread_fd = c ? dup(sfd) : sfd;
dispatch_conn_new(per_thread_fd, conn_read,
EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else {//主線程進行端口監聽,conn的狀态為conn_listening
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
exit(EXIT_FAILURE);
}//監聽的端口的fd對應的conn結構體
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}
return success == 0;
}
2、監聽fd和事件處理
conn_new主要是添加fd到事件基地,主線程監聽listen fd和工作線程監聽client fd
//添加sfd到事件基地,主要用于監聽listen fd和client fd
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;
c = conns[sfd];
//conn結構體的屬性初始化
……
//注冊sfd的可讀事件
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
return c;
}
事件處理函數主要是根據c->state的狀态執行狀态機
/*監聽listen fd時,c->state為conn_listening
監聽client fd時,c->states比較多樣化*/
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;
……
drive_machine(c);//關于c->state的狀态機處理io事件
return;
}
狀态機函數,此處隻展開關于conn_listening的處理,工作線程中的狀态事件的處理會在後續memcached
請求進行中介紹。
static void drive_machine(conn *c) {
……
while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
……
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
……
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
……
} else {
//連接配接建立成功,配置設定連接配接給工作線程
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
……
}
}
return;
}
主線程進行新連接配接的分發,選擇一個工作線程,将新連接配接封裝成CQ_ITEM添加到線程的new_conn_queue,
發送‘c’給工作線程,工作線程接收到‘c’會從new_conn_queue擷取CQ_ITEM,調用conn_new進行client fd的
監聽。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
//選擇工作線程來管理新連接配接
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//将新連接配接加入到工作線程的新連接配接隊列
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
//發送消息給工作線程,notify_send_fd可讀調用thread_libevent_process
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}