http://www.cnblogs.com/hustcat/archive/2010/08/31/1814022.html
這兩天沒事,看了一下Memcached和libevent的源碼,做個小總結。
1、入門
1.1、概述
Libevent是一個用于開發可擴充性網絡伺服器的基于事件驅動(event-driven)模型的網絡庫。Libevent有幾個顯著的亮點:
(1)事件驅動(event-driven),高性能;
(2)輕量級,專注于網絡,不如 ACE 那麼臃腫龐大;
(3)源代碼相當精煉、易讀;
(4)跨平台,支援 Windows、Linux、*BSD和 Mac Os;
(5)支援多種 I/O多路複用技術, epoll、poll、dev/poll、select 和kqueue 等;
(6)支援 I/O,定時器和信号等事件;
(7)注冊事件優先級;
Libevent 已經被廣泛的應用,作為底層的網絡庫;比如 memcached、 Vomi t、 Nylon、 Netchat等等。
1.2、一個簡單示例
1 int lasttime;
2
3 static void
4 timeout_cb(int fd, short event, void *arg)
5 {
6 struct timeval tv;
7 struct event *timeout = arg;
8 int newtime = time(NULL);
9
10 //printf("%s: called at %d: %d\n", __func__, newtime,
11 printf("%s: called at %d: %d\n", "timeout_cb", newtime,
12 newtime - lasttime);
13 lasttime = newtime;
14
15 evutil_timerclear(&tv);
16 tv.tv_sec = 2;
17 //重新注冊event
18 event_add(timeout, &tv);
19 }
20
21 int
22 main (int argc, char **argv)
23 {
24 struct event timeout;
25 struct timeval tv;
26
27 /* Initalize the event library */
28 //初始化event環境
29 event_init();
30
31 /* Initalize one event */
32 //設定事件
33 evtimer_set(&timeout, timeout_cb, &timeout);
34
35 evutil_timerclear(&tv);
36 tv.tv_sec = 2;
37 //注冊事件
38 event_add(&timeout, &tv);
39
40 lasttime = time(NULL);
41
42 //等待,分發,處理事件
43 event_dispatch();
44
45 return (0);
46 }
這是一個簡單的基于libevent的定時器程式,運作結果:
用libevent程式設計非常簡單,隻需要調用event_init初始化環境,然後調用event_add注冊相應的事件,接着調用event_dispatch等待并處理相應的事件即可。
調用event_add注冊事件時,設定其回調函數。Libevent檢測到事件發生時,便會調用事件對應的回調用函數,執行相關的業務邏輯。
1.3、源代碼結構
Libevent 的源代碼雖然都在一層檔案夾下面,但是其代碼分類還是相當清晰的,主要可分為頭檔案、内部使用的頭檔案、輔助功能函數、日志、libevent 架構、對系統 I/O 多路複用機制的封裝、信号管理、定時事件管理、緩沖區管理、基本資料結構和基于 libevent的兩個實用庫等幾個部分,有些部分可能就是一個源檔案。
(1)頭檔案
主要就是 event.h:事件宏定義、接口函數聲明,主要結構體 event 的聲明;
(2)内部頭檔案
xxx-internal.h:内部資料結構和函數,對外不可見,以達到資訊隐藏的目的;
(3)libevent架構
event.c:event 整體架構的代碼實作;
(4)對系統 I/O多路複用機制的封裝
epoll.c:對 epoll 的封裝;
select.c:對 select 的封裝;
devpoll.c:對 dev/poll 的封裝;
kqueue.c:對kqueue 的封裝;
(5)定時事件管理
min-heap.h:其實就是一個以時間作為 key的小根堆結構;
(6)信号管理
signal.c:對信号事件的處理;
(7)輔助功能函數
evutil.h 和 evutil.c:一些輔助功能函數,包括建立 socket pair和一些時間操作函數:加、減和比較等。
(8)日志
log.h和 log.c:log 日志函數
(9)緩沖區管理
evbuffer.c 和buffer.c:libevent 對緩沖區的封裝;
(10)基本資料結構
compat\sys 下的兩個源檔案: queue.h是 libevent 基本資料結構的實作,包括連結清單,雙向連結清單,隊列等;_libevent_time.h:一些用于時間操作的結構體定義、函數和宏定義;
(11)實用網絡庫
http 和evdns:是基于 libevent 實作的http 伺服器和異步 dns 查詢庫;
2、核心對象
結構體event和event_base是libevent的兩個核心資料結構,前者代表一個事件對象,後者代表整個事件處理架構。
2.1、event(事件)
代碼
1 //event.h
2 struct event {
3 TAILQ_ENTRY (event) ev_next; //已注冊事件連結清單
4 TAILQ_ENTRY (event) ev_active_next;//就緒事件連結清單
5 TAILQ_ENTRY (event) ev_signal_next; //signal連結清單
6 unsigned int min_heap_idx; /* for managing timeouts,事件在堆中的下标 */
7
8 struct event_base *ev_base;
10 int ev_fd; //對于I/O事件,是綁定的檔案描述符;對于signal事件,是綁定的信号
11 short ev_events; //event關注的事件類型
12 short ev_ncalls; //事件就緒執行時,調用 ev_callback 的次數
13 short *ev_pncalls; /* Allows deletes in callback */
15 struct timeval ev_timeout; //timout事件的逾時值
16
17 int ev_pri; /* smaller numbers are higher priority,優先級 */
18
19 void (*ev_callback)(int, short, void *arg); //回調函數
20 void *ev_arg; //回調函數的參數
21
22 int ev_res; /* result passed to event callback */
23 int ev_flags; //event的狀态
24 };
25
Libevent通過event對象将I/O事件、信号事件和定時器事件封裝,進而統一處理,這也是libevent的精妙所有。
各個字段的具體含義:
(1) ev_events:event關注的事件類型,它可以是以下3種類型:
I/O事件: EV_WRITE和EV_READ
定時事件:EV_TIMEOUT
信号: EV_SIGNAL
輔助選項:EV_PERSIST,表明是一個永久事件
libevent中的定義為:
#define EV_TIMEOUT 0x01
#define EV_READ 0x02
#define EV_WRITE 0x04
#define EV_SIGNAL 0x08
#define EV_PERSIST 0x10 /* Persistant event*/
(2)ev_next,ev_active_next 和 ev_signal_next 都是雙向連結清單節點指針;它們是 libevent 對不同僚件類型和在不同的時期,對事件的管理時使用到的字段。
libevent 使用雙向連結清單儲存所有注冊的 I/O和 Signal 事件,ev_next 就是該I/O事件在連結清單中的位置;此連結清單可以稱為“已注冊事件連結清單”;
同樣 ev_signal_next 就是 signal 事件在 signal 事件連結清單中的位置;
ev_active_next:libevent 将所有的激活事件放入到連結清單 active list 中,然後周遊 active list 執
行排程,ev_active_next就指明了 event 在active list 中的位置;
(3)min_heap_idx 和 ev_timeout,如果是 timeout 事件,它們是 event 在小根堆中的索引和逾時值,libevent 使用小根堆來管理定時事件。
(4)ev_base指向事件架構執行個體。
(5)ev_fd,對于 I/O事件,是綁定的檔案描述符;對于 signal 事件,是事件對應的信号;
(6)eb_flags:libevent 用于标記 event資訊的字段,表明事件目前的狀态,可能的值有:
#define EVLIST_TIMEOUT 0x01 // event在time堆中
#define EVLIST_INSERTED 0x02 // event在已注冊事件連結清單中
#define EVLIST_SIGNAL 0x04 // 未見使用
#define EVLIST_ACTIVE 0x08 // event在激活連結清單中
#define EVLIST_INTERNAL 0x10 // 内部使用标記
#define EVLIST_INIT 0x80 // event 已被初始化
2.2、event_base(事件處理架構)
1 //evenet_internal.h
2 struct event_base {
3 const struct eventop *evsel; //底層具體I/O demultiplex操作函數集
4 void *evbase;
5 int event_count; /* counts number of total events,總的事件數量 */
6 int event_count_active; /* counts number of active events,就緒事件數量 */
8 int event_gotterm; /* Set to terminate loop */
9 int event_break; /* Set to terminate loop immediately */
10
11 /* active event management */
12 //就緒事件連結清單數組
13 struct event_list **activequeues;
14 int nactivequeues;//就緒事件隊列個數
15
16 /* signal handling info */
17 struct evsignal_info sig; //用于管理信号
19 struct event_list eventqueue; //注冊事件隊列
20 struct timeval event_tv;
22 struct min_heap timeheap; //管理定時器的小根堆
23 struct timeval tv_cache; //記錄時間緩存
(1)evsel:libevent支援Linux、Windows等多種平台,也支援epoll、poll、select、kqueue等多種I/O多路複用模型。如果把event_init、event_add看成高層抽象的統一事件操作接口,則evsel為這些函數在底層具體的I/O demultiplex的對應的操作函數集。eventop為函數指針的集合:
1 struct eventop {
2 const char *name;
3 void *(*init)(struct event_base *);
4 int (*add)(void *, struct event *);
5 int (*del)(void *, struct event *);
6 int (*dispatch)(struct event_base *, void *, struct timeval *);
7 void (*dealloc)(struct event_base *, void *);
8 /* set if we need to reinitialize the event base */
9 int need_reinit;
10 };
11
在初始化函數event_base_new中,libevent将evsel指向全局數組eventops的具體元素:
2.3、主要函數
2.3.1、event_int(初始化libevent執行個體)
struct event_base *
event_init(void)
初始化事件處理架構執行個體,内部調用event_base_new。
event_base_new的主要邏輯:
1 struct event_base *
2 event_base_new(void)
3 {
4
5 //初始化小根堆
6 min_heap_ctor(&base->timeheap);
8 //初始化注冊事件隊列
9 TAILQ_INIT(&base->eventqueue);
11 for (i = 0; eventops[i] && !base->evbase; i++) {
12 //I/O demultiplex機制執行個體
13 base->evsel = eventops[i];
15 //初始化I/O demultiplex執行個體(參見win32_init)
16 base->evbase = base->evsel->init(base);
17 }
19 //配置設定1個就緒事件隊列
20 event_base_priority_init(base, 1);
22 }
2.3.2、event_add(注冊事件)
//注冊事件
int
event_add(struct event *ev, const struct timeval *tv)
該函數主要将事件ev加入到事件架構event_base的注冊事件連結清單base->eventqueue。
2.3.3、event_del(删除事件)
//删除事件
event_del(struct event *ev)
該函數主要将事件ev從相應的連結清單上删除。
2.3.4、event_set(設定事件)
/*設定event對象
**ev:事件對象
**fd:事件對應的檔案描述符或信号,對于定時器設為-1
**events:事件類型,比如 EV_READ,EV_PERSIST, EV_WRITE, EV_SIGNAL
**callback:事件的回調函數
**arg:回調函數參數
*/
void
event_set(struct event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
在将事件注冊事件處理架構之前,應該先調用event_set對事件進行相關設定。
2.4、libevent對event的管理
event結構有3個連結清單結點域和一個小根堆索引,libevent通過3個連結清單和一個小根堆對I/O事件、signal事件和timer事件進行管理。
對于I/O事件,通過event_add将其加入event_base的注冊事件連結清單eventqueue ;就緒時會加入event_base的就緒連結清單activequeues[];
對于timer事件,event_add将其加入到event_base的小根堆timeheap;
Signale事件的管理相對複雜些,event_add将其加入到注冊事件連結清單,同時,event_add内部會調用I/O demultiplex的add函數(對于I/O事件也一樣),比如epoll_add。而add函數又會調用evsignal_add将其加入到evsignal_info的evsigevents[signo]連結清單(關于signal,後面會詳細介紹)。
3、事件處理架構主循環
Libevent将I/O事件、signal事件和timer事件用統一的模型進行處理,這是非常精妙的。libevent主循環函數不斷檢測注冊事件,如果有事件發生,則将其放入就緒連結清單,并調用事件的回調函數,完成業務邏輯處理。
3.1、event_dispatch
//事件處理主循環
event_dispatch(void)
這是呈現給外部的接口,它的實作很簡單,即調用event_loop,而event_loop調用event_base_loop,event_base_loop完成實際的主循環邏輯。
3.2、event_base_loop
主要算法:
1 done = 0;
2 while (!done) {
3
4 /*如果沒有就緒事件,根據timer heap中事件的最小逾時時間,計算I/O demultiplex的
5 **最大等待時間. 相反,如果有就緒事件,則清除tv,即I/O demultiplex不應該等待.
6 */
7 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
8 timeout_next(base, &tv_p);
9 } else {
10 /*
11 * if we have active events, we just poll new events
12 * without waiting.
13 */
14 evutil_timerclear(&tv);
15 }
17 /* If we have no events, we just exit */
18 //沒有事件處理,則退出循環
19 if (!event_haveevents(base)) {
20 event_debug(("%s: no events registered.", __func__));
21 return (1);
23
24 //tv_p為I/O demultiplex的逾時時間
25 //處理signal事件和I/O事件
26 res = evsel->dispatch(base, evbase, tv_p);
27
28 //處理timeout事件,對于逾時的事件,将其放到就緒事件連結清單
29 timeout_process(base);
31 if (base->event_count_active) {
32 //處理就緒事件
33 event_process_active(base);
34 if (!base->event_count_active && (flags & EVLOOP_ONCE))
35 done = 1;
36 } else if (flags & EVLOOP_NONBLOCK)
37 done = 1;
38
39 }//end while
40
3.3、timeout_next
1 /*根據timer heap中事件的最小逾時時間,計算I/O demultiplex的最大等待時間.
2 **為了及時處理timer事件,I/O demultiplex的最大等待時間不應該超過timer事件中最小的逾時時間,
3 **否則,timer事件就不能得到及時處理
4 */
5 static int
6 timeout_next(struct event_base *base, struct timeval **tv_p)
7 {
8 struct timeval now;
9 struct event *ev;
10 struct timeval *tv = *tv_p;
12 //如果沒有timer事件,則直接傳回
13 if ((ev = min_heap_top(&base->timeheap)) == NULL) {
14 /* if no time-based events are active wait for I/O */
15 *tv_p = NULL;
16 return (0);
17 }
19 if (gettime(base, &now) == -1)
20 return (-1);
22 //如果最小的timer事件已經逾時,則清除tv,即I/O demultiplex不應該等待.
23 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
24 evutil_timerclear(tv);
25 return (0);
26 }
28 //更新I/O demultiplex可以等待的最大時間:ev->ev_timeout - now
29 evutil_timersub(&ev->ev_timeout, &now, tv);
30 return (0);
31 }
32
3.4、dispatch函數
調用底層I/O multiplex的dispatch函數,具體的實作可以參見epoll的實作epoll_dispatch。
3.5、event_process_active
1 /*處理就緒事件.
2 **就緒事件位于優先級隊列中,低優先級通常比高優先級隊列先處理,是以,
3 **高優先級隊列可能餓死.
5 static void
6 event_process_active(struct event_base *base)
8 struct event *ev;
9 struct event_list *activeq = NULL;
10 int i;
11 short ncalls;
12
13 for (i = 0; i < base->nactivequeues; ++i) {
14 if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
15 //一次隻處理一個就緒事件連結清單
16 activeq = base->activequeues[i];
17 break;
18 }
19 }
21 assert(activeq != NULL);
22
23 //處理就緒事件連結清單上的所有事件
24 for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
25
26 //先将事件從連結清單上删除
27 if (ev->ev_events & EV_PERSIST)
28 event_queue_remove(base, ev, EVLIST_ACTIVE);
29 else
30 event_del(ev); //删除事件
31
32 /* Allows deletes to work */
33 ncalls = ev->ev_ncalls;
34 ev->ev_pncalls = &ncalls;
35 while (ncalls) {
36 ncalls--; //調用次數減1
37 ev->ev_ncalls = ncalls;
38 //調用事件的回調函數
39 (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
40 if (base->event_break)
41 return;
42 }
43 }
44 }
45
4、Timer事件
Timer事件的處理本身比較簡單,不再贅述。
5、signal事件
5.1、socket pair
Libevent通過socketpair,将signal事件與I/O事件完美的統一起來。Socketpair,簡單的說就一對socket,一端用于寫,一端用于讀。工作方式如下:
為了與I/O事件統一起來,libevent内部使用了一個針對read socket的讀事件。
5.1.1、Socketpair的建立
與信号事件的初始化工作都是在evsignal_init中完成的,而evsignal_init通過調用evutil_socketpair建立socketpair。對于Unix平台,有socketpair系統調用;對于Windows,則相對複雜一些,具體見evutil_socketpair函數的實作。
5.2、evsignal_info
在event_base内部有一個evsignal_info類型的字段sig,它是用于管理signal事件的核心資料結構:
1 //evsignal.h
2 struct evsignal_info {
3 struct event ev_signal; //内部socket讀事件
4 int ev_signal_pair[2]; //對應socket pair的兩個socket描述符
5 int ev_signal_added; //内部socket讀事件是否已經加入注冊連結清單
6 volatile sig_atomic_t evsignal_caught; //是否有信号發生
7 //信号事件連結清單數組,evsigevents[signo]表示注冊信号signo的事件
8 struct event_list evsigevents[NSIG];
9 //具體記錄每個信号觸發的次數,evsigcaught[signo]是記錄信号 signo被觸發的次數
10 sig_atomic_t evsigcaught[NSIG];
11
12 //sh_old記錄了原來的 signal 處理函數指針,當信号 signo 注冊的 event 被清空時,需要重新設定其處理函數
13 #ifdef HAVE_SIGACTION
14 struct sigaction **sh_old;
15 #else
16 ev_sighandler_t **sh_old;
17 #endif
18 int sh_old_max;
19 };
5.3、主要函數
5.3.1、evsignal_init
主要完成evsignal_info的初始化,主要算法:
1 int
2 evsignal_init(struct event_base *base)
4 evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair);
5 base->sig.sh_old = NULL;
6 base->sig.sh_old_max = 0;
8 //事件發生次數設為0
9 base->sig.evsignal_caught = 0;
10 memset(&base->sig.evsigcaught, 0, sizeof(sig_atomic_t)*NSIG);
11 /* initialize the queues for all events */
12 for (i = 0; i < NSIG; ++i)
13 TAILQ_INIT(&base->sig.evsigevents[i]);
15 evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]); //寫端
17 //設定内部讀事件
18 event_set(&base->sig.ev_signal, base->sig.ev_signal_pair[1],
19 EV_READ | EV_PERSIST, evsignal_cb, &base->sig.ev_signal); //讀端
20 base->sig.ev_signal.ev_base = base;
22 //sig.ev_signal == EV_READ | EV_PERSIST | EVLIST_INTERNAL
23 base->sig.ev_signal.ev_flags |= EVLIST_INTERNAL;
24 }
該函數的關鍵在于這裡會設定libevent用于管理信号事件的内部讀事件evsignal_info的ev_signal,并将該事件對應的檔案描述符設為socket pair的讀端。該函數由I/O multiplex的init函數調用。注:這裡隻是設定,而并沒有注冊socket pair的讀事件(見下一節)。
5.3.2、evsignal_add
當調用event_add注冊信号事件時,内部會先調用I/O multiplex的add函數,add函數又會調用evsignal_add,将事件加到evsignal_info内部的信号事件連結清單。然後再event_queue_insert将其添加到event_base的注冊事件連結清單。
這裡有兩個地方需要注意,一是調用_evsignal_set_handler設定外部注冊信号事件對應的信号的信号處理函數evsignal_handler:
1 static void
2 evsignal_handler(int sig)
4 int save_errno = errno;
5
6 //設定信号事件的發生次數
7 evsignal_base->sig.evsigcaught[sig]++;
8 evsignal_base->sig.evsignal_caught = 1;
10 #ifndef HAVE_SIGACTION
11 signal(sig, evsignal_handler);
12 #endif
13
14 /* Wake up our notification mechanism */
15 //向socket pair的寫端寫資料
16 send(evsignal_base->sig.ev_signal_pair[0], "a", 1, 0);
17 errno = save_errno;
18 }
當使用者注冊信号事件對應的信号發生時,OS轉到evsignal_handler函數,進而設定sig.evsignal_caught,并向socket pair的寫端發送資料。
二是通過調用event_add完成内部socket pair的讀事件sig->ev_signal的注冊。最後,将(外部)事件添加到信号事件連結清單。
5.3.2、與主循環結合
信号事件完成了注冊,libevent就會在主循環中,等待事件發生,并處理事件。為了了解,來看看具體I/O demultiplex的dispatch函數:
1 static int
2 epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
4 struct epollop *epollop = arg;
5 struct epoll_event *events = epollop->events;
6 struct evepoll *evep;
7 int i, res, timeout = -1;
8
9 if (tv != NULL)
10 timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
12 if (timeout > MAX_EPOLL_TIMEOUT_MSEC) {
13 /* Linux kernels can wait forever if the timeout is too big;
14 * see comment on MAX_EPOLL_TIMEOUT_MSEC. */
15 timeout = MAX_EPOLL_TIMEOUT_MSEC;
16 }
17
18 //等待I/O事件
19 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
21 if (res == -1) {
22 if (errno != EINTR) {
23 event_warn("epoll_wait");
24 return (-1);
25 }
26 //epoll_wait被信号中斷
27 evsignal_process(base);
28 return (0);
29 } else if (base->sig.evsignal_caught) {//發生了信号事件
30 //處理信号事件
31 evsignal_process(base);
32 }
33 //…
34 }
epoll_dispatch函數調用epoll_wait函數等待I/O發生。然後,如果有信号事件發生,則調用evsignal_process處理信号事件,evsignal_process的邏輯比較簡單,它隻是将事件從注冊事件連結清單轉移到就緒事件連結清單。
還記得evsignal_handler函數嗎?它是所有(外部)信号事件對應的信号的信号處理函數,将實際的信号發生時,OS會轉而執行evsignal_handler函數,而它便向socket pair的寫端寫資料,而讀端收到資料。而此時,libevent的内部socket pair讀事件已經完成注冊。libevent正阻塞在epoll_wait處,當socketp pair讀端收到資料時,libevent便從epoll_wait處傳回。總之,signal事件通過socket pair,與I/O事件實作完美的統一。
Libevent從epoll_wait傳回後,它調用evsignal_process處理信号事件,然後調用event_active将I/O事件(包括内部的socket pair讀事件)轉移到就緒事件連結清單。
Socket pair的讀事件回調函數:
2 evsignal_cb(int fd, short what, void *arg)
4 static char signals[1];
5 #ifdef WIN32
6 SSIZE_T n;
7 #else
8 ssize_t n;
9 #endif
10 //接收資料
11 n = recv(fd, signals, sizeof(signals), 0);
12 if (n == -1)
13 event_err(1, "%s: read", __func__);
14 }
6、libevent的應用
Libevent是一個非常優秀的開源網絡庫,它被許多其它開源程式使用。Memcache使用libevent作為底層的網絡處理元件,并采用主線程(main thread,單一)+工作線程(work thread,多個)的多線程模型(這将在Memcached的分析中詳細介紹)。
作者:arrowcat
出處:http://www.cnblogs.com/hustcat/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。