天天看點

boost asio io_service學習筆記

構造函數

構造函數的主要動作就是調用CreateIoCompletionPort建立了一個初始iocp。

Dispatch和post的差別

Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之後執行。

Dispatch會首先檢查目前thread是不是io_service.run/runonce/poll/poll_once線程,如果是,則直接運作。

poll和run的差別

兩者代碼幾乎一樣,都是首先檢查是否有outstanding的消息,如果沒有直接傳回,否則調用do_one()。唯一的不同是在調用size_t do_one(bool block, boost::system::error_code& ec)時前者block = false,後者block = true。

該參數的作用展現在:

BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,

&completion_key, &overlapped, block ? timeout : 0);

是以可以看出,poll處理的是已經完成了的消息,也即GetQueuedCompletionStatus立刻能傳回的。而run則會導緻等待。

poll 的作用是依次處理目前已經完成了的消息,直到所有已經完成的消息處理完成為止。如果沒有已經完成了得消息,函數将退出。poll不會等待。這個函數有點類似于PeekMessage。鑒于PeekMessage很少用到,poll的使用場景我也有點疑惑。poll的一個應用場景是如果希望handler的處理有優先級,也即,如果消息完成速度很快,同時可能完成多個消息,而消息的處理過程可能比較耗時,那麼可以在完成之後的消息處理函數中不真正處理資料,而是把handler儲存在隊列中,然後按優先級統一處理。代碼如下:

while (io_service.run_one()) { 

    // The custom invocation hook adds the handlers to the priority queue 

    // rather than executing them from within the poll_one() call. 

    while (io_service.poll_one())      ;    pri_queue.execute_all(); }

循環執行poll_one讓已經完成的消息的wrap_handler處理完畢,也即插入一個隊列中,然後再統一處理之。這裡的wrap_handler是一個class,在post的時候,用如下代碼:

io_service.post(pri_queue.wrap(0, low_priority_handler));或者 acceptor.async_accept(server_socket, pri_queue.wrap(100, high_priority_handler));

template <typename Handler> wrapped_handler<Handler> handler_priority_queue::wrap(int priority, Handler handler) 

{    return wrapped_handler<Handler>(*this, priority, handler); }

參見boost_asio/example/invocation/prioritised_handlers.cpp

這個sample也同時表現了wrap的使用場景。

也即把handler以及參數都wrap成一個object,然後把object插入一個隊列,在pri_queue.execute_all中按優先級統一處理。

run的作用是處理消息,如果有消息未完成将一直等待到所有消息完成并處理之後才退出。

reset和stop

文檔中reset的解釋是重置io_service以便下一次調用。

當 run,run_one,poll,poll_one是被stop掉導緻退出,或者由于完成了所有任務(正常退出)導緻退出時,在調用下一次 run,run_one,poll,poll_one之前,必須調用此函數。reset不能在run,run_one,poll,poll_one正在運作時調用。如果是消息處理handler(使用者代碼)抛出異常,則可以在處理之後直接繼續調用 io.run,run_one,poll,poll_one。 例如:

boost::asio::io_service io_service;

...

for (;;)

{

try

io_service.run();

break; // run() exited normally

}

catch (my_exception& e)

// Deal with exception as appropriate.

}在抛出了異常的情況下,stopped_還沒來得及被asio設定為1,是以無需調用reset。

reset函數的代碼僅有一行:

void reset()

::InterlockedExchange(&stopped_, 0);

也即,當io.stop時,會設定stopped_=1。當完成所有任務時,也會設定。

總的來說,單線程情況下,不管io.run是如何退出的,在下一次調用io.run之前調用一次reset沒有什麼壞處。例如:

for(;;)

io.run();

catch(…)

io.reset();

如果是多線程在運作io.run,則應該小心,因為reset必須是所有的run,run_one,poll,poll_one退出後才能調用。

文檔中的stop的解釋是停止io_service的處理循環。

此函數不是阻塞函數,也即,它僅僅隻是給iocp發送一個退出消息而并不是等待其真正退出。因為poll和poll_one本來就不等待(GetQueuedCompletionStatus時timeout = 0),是以此函數對poll和poll_one無意義。對于run_one來說,如果該事件還未完成,則run_one會立刻傳回。如果該事件已經完成,并且還在進行中,則stop并無特殊意義(會等待handler完成後自然退出)。對于run來說,stop的調用會導緻run中的 GetQueuedCompletionStatus立刻傳回。并且由于設定了stopped = 1,此前完成的消息的handlers也不會被調用。考慮一下這種情況:在io.stop之前,有1k個消息已經完成但尚未處理,io.run正在依次從 GetQueuedCompletionStatus中獲得資訊并且調用handlers,調用io.stop設定stopped=1将導緻後許 GetQueuedCompletionStatus傳回的消息直接被丢棄,直到收到退出消息并退出io.run為止。

