這篇文章介紹下libevent在socket異步程式設計中的應用。在一些對性能要求較高的網絡應用程式中,為了防止程式阻塞在socket I/O操作上造成程式性能的下降,需要使用異步程式設計,即程式準備好讀寫的函數(或接口)并向系統注冊,然後在需要的時候隻向系統送出讀寫的請求之後就繼續做自己的事情,實際的讀寫操作由系統在合适的時候調用我們程式注冊的接口進行。異步程式設計會給一些程式猿帶來一些了解和編寫上的困難,因為我們通常寫的一些簡單的程式都是順序執行的,而異步程式設計将程式的執行順序打亂了,有些代碼什麼情況下執行往往不是太清晰,是以也使得程式設計的複雜度大大增加。
Note:這裡系統這個詞使用的不準确,實際上可以是自己封裝的異步調用機制,更常見的是一些可用的庫,比如libevent,ACE等
想了解libevent的工作原理可以自行查詢資料,網上相關的介紹一大堆,也可以自己閱讀源碼進行分析,本文僅從使用的角度做一個簡單的介紹,看如何快速的将libevent引入我們的程式中。任何應用都免不了需要承載其功能的底層OS,libevent也不例外,其内部是通過封裝作業系統的IO複用機制實作的,在linux系統上可能是epoll、kqueu之類的,取決于具體的OS所支援的IO複用方式,在我的系統上是epoll,是以可以了解為libevent提供了一個比epoll更為友好的操作接口,将程式猿從網絡IO處理的細節中解放出來,使其可以專注于目标問題的處理上。
首先,安裝libevent到任意目錄下
wget http://monkey.org/~provos/libevent-1.4.13-stable.tar.gz
tar –xzvf libevent-1.4.13-stable.tar.gz
cd libevent-1.4.13-stable
./configure --prefix=/home/mydir/libevent
make && make install
現在假定我們要設計一個伺服器程式,用于接收用戶端的資料,并将接收的資料回寫給用戶端。下面來構造該程式,由于本僅僅是展示一個Demo,是以程式中将不對錯誤進行處理,假設所有的調用都成功

2 #define PORT 25341
3 #define BACKLOG 5
4 #define MEM_SIZE 1024
5
6 struct event_base * base ;
7
8 int main( int argc, char * argv[])
9 {
10 struct sockaddr_in my_addr;
11 int sock;
12
13 sock = socket(AF_INET, SOCK_STREAM, 0 );
14 int yes = 1 ;
15 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, & yes, sizeof ( int ));
16 memset( & my_addr, 0 , sizeof (my_addr));
17 my_addr.sin_family = AF_INET;
18 my_addr.sin_port = htons(PORT);
19 my_addr.sin_addr.s_addr = INADDR_ANY;
20 bind(sock, ( struct sockaddr * ) & my_addr, sizeof ( struct sockaddr));
21 listen(sock, BACKLOG);
22
23 struct event listen_ev;
24 base = event_base_new();
25 event_set( & listen_ev, sock, EV_READ | EV_PERSIST, on_accept, NULL);
26 event_base_set( base , & listen_ev);
27 event_add( & listen_ev, NULL);
28 event_base_dispatch( base );
29
30 return 0 ;
31 }

