出處:http://blog.csdn.net/luotuo44/article/details/39344743
和之前的《Libevent工作流程探究》一樣,這裡也是用一個例子來探究bufferevent的工作流程。具體的例子可以參考《Libevent使用例子,從簡單到複雜》,這裡就不列出了。其實要做的例子也就是bufferevent_socket_new、bufferevent_setcb、bufferevent_enable這幾個函數。
因為本文會用到《 Libevent工作流程探究 》中提到的說法,比如将一個event插入到event_base中。是以讀者最好先讀一下那篇博文。此外,因為bufferevent結構體本身會使用evbuffer結構體和還會調用相應的一些操作,是以讀者還應該先閱讀《 evbuffer結構與基本操作 》和《 更多evbuffer操作函數 》。
bufferevent結構體:
bufferevent其實也就是在event_base的基礎上再進行一層封裝,其本質還是離不開event和event_base,從bufferevent的結構體就可以看到這一點。
bufferevent結構體中有兩個event,分别用來監聽同一個fd的可讀事件和可寫事件。為什麼不用一個event同時監聽可讀和可寫呢?這是因為監聽可寫是困難的,下面會說到原因。讀者也可以自問一下,自己之前有沒有試過用最原始的event監聽一個fd的可寫。
由于socket 是全雙工的,是以在bufferevent結構體中,也有兩個evbuffer成員,分别是讀緩沖區和寫緩沖區。 bufferevent結構體定義如下:
[cpp] view plain copy

- //bufferevent_struct.h檔案
- struct bufferevent {
- struct event_base *ev_base;
- //操作結構體,成員有一些函數指針。類似struct eventop結構體
- const struct bufferevent_ops *be_ops;
- struct event ev_read;//讀事件event
- struct event ev_write;//寫事件event
- struct evbuffer *input;//讀緩沖區
- struct evbuffer *output; //寫緩沖區
- struct event_watermark wm_read;//讀水位
- struct event_watermark wm_write;//寫水位
- bufferevent_data_cb readcb;//可讀時的回調函數指針
- bufferevent_data_cb writecb;//可寫時的回調函數指針
- bufferevent_event_cb errorcb;//錯誤發生時的回調函數指針
- void *cbarg;//回調函數的參數
- struct timeval timeout_read;//讀事件event的逾時值
- struct timeval timeout_write;//寫事件event的逾時值
- short enabled;
- };
如果看過Libevent的參考手冊的話,應該還會知道bufferevent除了用于socket外,還可以用于socketpair 和 filter。如果用面向對象的思維,應從這個三個應用中抽出相同的部分作為父類,然後派生出三個子類。
Libevent雖然是用C語言寫的,不過它還是提取出一些公共部分,然後定義一個bufferevent_private結構體,用于儲存這些公共部分成員。從集合的角度來說,bufferevent_private應該是bufferevent的一個子集,即一部分。但在Libevent中,bufferevent确實bufferevent_private的一個成員。下面是bufferevent_private結構體。
[cpp] view plain copy

- //bufferevent-internal.h檔案
- struct bufferevent_private {
- struct bufferevent bev;
- //設定input evbuffer的高水位時,需要一個evbuffer回調函數配合工作
- struct evbuffer_cb_entry *read_watermarks_cb;
- //鎖是Libevent自動配置設定的,還是使用者配置設定的
- unsigned own_lock : 1;
- ...
- //這個socket是否處理正在連接配接伺服器狀态
- unsigned connecting : 1;
- //标志連接配接被拒絕
- unsigned connection_refused : 1;
- //标志是什麼原因把 讀 挂起來
- bufferevent_suspend_flags read_suspended;
- //标志是什麼原因把 寫 挂起來
- bufferevent_suspend_flags write_suspended;
- enum bufferevent_options options;
- int refcnt;// bufferevent的引用計數
- //鎖變量
- void *lock;
- };
建立一個bufferevent:
函數bufferevent_socket_new可以完成這個工作。
[cpp] view plain copy

