天天看點

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:

出處:http://blog.csdn.net/luotuo44/article/details/39344743

        和之前的《Libevent工作流程探究》一樣,這裡也是用一個例子來探究bufferevent的工作流程。具體的例子可以參考《Libevent使用例子,從簡單到複雜》,這裡就不列出了。其實要做的例子也就是bufferevent_socket_new、bufferevent_setcb、bufferevent_enable這幾個函數。

        因為本文會用到《 Libevent工作流程探究 》中提到的說法,比如将一個event插入到event_base中。是以讀者最好先讀一下那篇博文。此外,因為bufferevent結構體本身會使用evbuffer結構體和還會調用相應的一些操作,是以讀者還應該先閱讀《 evbuffer結構與基本操作 》和《 更多evbuffer操作函數 》。

bufferevent結構體:

        bufferevent其實也就是在event_base的基礎上再進行一層封裝,其本質還是離不開event和event_base,從bufferevent的結構體就可以看到這一點。

        bufferevent結構體中有兩個event,分别用來監聽同一個fd的可讀事件和可寫事件。為什麼不用一個event同時監聽可讀和可寫呢?這是因為監聽可寫是困難的,下面會說到原因。讀者也可以自問一下,自己之前有沒有試過用最原始的event監聽一個fd的可寫。

        由于socket 是全雙工的,是以在bufferevent結構體中,也有兩個evbuffer成員,分别是讀緩沖區和寫緩沖區。 bufferevent結構體定義如下:

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent_struct.h檔案  
  2. struct bufferevent {  
  3.     struct event_base *ev_base;  
  4.     //操作結構體,成員有一些函數指針。類似struct eventop結構體  
  5.     const struct bufferevent_ops *be_ops;  
  6.     struct event ev_read;//讀事件event  
  7.     struct event ev_write;//寫事件event  
  8.     struct evbuffer *input;//讀緩沖區  
  9.     struct evbuffer *output; //寫緩沖區  
  10.     struct event_watermark wm_read;//讀水位  
  11.     struct event_watermark wm_write;//寫水位  
  12.     bufferevent_data_cb readcb;//可讀時的回調函數指針  
  13.     bufferevent_data_cb writecb;//可寫時的回調函數指針  
  14.     bufferevent_event_cb errorcb;//錯誤發生時的回調函數指針  
  15.     void *cbarg;//回調函數的參數  
  16.     struct timeval timeout_read;//讀事件event的逾時值  
  17.     struct timeval timeout_write;//寫事件event的逾時值  
  18.     short enabled;  
  19. };  

        如果看過Libevent的參考手冊的話,應該還會知道bufferevent除了用于socket外,還可以用于socketpair 和 filter。如果用面向對象的思維,應從這個三個應用中抽出相同的部分作為父類,然後派生出三個子類。

        Libevent雖然是用C語言寫的,不過它還是提取出一些公共部分,然後定義一個bufferevent_private結構體,用于儲存這些公共部分成員。從集合的角度來說,bufferevent_private應該是bufferevent的一個子集,即一部分。但在Libevent中,bufferevent确實bufferevent_private的一個成員。下面是bufferevent_private結構體。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent-internal.h檔案  
  2. struct bufferevent_private {  
  3.     struct bufferevent bev;  
  4.     //設定input evbuffer的高水位時,需要一個evbuffer回調函數配合工作  
  5.     struct evbuffer_cb_entry *read_watermarks_cb;  
  6.     //鎖是Libevent自動配置設定的,還是使用者配置設定的  
  7.     unsigned own_lock : 1;  
  8.     ...  
  9.     //這個socket是否處理正在連接配接伺服器狀态  
  10.     unsigned connecting : 1;  
  11.     //标志連接配接被拒絕  
  12.     unsigned connection_refused : 1;  
  13.     //标志是什麼原因把 讀 挂起來  
  14.     bufferevent_suspend_flags read_suspended;  
  15.         //标志是什麼原因把 寫 挂起來  
  16.     bufferevent_suspend_flags write_suspended;  
  17.     enum bufferevent_options options;  
  18.         int refcnt;// bufferevent的引用計數  
  19.     //鎖變量  
  20.     void *lock;  
  21. };  

