天天看點

[網絡程式設計]微信協程庫libco研究(三):協程的事件管理1. 基礎資料結構2. 初始化3. 添加監聽事件4. 輪詢5. 總結

轉載自:https://segmentfault.com/a/1190000012834756

前面的兩篇文章中介紹了微信的libco庫如何hook系統函數和協程的建立和管理,本篇文章将介紹libco庫是進行事件管理的。

libco庫使用一種類似時間片的技術進行輪詢,使得每個注冊在其上的事件都有機會執行。

1. 基礎資料結構

在上一篇文章中介紹stCoRoutineEnv_t時,我們将stCoEpoll_t這個結構跳過了,現在我們來仔細分析下這個資料結構。

struct stCoEpoll_t
{
    int iEpollFd;
    static const int _EPOLL_SIZE =  * ;
    struct stTimeout_t *pTimeout; //用于儲存timeout item
    struct stTimeoutItemLink_t *pstTimeoutList; // 在後續的event_ loop中介紹
    struct stTimeoutItemLink_t *pstActiveList; 
    co_epoll_res *result; 
};
           

stCoEpoll_t中主要儲存了epoll監聽的fd,以及注冊在其中的逾時事件。

stTimeoutItem_t其實是libco庫實作的雙向連結清單,有prev和next指針,同時儲存了連結清單指針。後面在使用過程中再介紹stTimeout_t。

struct stTimeoutItem_t
{
    enum
    {
        eMaxTimeout =  *  //40s
    };
    stTimeoutItem_t *pPrev;
    stTimeoutItem_t *pNext;
    stTimeoutItemLink_t *pLink;

    unsigned long long ullExpireTime;

    OnPreparePfn_t pfnPrepare;
    OnProcessPfn_t pfnProcess;

    void *pArg; // routine 
    bool bTimeout;
};
struct stTimeoutItemLink_t
{
    stTimeoutItem_t *head;
    stTimeoutItem_t *tail;

};
struct stTimeout_t
{
    stTimeoutItemLink_t *pItems;
    int iItemSize;

    unsigned long long ullStart;
    long long llStartIdx;
};
           

2. 初始化

在上篇文章中,在初始化本線程的stCoRoutineEnv_t時,在co_init_curr_thread_env的最後,會調用AllocEpoll() => AllocTimeout() 方法,我們看一下AllocTimeout中具體做了哪些事情。

stTimeout_t *AllocTimeout( int iSize )
{
    stTimeout_t *lp = (stTimeout_t*)calloc( ,sizeof(stTimeout_t) );    

    lp->iItemSize = iSize;
    lp->pItems = (stTimeoutItemLink_t*)calloc( ,sizeof(stTimeoutItemLink_t) * lp->iItemSize );

    lp->ullStart = GetTickMS();
    lp->llStartIdx = ;

    return lp;
}
           
  1. 申請了60*1000個timeoutLink連結清單。
  2. 設定目前時間為起始時間。
  3. 設定目前遊标為0。

3. 添加監聽事件

下面以一個簡單的用戶端連結伺服器的例子在說明在libco中是如何添加監聽事件的。

fd = socket(PF_INET, SOCK_STREAM, );
    struct sockaddr_in addr;
    SetAddr(endpoint->ip, endpoint->port, addr);
    ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));
           

由于在libco庫中hook了socket和connect的函數,是以,這個邏輯會調用poll函數,最終将調用co_poll_inner。下面介紹co_poll_inner的具體邏輯。

第一步,先将epoll結構轉換成poll結構(不清楚為什麼一定要轉換成poll類型,難道是為了相容性嗎?)

//1.struct change
    stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
    memset( &arg,,sizeof(arg) );

    arg.iEpollFd = epfd;
    arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
    arg.nfds = nfds;

    stPollItem_t arr[];
    if( nfds < sizeof(arr) / sizeof(arr[]) && !self->cIsShareStack)
    {
        arg.pPollItems = arr;
    }    
    else
    {
        arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
    }
    memset( arg.pPollItems,,nfds * sizeof(stPollItem_t) );

    arg.pfnProcess = OnPollProcessEvent; //記住這個函數,後續有用
    arg.pArg = GetCurrCo( co_get_curr_thread_env() );//參數為目前Env指針
           

第二步,将poll結構加入到epoll的監聽事件中

第三步,添加timeout事件

