處理大并發之五 使用libevent利器bufferevent
首先來翻譯一段文章
你可能注意到随着我們代碼變得越來越高效,程式也變得更加複雜。當我們産生一個程序的時候,我們沒有必要為每一個連結管理一個buffer,我們隻需要每個處理獨立棧配置設定緩沖區就可以了。在讀和寫的時候,我們不必明确的跟蹤每一個socket,這在我們的代碼裡是一個暗示,我們沒有必要定義一個結構體去跟蹤每一個操作什麼時候完成,我們隻需要使用循環棧變量就可以了。
此外,如果你在windows網絡程式設計方面有着豐富的經驗,當你在使用上一篇部落格中的例子時,你可能認識到libevent可能達不到最理想的性能。在windows上,你做的快速異步IO不是用的select,它使用的IOCP API。和其他的快速網絡API不同,當你的程式執行完成,sock準備完成,IOCP不會通知你的程式,取而代之的是,程式告訴windows網絡棧開啟一個網絡操作,并且當操作執行完成時,IOCP會告訴程式。
幸運的是,libevent2 的bufferevents接口解決了上面的這些沖突,它使得程式更加容易寫,并且為windows和unix提供了有效的接口。
分析:
libevent的bufferevent在event的基礎上自己維護了一個buffer,這樣的話,就不需要再自己管理一個buffer了,上一篇部落格是自己維護一個buffer,維護過程複雜,且過程難以了解,既然libevent自己提供了bufferevent這個神器,且有API,何必自己維護呢?
先看看struct bufferevent這個結構體
[cpp] view plain copy
- struct bufferevent {
- struct event_base *ev_base;
- const struct bufferevent_ops *be_ops;
- struct event ev_read;
- struct event ev_write;
- struct evbuffer *input;
- struct evbuffer *output;
- ……
- bufferevent_data_cb readcb;
- bufferevent_data_cb writecb;
- bufferevent_event_cb errorcb;
- ……
- }
可以看出struct bufferevent内置了兩個event(讀/寫)和對應的緩沖區。當有資料被讀入(input)的時候,readcb被調用,當output被輸出完成的時候,writecb被調用,當網絡I/O出現錯誤,如連結中斷,逾時或其他錯誤時,errorcb被調用。
使用bufferevent的過程:
1. 設定sock為非阻塞的
[cpp] view plain copy
- eg: evutil_make_socket_nonblocking(fd);
2. 使用bufferevent_socket_new建立一個structbufferevent *bev,關聯該sockfd,托管給event_base
函數原型為:
[cpp] view plain copy
- struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options)
- eg: struct bufferevent *bev;
- bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
3. 設定讀寫對應的回調函數
函數原型為:
[cpp] view plain copy
- void bufferevent_setcb(struct bufferevent *bufev,
- bufferevent_data_cb readcb, bufferevent_data_cb writecb,
- bufferevent_event_cb eventcb, void *cbarg)
- eg. bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
4. 啟用讀寫事件,其實是調用了event_add将相應讀寫事件加入事件監聽隊列poll。正如文檔所說,如果相應事件不置為true,bufferevent是不會讀寫資料的
函數原型:
[cpp] view plain copy
- int bufferevent_enable(struct bufferevent *bufev, short event)
- eg. bufferevent_enable(bev, EV_READ|EV_WRITE);
5. 進入bufferevent_setcb回調函數:
在readcb裡面從input中讀取資料,處理完畢後填充到output中;
writecb對于服務端程式,隻需要readcb就可以了,可以置為NULL;
errorcb用于處理一些錯誤資訊。
針對這些使用過程進入源碼進行分析:
1. bufferevent_socket_new
(1)在bufferevent_init_common中調用evbuffer_new()初始化input和output
(2)在event_assign中初始化bufferevent中的ev_read和ev_write事件。
(3)在evbuffer_add_cb中給output添加了一個callback bufferevent_socket_outbuf_cb
2. bufferevent_setcb
該函數的作用主要是指派,把該函數後面的參數,指派給第一個參數struct bufferevent *bufev定義的變量
3. bufferevent_enable
調用event_add将讀寫事件加入到事件監聽隊列中。
對bufferevent常用的幾個函數進行分析:
[cpp] view plain copy
- char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);
含義:Read a single line from an evbuffer.
傳回值:讀到的一行内容
[cpp] view plain copy
- int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);
含義:将資料添加到evbuffer的結尾
傳回值:成功傳回0,失敗傳回-1
[cpp] view plain copy
- int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);
含義:從evbuffer讀取資料到data
傳回值:成功傳回0,失敗傳回-1
[cpp] view plain copy
- size_t evbuffer_get_length(const structevbuffer *buf);
含義:傳回evbuffer中存儲的位元組長度
暫時先分析到這裡,下面是代碼,用戶端發送消息:HTTP/1.0, Client 0 send Message:
Request: Hello Server! over,服務端一條消息收完成後,會回複:Response ok! Hello Client!
服務端從bufferevent中取出消息是按行取的。代碼可能有不完善的地方,由于才疏學淺,研究時間短(3天),希望高手提出寶貴意見。
libevent_eventbuffer_server.c
[cpp] view plain copy
- #include <netinet/in.h>
- #include <sys/socket.h>
- #include <fcntl.h>
- #include <event2/event.h>
- #include <event2/buffer.h>
- #include <event2/bufferevent.h>
- #include <assert.h>
- #include <unistd.h>
- #include <string.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <errno.h>
- void do_read(evutil_socket_t fd, short events, void *arg);
- //struct bufferevent内建了兩個event(read/write)和對應的緩沖區(struct evbuffer *input, *output),并提供相應的函數用來操作>
- 緩沖區(或者直接操作bufferevent)
- //接收到資料後,判斷是不一樣一條消息的結束,結束标志為"over"字元串
- void readcb(struct bufferevent *bev, void *ctx)
- {
- printf("called readcb!\n");
- struct evbuffer *input, *output;
- char *request_line;
- size_t len;
- input = bufferevent_get_input(bev);//其實就是取出bufferevent中的input
- output = bufferevent_get_output(bev);//其實就是取出bufferevent中的output
- size_t input_len = evbuffer_get_length(input);
- printf("input_len: %d\n", input_len);
- size_t output_len = evbuffer_get_length(output);
- printf("output_len: %d\n", output_len);
- while(1)
- {
- request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//從evbuffer前面取出一行,用一個新配置設定的空字元結束
- 的字元串傳回這一行,EVBUFFER_EOL_CRLF表示行尾是一個可選的回車,後随一個換行符
- if(NULL == request_line)
- {
- printf("The first line has not arrived yet.\n");
- free(request_line);//之是以要進行free是因為 line = mm_malloc(n_to_copy+1)),在這裡進行了malloc
- break;
- }
- else
- <span style="white-space: pre;"> </span>{
- printf("Get one line date: %s\n", request_line);
- if(strstr(request_line, "over") != NULL)//用于判斷是不是一條消息的結束
- {
- char *response = "Response ok! Hello Client!\r\n";
- evbuffer_add(output, response, strlen(response));//Adds data to an event buffer
- printf("服務端接收一條資料完成,回複用戶端一條消息: %s\n", response);
- free(request_line);
- break;
- }
- }
- free(request_line);
- }
- size_t input_len1 = evbuffer_get_length(input);
- printf("input_len1: %d\n", input_len1);
- size_t output_len1 = evbuffer_get_length(output);
- printf("output_len1: %d\n\n", output_len1);
- }
- void errorcb(struct bufferevent *bev, short error, void *ctx)
- {
- if (error & BEV_EVENT_EOF)
- {
- printf("connection closed\n");
- }
- else if (error & BEV_EVENT_ERROR)
- {
- printf("some other error\n");
- }
- else if (error & BEV_EVENT_TIMEOUT)
- {
- printf("Timed out\n");
- }
- bufferevent_free(bev);
- }
- void do_accept(evutil_socket_t listener, short event, void *arg)
- {
- struct event_base *base = arg;
- struct sockaddr_storage ss;
- socklen_t slen = sizeof(ss);
- int fd = accept(listener, (struct sockaddr*)&ss, &slen);
- if (fd < 0)
- {
- perror("accept");
- }
- else if (fd > FD_SETSIZE)
- {
- close(fd);
- }
- else
- {
- struct bufferevent *bev;
- evutil_make_socket_nonblocking(fd);
- //使用bufferevent_socket_new建立一個struct bufferevent *bev,關聯該sockfd,托管給event_base
- BEV_OPT_CLOSE_ON_FREE表示釋放bufferevent時關閉底層傳輸端口。這将關閉底層套接字,釋放底層bufferevent等。
- bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
- //設定讀寫對應的回調函數
- bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
- // bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
- //啟用讀寫事件,其實是調用了event_add将相應讀寫事件加入事件監聽隊列poll。正如文檔所說,如果相應事件不置為true,buf
- ferevent是不會讀寫資料的
- bufferevent_enable(bev, EV_READ|EV_WRITE);
- }
- }
- void run(void)
- {
- evutil_socket_t listener;
- struct sockaddr_in sin;
- struct event_base *base;
- struct event *listener_event;
- base = event_base_new();
- if (!base)
- return;
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = 0;
- sin.sin_port = htons(8000);
- listener = socket(AF_INET, SOCK_STREAM, 0);
- evutil_make_socket_nonblocking(listener);
- #ifndef WIN32
- {
- int one = 1;
- setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
- }
- #endif
- if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)
- {
- perror("bind");
- return;
- }
- if (listen(listener, 16)<0)
- {
- perror("listen");
- return;
- }
- listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
- event_add(listener_event, NULL);
- event_base_dispatch(base);
- }
- int main(int argc, char **argv)
- {
- setvbuf(stdout, NULL, _IONBF, 0);
- run();
- return 0;
- }
編譯:gcc -I/usr/include-o test libevent_eventbuffer_server.c -L/usr/local/lib –levent
運作:
服務端:

用戶端:
今晚部落格暫時寫完了,時間比較倉促,錯誤估計不會少,關于bufferevent很多API都還不是很熟悉,還有libevent 添加事件event_add是非線程安全的,如果使用多線程,需要保證event_add不能出現在多個線程中,以後有時間慢慢研究。
體會:關于源碼,還需要好好研究,其實今晚挺郁悶的,弄了半天,沒有什麼進展,現在自己還有很多疑問,主要是自己太急了,不過3天時間做到基本了解,自己還算滿意,下一步有時間多研究下吧。晚安,北京
如是轉載,請指明原出處: http://blog.csdn.net/feitianxuxue ,謝謝合作!