建立一個bufferevent:

        函數bufferevent_socket_new可以完成這個工作。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent-internal.h檔案  
  2. struct bufferevent_ops {  
  3.     const char *type;//類型名稱  
  4.     off_t mem_offset;//成員bev的偏移量  
  5.     //啟動。将event加入到event_base中  
  6.     int (*enable)(struct bufferevent *, short);  
  7.     //關閉。将event從event_base中删除  
  8.     int (*disable)(struct bufferevent *, short);  
  9.     //銷毀  
  10.     void (*destruct)(struct bufferevent *);  
  11.     //調整event的逾時值  
  12.     int (*adj_timeouts)(struct bufferevent *);  
  13.     int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);  
  14.     //擷取成員的值。具體看實作  
  15.     int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);  
  16. };  
  17. //bufferevent_sock.c檔案  
  18. const struct bufferevent_ops bufferevent_ops_socket = {  
  19.     "socket",  
  20.     evutil_offsetof(struct bufferevent_private, bev),  
  21.     be_socket_enable,  
  22.     be_socket_disable,  
  23.     be_socket_destruct,  
  24.     be_socket_adj_timeouts,  
  25.     be_socket_flush,  
  26.     be_socket_ctrl,  
  27. };  
  28. //由于有幾個不同類型的bufferevent,而且它們的enable、disable等操作是不同的。是以  
  29. //需要的一些函數指針指明某個類型的bufferevent應該使用哪些操作函數。結構體bufferevent_ops_socket  
  30. //就應運而生。對于socket,其操作函數如上。  
  31. //bufferevent_sock.c檔案  
  32. struct bufferevent *  
  33. bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,  
  34.     int options)  
  35. {  
  36.     struct bufferevent_private *bufev_p;  
  37.     struct bufferevent *bufev;  
  38.     ...//win32  
  39.     //結構體記憶體清零,所有成員都為0  
  40.     if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)  
  41.         return NULL;  
  42.     //如果options中需要線程安全,那麼就會申請鎖  
  43.     //會建立一個輸入和輸出緩存區  
  44.     if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,  
  45.                     options) < 0) {  
  46.         mm_free(bufev_p);  
  47.         return NULL;  
  48.     }  
  49.     bufev = &bufev_p->bev;  
  50.     //設定将evbuffer的資料向fd傳  
  51.     evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);  
  52.     //将fd與event相關聯。同一個fd關聯兩個event  
  53.     event_assign(&bufev->ev_read, bufev->ev_base, fd,  
  54.         EV_READ|EV_PERSIST, bufferevent_readcb, bufev);  
  55.     event_assign(&bufev->ev_write, bufev->ev_base, fd,  
  56.         EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);  
  57.     //設定evbuffer的回調函數,使得外界給寫緩沖區添加資料時,能觸發  
  58.     //寫操作,這個回調對于寫事件的監聽是很重要的  
  59.     evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);  
  60.     //當機讀緩沖區的尾部,未解凍之前不能往讀緩沖區追加資料  
  61.     //也就是說不能從socket fd中讀取資料  
  62.     evbuffer_freeze(bufev->input, 0);  
  63.     //當機寫緩沖區的頭部,未解凍之前不能把寫緩沖區的頭部資料删除  
  64.     //也就是說不能把資料寫到socket fd  
  65.     evbuffer_freeze(bufev->output, 1);  
  66.     return bufev;  
  67. }  

        留意函數裡面的evbuffer_add_cb調用,後面會說到。

        函數在最後面會當機兩個緩沖區。其實,雖然這裡當機了,但實際上Libevent在讀資料或者寫資料之前會解凍的讀完或者寫完資料後,又會馬上當機。這主要防止資料被意外修改。使用者一般不會直接調用evbuffer_freeze或者evbuffer_unfreeze函數。一切的當機和解凍操作都由Libevent内部完成。還有一點要注意,因為這裡隻是把寫緩沖區的頭部當機了。是以還是可以往寫緩沖區的尾部追加資料。同樣,此時也是可以從讀緩沖區讀取資料。這個是必須的。因為在Libevent内部不解凍的時候,使用者需要從讀緩沖區中擷取資料(這相當于從socket fd中讀取資料),使用者也需要把資料寫到寫緩沖區中(這相當于把資料寫入到socket fd中)。

        在bufferevent_socket_new函數裡面會調用函數bufferevent_init_common完成公有部分的初始化。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. int  
  3. bufferevent_init_common(struct bufferevent_private *bufev_private,  
  4.     struct event_base *base,  
  5.     const struct bufferevent_ops *ops,  
  6.     enum bufferevent_options options)  
  7. {  
  8.     struct bufferevent *bufev = &bufev_private->bev;  
  9.     //配置設定輸入緩沖區  
  10.     if (!bufev->input) {  
  11.         if ((bufev->input = evbuffer_new()) == NULL)  
  12.             return -1;  
  13.     }  
  14.     //配置設定輸出緩沖區  
  15.     if (!bufev->output) {  
  16.         if ((bufev->output = evbuffer_new()) == NULL) {  
  17.             evbuffer_free(bufev->input);  
  18.             return -1;  
  19.         }  
  20.     }  
  21.     bufev_private->refcnt = 1;//引用次數為1  
  22.     bufev->ev_base = base;  
  23.     //預設情況下,讀和寫event都是不支援逾時的  
  24.     evutil_timerclear(&bufev->timeout_read);  
  25.     evutil_timerclear(&bufev->timeout_write);  
  26.     bufev->be_ops = ops;  
  27.      //可寫是預設支援的  
  28.     bufev->enabled = EV_WRITE;  
  29. #ifndef _EVENT_DISABLE_THREAD_SUPPORT  
  30.     if (options & BEV_OPT_THREADSAFE) {  
  31.         //申請鎖。  
  32.         if (bufferevent_enable_locking(bufev, NULL) < 0) {  
  33.             evbuffer_free(bufev->input);  
  34.             evbuffer_free(bufev->output);  
  35.             bufev->input = NULL;  
  36.             bufev->output = NULL;  
  37.             return -1;  
  38.         }  
  39.     }  
  40. #endif  
  41.     ...//延遲調用的初始化,一般不需要用到  
  42.     bufev_private->options = options;  
  43.     //将evbuffer和bufferevent相關聯  
  44.     evbuffer_set_parent(bufev->input, bufev);  
  45.     evbuffer_set_parent(bufev->output, bufev);  
  46.     return 0;  
  47. }  

        代碼中可以看到,預設是enable  EV_WRITE的。

設定回調函數:

        函數bufferevent_setcb完成這個工作。該函數相當簡單,也就是進行一些指派操作。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. void  
  3. bufferevent_setcb(struct bufferevent *bufev,  
  4.     bufferevent_data_cb readcb, bufferevent_data_cb writecb,  
  5.     bufferevent_event_cb eventcb, void *cbarg)  
  6. {  
  7.     //bufferevent結構體内部有一個鎖變量  
  8.     BEV_LOCK(bufev);  
  9.     bufev->readcb = readcb;  
  10.     bufev->writecb = writecb;  
  11.     bufev->errorcb = eventcb;  
  12.     bufev->cbarg = cbarg;  
  13.     BEV_UNLOCK(bufev);  
  14. }  

        如果不想設定某個操作的回調函數,直接設定為NULL即可。