- //bufferevent-internal.h檔案
- struct bufferevent_ops {
- const char *type;//類型名稱
- off_t mem_offset;//成員bev的偏移量
- //啟動。将event加入到event_base中
- int (*enable)(struct bufferevent *, short);
- //關閉。将event從event_base中删除
- int (*disable)(struct bufferevent *, short);
- //銷毀
- void (*destruct)(struct bufferevent *);
- //調整event的逾時值
- int (*adj_timeouts)(struct bufferevent *);
- int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
- //擷取成員的值。具體看實作
- int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
- };
- //bufferevent_sock.c檔案
- const struct bufferevent_ops bufferevent_ops_socket = {
- "socket",
- evutil_offsetof(struct bufferevent_private, bev),
- be_socket_enable,
- be_socket_disable,
- be_socket_destruct,
- be_socket_adj_timeouts,
- be_socket_flush,
- be_socket_ctrl,
- };
- //由于有幾個不同類型的bufferevent,而且它們的enable、disable等操作是不同的。是以
- //需要的一些函數指針指明某個類型的bufferevent應該使用哪些操作函數。結構體bufferevent_ops_socket
- //就應運而生。對于socket,其操作函數如上。
- //bufferevent_sock.c檔案
- struct bufferevent *
- bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
- int options)
- {
- struct bufferevent_private *bufev_p;
- struct bufferevent *bufev;
- ...//win32
- //結構體記憶體清零,所有成員都為0
- if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
- return NULL;
- //如果options中需要線程安全,那麼就會申請鎖
- //會建立一個輸入和輸出緩存區
- if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
- options) < 0) {
- mm_free(bufev_p);
- return NULL;
- }
- bufev = &bufev_p->bev;
- //設定将evbuffer的資料向fd傳
- evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
- //将fd與event相關聯。同一個fd關聯兩個event
- event_assign(&bufev->ev_read, bufev->ev_base, fd,
- EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
- event_assign(&bufev->ev_write, bufev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
- //設定evbuffer的回調函數,使得外界給寫緩沖區添加資料時,能觸發
- //寫操作,這個回調對于寫事件的監聽是很重要的
- evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
- //當機讀緩沖區的尾部,未解凍之前不能往讀緩沖區追加資料
- //也就是說不能從socket fd中讀取資料
- evbuffer_freeze(bufev->input, 0);
- //當機寫緩沖區的頭部,未解凍之前不能把寫緩沖區的頭部資料删除
- //也就是說不能把資料寫到socket fd
- evbuffer_freeze(bufev->output, 1);
- return bufev;
- }
留意函數裡面的evbuffer_add_cb調用,後面會說到。
函數在最後面會當機兩個緩沖區。其實,雖然這裡當機了,但實際上Libevent在讀資料或者寫資料之前會解凍的讀完或者寫完資料後,又會馬上當機。這主要防止資料被意外修改。使用者一般不會直接調用evbuffer_freeze或者evbuffer_unfreeze函數。一切的當機和解凍操作都由Libevent内部完成。還有一點要注意,因為這裡隻是把寫緩沖區的頭部當機了。是以還是可以往寫緩沖區的尾部追加資料。同樣,此時也是可以從讀緩沖區讀取資料。這個是必須的。因為在Libevent内部不解凍的時候,使用者需要從讀緩沖區中擷取資料(這相當于從socket fd中讀取資料),使用者也需要把資料寫到寫緩沖區中(這相當于把資料寫入到socket fd中)。
在bufferevent_socket_new函數裡面會調用函數bufferevent_init_common完成公有部分的初始化。
[cpp] view plain copy

- //bufferevent.c檔案
- int
- bufferevent_init_common(struct bufferevent_private *bufev_private,
- struct event_base *base,
- const struct bufferevent_ops *ops,
- enum bufferevent_options options)
- {
- struct bufferevent *bufev = &bufev_private->bev;
- //配置設定輸入緩沖區
- if (!bufev->input) {
- if ((bufev->input = evbuffer_new()) == NULL)
- return -1;
- }
- //配置設定輸出緩沖區
- if (!bufev->output) {
- if ((bufev->output = evbuffer_new()) == NULL) {
- evbuffer_free(bufev->input);
- return -1;
- }
- }
- bufev_private->refcnt = 1;//引用次數為1
- bufev->ev_base = base;
- //預設情況下,讀和寫event都是不支援逾時的
- evutil_timerclear(&bufev->timeout_read);
- evutil_timerclear(&bufev->timeout_write);
- bufev->be_ops = ops;
- //可寫是預設支援的
- bufev->enabled = EV_WRITE;
- #ifndef _EVENT_DISABLE_THREAD_SUPPORT
- if (options & BEV_OPT_THREADSAFE) {
- //申請鎖。
- if (bufferevent_enable_locking(bufev, NULL) < 0) {
- evbuffer_free(bufev->input);
- evbuffer_free(bufev->output);
- bufev->input = NULL;
- bufev->output = NULL;
- return -1;
- }
- }
- #endif
- ...//延遲調用的初始化,一般不需要用到
- bufev_private->options = options;
- //将evbuffer和bufferevent相關聯
- evbuffer_set_parent(bufev->input, bufev);
- evbuffer_set_parent(bufev->output, bufev);
- return 0;
- }
代碼中可以看到,預設是enable EV_WRITE的。
設定回調函數:
函數bufferevent_setcb完成這個工作。該函數相當簡單,也就是進行一些指派操作。
[cpp] view plain copy

- //bufferevent.c檔案
- void
- bufferevent_setcb(struct bufferevent *bufev,
- bufferevent_data_cb readcb, bufferevent_data_cb writecb,
- bufferevent_event_cb eventcb, void *cbarg)
- {
- //bufferevent結構體内部有一個鎖變量
- BEV_LOCK(bufev);
- bufev->readcb = readcb;
- bufev->writecb = writecb;
- bufev->errorcb = eventcb;
- bufev->cbarg = cbarg;
- BEV_UNLOCK(bufev);
- }
如果不想設定某個操作的回調函數,直接設定為NULL即可。
令bufferevent可以工作:
相信讀者也知道,即使調用了bufferevent_socket_new和bufferevent_setcb,這個bufferevent還是不能工作,必須調用bufferevent_enable。為什麼會這樣的呢?
如果看過之前的那些博文,相信讀者知道,一個event能夠工作,不僅僅需要new出來,還要調用event_add函數,把這個event添加到event_base中。在本文前面的代碼中,并沒有看到event_add函數的調用。是以還需要調用一個函數,把event添加到event_base中。函數bufferevent_enable就是完成這個工作的。
[cpp] view plain copy

- //bufferevent.c檔案
- int
- bufferevent_enable(struct bufferevent *bufev, short event)
- {
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- short impl_events = event;
- int r = 0;
- //增加引用并加鎖
- //增加引用是為了防止其他線程調用bufferevent_free,釋放了bufferevent
- _bufferevent_incref_and_lock(bufev);
- //挂起了讀,此時不能監聽讀事件
- if (bufev_private->read_suspended)
- impl_events &= ~EV_READ;
- //挂起了寫,此時不能監聽寫事情
- if (bufev_private->write_suspended)
- impl_events &= ~EV_WRITE;
- bufev->enabled |= event;
- //調用對應類型的enbale函數。因為不同類型的bufferevent有不同的enable函數
- if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
- r = -1;
- //減少引用并解鎖
- _bufferevent_decref_and_unlock(bufev);
- return r;
- }
上面代碼可以看到,最終會調用對應bufferevent類型的enable函數,對于socket bufferevent,其enable函數是be_socket_enable,代碼如下:
[cpp] view plain copy

- //bufferevent.c檔案
- int
- _bufferevent_add_event(struct event *ev, const struct timeval *tv)
- {
- if (tv->tv_sec == 0 && tv->tv_usec == 0)
- return event_add(ev, NULL);
- else
- return event_add(ev, tv);
- }
- //bufferevent_sock.c檔案
- #define be_socket_add(ev, t) \
- _bufferevent_add_event((ev), (t))
- static int
- be_socket_enable(struct bufferevent *bufev, short event)
- {
- if (event & EV_READ) {
- if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
- return -1;
- }
- if (event & EV_WRITE) {
- if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
- return -1;
- }
- return 0;
- }
如果讀者熟悉Libevent的逾時事件,那麼可以知道Libevent是在event_add函數裡面确定一個event的逾時的。上面代碼也展示了這一點,如果讀或者寫event設定了逾時(即其逾時值不為0),那麼就會作為參數傳給event_add函數。如果讀者不熟悉的Libevent的逾時事件的話,可以參考《逾時event的處理》。
使用者可以調用函數bufferevent_set_timeouts,設定讀或者寫事件的逾時。代碼如下:
[cpp] view plain copy

- //bufferevent.c檔案
- int
- bufferevent_set_timeouts(struct bufferevent *bufev,
- const struct timeval *tv_read,
- const struct timeval *tv_write)
- {
- int r = 0;
- BEV_LOCK(bufev);
- if (tv_read) {
- bufev->timeout_read = *tv_read;
- } else {
- evutil_timerclear(&bufev->timeout_read);
- }
- if (tv_write) {
- bufev->timeout_write = *tv_write;
- } else {
- evutil_timerclear(&bufev->timeout_write);
- }
- if (bufev->be_ops->adj_timeouts)
- r = bufev->be_ops->adj_timeouts(bufev);
- BEV_UNLOCK(bufev);
- return r;
- }
- //bufferevent_sock.c檔案
- static int
- be_socket_adj_timeouts(struct bufferevent *bufev)
- {
- int r = 0;
- //使用者監聽了讀事件
- if (event_pending(&bufev->ev_read, EV_READ, NULL))
- if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
- r = -1;
- //使用者監聽了寫事件
- if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
- if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
- r = -1;
- }
- return r;
- }
從上面代碼可以看到:使用者不僅僅可以設定逾時值,還可以修改逾時值,也是通過這個函數進行修的。當然也是可以删除逾時的,直接把逾時參數設定成NULL即可。
至此,已經完成了bufferevent的初始化工作,隻需調用event_base_dispatch函數,啟動發動機就可以工作了。
處理讀事件:
接下來的任務:底層的socket fd接收資料後,bufferevent是怎麼工作的。
讀事件的水位:
在講解讀事件之前,先來看一下水位問題,函數bufferevent_setwatermark可以設定讀和寫的水位。這裡隻講解讀事件的水位。
水位有兩個:低水位和高水位。
低水位比較容易懂,就是當可讀的資料量到達這個低水位後,才會調用使用者設定的回調函數。比如使用者想每次讀取100位元組,那麼就可以把低水位設定為100。當可讀資料的位元組數小于100時,即使有資料都不會打擾使用者(即不會調用使用者設定的回調函數)。可讀資料大于等于100位元組後,才會調用使用者的回調函數。
高水位是什麼呢?其實,這和使用者的回調函數沒有關系。它的意義是:把讀事件的evbuffer的資料量限制在高水位之下。比如,使用者認為讀緩沖區不能太大(太大的話,連結清單會很長)。那麼使用者就會設定讀事件的高水位。當讀緩沖區的資料量達到這個高水位後,即使socket fd還有資料沒有讀,也不會讀進這個讀緩沖區裡面。一句話說,就是控制evbuffer的大小。
雖然控制了evbuffer的大小,但socket fd可能還有資料。有資料就會觸發可讀事件,但處理可讀的時候,又會發現設定了高水位,不能讀取資料evbuffer。socket fd的資料沒有被讀完,又觸發……。這個貌似是一個死循環。實際上是不會出現這個死循環的,因為Libevent發現evbuffer的資料量到達高水位後,就會把可讀事件給挂起來,讓它不能再觸發了。Libevent使用函數bufferevent_wm_suspend_read把監聽讀事件的event挂起來。下面看一下Libevent是怎麼把一個event挂起來的。
[cpp] view plain copy

