轉載自:https://segmentfault.com/a/1190000012834756
前面的兩篇文章中介紹了微信的libco庫如何hook系統函數和協程的建立和管理,本篇文章将介紹libco庫是進行事件管理的。
libco庫使用一種類似時間片的技術進行輪詢,使得每個注冊在其上的事件都有機會執行。
1. 基礎資料結構
在上一篇文章中介紹stCoRoutineEnv_t時,我們将stCoEpoll_t這個結構跳過了,現在我們來仔細分析下這個資料結構。
struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = * ;
struct stTimeout_t *pTimeout; //用于儲存timeout item
struct stTimeoutItemLink_t *pstTimeoutList; // 在後續的event_ loop中介紹
struct stTimeoutItemLink_t *pstActiveList;
co_epoll_res *result;
};
stCoEpoll_t中主要儲存了epoll監聽的fd,以及注冊在其中的逾時事件。
stTimeoutItem_t其實是libco庫實作的雙向連結清單,有prev和next指針,同時儲存了連結清單指針。後面在使用過程中再介紹stTimeout_t。
struct stTimeoutItem_t
{
enum
{
eMaxTimeout = * //40s
};
stTimeoutItem_t *pPrev;
stTimeoutItem_t *pNext;
stTimeoutItemLink_t *pLink;
unsigned long long ullExpireTime;
OnPreparePfn_t pfnPrepare;
OnProcessPfn_t pfnProcess;
void *pArg; // routine
bool bTimeout;
};
struct stTimeoutItemLink_t
{
stTimeoutItem_t *head;
stTimeoutItem_t *tail;
};
struct stTimeout_t
{
stTimeoutItemLink_t *pItems;
int iItemSize;
unsigned long long ullStart;
long long llStartIdx;
};
2. 初始化
在上篇文章中,在初始化本線程的stCoRoutineEnv_t時,在co_init_curr_thread_env的最後,會調用AllocEpoll() => AllocTimeout() 方法,我們看一下AllocTimeout中具體做了哪些事情。
stTimeout_t *AllocTimeout( int iSize )
{
stTimeout_t *lp = (stTimeout_t*)calloc( ,sizeof(stTimeout_t) );
lp->iItemSize = iSize;
lp->pItems = (stTimeoutItemLink_t*)calloc( ,sizeof(stTimeoutItemLink_t) * lp->iItemSize );
lp->ullStart = GetTickMS();
lp->llStartIdx = ;
return lp;
}
- 申請了60*1000個timeoutLink連結清單。
- 設定目前時間為起始時間。
- 設定目前遊标為0。
3. 添加監聽事件
下面以一個簡單的用戶端連結伺服器的例子在說明在libco中是如何添加監聽事件的。
fd = socket(PF_INET, SOCK_STREAM, );
struct sockaddr_in addr;
SetAddr(endpoint->ip, endpoint->port, addr);
ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));
由于在libco庫中hook了socket和connect的函數,是以,這個邏輯會調用poll函數,最終将調用co_poll_inner。下面介紹co_poll_inner的具體邏輯。
第一步,先将epoll結構轉換成poll結構(不清楚為什麼一定要轉換成poll類型,難道是為了相容性嗎?)
//1.struct change
stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
memset( &arg,,sizeof(arg) );
arg.iEpollFd = epfd;
arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
arg.nfds = nfds;
stPollItem_t arr[];
if( nfds < sizeof(arr) / sizeof(arr[]) && !self->cIsShareStack)
{
arg.pPollItems = arr;
}
else
{
arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
}
memset( arg.pPollItems,,nfds * sizeof(stPollItem_t) );
arg.pfnProcess = OnPollProcessEvent; //記住這個函數,後續有用
arg.pArg = GetCurrCo( co_get_curr_thread_env() );//參數為目前Env指針
第二步,将poll結構加入到epoll的監聽事件中
第三步,添加timeout事件
//3.add timeout
unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 将本事件加入到timeout的指定連結清單中
int iRaiseCnt = ;
if( ret != )
{
co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
ret,now,timeout,arg.ullExpireTime);
errno = EINVAL;
iRaiseCnt = -;
}
else
{
co_yield_env( co_get_curr_thread_env() );
iRaiseCnt = arg.iRaiseCnt;
}
在AllocTimeout隻初始化了60*1000(即60s)的連結清單數組,此時在AddTimeout中,将根據本監聽事件的逾時時間添加到對應的數組index中的連結清單中,是不是比較類似于java中的HashMap的實作方式?
這裡有個問題,如果逾時時間超過了60s,那麼逾時事件都會添加到目前index的前一個遊标處,相當于有可能61s,65s的逾時事件都會在同一個timeout連結清單中,那麼會不會出現,由于時間還沒到,而逾時事件被處理呢?
AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
添加完逾時事件後,本協程調用co_yield_env放棄執行,stRoutineEnv_t将會調用其他的協程進行處理。
4. 輪詢
将事件都加入到timeout連結清單,以及注冊到epoll fd後,main 協程将調用co_eventloop進行輪詢。
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
if( !ctx->result )
{
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;)
{
// 1. 調用epoll_wait
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
// 将timeout連結清單清空
memset( timeout,,sizeof(stTimeoutItemLink_t) );
// 處理poll事件
for(int i=;i<ret;i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare )
{
// 這個函數基本是 OnPollPreparePfn
// 在pollPreaprePfn中,将poll_inner中添加的timeout事件删除,并添加到active list中
item->pfnPrepare( item,result->events[i],active );
}
else
{
AddTail( active,item );
}
}
// 2. 将stTimeout_t中的timeout事件全部添加到timeout連結清單中
unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout,now,timeout );
// 設定其為timeout事件
stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
// 3. 添加timeoutList 到 active list
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
// 4. 對active list進行周遊執行
lp = active->head;
while( lp )
{
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
// 這裡會對timeout事件進行判斷,若時間不逾時,仍然會将其加入到stTimeout_t的timeout數組隊列中
if (lp->bTimeout && now < lp->ullExpireTime)
{
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret)
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if( lp->pfnProcess )
{
lp->pfnProcess( lp );
}
lp = active->head;
}
if( pfn )
{
if( - == pfn( arg ) )
{
break;
}
}
}
}
具體步驟如下:
- 調用epoll_wait等待監聽的事件。
- 将stTimeout_t中的timeout連結清單清空。
- 若epoll中有資料,則将對應的事件加入到stTimeout_t的active連結清單中;同時将timeout數組連結清單中删除本事件的逾時事件。
- 周遊timout數組連結清單,将已經逾時的事件加入到timeout連結清單中。
- 将timeout連結清單中的所有事件置為逾時事件,需要後續特殊處理;同時将timeout連結清單合并到active連結清單。
-
周遊active連結清單,對逾時事件且目前時間未超過逾時時間的,重新将其加入到timeout數組連結清單中,這就解決了上面逾時時間超過60s的問題;對其他的事件進行處理。
至此,整個libco庫的事件監聽的分析已經完成。
5. 總結
每個網絡架構都會有一個類似event_loop的函數,用于輪詢注冊的io事件,libco庫也不例外,輪詢就是比較簡單粗暴,但是又是很有效果。libco庫将socket相關的函數都進行了hook,使得調用者可以使用同步的方法進行編碼,卻能夠異步的執行。