令bufferevent可以工作:

        相信讀者也知道,即使調用了bufferevent_socket_new和bufferevent_setcb,這個bufferevent還是不能工作,必須調用bufferevent_enable。為什麼會這樣的呢?

        如果看過之前的那些博文,相信讀者知道,一個event能夠工作,不僅僅需要new出來,還要調用event_add函數,把這個event添加到event_base中。在本文前面的代碼中,并沒有看到event_add函數的調用。是以還需要調用一個函數,把event添加到event_base中。函數bufferevent_enable就是完成這個工作的。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. int  
  3. bufferevent_enable(struct bufferevent *bufev, short event)  
  4. {  
  5.     struct bufferevent_private *bufev_private =  
  6.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  7.     short impl_events = event;  
  8.     int r = 0;  
  9.     //增加引用并加鎖  
  10.     //增加引用是為了防止其他線程調用bufferevent_free,釋放了bufferevent  
  11.     _bufferevent_incref_and_lock(bufev);  
  12.     //挂起了讀,此時不能監聽讀事件  
  13.     if (bufev_private->read_suspended)  
  14.         impl_events &= ~EV_READ;  
  15.     //挂起了寫,此時不能監聽寫事情  
  16.     if (bufev_private->write_suspended)  
  17.         impl_events &= ~EV_WRITE;  
  18.     bufev->enabled |= event;  
  19.     //調用對應類型的enbale函數。因為不同類型的bufferevent有不同的enable函數  
  20.     if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)  
  21.         r = -1;  
  22.     //減少引用并解鎖  
  23.     _bufferevent_decref_and_unlock(bufev);  
  24.     return r;  
  25. }  

        上面代碼可以看到,最終會調用對應bufferevent類型的enable函數,對于socket bufferevent,其enable函數是be_socket_enable,代碼如下:

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. int  
  3. _bufferevent_add_event(struct event *ev, const struct timeval *tv)  
  4. {  
  5.     if (tv->tv_sec == 0 && tv->tv_usec == 0)  
  6.         return event_add(ev, NULL);  
  7.     else  
  8.         return event_add(ev, tv);  
  9. }  
  10. //bufferevent_sock.c檔案  
  11. #define be_socket_add(ev, t)            \  
  12.     _bufferevent_add_event((ev), (t))  
  13. static int  
  14. be_socket_enable(struct bufferevent *bufev, short event)  
  15. {  
  16.     if (event & EV_READ) {  
  17.         if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)  
  18.             return -1;  
  19.     }  
  20.     if (event & EV_WRITE) {  
  21.         if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)  
  22.             return -1;  
  23.     }  
  24.     return 0;  
  25. }  

        如果讀者熟悉Libevent的逾時事件,那麼可以知道Libevent是在event_add函數裡面确定一個event的逾時的。上面代碼也展示了這一點,如果讀或者寫event設定了逾時(即其逾時值不為0),那麼就會作為參數傳給event_add函數。如果讀者不熟悉的Libevent的逾時事件的話,可以參考《逾時event的處理》。

        使用者可以調用函數bufferevent_set_timeouts,設定讀或者寫事件的逾時。代碼如下:

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. int  
  3. bufferevent_set_timeouts(struct bufferevent *bufev,  
  4.              const struct timeval *tv_read,  
  5.              const struct timeval *tv_write)  
  6. {  
  7.     int r = 0;  
  8.     BEV_LOCK(bufev);  
  9.     if (tv_read) {  
  10.         bufev->timeout_read = *tv_read;  
  11.     } else {  
  12.         evutil_timerclear(&bufev->timeout_read);  
  13.     }  
  14.     if (tv_write) {  
  15.         bufev->timeout_write = *tv_write;  
  16.     } else {  
  17.         evutil_timerclear(&bufev->timeout_write);  
  18.     }  
  19.     if (bufev->be_ops->adj_timeouts)  
  20.         r = bufev->be_ops->adj_timeouts(bufev);  
  21.     BEV_UNLOCK(bufev);  
  22.     return r;  
  23. }  
  24. //bufferevent_sock.c檔案  
  25. static int  
  26. be_socket_adj_timeouts(struct bufferevent *bufev)  
  27. {  
  28.     int r = 0;  
  29.     //使用者監聽了讀事件  
  30.     if (event_pending(&bufev->ev_read, EV_READ, NULL))  
  31.         if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)  
  32.             r = -1;  
  33.     //使用者監聽了寫事件  
  34.     if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {  
  35.         if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)  
  36.             r = -1;  
  37.     }  
  38.     return r;  
  39. }  

        從上面代碼可以看到:使用者不僅僅可以設定逾時值,還可以修改逾時值,也是通過這個函數進行修的。當然也是可以删除逾時的,直接把逾時參數設定成NULL即可。

        至此,已經完成了bufferevent的初始化工作,隻需調用event_base_dispatch函數,啟動發動機就可以工作了。

處理讀事件:

        接下來的任務:底層的socket fd接收資料後,bufferevent是怎麼工作的。