- //bufferevent-internal.h檔案
- #define bufferevent_wm_suspend_read(b) \
- bufferevent_suspend_read((b), BEV_SUSPEND_WM)
- //bufferevent.c檔案
- void
- bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
- {
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- BEV_LOCK(bufev);
- if (!bufev_private->read_suspended)//不能挂多次
- bufev->be_ops->disable(bufev, EV_READ);//實際調用be_socket_disable函數
- bufev_private->read_suspended |= what;//因何而被挂起
- BEV_UNLOCK(bufev);
- }
- //bufferevent_sock.c檔案
- static int
- be_socket_disable(struct bufferevent *bufev, short event)
- {
- struct bufferevent_private *bufev_p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- if (event & EV_READ) {
- if (event_del(&bufev->ev_read) == -1)
- return -1;
- }
- if ((event & EV_WRITE) && ! bufev_p->connecting) {
- if (event_del(&bufev->ev_write) == -1)//删掉這個event
- return -1;
- }
- return 0;
- }
居然是直接删除這個監聽讀事件的event,真的是挂了!!!
看來不能随便設定高水位,因為它會暫停讀。如果隻想設定低水位而不想設定高水位,那麼在調用bufferevent_setwatermark函數時,高水位的參數設為0即可。
那麼什麼時候取消挂起,讓bufferevent可以繼續讀socket 資料呢?從高水位的意義來說,當然是當evbuffer裡面的資料量小于高水位時,就能再次讀取socket資料了。現在來看一下Libevent是怎麼恢複讀的。看一下設定水位的函數bufferevent_setwatermark吧,它進行了一些為高水位埋下了一個回調函數。對,就是evbuffer的回調函數。前一篇博文說到,當evbuffer裡面的資料添加或者删除時,是會觸發一些回調函數的。當使用者移除evbuffer的一些資料量時,Libevent就會檢查這個evbuffer的資料量是否小于高水位,如果小于的話,那麼就恢複 讀事件。
不說這麼多了,上代碼。
[cpp] view plain copy