//3.add timeout
    unsigned long long now = GetTickMS();
    arg.ullExpireTime = now + timeout;
    int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 将本事件加入到timeout的指定連結清單中
    int iRaiseCnt = ;
    if( ret !=  )
    {
        co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
                ret,now,timeout,arg.ullExpireTime);
        errno = EINVAL;
        iRaiseCnt = -;
    }
    else
    {
        co_yield_env( co_get_curr_thread_env() );
        iRaiseCnt = arg.iRaiseCnt;
    }
           

在AllocTimeout隻初始化了60*1000(即60s)的連結清單數組,此時在AddTimeout中,将根據本監聽事件的逾時時間添加到對應的數組index中的連結清單中,是不是比較類似于java中的HashMap的實作方式?

這裡有個問題,如果逾時時間超過了60s,那麼逾時事件都會添加到目前index的前一個遊标處,相當于有可能61s,65s的逾時事件都會在同一個timeout連結清單中,那麼會不會出現,由于時間還沒到,而逾時事件被處理呢?

AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
           

添加完逾時事件後,本協程調用co_yield_env放棄執行,stRoutineEnv_t将會調用其他的協程進行處理。

4. 輪詢

将事件都加入到timeout連結清單,以及注冊到epoll fd後,main 協程将調用co_eventloop進行輪詢。

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
    if( !ctx->result )
    {
        ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
    }
    co_epoll_res *result = ctx->result;


    for(;;)
    {
        // 1. 調用epoll_wait
        int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE,  );

        stTimeoutItemLink_t *active = (ctx->pstActiveList);
        stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);

        // 将timeout連結清單清空
        memset( timeout,,sizeof(stTimeoutItemLink_t) );

        // 處理poll事件
        for(int i=;i<ret;i++)
        {
            stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
            if( item->pfnPrepare ) 
            {
                // 這個函數基本是 OnPollPreparePfn
                // 在pollPreaprePfn中,将poll_inner中添加的timeout事件删除,并添加到active list中
                item->pfnPrepare( item,result->events[i],active );
            }
            else
            {
                AddTail( active,item );
            }
        }


        // 2. 将stTimeout_t中的timeout事件全部添加到timeout連結清單中
        unsigned long long now = GetTickMS();
        TakeAllTimeout( ctx->pTimeout,now,timeout );

        // 設定其為timeout事件
        stTimeoutItem_t *lp = timeout->head;
        while( lp )
        {
            //printf("raise timeout %p\n",lp);
            lp->bTimeout = true;
            lp = lp->pNext;
        }

        // 3. 添加timeoutList 到 active list
        Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

        // 4. 對active list進行周遊執行
        lp = active->head;
        while( lp )
        {

            PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
            // 這裡會對timeout事件進行判斷,若時間不逾時,仍然會将其加入到stTimeout_t的timeout數組隊列中
            if (lp->bTimeout && now < lp->ullExpireTime) 
            {
                int ret = AddTimeout(ctx->pTimeout, lp, now);
                if (!ret) 
                {
                    lp->bTimeout = false;
                    lp = active->head;
                    continue;
                }
            }
            if( lp->pfnProcess )
            {
                lp->pfnProcess( lp );
            }

            lp = active->head;
        }
        if( pfn )
        {
            if( - == pfn( arg ) )
            {
                break;
            }
        }
    }
}
           

具體步驟如下:

  1. 調用epoll_wait等待監聽的事件。
  2. 将stTimeout_t中的timeout連結清單清空。
  3. 若epoll中有資料,則将對應的事件加入到stTimeout_t的active連結清單中;同時将timeout數組連結清單中删除本事件的逾時事件。
  4. 周遊timout數組連結清單,将已經逾時的事件加入到timeout連結清單中。
  5. 将timeout連結清單中的所有事件置為逾時事件,需要後續特殊處理;同時将timeout連結清單合并到active連結清單。
  6. 周遊active連結清單,對逾時事件且目前時間未超過逾時時間的,重新将其加入到timeout數組連結清單中,這就解決了上面逾時時間超過60s的問題;對其他的事件進行處理。

    至此,整個libco庫的事件監聽的分析已經完成。

5. 總結

每個網絡架構都會有一個類似event_loop的函數,用于輪詢注冊的io事件,libco庫也不例外,輪詢就是比較簡單粗暴,但是又是很有效果。libco庫将socket相關的函數都進行了hook,使得調用者可以使用同步的方法進行編碼,卻能夠異步的執行。