void stop()

if (::InterlockedExchange(&stopped_, 1) == 0)

if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))

DWORD last_error = ::GetLastError();

boost::system::system_error e(

boost::system::error_code(last_error,

boost::asio::error::get_system_category()),

"pqcs");

boost::throw_exception(e);

注意除了讓目前代碼退出之外還有一個副作用就是設定了stopped_=1。這個副作用導緻在stop之後如果不調用reset,所有run,run_one,poll,poll_one都将直接退出。

另一個需要注意的是,stop會導緻所有未完成的消息以及完成了但尚未處理得消息都直接被丢棄,不會導緻handlers倍調用。

注意這兩個函數都不會CloseHandle(iocp.handle_),那是析構函數幹的事情。

注意此處有個細節:一次PostQueuedCompletionStatus僅導緻一次 GetQueuedCompletionStatus傳回,那麼如果有多個thread此時都在io.run,并且block在 GetQueuedCompletionStatus時,調用io.stop将PostQueuedCompletionStatus并且導緻一個 thread的GetQueuedCompletionStatus傳回。那麼其他的thread呢?進入io_service的do_one(由run 函數調用)代碼可以看到,當GetQueuedCompletionStatus傳回并且發現是退出消息時,會再發送一次 PostQueuedCompletionStatus。代碼如下:

else

    // Relinquish responsibility for dispatching timers. If the io_service

    // is not being stopped then the thread will get an opportunity to

    // reacquire timer responsibility on the next loop iteration.

    if (dispatching_timers)

    {

      ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);

    }

    // The stopped_ flag is always checked to ensure that any leftover

    // interrupts from a previous run invocation are ignored.

    if (::InterlockedExchangeAdd(&stopped_, 0) != 0)

      // Wake up next thread that is blocked on GetQueuedCompletionStatus.

      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))

      {

        last_error = ::GetLastError();

        ec = boost::system::error_code(last_error,

            boost::asio::error::get_system_category());

        return 0;

      }

      ec = boost::system::error_code();

      return 0;

Wrap

這個函數是一個文法糖。

Void func(int a);

io_service.wrap(func)(a);

相當于io_service.dispatch(bind(func,a));

可以儲存io_service.wrap(func)到g,以便在稍後某些時候調用g(a);

例如:

socket_.async_read_some(boost::asio::buffer(buffer_),      strand_.wrap( 

        boost::bind(&connection::handle_read, shared_from_this(), 

          boost::asio::placeholders::error, 

          boost::asio::placeholders::bytes_transferred)));

這是一個典型的wrap用法。注意async_read_some要求的參數是一個handler,在read_some結束後被調用。由于希望真正被調用的handle_read是串行化的,在這裡再post一個消息給io_service。以上代碼類似于:

void A::func(error,bytes_transferred)

strand_.dispatch(boost::bind(handle_read,shared_from_this(),error,bytes_transferred);

socket_.async_read_some(boost::asio::buffer(buffer_), func);

注意1點:

io_service.dispatch(bind(func,a1,…an)),這裡面都是傳值,無法指定bind(func,ref(a1)…an)); 是以如果要用ref語義,則應該在傳入wrap時顯式指出。例如:

void func(int& i){i+=1;}

void main()

int i = 0;

boost::asio::io_service io;

io.wrap(func)(boost::ref(i));

printf("i=%d\n");

當然在某些場合下,傳遞shared_ptr也是可以的(也許更好)。

從handlers抛出的異常的影響

當handlers抛出異常時,該異常會傳遞到本線程最外層的io.run,run_one,poll,poll_one,不會影響其他線程。捕獲該異常是程式員自己的責任。

Thread1,2,3,4()

Void func(void)

throw 1;

Thread5()

io_service.post(func);

注意這種情況下無需調用io_service.reset()。

這種情況下也不能調用reset,因為調用reset之前必須讓所有其他線程正在調用的io_service.run退出。(reset調用時不能有任何run,run_one,poll,poll_one正在運作)

Work

有些應用程式希望在沒有pending的消息時,io.run也不退出。比如io.run運作于一個背景線程,該線程在程式的異步請求發出之前就啟動了。

可以通過如下代碼實作這種需求:

main()

boost::asio::io_service::work work(io_service);

Create thread

Getchar();

Thread()

Io_service.run();

這種情況下,如果work不被析構,該線程永遠不會退出。在work不被析構得情況下就讓其退出,可以調用io.stop。這将導緻 io.run立刻退出,所有未完成的消息都将丢棄。已完成的消息(但尚未進入handler的)也不會調用其handler函數(由于在stop中設定了 stopped_= 1)。

如果希望所有發出的異步消息都正常處理之後io.run正常退出,work對象必須析構,或者顯式的删除。

auto_ptr<boost::asio::io_service::work> work(

new boost::asio::io_service::work(io_service));