- //bufferevent.c檔案
- void
- bufferevent_setwatermark(struct bufferevent *bufev, short events,
- size_t lowmark, size_t highmark)
- {
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- BEV_LOCK(bufev);
- if (events & EV_READ) {
- bufev->wm_read.low = lowmark;
- bufev->wm_read.high = highmark;
- if (highmark) {//高水位
- //還沒設定高水位的回調函數
- if (bufev_private->read_watermarks_cb == NULL) {
- bufev_private->read_watermarks_cb =
- evbuffer_add_cb(bufev->input,
- bufferevent_inbuf_wm_cb,
- bufev);//添加回調函數
- }
- evbuffer_cb_set_flags(bufev->input,
- bufev_private->read_watermarks_cb,
- EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
- //設定(修改)高水位時,evbuffer的資料量已經超過了水位值
- //可能是把之前的高水位調高或者調低
- //挂起操作和取消挂起操作都是幂等的(即多次挂起的作用等同于挂起一次)
- if (evbuffer_get_length(bufev->input) > highmark)
- bufferevent_wm_suspend_read(bufev);
- else if (evbuffer_get_length(bufev->input) < highmark)//調低了
- bufferevent_wm_unsuspend_read(bufev);
- } else {
- //高水位值等于0,那麼就要取消挂起 讀事件
- //取消挂起操作是幂等的
- if (bufev_private->read_watermarks_cb)
- evbuffer_cb_clear_flags(bufev->input,
- bufev_private->read_watermarks_cb,
- EVBUFFER_CB_ENABLED);
- bufferevent_wm_unsuspend_read(bufev);
- }
- }
- BEV_UNLOCK(bufev);
- }
這個函數,不僅僅為高水位設定回調函數,還會檢查目前evbuffer的資料量是否超過了高水位。因為這個設定水位函數可能是在bufferevent工作一段時間後才添加的,是以evbuffer是有可能已經有資料的了,是以需要檢查。如果超過了水位值,那麼就需要挂起讀。當然也存在另外一種可能:使用者之前設定過了一個比較大的高水位,挂起了讀。現在發現錯了,就把高水位調低一點,此時就需要恢複讀。
現在假設使用者移除了一些evbuffer的資料,進而觸發了evbuffer的回調函數,當然也就調用了函數bufferevent_inbuf_wm_cb。下面看一下這個函數是怎麼恢複讀的。
[cpp] view plain copy

- //bufferevent.c檔案
- static void
- bufferevent_inbuf_wm_cb(struct evbuffer *buf,
- const struct evbuffer_cb_info *cbinfo,
- void *arg)
- {
- struct bufferevent *bufev = arg;
- size_t size;
- size = evbuffer_get_length(buf);
- if (size >= bufev->wm_read.high)
- bufferevent_wm_suspend_read(bufev);
- else
- bufferevent_wm_unsuspend_read(bufev);
- }
- //bufferevent-internal.h檔案
- #define bufferevent_wm_unsuspend_read(b) \
- bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)
- //bufferevent.c檔案
- void
- bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
- {
- struct bufferevent_private *bufev_private =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- BEV_LOCK(bufev);
- bufev_private->read_suspended &= ~what;
- if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
- bufev->be_ops->enable(bufev, EV_READ);//重新把event插入到event_base中
- BEV_UNLOCK(bufev);
- }
因為使用者可以手動為這個evbuffer添加資料,此時也會調用bufferevent_inbuf_wm_cb函數。此時就要檢查evbuffer的資料量是否已經超過高水位了,而不能僅僅檢查是否低于高水位。
高水位導緻讀的挂起和之後讀的恢複,一切工作都是由Libevent内部完成的,使用者不用做任何工作。
從socket中讀取資料:
從前面的一系列博文可以知道,如果一個socket可讀了,那麼監聽可讀事件的event的回調函數就會被調用。這個回調函數是在bufferevent_socket_new函數中被Libevent内部設定的,設定為bufferevent_readcb函數,使用者并不知情。
當socket有資料可讀時,Libevent就會監聽到,然後調用bufferevent_readcb函數處理。該函數會調用evbuffer_read函數,把資料從socket fd中讀取到evbuffer中。然後再調用使用者在bufferevent_setcb函數中設定的讀事件回調函數。是以,當使用者的讀事件回調函數被調用時,資料已經在evbuffer中了,使用者拿來就用,無需調用read這類會阻塞的函數。
下面看一下bufferevent_readcb函數的具體實作。
[cpp] view plain copy