第13行說明建立的是一個TCP socket。第15行是伺服器程式的通常做法,設定了該選項後,在父子程序模型中,當子程序為客戶服務的時候如果父程序退出,可以重新啟動程式完成服務的無縫更新,否則在所有父子程序完全退出前再啟動程式會在該端口上綁定失敗,也即不能完成無縫更新的操作(更多資訊可以參考該函數說明或Steven先生的<網絡程式設計>)。第24行用于建立一個事件處理的全局變量,可以了解為這是一個負責集中處理各種出入IO事件的總管家,它負責接收和派發所有輸入輸出IO事件的資訊,這裡調用的是函數event_base_new(), 很多程式裡這裡用的是event_init(),差別就是前者是線程安全的、而後者是非線程安全的,後者在其官方說明中已經被标志為過時的函數、且建議用前者代替,libevent中還有很多類似的函數,比如建議用event_base_dispatch代替event_dispatch,用event_assign代替event_set和event_base_set等,關于libevent接口的詳細說明見其官方說明libevent_doc. 第25行說明在listen_en這個事件監聽sock這個描述字的讀操作,當讀消息到達是調用on_accept函數,EV_PERSIST參數告訴系統持續的監聽sock上的讀事件,如果不加該參數,每次要監聽該事件時就要重複的調用26行的event_add函數,從前面的代碼可知,sock這個描述字是bind到本地的socket端口上,是以其對應的可讀事件自然就是來自用戶端的連接配接到達,我們就可以調用accept無阻塞的傳回客戶的連接配接了。第26行将listen_ev注冊到base這個事件中,相當于告訴處理IO的管家請留意我的listen_ev上的事件。第27行相當于告訴處理IO的管家,當有我的事件到達時你發給我(調用on_accept函數),至此對listen_ev的初始化完畢。第28行正式啟動libevent的事件處理機制,使系統運作起來,運作程式的話會發現event_base_dispatch是一個無限循環。
下面是on_accept函數的内容
1: void on_accept(int sock, short event, void* arg)
2: {
3: struct sockaddr_in cli_addr;
4: int newfd, sin_size;
5: // read_ev must allocate from heap memory, otherwise the program would crash from segmant fault
6: struct event* read_ev = (struct event*)malloc(sizeof(struct event));;
7: sin_size = sizeof(struct sockaddr_in);
8: newfd = accept(sock, (struct sockaddr*)&cli_addr, &sin_size);
9: event_set(read_ev, newfd, EV_READ|EV_PERSIST, on_read, read_ev);
10: event_base_set(base, read_ev);
11: event_add(read_ev, NULL);
12: }
第9-12與前面main函數的24-26相同,即在代表客戶的描述字newfd上監聽可讀事件,當有資料到達是調用on_read函數。這裡有亮點需要注意,一是read_ev需要從堆裡malloc出來,如果是在棧上配置設定,那麼當函數傳回時變量占用的記憶體會被釋放,是以事件主循環event_base_dispatch會通路無效的記憶體而導緻程序崩潰(即crash);第二個要注意的是第9行read_ev作為參數傳遞給了on_read函數。
下面是on_read函數的内容
1: void on_read(int sock, short event, void* arg)
2: {
3: struct event* write_ev;
4: int size;
5: char* buffer = (char*)malloc(MEM_SIZE);
6: bzero(buffer, MEM_SIZE);
7: size = recv(sock, buffer, MEM_SIZE, 0);
8: printf("receive data:%s, size:%d\n", buffer, size);
9: if (size == 0) {
10: event_del((struct event*)arg);
11: free((struct event*)arg);
12: close(sock);
13: return;
14: }
15: write_ev = (struct event*) malloc(sizeof(struct event));;
16: event_set(write_ev, sock, EV_WRITE, on_write, buffer);
17: event_base_set(base, write_ev);
18: event_add(write_ev, NULL);
19: }
第9行,當從socket讀傳回0标志對方已經關閉了連接配接,是以這個時候就沒必要繼續監聽該套接口上的事件,由于EV_READ在on_accept函數裡是用EV_PERSIST參數注冊的,是以要顯示的調用event_del函數取消對該事件的監聽。第18-21行與on_accept函數的6-11行類似,當可寫時調用on_write函數,注意第19行将buffer作為參數傳遞給了on_write。這段程式還有比較嚴重的問題,後面進行說明。
on_write函數的實作

1 void on_write( int sock, short event , void * arg)
2 {
3 char * buffer = ( char * )arg;
4 send(sock, buffer, strlen(buffer), 0 );
5
6 free(buffer);

7 }
on_write函數中向用戶端回寫資料,然後釋放on_read函數中malloc出來的buffer。在很多書合程式設計指導中都很強調資源的所有權,經常要求誰配置設定資源、就由誰釋放資源,這樣對資源的管理指責就更明确,不容易出問題,但是通過該例子我們發現在異步程式設計中資源的配置設定與釋放往往是由不同的所有者操作的,是以也是比較容易出問題的地方。
其實在on_read函數中從socket讀取資料後程式就可以直接調用write/send接口向客戶回寫資料了,因為寫事件已經滿足,不存在異步不異步的問題,這裡進行on_write的異步操作僅僅是為了說明異步程式設計中資源的管理與釋放的問題,另外一方面,直接調用write/send函數向用戶端寫資料可能導緻程式較長時間阻塞在IO操作上,比如socket的輸出緩沖區已滿,則write/send操作阻塞到有可用的緩沖區之後才能進行實際的寫操作,而通過向寫事件注冊on_accept函數,那麼libevent會在合适的時間調用我們的callback函數,(比如對于會引起IO阻塞的情況比如socket輸出緩沖區滿,則由libevent設計算法來處理,如此當回調on_accept函數時我們在調用IO操作就不會發生真正的IO之外的阻塞)。注:前面括号中是我個人認為一個庫應該實作的功能,至于libevent是不是實作這樣的功能并不清楚也無意深究。
再來看看前面提到的on_read函數中存在的問題,首先write_ev是動态配置設定的記憶體,但是沒有釋放,是以存在記憶體洩漏,另外,on_read中進行malloc操作,那麼當多次調用該函數的時候就會造成記憶體的多次洩漏。這裡的解決方法是對socket的描述字可以封裝一個結構體來保護讀、寫的事件以及資料緩沖區,整理後的完整代碼如下