讀事件的水位:

        在講解讀事件之前,先來看一下水位問題,函數bufferevent_setwatermark可以設定讀和寫的水位。這裡隻講解讀事件的水位。

        水位有兩個:低水位和高水位。

        低水位比較容易懂,就是當可讀的資料量到達這個低水位後,才會調用使用者設定的回調函數。比如使用者想每次讀取100位元組,那麼就可以把低水位設定為100。當可讀資料的位元組數小于100時,即使有資料都不會打擾使用者(即不會調用使用者設定的回調函數)。可讀資料大于等于100位元組後,才會調用使用者的回調函數。

        高水位是什麼呢?其實,這和使用者的回調函數沒有關系。它的意義是:把讀事件的evbuffer的資料量限制在高水位之下。比如,使用者認為讀緩沖區不能太大(太大的話,連結清單會很長)。那麼使用者就會設定讀事件的高水位。當讀緩沖區的資料量達到這個高水位後,即使socket fd還有資料沒有讀,也不會讀進這個讀緩沖區裡面。一句話說,就是控制evbuffer的大小。

        雖然控制了evbuffer的大小,但socket fd可能還有資料。有資料就會觸發可讀事件,但處理可讀的時候,又會發現設定了高水位,不能讀取資料evbuffer。socket fd的資料沒有被讀完,又觸發……。這個貌似是一個死循環。實際上是不會出現這個死循環的,因為Libevent發現evbuffer的資料量到達高水位後,就會把可讀事件給挂起來,讓它不能再觸發了。Libevent使用函數bufferevent_wm_suspend_read把監聽讀事件的event挂起來。下面看一下Libevent是怎麼把一個event挂起來的。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent-internal.h檔案  
  2. #define bufferevent_wm_suspend_read(b) \  
  3.     bufferevent_suspend_read((b), BEV_SUSPEND_WM)  
  4. //bufferevent.c檔案  
  5. void  
  6. bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)  
  7. {  
  8.     struct bufferevent_private *bufev_private =  
  9.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  10.     BEV_LOCK(bufev);  
  11.     if (!bufev_private->read_suspended)//不能挂多次  
  12.         bufev->be_ops->disable(bufev, EV_READ);//實際調用be_socket_disable函數  
  13.     bufev_private->read_suspended |= what;//因何而被挂起  
  14.     BEV_UNLOCK(bufev);  
  15. }  
  16. //bufferevent_sock.c檔案  
  17. static int  
  18. be_socket_disable(struct bufferevent *bufev, short event)  
  19. {  
  20.     struct bufferevent_private *bufev_p =  
  21.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  22.     if (event & EV_READ) {  
  23.         if (event_del(&bufev->ev_read) == -1)  
  24.             return -1;  
  25.     }  
  26.     if ((event & EV_WRITE) && ! bufev_p->connecting) {  
  27.         if (event_del(&bufev->ev_write) == -1)//删掉這個event  
  28.             return -1;  
  29.     }  
  30.     return 0;  
  31. }  

        居然是直接删除這個監聽讀事件的event,真的是挂了!!!

        看來不能随便設定高水位,因為它會暫停讀。如果隻想設定低水位而不想設定高水位,那麼在調用bufferevent_setwatermark函數時,高水位的參數設為0即可。

        那麼什麼時候取消挂起,讓bufferevent可以繼續讀socket 資料呢?從高水位的意義來說,當然是當evbuffer裡面的資料量小于高水位時,就能再次讀取socket資料了。現在來看一下Libevent是怎麼恢複讀的。看一下設定水位的函數bufferevent_setwatermark吧,它進行了一些為高水位埋下了一個回調函數。對,就是evbuffer的回調函數。前一篇博文說到,當evbuffer裡面的資料添加或者删除時,是會觸發一些回調函數的。當使用者移除evbuffer的一些資料量時,Libevent就會檢查這個evbuffer的資料量是否小于高水位,如果小于的話,那麼就恢複 讀事件。

        不說這麼多了,上代碼。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. void  
  3. bufferevent_setwatermark(struct bufferevent *bufev, short events,  
  4.     size_t lowmark, size_t highmark)  
  5. {  
  6.     struct bufferevent_private *bufev_private =  
  7.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  8.     BEV_LOCK(bufev);  
  9.     if (events & EV_READ) {  
  10.         bufev->wm_read.low = lowmark;  
  11.         bufev->wm_read.high = highmark;  
  12.         if (highmark) {//高水位  
  13.             //還沒設定高水位的回調函數  
  14.             if (bufev_private->read_watermarks_cb == NULL) {  
  15.                 bufev_private->read_watermarks_cb =  
  16.                     evbuffer_add_cb(bufev->input,  
  17.                             bufferevent_inbuf_wm_cb,  
  18.                             bufev);//添加回調函數  
  19.             }  
  20.             evbuffer_cb_set_flags(bufev->input,  
  21.                       bufev_private->read_watermarks_cb,  
  22.                       EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);  
  23.             //設定(修改)高水位時,evbuffer的資料量已經超過了水位值  
  24.             //可能是把之前的高水位調高或者調低  
  25.             //挂起操作和取消挂起操作都是幂等的(即多次挂起的作用等同于挂起一次)  
  26.             if (evbuffer_get_length(bufev->input) > highmark)  
  27.                 bufferevent_wm_suspend_read(bufev);  
  28.             else if (evbuffer_get_length(bufev->input) < highmark)//調低了  
  29.                 bufferevent_wm_unsuspend_read(bufev);  
  30.         } else {  
  31.             //高水位值等于0,那麼就要取消挂起 讀事件  
  32.             //取消挂起操作是幂等的  
  33.             if (bufev_private->read_watermarks_cb)  
  34.                 evbuffer_cb_clear_flags(bufev->input,  
  35.                     bufev_private->read_watermarks_cb,  
  36.                     EVBUFFER_CB_ENABLED);  
  37.             bufferevent_wm_unsuspend_read(bufev);  
  38.         }  
  39.     }  
  40.     BEV_UNLOCK(bufev);  
  41. }  

        這個函數,不僅僅為高水位設定回調函數,還會檢查目前evbuffer的資料量是否超過了高水位。因為這個設定水位函數可能是在bufferevent工作一段時間後才添加的,是以evbuffer是有可能已經有資料的了,是以需要檢查。如果超過了水位值,那麼就需要挂起讀。當然也存在另外一種可能:使用者之前設定過了一個比較大的高水位,挂起了讀。現在發現錯了,就把高水位調低一點,此時就需要恢複讀。

        現在假設使用者移除了一些evbuffer的資料,進而觸發了evbuffer的回調函數,當然也就調用了函數bufferevent_inbuf_wm_cb。下面看一下這個函數是怎麼恢複讀的。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. static void  
  3. bufferevent_inbuf_wm_cb(struct evbuffer *buf,  
  4.     const struct evbuffer_cb_info *cbinfo,  
  5.     void *arg)  
  6. {  
  7.     struct bufferevent *bufev = arg;  
  8.     size_t size;  
  9.     size = evbuffer_get_length(buf);  
  10.     if (size >= bufev->wm_read.high)  
  11.         bufferevent_wm_suspend_read(bufev);  
  12.     else  
  13.         bufferevent_wm_unsuspend_read(bufev);  
  14. }  
  15. //bufferevent-internal.h檔案  
  16. #define bufferevent_wm_unsuspend_read(b) \  
  17.     bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)  
  18. //bufferevent.c檔案  
  19. void  
  20. bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)  
  21. {  
  22.     struct bufferevent_private *bufev_private =  
  23.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  24.     BEV_LOCK(bufev);  
  25.     bufev_private->read_suspended &= ~what;  
  26.     if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))  
  27.         bufev->be_ops->enable(bufev, EV_READ);//重新把event插入到event_base中  
  28.     BEV_UNLOCK(bufev);  
  29. }  

        因為使用者可以手動為這個evbuffer添加資料,此時也會調用bufferevent_inbuf_wm_cb函數。此時就要檢查evbuffer的資料量是否已經超過高水位了,而不能僅僅檢查是否低于高水位。

        高水位導緻讀的挂起和之後讀的恢複,一切工作都是由Libevent内部完成的,使用者不用做任何工作。