- static void
- bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
- {
- struct bufferevent *bufev = arg;
- struct bufferevent_private *bufev_p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- struct evbuffer *input;
- int res = 0;
- short what = BEV_EVENT_READING;
- ev_ssize_t howmuch = -1, readmax=-1;
- _bufferevent_incref_and_lock(bufev);
- if (event == EV_TIMEOUT) {
- what |= BEV_EVENT_TIMEOUT;
- goto error;
- }
- input = bufev->input;
- //使用者設定了高水位
- if (bufev->wm_read.high != 0) {
- howmuch = bufev->wm_read.high - evbuffer_get_length(input);
- if (howmuch <= 0) {
- bufferevent_wm_suspend_read(bufev);
- goto done;
- }
- }
- //因為使用者可以限速,是以這麼要檢測最大的可讀大小。
- //如果沒有限速的話,那麼将傳回16384位元組,即16K
- //預設情況下是沒有限速的。
- readmax = _bufferevent_get_read_max(bufev_p);
- if (howmuch < 0 || howmuch > readmax)
- howmuch = readmax;
- //一些原因導緻讀 被挂起,比如加鎖了。
- if (bufev_p->read_suspended)
- goto done;
- //解凍,使得可以在input的後面追加資料
- evbuffer_unfreeze(input, 0);
- res = evbuffer_read(input, fd, (int)howmuch); //從socket fd中讀取資料
- evbuffer_freeze(input, 0);//當機
- if (res == -1) {
- int err = evutil_socket_geterror(fd);
- if (EVUTIL_ERR_RW_RETRIABLE(err))//EINTER or EAGAIN
- goto reschedule;
- //不是 EINTER or EAGAIN 這兩個可以重試的錯誤,那麼就應該是其他緻命的錯誤
- //此時,應該報告給使用者
- what |= BEV_EVENT_ERROR;
- } else if (res == 0) {//斷開了連接配接
- what |= BEV_EVENT_EOF;
- }
- if (res <= 0)
- goto error;
- //速率相關的操作
- _bufferevent_decrement_read_buckets(bufev_p, res);
- //evbuffer的資料量大于低水位值。
- if (evbuffer_get_length(input) >= bufev->wm_read.low)
- _bufferevent_run_readcb(bufev);//調用使用者設定的回調函數
- goto done;
- reschedule:
- goto done;
- error:
- //把監聽可讀事件的event從event_base的事件隊列中删除掉.event_del
- bufferevent_disable(bufev, EV_READ);//會調用be_socket_disable函數
- _bufferevent_run_eventcb(bufev, what);//會調用使用者設定的錯誤處理函數
- done:
- _bufferevent_decref_and_unlock(bufev);
- }
細心的讀者可能會發現:對使用者的讀事件回調函數的觸發是邊緣觸發的。這也就要求,在回調函數中,使用者應該盡可能地把evbuffer的所有資料都讀出來。如果想等到下一次回調時再讀,那麼需要等到下一次socketfd接收到資料才會觸發使用者的回調函數。如果之後socket fd一直收不到任何資料,那麼即使evbuffer還有資料,使用者的回調函數也不會被調用了。
處理寫事件:
對一個可讀事件進行監聽是比較容易的,但對于一個可寫事件進行監聽則比較困難。為什麼呢?因為可讀監聽是監聽fd的讀緩沖區是否有資料了,如果沒有資料那麼就一直等待。對于可寫,首先要明白“什麼是可寫”,可寫就是fd的寫緩沖區(這個緩沖區在核心)還沒滿,可以往裡面放資料。這就有一個問題,如果寫緩沖區沒有滿,那麼就一直是可寫狀态。如果一個event監聽了可寫事件,那麼這個event就會一直被觸發(死循環)。因為一般情況下,如果不是發大量的資料這個寫緩沖區是不會滿的。
也就是說,不能監聽可寫事件。但我們确實要往fd中寫資料,那怎麼辦?Libevent的做法是:當我們确實要寫入資料時,才監聽可寫事件。也就是說我們調用bufferevent_write寫入資料時,Libevent才會把監聽可寫事件的那個event注冊到event_base中。當Libevent把資料都寫入到fd的緩沖區後,Libevent又會把這個event從event_base中删除。比較煩瑣。
bufferevent_writecb函數不僅僅要處理上面說到的那個問題,還要處理另外一個坑爹的問題。那就是:判斷socket fd是不是已經連接配接上伺服器了。這是因為這個socket fd是非阻塞的,是以它調用connect時,可能還沒連接配接上就傳回了。對于非阻塞socket fd,一般是通過判斷這個socket是否可寫,進而得知這個socket是否已經連接配接上伺服器。如果可寫,那麼它就已經成功連接配接上伺服器了。這個問題,這裡先提一下,後面會詳細講。
同前面的監聽可讀一樣,Libevent是在bufferevent_socket_new函數設定可寫的回調函數,為bufferevent_writecb。
[cpp] view plain copy