work.reset(); // Allow run() to normal exit.

work是一個很小的輔助類,隻支援構造函數和析構函數。(還有一個get_io_service傳回所關聯的io_service)

代碼如下:

inline io_service::work::work(boost::asio::io_service& io_service)

: io_service_(io_service)

io_service_.impl_.work_started();

inline io_service::work::work(const work& other)

: io_service_(other.io_service_)

inline io_service::work::~work()

io_service_.impl_.work_finished();

void work_started()

::InterlockedIncrement(&outstanding_work_);

// Notify that some work has finished.

void work_finished()

if (::InterlockedDecrement(&outstanding_work_) == 0)

stop();

可以看出構造一個work時,outstanding_work_+1,使得io.run在完成所有異步消息後判斷outstanding_work_時不會為0,是以會繼續調用GetQueuedCompletionStatus并阻塞在這個函數上。

而析構函數中将其-1,并判斷其是否為0,如果是,則post退出消息給GetQueuedCompletionStatus讓其退出。

是以work如果析構,則io.run會在處理完所有消息之後正常退出。work如果不析構,則io.run會一直運作不退出。如果使用者直接調用io.stop,則會讓io.run立刻退出。

特别注意的是,work提供了一個拷貝構造函數,是以可以直接在任意地方使用。對于一個io_service來說,有多少個work執行個體關聯,則outstanding_work_就+1了多少次,隻有關聯到同一個io_service的work全被析構之後,io.run才會在所有消息處理結束之後正常退出。

strand

strand是另一個輔助類,提供2個接口dispatch和post,語義和io_service的dispatch和post類似。差別在于,同一個strand所發出的dispatch和post絕對不會并行執行,dispatch和post所包含的handlers也不會并行。是以如果希望串行處理每一個tcp連接配接,則在accept之後應該在該連接配接的資料結構中構造一個strand,并且所有dispatch/post(recv /send)操作都由該strand發出。strand的作用巨大,考慮如下場景:有多個thread都在執行async_read_some,那麼由于線程排程,很有可能後接收到的包先被處理,為了避免這種情況,就隻能收完資料後放入一個隊列中,然後由另一個線程去統一處理。

void connection::start() 

socket_.async_read_some(boost::asio::buffer(buffer_), 

strand_.wrap( 

boost::bind(&connection::handle_read, shared_from_this(), 

boost::asio::placeholders::error, 

boost::asio::placeholders::bytes_transferred))); 

不使用strand的處理方式:

前端tcp iocp收包,并且把同一個tcp連接配接的包放入一個list,如果list以前為空,則post一個消息給後端vnn iocp。後端vnn iocp收到post的消息後循環從list中擷取資料,并且處理,直到list為空為止。處理結束後重新調用 GetQueuedCompletionStatus進入等待。如果前端tcp iocp發現list過大,意味着處理速度小于接收速度,則不再調用iocpRecv,并且設定标志,當vnn iocp thread處理完了目前所有積壓的資料包後,檢查這個标志,重新調用一次iocpRecv。

使用strand的處理方式:

前端tcp iocp收包,收到包後直接通過strand.post(on_recved)發給後端vnn iocp。後端vnn iocp處理完之後再調用一次strand.async_read_some。

這兩種方式我沒看出太大差別來。如果對資料包的處理的确需要阻塞操作,例如db query,那麼使用後端iocp以及後端thread是值得考慮的。這種情況下,前端iocp由于僅用來異步收發資料,是以1個thread就夠了。在确定使用2級iocp的情況下,前者似乎更為靈活,也沒有增加什麼開銷。

值得讨論的是,如果後端多個thread都處于db query狀态,那麼實際上此時依然沒有thread可以提供資料處理服務,是以2級iocp意義其實就在于在這種情況下,前端tcp iocp依然可以accept,以及recv第一次資料,不會導緻使用者connect不上的情況。在後端thread空閑之後會處理這期間的recv到的資料并在此async_read_some。

如果是單級iocp(假定handlers沒有阻塞操作),多線程,那麼strand的作用很明顯。這種情況下,很明顯應該讓一個tcp連接配接的資料處理過程串行化。

Strand的實作原理

Strand内部實作機制稍微有點複雜。每次發出strand請求(例如 async_read(strand_.wrap(funobj1))),strand再次包裹了一次成為funobj2。在async_read完成時,系統調用funobj2,檢查是否正在執行該strand所發出的完成函數(檢查該strand的一個标志位),如果沒有,則直接調用 funobj2。如果有,則檢查是否就是目前thread在執行,如果是,則直接調用funobj2(這種情況可能發生在嵌套調用的時候,但并不産生同步問題,就像同一個thread可以多次進入同一個critical_session一樣)。如果不是,則把該funobj2插入到strand内部維護的一個隊列中。

繼續閱讀