天天看點

Boost::asio io_service 實作分析

io_servie 實作了一個任務隊列,這裡的任務就是void(void)的函數。Io_servie最常用的兩個接口是post和run,post向任務隊列中投遞任務,run是執行隊列中的任務,直到全部執行完畢,并且run可以被N個線程調用。Io_service是完全線程安全的隊列。

提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop

l  Io_servie是接口類,為實作跨平台,采用了政策模式,所有接口均有impl_type實作。根據平台不同impl_type分為

n  win_iocp_io_service Win版本的實作,這裡主要分析Linux版本。

n  task_io_service 非win平台下的實作,其代碼結構為:

u  detail/task_io_service_fwd.hpp 簡單聲明task_io_service名稱

u  detail/task_io_service.hpp 聲明task_io_service的方法和屬性

u  detail/impl/task_io_service.ipp 具體實作檔案

u  隊列中的任務類型為opertioan,原型其實是typedef task_io_service_operation operation,其實作檔案在detail/task_io_service_operation.hpp中,當隊列中的任務被執行時,就是task_io_service_operation:: complete被調用的時候。

Post向隊列中投遞任務,然後激活空閑線程執行任務。其實作流程如下:

l  Post接收handler作為參數,實際上是個仿函數,通過此仿函數構造出completion_handler對象,completion_handler繼承自operation。然後調用post_immediate_completion。

l  post_immediate_completion首先将outstanding_work_增加,然後調用post_deferred_completion。

l  post_deferred_completion首先加鎖将任務入列,然後調用wake_one_thread_and_unlock

l  wake_one_thread_and_unlock嘗試喚醒目前空閑的線程,其實作中特别之處在于,若沒有空閑線程,但是有線程在執行task->run,即阻塞在epoll_wait上,那麼先中斷epoll_wait執行任務隊列完成後再執行epoll_wait。

l  first_idle_thread_維護了所有目前空閑線程,實際上使用了Leader/Follower模式,每次喚醒時隻喚醒空閑線程的第一個。

         Run方法執行隊列中的所有任務,直到任務執行完畢。

l  run方法首先構造一個idle_thread_info,和first_idle_thread_類型相同,即通過first_idle_thread_将所有線程串聯起來,它這個串聯不是立即串聯的,當該線程無任務可做是加入到first_idle_thread_的首部,有任務執行時,從first_idle_thread_中斷開。這很正常,因為first_idle_thread_維護的是目前空閑線程。

l  加鎖,循環執行do_one方法,直到do_one傳回false

l  do_one每次執行一個任務。首先檢查隊列是否為空,若空将此線程追加到first_idle_thread_的首部,然後阻塞在條件變量上,直到被喚醒。

l  當被喚醒或是首次執行,若stopped_為true(即此時stop方法被調用了),傳回0

l  隊列非空,pop出一個任務,檢查隊列無任務那麼簡單的解鎖,若仍有,調用wake_one_thread_and_unlock嘗試喚醒其他空閑線程執行。然後執行該任務,傳回1.

l  實際上在執行隊列任務時有一個特别的判斷if (o == &task_operation_),那麼将會執行task_->run,task_變量類型為reactor,在linux平台實作為epoll_reactor,實作代碼檔案為detail/impl/epoll_reactor.ipp,run方法實際上執行的是epoll_wait,run阻塞在epoll_wait上等待事件到來,并且處理完事件後将需要回調的函數push到io_servie的任務隊列中,雖然epoll_wait是阻塞的,但是它提供了interrupt函數,該interrupt是如何實作的呢,它向epoll_wait添加一個檔案描述符,該檔案描述符中有8個位元組可讀,這個檔案描述符是專用于中斷epoll_wait的,他被封裝到select_interrupter中,select_interrupter實際上實作是eventfd_select_interrupter,在構造的時候通過pipe系統調用建立兩個檔案描述符,然後預先通過write_fd寫8個位元組,這8個位元組一直保留。在添加到epoll_wait中采用EPOLLET水準觸發,這樣,隻要select_interrupter的讀檔案描述符添加到epoll_wait中,立即中斷epoll_wait。很是巧妙。!!!實際上就是因為有了這個reactor,它才叫io_servie,否則就是一個純的任務隊列了。

l  Run方法的原則是:

n  有任務立即執行任務,盡量使所有的線程一起執行任務

n  若沒有任務,阻塞在epoll_wait上等待io事件

n  若有新任務到來,并且沒有空閑線程,那麼先中斷epoll_wait,先執行任務

n  若隊列中有任務,并且也需要epoll_wait監聽事件,那麼非阻塞調用epoll_wait(timeout字段設定為0),待任務執行完畢在阻塞在epoll_wait上。

n  幾乎對線程的使用上達到了極緻。

n  從這個函數中可以知道,在使用ASIO時,io_servie應該盡量多,這樣可以使其epoll_wait占用的時間片最多,這樣可以最大限度的響應IO事件,降低響應時延。但是每個io_servie::run占用一個線程,是以io_servie最佳應該和CPU的核數相同。

l  加鎖,調用stop_all_threads

l  設定stopped_變量為true,周遊所有的空閑線程,依次喚醒

l  task_interrupted_設定為true,調用task_的interrupt方法

l  task_的類型為reactor,在run方法中已經做了分析

繼續閱讀