- //bufferevent_sock.c檔案
- static void
- bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
- {
- struct bufferevent *bufev = arg;
- struct bufferevent_private *bufev_p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- int res = 0;
- short what = BEV_EVENT_WRITING;
- int connected = 0;
- ev_ssize_t atmost = -1;
- _bufferevent_incref_and_lock(bufev);
- if (event == EV_TIMEOUT) {
- what |= BEV_EVENT_TIMEOUT;
- goto error;
- }
- ...//判斷這個socket是否已經連接配接上伺服器了
- //使用者可能設定了限速,如果沒有限速,那麼atmost将傳回16384(16K)
- atmost = _bufferevent_get_write_max(bufev_p);
- //一些原因導緻寫被挂起來了
- if (bufev_p->write_suspended)
- goto done;
- //如果evbuffer有資料可以寫到sockfd中
- if (evbuffer_get_length(bufev->output)) {
- //解凍連結清單頭
- evbuffer_unfreeze(bufev->output, 1);
- //将output這個evbuffer的資料寫到socket fd 的緩沖區中
- //會把已經寫到socket fd緩沖區的資料,從evbuffer中删除
- res = evbuffer_write_atmost(bufev->output, fd, atmost);
- evbuffer_freeze(bufev->output, 1);
- if (res == -1) {
- int err = evutil_socket_geterror(fd);
- if (EVUTIL_ERR_RW_RETRIABLE(err))//可以恢複的錯誤。一般是EINTR或者EAGAIN
- goto reschedule;
- what |= BEV_EVENT_ERROR;
- } else if (res == 0) {//該socket已經斷開連接配接了
- what |= BEV_EVENT_EOF;
- }
- if (res <= 0)
- goto error;
- }
- //如果把寫緩沖區的資料都寫完成了。為了防止event_base不斷地觸發可寫
- //事件,此時要把這個監聽可寫的event删除。
- //前面的atmost限制了一次最大的可寫資料。如果還沒寫所有的資料
- //那麼就不能delete這個event,而是要繼續監聽可寫事情,知道把所有的
- //資料都寫到socket fd中。
- if (evbuffer_get_length(bufev->output) == 0) {
- event_del(&bufev->ev_write);
- }
- //如果evbuffer裡面的資料量已經寫得七七八八了,小于設定的低水位值,那麼
- //就會調用使用者設定的寫事件回調函數
- if ((res || !connected) &&
- evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
- _bufferevent_run_writecb(bufev);
- }
- goto done;
- reschedule:
- if (evbuffer_get_length(bufev->output) == 0) {
- event_del(&bufev->ev_write);
- }
- goto done;
- error:
- bufferevent_disable(bufev, EV_WRITE);//有錯誤。把這個寫event删除
- _bufferevent_run_eventcb(bufev, what);
- done:
- _bufferevent_decref_and_unlock(bufev);
- }
上面代碼的邏輯比較清晰,調用evbuffer_write_atmost函數把資料從evbuffer中寫到evbuffer緩沖區中,此時要注意函數的傳回值,因為可能寫的時候發生錯誤。如果發生了錯誤,就要調用使用者設定的event回調函數(網上也有人稱其為錯誤處理函數)。
之後,還要判斷evbuffer的資料是否已經全部寫到socket 的緩沖區了。如果已經全部寫了,那麼就要把監聽寫事件的event從event_base的插入隊列中删除。如果還沒寫完,那麼就不能删除,因為還要繼續監聽可寫事件,下次接着寫。
現在來看一下,把監聽寫事件的event從event_base的插入隊列中删除後,如果下次使用者有資料要寫的時候,怎麼把這個event添加到event_base的插入隊列。
使用者一般是通過bufferevent_write函數把資料寫入到evbuffer(寫入evbuffer後,接着就會被寫入socket,是以調用bufferevent_write就相當于把資料寫入到socket。)。而這個bufferevent_write函數是直接調用evbuffer_add函數的。函數evbuffer_add沒有調用什麼可疑的函數,能夠把監聽可寫的event添加到event_base中。唯一的可能就是那個回調函數。對就是evbuffer的回調函數。關于evbuffer的回調函數,可以參考這裡。
[cpp] view plain copy

