天天看點

memcache 多線程模型

作者:Linux碼農

memcache 網絡模型采用的是單程序多線程模型,内部使用 libevent 事件庫來處理網絡請求。

其工作模式是主線程負責 accept 新的用戶端連接配接請求,然後把擷取到的新的連接配接請求經過 Round Robin 方式配置設定各個 worker 線程,worker 線程負責處理請求。

線程對象類型如下

typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct conn_queue *new_conn_queue ; 
...

} LIBEVENT_THREAD;
           

每個線程都有一個 libevent 的執行個體 event_base, 各個線程都在自己的事件執行個體中處理觸發的事件。

每個線程都有一個連接配接隊列,當有用戶端連接配接請求到來時,主線程把擷取到的新連接配接節點放到線程的 new_conn_queue 隊列中,線程從自己的隊裡中取出節點進行接收消息并處理。

每個線程都有一個 pipe,用于主線程和 worker 線程進行通信。

關于線程的 new_conn_queue 隊列是一個連結清單,存放着 CQ_ITEM 節點資訊,該節點資訊中儲存着連接配接資訊。

typedef struct conn_queue_item CQ_ITEM;

struct conn_queue_item {
int sfd;
enum conn_states init_state;
int event_flags;
int read_buffer_size;
enum network_transport transport;
CQ_ITEM *next;
};

typedef struct conn_queue CQ;

struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
};
           

每個隊列都有一個鎖,保證互斥操作。

啟動流程

在分析 workers 線程之前先分析下主線程整個啟動流程。

memcache 多線程模型

主線程建立監聽套接字流程

static int server_socket(const char *interface, int port, enum network_transport transport,) {
int sfd;
struct linger ling = {0, 0};
...

hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

if (port == -1) {
port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
error= getaddrinfo(interface, port_buf, &hints, &ai);


for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
//建立一個套接字
if ((sfd = new_socket(next)) == -1) {

if (errno == EMFILE) {
/* ...unless we're out of fds */
perror("server_socket");
exit(EX_OSERR);
}
continue;
}

//設定套接字屬性
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));

error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));

error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));

error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));

}
//綁定套接字
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
if (errno != EADDRINUSE) {
...
return 1;
}
close(sfd);
continue;
} else {
success++;
//監聽
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
...
return 1;
}
if (portnumber_file != NULL &&
(next->ai_addr->sa_family == AF_INET ||
next->ai_addr->sa_family == AF_INET6)) {
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
if (next->ai_addr->sa_family == AF_INET) {
...
} else {
...
}
}
}
}

// 建立一個主線程用于監聽的 連接配接對象,同時把監聽套接字注冊到main_base事件中
if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,
transport, main_base, NULL))) {
...
}

listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;

}

freeaddrinfo(ai);

/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}
           

主線程建立監聽套接字,同時在連接配接數組中給對應套接字生成一個連接配接對象,該連接配接對象用于對網絡操作的一種封裝。

conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;

c = conns[sfd];

if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
...
return NULL;
}
MEMCACHED_CONN_CREATE(c);
c->read = NULL;
c->sendmsg = NULL;
c->write = NULL;
c->rbuf = NULL;

c->rsize = read_buffer_size;

...

STATS_LOCK();
stats_state.conn_structs++;
STATS_UNLOCK();

c->sfd = sfd;
conns[sfd] = c;
}

c->transport = transport;
c->protocol = settings.binding_protocol;

/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
if (!settings.socketpath) {
c->request_addr_size = sizeof(c->request_addr);
} else {
c->request_addr_size = 0;
}

if (transport == tcp_transport && init_state == conn_new_cmd) {
if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
&c->request_addr_size)) {
perror("getpeername");
memset(&c->request_addr, 0, sizeof(c->request_addr));
}
}

if (init_state == conn_new_cmd) {
LOGGER_LOG(NULL, LOG_CONNEVENTS, LOGGER_CONNECTION_NEW, NULL,
&c->request_addr, c->request_addr_size, c->transport, 0, sfd);
}

...
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
...

// 把監聽套接字注冊到事件中
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;

if (event_add(&c->event, 0) == -1) {
return NULL;
}

STATS_LOCK();
stats_state.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();

MEMCACHED_CONN_ALLOCATE(c->sfd);

return c;
}
           

主線程建立的連接配接對象與監聽套接字進行關聯,同時把監聽套接字注冊到事件驅動中,當有用戶端進行向主線程發起連接配接時,主線程觸發回調 event_handler。

workers 線程的初始化建立

void thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;

...

pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);


/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
power = 13;
}

...

//建立nthreads個worker線程對象
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

dispatcher_thread.base = main_base;//設定主線程對象的event_base
dispatcher_thread.thread_id = pthread_self();//設定主線程對象線程id

//為每個worker線程建立與主線程通信的管道
for (i = 0; i < nthreads; i++) {

int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}