從socket中讀取資料:

        從前面的一系列博文可以知道,如果一個socket可讀了,那麼監聽可讀事件的event的回調函數就會被調用。這個回調函數是在bufferevent_socket_new函數中被Libevent内部設定的,設定為bufferevent_readcb函數,使用者并不知情。

        當socket有資料可讀時,Libevent就會監聽到,然後調用bufferevent_readcb函數處理。該函數會調用evbuffer_read函數,把資料從socket fd中讀取到evbuffer中。然後再調用使用者在bufferevent_setcb函數中設定的讀事件回調函數。是以,當使用者的讀事件回調函數被調用時,資料已經在evbuffer中了,使用者拿來就用,無需調用read這類會阻塞的函數。

        下面看一下bufferevent_readcb函數的具體實作。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. static void  
  2. bufferevent_readcb(evutil_socket_t fd, short event, void *arg)  
  3. {  
  4.     struct bufferevent *bufev = arg;  
  5.     struct bufferevent_private *bufev_p =  
  6.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  7.     struct evbuffer *input;  
  8.     int res = 0;  
  9.     short what = BEV_EVENT_READING;  
  10.     ev_ssize_t howmuch = -1, readmax=-1;  
  11.     _bufferevent_incref_and_lock(bufev);  
  12.     if (event == EV_TIMEOUT) {  
  13.         what |= BEV_EVENT_TIMEOUT;  
  14.         goto error;  
  15.     }  
  16.     input = bufev->input;  
  17.     //使用者設定了高水位  
  18.     if (bufev->wm_read.high != 0) {  
  19.         howmuch = bufev->wm_read.high - evbuffer_get_length(input);  
  20.         if (howmuch <= 0) {  
  21.             bufferevent_wm_suspend_read(bufev);  
  22.             goto done;  
  23.         }  
  24.     }  
  25.     //因為使用者可以限速,是以這麼要檢測最大的可讀大小。  
  26.     //如果沒有限速的話,那麼将傳回16384位元組,即16K  
  27.     //預設情況下是沒有限速的。  
  28.     readmax = _bufferevent_get_read_max(bufev_p);  
  29.     if (howmuch < 0 || howmuch > readmax)   
  30.         howmuch = readmax;  
  31.     //一些原因導緻讀 被挂起,比如加鎖了。  
  32.     if (bufev_p->read_suspended)  
  33.         goto done;  
  34.     //解凍,使得可以在input的後面追加資料  
  35.     evbuffer_unfreeze(input, 0);  
  36.     res = evbuffer_read(input, fd, (int)howmuch); //從socket fd中讀取資料  
  37.     evbuffer_freeze(input, 0);//當機  
  38.     if (res == -1) {  
  39.         int err = evutil_socket_geterror(fd);  
  40.         if (EVUTIL_ERR_RW_RETRIABLE(err))//EINTER or EAGAIN  
  41.             goto reschedule;  
  42.         //不是 EINTER or EAGAIN 這兩個可以重試的錯誤,那麼就應該是其他緻命的錯誤  
  43.         //此時,應該報告給使用者  
  44.         what |= BEV_EVENT_ERROR;  
  45.     } else if (res == 0) {//斷開了連接配接  
  46.         what |= BEV_EVENT_EOF;  
  47.     }  
  48.     if (res <= 0)  
  49.         goto error;  
  50.     //速率相關的操作  
  51.     _bufferevent_decrement_read_buckets(bufev_p, res);  
  52.     //evbuffer的資料量大于低水位值。  
  53.     if (evbuffer_get_length(input) >= bufev->wm_read.low)  
  54.         _bufferevent_run_readcb(bufev);//調用使用者設定的回調函數  
  55.     goto done;  
  56.  reschedule:  
  57.     goto done;  
  58.  error:  
  59.     //把監聽可讀事件的event從event_base的事件隊列中删除掉.event_del  
  60.     bufferevent_disable(bufev, EV_READ);//會調用be_socket_disable函數  
  61.     _bufferevent_run_eventcb(bufev, what);//會調用使用者設定的錯誤處理函數  
  62.  done:  
  63.     _bufferevent_decref_and_unlock(bufev);  
  64. }  

        細心的讀者可能會發現:對使用者的讀事件回調函數的觸發是邊緣觸發的。這也就要求,在回調函數中,使用者應該盡可能地把evbuffer的所有資料都讀出來。如果想等到下一次回調時再讀,那麼需要等到下一次socketfd接收到資料才會觸發使用者的回調函數。如果之後socket fd一直收不到任何資料,那麼即使evbuffer還有資料,使用者的回調函數也不會被調用了。