- //bufferevent.c檔案
- int
- bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
- {
- if (evbuffer_add(bufev->output, data, size) == -1)
- return (-1);
- return 0;
- }
- //buffer.c檔案
- int
- evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
- {
- ...
- out:
- evbuffer_invoke_callbacks(buf);//調用回調函數
- result = 0;
- done:
- return result;
- }
還記得本文前面的bufferevent_socket_new函數嗎?該函數裡面會有
[cpp] view plain copy

- evbuffer_add_cb(bufev->output,bufferevent_socket_outbuf_cb, bufev);
當bufferevent的寫緩沖區output的資料發生變化時,函數bufferevent_socket_outbuf_cb就會被調用。現在馬上飛到這個函數。
[cpp] view plain copy

- //bufferevent_sock.c檔案
- static void
- bufferevent_socket_outbuf_cb(struct evbuffer *buf,
- const struct evbuffer_cb_info *cbinfo,
- void *arg)
- {
- struct bufferevent *bufev = arg;
- struct bufferevent_private *bufev_p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- if (cbinfo->n_added && //evbuffer添加了資料
- (bufev->enabled & EV_WRITE) && //預設情況下是enable EV_WRITE的
- !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&//這個event已經被踢出event_base了
- !bufev_p->write_suspended) {//這個bufferevent的寫并沒有被挂起
- //把這個event添加到event_base中
- if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
- }
- }
- }
這個函數首先進行一些判斷,滿足條件後就會把這個監聽寫事件的event添加到event_base中。其中event_pending函數就是判斷這個bufev->ev_write是否已經被event_base删除了。關于event_pending,可以參考這裡。
對于bufferevent_write,初次使用該函數的讀者可能會有疑問:調用該函數後,參數data指向的記憶體空間能不能馬上釋放,還是要等到Libevent把data指向的資料都寫到socket 緩存區才能删除?其實,從前一篇博文可以看到,evbuffer_add是直接複制一份使用者要發送的資料到evbuffer緩存區的。是以,調用完bufferevent_write,就可以馬上釋放參數data指向的記憶體空間。
網上的關于Libevent的一些使用例子,包括我寫的《 Libevent使用例子,從簡單到複雜》,都是在主線程中調用bufferevent_write函數寫入資料的。從上面的分析可以得知,是可以馬上把監聽可寫事件的event添加到event_base中。如果是在次線程調用該函數寫入資料呢?此時,主線程可能還睡眠在poll、epoll這類的多路IO複用函數上。這種情況下能不能及時喚醒主線程呢?其實是可以的,隻要你的Libevent在一開始使用了線程功能。具體的分析過程可以參考《evthread_notify_base通知主線程》。上面代碼中的be_socket_add會調用event_add,而在次線程調用event_add就會調用evthread_notify_base通知主線程。
bufferevent_socket_connect:
使用者可以在調用bufferevent_socket_new函數時,傳一個-1作為socket的檔案描述符,然後調用bufferevent_socket_connect函數連接配接伺服器,無需自己寫代碼調用connect函數連接配接伺服器。
bufferevent_socket_connect函數會調用socket函數申請一個套接字fd,然後把這個fd設定成非阻塞的(這就導緻了一些坑爹的事情)。接着就connect伺服器,因為該socket fd是非阻塞的,是以不會等待,而是馬上傳回,連接配接這工作交給核心來完成。是以,傳回後這個socket還沒有真正連接配接上伺服器。那麼什麼時候連接配接上呢?核心又是怎麼通知通知使用者呢?
一般來說,當可以往socket fd寫東西了,那就說明已經連接配接上了。也就是說這個socket fd變成可寫狀态,就連接配接上了。
是以,對于“非阻塞connect”比較流行的做法是:用select或者poll這類多路IO複用函數監聽該socket的可寫事件。當這個socket觸發了可寫事件,然後再對這個socket調用getsockopt函數,做進一步的判斷。
Libevent也是這樣實作的,下面來看一下bufferevent_socket_connect函數。
[cpp] view plain copy