threads[i].notify_receive_fd = fds[0]; //worker線程管道接收fd
threads[i].notify_send_fd = fds[1]; //worker線程管道寫入fd
//設定worker線程的屬性資訊
setup_thread(&threads[i]);

/* Reserve three fds for the libevent base, and two for the pipe */
stats_state.reserved_fds += 5;
}

/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {
//建立線程并啟動
create_worker(worker_libevent, &threads[i]);
}

/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);//等待所有worker線程啟動完畢
pthread_mutex_unlock(&init_lock);
}

           

主線程建立 worker 線程池并啟動線程,并為每個線程建立一個與主程序進行通信的管道,同時為每個線程建立一個連接配接隊列。

每個線程啟動後都在自己的事件驅動中進行循環,當有事件發生時,觸發回調

thread_libevent_process。

static void setup_thread(LIBEVENT_THREAD *me) {

me->base = event_init();
// 把管道讀注冊到線程的事件驅動中
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
event_add(&me->notify_event, 0)
//建立一個連接配接隊列,并進行初始化
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->ev_queue);

pthread_mutex_init(&me->stats.mutex, NULL)

me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char *), NULL, NULL);

}
           

到此,主線程和 worker 線程都在自己的事件循環中。

用戶端發起連接配接

當用戶端向主線程發起連接配接請求時,主線程觸發回調 event_handler。

void event_handler(const evutil_socket_t fd, const short which, void *arg) {
conn *c;

c = (conn *)arg; 
c->which = which;

/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//調用drive_machine進行業務邏輯處理
drive_machine(c);

/* wait for next event */
return;
}
           

主線程監聽時,conn 對象狀态為 conn_listening,是以主線程調用 accept 用于接收用戶端端連接配接,隻分析 TCP 部分。

static void drive_machine(conn *c) {

...

while (!stop) {

switch(c->state) {

case conn_listening:
addrlen = sizeof(addr);
//接收一個新的連接配接
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);

if (!use_accept4) {
//設定套接字為非阻塞
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
...
}
}

bool reject;
if (settings.maxconns_fast) {
reject = sfd >= settings.maxconns - 1;
if (reject) {
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
}
} else {
reject = false;
}

if (reject) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
} else {
void *ssl_v = NULL;

//把接收到新的連接配接配置設定給worker線程
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
READ_BUFFER_SIZE, tcp_transport);
}

stop = true;
break;

case conn_waiting:
...
break;

case conn_read:

res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;

case conn_parse_cmd:
... 
break;

case conn_new_cmd:
...
break;

case conn_nread:
...
break;

case conn_swallow:
...
break;

case conn_write:
case conn_mwrite:
...
break;

case conn_closing:
... 
break;

case conn_closed:
... 
break;

case conn_watch:
... 
break;
case conn_io_queue:
... 
break;
case conn_max_state:
assert(false);
break;
}
}

return;
}
           

主線程接收到用戶端連接配接後,通過 dispatch_conn_new 把連接配接配置設定到worker 線程池中。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {

CQ_ITEM *item = cqi_new(thread->ev_queue);
char buf[1];
//以輪詢的方式找到待配置設定給的worker線程
int tid = (last_thread +1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;
//初始化存放到消息隊列中的節點資訊
item->sfd = sfd;
//此時init_state 為 conn_new_cmd
item->init_state = init_state; 
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//把節點放到worker線程的連接配接隊列中
cq_push(thread->new_conn_queue, item);

MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
buf[0] = 'c';
//發worker線程的管道中發送一個‘c’字元,讓work線程處理連接配接請求。
write(thread->notify_send_fd, buf, 1); 
}
           

worker線程觸發事件

主線程通過相關到寫入一個字元,觸發 worker 線程讀事件,調用回調thread_libevent_process。

static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];

read(fd, buf, 1);

switch (buf[0]) {
case 'c':
//從隊列中擷取一個節點
item = cq_pop(me->ev_queue);
if (item != NULL) {
//為該連接配接建立一個連接配接對象
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
...
} else {
c->thread = me;
} 
//釋放節點
cqi_free(item);
}
break;

case 'l':
...
break;

case 'g':
...
break;
}

}
           

該方法的主要功能就是從 worker 線程從自己的隊列中擷取一個節點項,從該節點項中擷取連接配接資訊,然後為該連接配接資訊生成一個 conn 連接配接對象,其中狀态為 conn_new_cmd。

把連接配接對象與用戶端連接配接進行關聯,同時把用戶端連接配接注冊到 worker 線程自己的 event 事件中。當用戶端向worker線程發送指令時,觸發 worker 線程的回調函數 event_handler。

以上流程可見如下圖

memcache 多線程模型

後續用戶端根據 conn->state 狀态的轉變進行不同的業務處理。

簡單的 main_thread 和 worker_thread 示意圖如下

memcache 多線程模型