處理寫事件:

        對一個可讀事件進行監聽是比較容易的,但對于一個可寫事件進行監聽則比較困難。為什麼呢?因為可讀監聽是監聽fd的讀緩沖區是否有資料了,如果沒有資料那麼就一直等待。對于可寫,首先要明白“什麼是可寫”,可寫就是fd的寫緩沖區(這個緩沖區在核心)還沒滿,可以往裡面放資料。這就有一個問題,如果寫緩沖區沒有滿,那麼就一直是可寫狀态。如果一個event監聽了可寫事件,那麼這個event就會一直被觸發(死循環)。因為一般情況下,如果不是發大量的資料這個寫緩沖區是不會滿的。

        也就是說,不能監聽可寫事件。但我們确實要往fd中寫資料,那怎麼辦?Libevent的做法是:當我們确實要寫入資料時,才監聽可寫事件。也就是說我們調用bufferevent_write寫入資料時,Libevent才會把監聽可寫事件的那個event注冊到event_base中。當Libevent把資料都寫入到fd的緩沖區後,Libevent又會把這個event從event_base中删除。比較煩瑣。        

        bufferevent_writecb函數不僅僅要處理上面說到的那個問題,還要處理另外一個坑爹的問題。那就是:判斷socket fd是不是已經連接配接上伺服器了。這是因為這個socket fd是非阻塞的,是以它調用connect時,可能還沒連接配接上就傳回了。對于非阻塞socket fd,一般是通過判斷這個socket是否可寫,進而得知這個socket是否已經連接配接上伺服器。如果可寫,那麼它就已經成功連接配接上伺服器了。這個問題,這裡先提一下,後面會詳細講。

        同前面的監聽可讀一樣,Libevent是在bufferevent_socket_new函數設定可寫的回調函數,為bufferevent_writecb。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent_sock.c檔案  
  2. static void  
  3. bufferevent_writecb(evutil_socket_t fd, short event, void *arg)  
  4. {  
  5.     struct bufferevent *bufev = arg;  
  6.     struct bufferevent_private *bufev_p =  
  7.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  8.     int res = 0;  
  9.     short what = BEV_EVENT_WRITING;  
  10.     int connected = 0;  
  11.     ev_ssize_t atmost = -1;  
  12.     _bufferevent_incref_and_lock(bufev);  
  13.     if (event == EV_TIMEOUT) {  
  14.         what |= BEV_EVENT_TIMEOUT;  
  15.         goto error;  
  16.     }  
  17.     ...//判斷這個socket是否已經連接配接上伺服器了  
  18.     //使用者可能設定了限速,如果沒有限速,那麼atmost将傳回16384(16K)  
  19.     atmost = _bufferevent_get_write_max(bufev_p);  
  20.     //一些原因導緻寫被挂起來了  
  21.     if (bufev_p->write_suspended)  
  22.         goto done;  
  23.     //如果evbuffer有資料可以寫到sockfd中  
  24.     if (evbuffer_get_length(bufev->output)) {  
  25.         //解凍連結清單頭  
  26.         evbuffer_unfreeze(bufev->output, 1);  
  27.         //将output這個evbuffer的資料寫到socket fd 的緩沖區中  
  28.         //會把已經寫到socket fd緩沖區的資料,從evbuffer中删除  
  29.         res = evbuffer_write_atmost(bufev->output, fd, atmost);  
  30.         evbuffer_freeze(bufev->output, 1);  
  31.         if (res == -1) {  
  32.             int err = evutil_socket_geterror(fd);  
  33.             if (EVUTIL_ERR_RW_RETRIABLE(err))//可以恢複的錯誤。一般是EINTR或者EAGAIN  
  34.                 goto reschedule;  
  35.             what |= BEV_EVENT_ERROR;  
  36.         } else if (res == 0) {//該socket已經斷開連接配接了  
  37.             what |= BEV_EVENT_EOF;  
  38.         }  
  39.         if (res <= 0)  
  40.             goto error;  
  41.     }  
  42.     //如果把寫緩沖區的資料都寫完成了。為了防止event_base不斷地觸發可寫  
  43.     //事件,此時要把這個監聽可寫的event删除。  
  44.     //前面的atmost限制了一次最大的可寫資料。如果還沒寫所有的資料  
  45.     //那麼就不能delete這個event,而是要繼續監聽可寫事情,知道把所有的  
  46.     //資料都寫到socket fd中。  
  47.     if (evbuffer_get_length(bufev->output) == 0) {  
  48.         event_del(&bufev->ev_write);  
  49.     }  
  50.     //如果evbuffer裡面的資料量已經寫得七七八八了,小于設定的低水位值,那麼  
  51.     //就會調用使用者設定的寫事件回調函數  
  52.     if ((res || !connected) &&  
  53.         evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {  
  54.         _bufferevent_run_writecb(bufev);  
  55.     }  
  56.     goto done;  
  57.  reschedule:  
  58.     if (evbuffer_get_length(bufev->output) == 0) {  
  59.         event_del(&bufev->ev_write);  
  60.     }  
  61.     goto done;  
  62.  error:  
  63.     bufferevent_disable(bufev, EV_WRITE);//有錯誤。把這個寫event删除  
  64.     _bufferevent_run_eventcb(bufev, what);  
  65.  done:  
  66.     _bufferevent_decref_and_unlock(bufev);  
  67. }  

        上面代碼的邏輯比較清晰,調用evbuffer_write_atmost函數把資料從evbuffer中寫到evbuffer緩沖區中,此時要注意函數的傳回值,因為可能寫的時候發生錯誤。如果發生了錯誤,就要調用使用者設定的event回調函數(網上也有人稱其為錯誤處理函數)。

        之後,還要判斷evbuffer的資料是否已經全部寫到socket 的緩沖區了。如果已經全部寫了,那麼就要把監聽寫事件的event從event_base的插入隊列中删除。如果還沒寫完,那麼就不能删除,因為還要繼續監聽可寫事件,下次接着寫。

        現在來看一下,把監聽寫事件的event從event_base的插入隊列中删除後,如果下次使用者有資料要寫的時候,怎麼把這個event添加到event_base的插入隊列。

        使用者一般是通過bufferevent_write函數把資料寫入到evbuffer(寫入evbuffer後,接着就會被寫入socket,是以調用bufferevent_write就相當于把資料寫入到socket。)。而這個bufferevent_write函數是直接調用evbuffer_add函數的。函數evbuffer_add沒有調用什麼可疑的函數,能夠把監聽可寫的event添加到event_base中。唯一的可能就是那個回調函數。對就是evbuffer的回調函數。關于evbuffer的回調函數,可以參考這裡。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent.c檔案  
  2. int  
  3. bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)  
  4. {  
  5.     if (evbuffer_add(bufev->output, data, size) == -1)  
  6.         return (-1);  
  7.     return 0;  
  8. }  
  9. //buffer.c檔案  
  10. int  
  11. evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)  
  12. {  
  13.     ...  
  14. out:  
  15.     evbuffer_invoke_callbacks(buf);//調用回調函數  
  16.     result = 0;  
  17. done:  
  18.     return result;  
  19. }  

        還記得本文前面的bufferevent_socket_new函數嗎?該函數裡面會有

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. evbuffer_add_cb(bufev->output,bufferevent_socket_outbuf_cb, bufev);  

        當bufferevent的寫緩沖區output的資料發生變化時,函數bufferevent_socket_outbuf_cb就會被調用。現在馬上飛到這個函數。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent_sock.c檔案  
  2. static void  
  3. bufferevent_socket_outbuf_cb(struct evbuffer *buf,  
  4.     const struct evbuffer_cb_info *cbinfo,  
  5.     void *arg)  
  6. {  
  7.     struct bufferevent *bufev = arg;  
  8.     struct bufferevent_private *bufev_p =  
  9.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  10.     if (cbinfo->n_added && //evbuffer添加了資料  
  11.         (bufev->enabled & EV_WRITE) && //預設情況下是enable EV_WRITE的  
  12.         !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&//這個event已經被踢出event_base了  
  13.         !bufev_p->write_suspended) {//這個bufferevent的寫并沒有被挂起  
  14.         //把這個event添加到event_base中  
  15.         if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {  
  16.         }  
  17.     }  
  18. }  

        這個函數首先進行一些判斷,滿足條件後就會把這個監聽寫事件的event添加到event_base中。其中event_pending函數就是判斷這個bufev->ev_write是否已經被event_base删除了。關于event_pending,可以參考這裡。

        對于bufferevent_write,初次使用該函數的讀者可能會有疑問:調用該函數後,參數data指向的記憶體空間能不能馬上釋放,還是要等到Libevent把data指向的資料都寫到socket 緩存區才能删除?其實,從前一篇博文可以看到,evbuffer_add是直接複制一份使用者要發送的資料到evbuffer緩存區的。是以,調用完bufferevent_write,就可以馬上釋放參數data指向的記憶體空間。

        網上的關于Libevent的一些使用例子,包括我寫的《 Libevent使用例子,從簡單到複雜》,都是在主線程中調用bufferevent_write函數寫入資料的。從上面的分析可以得知,是可以馬上把監聽可寫事件的event添加到event_base中。如果是在次線程調用該函數寫入資料呢?此時,主線程可能還睡眠在poll、epoll這類的多路IO複用函數上。這種情況下能不能及時喚醒主線程呢?其實是可以的,隻要你的Libevent在一開始使用了線程功能。具體的分析過程可以參考《evthread_notify_base通知主線程》。上面代碼中的be_socket_add會調用event_add,而在次線程調用event_add就會調用evthread_notify_base通知主線程。