- //bufferevent_sock.c檔案
- int
- bufferevent_socket_connect(struct bufferevent *bev,
- struct sockaddr *sa, int socklen)
- {
- struct bufferevent_private *bufev_p =
- EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
- evutil_socket_t fd;
- int r = 0;
- int result=-1;
- int ownfd = 0;
- _bufferevent_incref_and_lock(bev);
- if (!bufev_p)
- goto done;
- fd = bufferevent_getfd(bev);
- if (fd < 0) {//該bufferevent還沒有設定fd
- if (!sa)
- goto done;
- fd = socket(sa->sa_family, SOCK_STREAM, 0);
- if (fd < 0)
- goto done;
- if (evutil_make_socket_nonblocking(fd)<0)//設定為非阻塞
- goto done;
- ownfd = 1;
- }
- if (sa) {
- r = evutil_socket_connect(&fd, sa, socklen);//非阻塞connect
- if (r < 0)
- goto freesock;
- }
- ...
- //為bufferevent裡面的兩個event設定監聽的fd
- //後面會調用bufferevent_enable
- bufferevent_setfd(bev, fd);
- if (r == 0) {//暫時還沒連接配接上,因為fd是非阻塞的
- //此時需要監聽可寫事件,當可寫了,并且沒有錯誤的話,就成功連接配接上了
- if (! be_socket_enable(bev, EV_WRITE)) {
- bufev_p->connecting = 1;//标志這個sockfd正在連接配接
- result = 0;
- goto done;
- }
- } else if (r == 1) {//已經連接配接上了
- result = 0;
- bufev_p->connecting = 1;
- event_active(&bev->ev_write, EV_WRITE, 1);//手動激活這個event
- } else {// connection refused
- bufev_p->connection_refused = 1;
- bufev_p->connecting = 1;
- result = 0;
- event_active(&bev->ev_write, EV_WRITE, 1);//手動激活這個event
- }
- goto done;
- freesock:
- _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);//出現錯誤
- if (ownfd)
- evutil_closesocket(fd);
- done:
- _bufferevent_decref_and_unlock(bev);
- return result;
- }
這個函數比較多錯誤處理的代碼,大緻看一下就行了。有幾個地方要注意,即使connect的時候被拒絕,或者已經連接配接上了,都會手動激活這個event。一個event即使沒有加入event_base,也是可以手動激活的。具體原理參考這裡。
無論是手動激活event,或者監聽到這個event可寫了,都是會調用bufferevent_writecb函數。現在再次看一下該函數,隻看connect部分。
[cpp] view plain copy

- //bufferevent_sock.c檔案
- static void
- bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
- {
- struct bufferevent_private *bufev_p =
- EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- int connected = 0;
- _bufferevent_incref_and_lock(bufev);
- ...
- //正在連接配接。因為這個sockfd可能是非阻塞的,是以可能之前的connect還沒
- //連接配接上。而判斷該sockfd是否成功連接配接上了的一個方法是判斷這個sockfd是否可寫
- if (bufev_p->connecting) {
- //c等于1,說明已經連接配接成功
- //c等于0,說明還沒連接配接上
- //c等于-1,說明發生錯誤
- int c = evutil_socket_finished_connecting(fd);
- if (bufev_p->connection_refused) {//在bufferevent_socket_connect中被設定
- bufev_p->connection_refused = 0;
- c = -1;
- }
- if (c == 0)//還沒連接配接上,繼續監聽可寫吧
- goto done;
- //錯誤,或者已經連接配接上了
- bufev_p->connecting = 0;//修改标志值
- if (c < 0) {//錯誤
- event_del(&bufev->ev_write);
- event_del(&bufev->ev_read);
- _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
- goto done;
- } else {//連接配接上了。
- connected = 1;
- ...//win32
- //居然會調用使用者設定的錯誤處理函數。太神奇了
- _bufferevent_run_eventcb(bufev,
- BEV_EVENT_CONNECTED);
- if (!(bufev->enabled & EV_WRITE) || //預設都是enable EV_WRITE的
- bufev_p->write_suspended) {
- event_del(&bufev->ev_write);//不再需要監聽可寫。因為已經連接配接上了
- goto done;
- }
- }
- }
- ...
- done:
- _bufferevent_decref_and_unlock(bufev);
- }
可以看到無論是connect被拒絕、發生錯誤或者連接配接上了,都在這裡做統一的處理。
如果已經連接配接上了,那麼會調用使用者設定event回調函數(網上也稱之為錯誤處理函數),通知使用者已經連接配接上了。并且,還會把監聽可寫事件的event從event_base中删除,其理由在前面已經說過了。
函數evutil_socket_finished_connecting會檢查這個socket,進而得知這個socket是處于什麼狀态。在bufferevent_socket_connect函數中,出現的一些錯誤,比如被拒絕,也是能通過這個函數檢查出來的。是以可以在這裡做統一的處理。該函數的内部是使用。貼一下這個函數的代碼吧。
[cpp] view plain copy

- //evutil.c檔案
- //Return 1 for connected, 0 for not yet, -1 for error.
- int
- evutil_socket_finished_connecting(evutil_socket_t fd)
- {
- int e;
- ev_socklen_t elen = sizeof(e);
- //用來檢測這個fd是否已經連接配接上了,這個fd是非阻塞的
- //如果e的值被設為0,那麼就說明連接配接上了。
- //否則e被設定為對應的錯誤值。
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&e, &elen) < 0)
- return -1;
- if (e) {
- if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//還沒連接配接上
- return 0;
- EVUTIL_SET_SOCKET_ERROR(e);
- return -1;
- }
- return 1;
- }