#include < sys / socket.h >
#include < sys / types.h >
#include < netinet / in .h >
#include < stdio.h >
#include < event .h >
#define PORT 25341
#define BACKLOG 5
#define MEM_SIZE 1024
struct event_base * base ;
struct sock_ev {
struct event * read_ev;
struct event * write_ev;
char * buffer;
};
void release_sock_event( struct sock_ev * ev)
{
event_del(ev -> read_ev);
free(ev -> read_ev);
free(ev -> write_ev);
free(ev -> buffer);
free(ev);
}
void on_write( int sock, short event , void * arg)
{
char * buffer = ( char * )arg;
send(sock, buffer, strlen(buffer), 0 );
free(buffer);
}
void on_read( int sock, short event , void * arg)
{
struct event * write_ev;
int size;
struct sock_ev * ev = ( struct sock_ev * )arg;
ev -> buffer = ( char * )malloc(MEM_SIZE);
bzero(ev -> buffer, MEM_SIZE);
size = recv(sock, ev -> buffer, MEM_SIZE, 0 );
printf( " receive data:%s, size:%d\n " , ev -> buffer, size);
if (size == 0 ) {
release_sock_event(ev);
close(sock);
return ;
}
event_set(ev -> write_ev, sock, EV_WRITE, on_write, ev -> buffer);
event_base_set( base , ev -> write_ev);
event_add(ev -> write_ev, NULL);
}
void on_accept( int sock, short event , void * arg)
{
struct sockaddr_in cli_addr;
int newfd, sin_size;
struct sock_ev * ev = ( struct sock_ev * )malloc( sizeof ( struct sock_ev));
ev -> read_ev = ( struct event * )malloc( sizeof ( struct event ));
ev -> write_ev = ( struct event * )malloc( sizeof ( struct event ));
sin_size = sizeof ( struct sockaddr_in);
newfd = accept(sock, ( struct sockaddr * ) & cli_addr, & sin_size);
event_set(ev -> read_ev, newfd, EV_READ | EV_PERSIST, on_read, ev);
event_base_set( base , ev -> read_ev);
event_add(ev -> read_ev, NULL);
}
int main( int argc, char * argv[])
{
struct sockaddr_in my_addr;
int sock;
sock = socket(AF_INET, SOCK_STREAM, 0 );
int yes = 1 ;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, & yes, sizeof ( int ));
memset( & my_addr, 0 , sizeof (my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(PORT);
my_addr.sin_addr.s_addr = INADDR_ANY;
bind(sock, ( struct sockaddr * ) & my_addr, sizeof ( struct sockaddr));
listen(sock, BACKLOG);
struct event listen_ev;
base = event_base_new();
event_set( & listen_ev, sock, EV_READ | EV_PERSIST, on_accept, NULL);
event_base_set( base , & listen_ev);
event_add( & listen_ev, NULL);
event_base_dispatch( base );
return 0 ;

}
程式編譯的時候要加 -levent 連接配接選項,以連接配接libevent的共享庫,但是執行的時候依然爆出如下錯誤:error while loading shared libraries: libevent-1.4.so.2: cannot open shared object file: No such file or directory, 這個是程式找不到共享庫的位置,通過執行echo $LD_LIBRARY_PATH可以看到系統庫的環境變量裡沒有我們安裝的路徑,即由--prefix制定的路徑,執行export LD_LIBRARY_PATH=/home/mydir/libevent/lib/:$LD_LIBRARY_PATH将該路徑加入系統環境變量裡,再執行程式就可以了。