bufferevent_socket_connect:

        使用者可以在調用bufferevent_socket_new函數時,傳一個-1作為socket的檔案描述符,然後調用bufferevent_socket_connect函數連接配接伺服器,無需自己寫代碼調用connect函數連接配接伺服器。

        bufferevent_socket_connect函數會調用socket函數申請一個套接字fd,然後把這個fd設定成非阻塞的(這就導緻了一些坑爹的事情)。接着就connect伺服器,因為該socket fd是非阻塞的,是以不會等待,而是馬上傳回,連接配接這工作交給核心來完成。是以,傳回後這個socket還沒有真正連接配接上伺服器。那麼什麼時候連接配接上呢?核心又是怎麼通知通知使用者呢?

        一般來說,當可以往socket fd寫東西了,那就說明已經連接配接上了。也就是說這個socket fd變成可寫狀态,就連接配接上了。

        是以,對于“非阻塞connect”比較流行的做法是:用select或者poll這類多路IO複用函數監聽該socket的可寫事件。當這個socket觸發了可寫事件,然後再對這個socket調用getsockopt函數,做進一步的判斷。

        Libevent也是這樣實作的,下面來看一下bufferevent_socket_connect函數。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent_sock.c檔案  
  2. int  
  3. bufferevent_socket_connect(struct bufferevent *bev,  
  4.     struct sockaddr *sa, int socklen)  
  5. {  
  6.     struct bufferevent_private *bufev_p =  
  7.         EVUTIL_UPCAST(bev, struct bufferevent_private, bev);  
  8.     evutil_socket_t fd;  
  9.     int r = 0;  
  10.     int result=-1;  
  11.     int ownfd = 0;  
  12.     _bufferevent_incref_and_lock(bev);  
  13.     if (!bufev_p)  
  14.         goto done;  
  15.     fd = bufferevent_getfd(bev);  
  16.     if (fd < 0) {//該bufferevent還沒有設定fd  
  17.         if (!sa)  
  18.             goto done;  
  19.         fd = socket(sa->sa_family, SOCK_STREAM, 0);  
  20.         if (fd < 0)  
  21.             goto done;  
  22.         if (evutil_make_socket_nonblocking(fd)<0)//設定為非阻塞  
  23.             goto done;  
  24.         ownfd = 1;  
  25.     }  
  26.     if (sa) {  
  27.         r = evutil_socket_connect(&fd, sa, socklen);//非阻塞connect  
  28.         if (r < 0)  
  29.             goto freesock;  
  30.     }  
  31.     ...  
  32.     //為bufferevent裡面的兩個event設定監聽的fd  
  33.     //後面會調用bufferevent_enable  
  34.     bufferevent_setfd(bev, fd);  
  35.     if (r == 0) {//暫時還沒連接配接上,因為fd是非阻塞的  
  36.         //此時需要監聽可寫事件,當可寫了,并且沒有錯誤的話,就成功連接配接上了  
  37.         if (! be_socket_enable(bev, EV_WRITE)) {  
  38.             bufev_p->connecting = 1;//标志這個sockfd正在連接配接  
  39.             result = 0;  
  40.             goto done;  
  41.         }  
  42.     } else if (r == 1) {//已經連接配接上了  
  43.         result = 0;  
  44.         bufev_p->connecting = 1;   
  45.         event_active(&bev->ev_write, EV_WRITE, 1);//手動激活這個event  
  46.     } else {// connection refused  
  47.         bufev_p->connection_refused = 1;  
  48.         bufev_p->connecting = 1;  
  49.         result = 0;  
  50.         event_active(&bev->ev_write, EV_WRITE, 1);//手動激活這個event  
  51.     }  
  52.     goto done;  
  53. freesock:  
  54.     _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);//出現錯誤  
  55.     if (ownfd)  
  56.         evutil_closesocket(fd);  
  57. done:  
  58.     _bufferevent_decref_and_unlock(bev);  
  59.     return result;  
  60. }  

        這個函數比較多錯誤處理的代碼,大緻看一下就行了。有幾個地方要注意,即使connect的時候被拒絕,或者已經連接配接上了,都會手動激活這個event。一個event即使沒有加入event_base,也是可以手動激活的。具體原理參考這裡。

        無論是手動激活event,或者監聽到這個event可寫了,都是會調用bufferevent_writecb函數。現在再次看一下該函數,隻看connect部分。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //bufferevent_sock.c檔案  
  2. static void  
  3. bufferevent_writecb(evutil_socket_t fd, short event, void *arg)  
  4. {  
  5.     struct bufferevent_private *bufev_p =  
  6.         EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  
  7.     int connected = 0;  
  8.     _bufferevent_incref_and_lock(bufev);  
  9.     ...  
  10.     //正在連接配接。因為這個sockfd可能是非阻塞的,是以可能之前的connect還沒  
  11.     //連接配接上。而判斷該sockfd是否成功連接配接上了的一個方法是判斷這個sockfd是否可寫  
  12.     if (bufev_p->connecting) {  
  13.         //c等于1,說明已經連接配接成功  
  14.         //c等于0,說明還沒連接配接上  
  15.         //c等于-1,說明發生錯誤  
  16.         int c = evutil_socket_finished_connecting(fd);  
  17.         if (bufev_p->connection_refused) {//在bufferevent_socket_connect中被設定  
  18.           bufev_p->connection_refused = 0;  
  19.           c = -1;  
  20.         }  
  21.         if (c == 0)//還沒連接配接上,繼續監聽可寫吧  
  22.             goto done;  
  23.         //錯誤,或者已經連接配接上了  
  24.         bufev_p->connecting = 0;//修改标志值  
  25.         if (c < 0) {//錯誤  
  26.             event_del(&bufev->ev_write);  
  27.             event_del(&bufev->ev_read);  
  28.             _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);  
  29.             goto done;  
  30.         } else {//連接配接上了。  
  31.             connected = 1;  
  32.             ...//win32  
  33.             //居然會調用使用者設定的錯誤處理函數。太神奇了  
  34.             _bufferevent_run_eventcb(bufev,  
  35.                     BEV_EVENT_CONNECTED);  
  36.             if (!(bufev->enabled & EV_WRITE) || //預設都是enable EV_WRITE的  
  37.                 bufev_p->write_suspended) {  
  38.                 event_del(&bufev->ev_write);//不再需要監聽可寫。因為已經連接配接上了  
  39.                 goto done;  
  40.             }  
  41.         }  
  42.     }  
  43.     ...  
  44.  done:  
  45.     _bufferevent_decref_and_unlock(bufev);  
  46. }  

        可以看到無論是connect被拒絕、發生錯誤或者連接配接上了,都在這裡做統一的處理。

        如果已經連接配接上了,那麼會調用使用者設定event回調函數(網上也稱之為錯誤處理函數),通知使用者已經連接配接上了。并且,還會把監聽可寫事件的event從event_base中删除,其理由在前面已經說過了。

        函數evutil_socket_finished_connecting會檢查這個socket,進而得知這個socket是處于什麼狀态。在bufferevent_socket_connect函數中,出現的一些錯誤,比如被拒絕,也是能通過這個函數檢查出來的。是以可以在這裡做統一的處理。該函數的内部是使用。貼一下這個函數的代碼吧。

[cpp]  view plain  copy  

Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
Libevent源碼分析-----bufferevent工作流程探究bufferevent結構體:建立一個bufferevent:設定回調函數:令bufferevent可以工作:處理讀事件:處理寫事件:bufferevent_socket_connect:
  1. //evutil.c檔案  
  2. //Return 1 for connected, 0 for not yet, -1 for error.  
  3. int  
  4. evutil_socket_finished_connecting(evutil_socket_t fd)  
  5. {  
  6.     int e;  
  7.     ev_socklen_t elen = sizeof(e);  
  8.     //用來檢測這個fd是否已經連接配接上了,這個fd是非阻塞的  
  9.     //如果e的值被設為0,那麼就說明連接配接上了。  
  10.     //否則e被設定為對應的錯誤值。  
  11.     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&e, &elen) < 0)  
  12.         return -1;  
  13.     if (e) {  
  14.         if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//還沒連接配接上  
  15.             return 0;  
  16.         EVUTIL_SET_SOCKET_ERROR(e);  
  17.         return -1;  
  18.     }  
  19.     return 1;  
  20. }