天天看點

ACE_Reactor(三)ACE_Select_Reactor_T

源碼可以到http://www.aoc.nrao.edu/php/tjuerges/ALMA/ACE-5.5.2/html/ace/上去找.

ACE_Select_Reactor_T主要是使用select來進行多路複用和分離。為了監測多個fd,ACE中新增了ACE_Handle_Set,就像類說明的說的,C++ wrapper facade for the socket @c fd_set abstraction.

用來作為fd_set的抽象。這個類是ACE_Dev_Poll_Reactor和ACE_WFMO_Reactor中不會使用到得。

其中ACE_Select_Reactor_T的繼承圖如下:

ACE_Reactor(三)ACE_Select_Reactor_T

而ACE_Select_Reactor_T中最核心的當然是事件的多路複用函數handle_events_i,其源碼如下:

1414 template <class ACE_SELECT_REACTOR_TOKEN> int
1415 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
1416   (ACE_Time_Value *max_wait_time)
1417 {
1418   int result = -;
1419 
1420   ACE_SEH_TRY
1421     {
1422       // We use the data member dispatch_set_ as the current dispatch
1423       // set.
1424 
1425       // We need to start from a clean dispatch_set
1426       this->dispatch_set_.rd_mask_.reset ();
1427       this->dispatch_set_.wr_mask_.reset ();
1428       this->dispatch_set_.ex_mask_.reset ();
1429 
1430       int number_of_active_handles =
1431         this->wait_for_multiple_events (this->dispatch_set_,
1432                                         max_wait_time);
1433 
1434       result =
1435         this->dispatch (number_of_active_handles,
1436                         this->dispatch_set_);
1437     }
1438   ACE_SEH_EXCEPT (this->release_token ())
1439     {
1440       // As it stands now, we catch and then rethrow all Win32
1441       // structured exceptions so that we can make sure to release the
1442       // <token_> lock correctly.
1443     }
1444 
1445   return result;
1446 }
           

從這段代碼上看,ACE_Select_Reactor_T是可以監測到多個fd傳回出發的多個events的,這點與ACE_Dev_Poll_Reactor的監測多個fd但是一次隻能最多傳回一個event不同,那麼這些事件的處理又是怎麼樣的呢?

上述的dispatch 函數同步層層調用,最後會調到dispatch_io_set,這個函數也是處理多個觸發事件的函數,其實作為:

1179 template <class ACE_SELECT_REACTOR_TOKEN> int
1180 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set
1181   (int number_of_active_handles,
1182    int &number_of_handlers_dispatched,
1183    int mask,
1184    ACE_Handle_Set &dispatch_mask,
1185    ACE_Handle_Set &ready_mask,
1186    ACE_EH_PTMF callback)
1187 {
1188   ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_set");
1189   ACE_HANDLE handle;
1190 
1191   ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
1192 
1193   while ((handle = handle_iter ()) != ACE_INVALID_HANDLE &&
1194          number_of_handlers_dispatched < number_of_active_handles)
1195     {
1196       ++number_of_handlers_dispatched;
1197 
1198       this->notify_handle (handle,
1199                            mask,
1200                            ready_mask,
1201                            this->handler_rep_.find (handle),
1202                            callback);
1203 
1204       // clear the bit from that dispatch mask,
1205       // so when we need to restart the iteration (rebuilding the iterator...)
1206       // we will not dispatch the already dispatched handlers
1207       this->clear_dispatch_mask (handle, mask);
1208 
1209       if (this->state_changed_)
1210         {
1211 
1212           handle_iter.reset_state ();
1213           this->state_changed_ = false;
1214         }
1215     }
1216 
1217   return ;
1218 }
           

從實作中可以看出,在while循環中依次調用notify_handle來處理傳進來的所有的handles,那麼notify_handle又是如何實作的?

template <class ACE_SELECT_REACTOR_TOKEN> void
 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle
   (ACE_HANDLE handle,
    ACE_Reactor_Mask mask,
    ACE_Handle_Set &ready_mask,
    ACE_Event_Handler *event_handler,
    ACE_EH_PTMF ptmf)
 {
   ACE_TRACE ("ACE_Select_Reactor_T::notify_handle");
   // Check for removed handlers.
   if (event_handler == )
     return;
 
   int reference_counting_required =
     event_handler->reference_counting_policy ().value () ==
     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
 
   // Call add_reference() if needed.
   if (reference_counting_required)
     {
       event_handler->add_reference ();
     }
 
   int status = (event_handler->*ptmf) (handle);
 
   if (status < )
     this->remove_handler_i (handle, mask);
   else if (status > )
     ready_mask.set_bit (handle);
 
   // Call remove_reference() if needed.
   if (reference_counting_required)
     {
       event_handler->remove_reference ();
     }
 }
           

最終可以看到每個handle對應的事件處理也就是:

(event_handler->*ptmf) (handle)。

那麼這樣說的話,每次在調用handle_events,select監測所有fd的事件,若有觸發,則全部傳回。将這些觸發的io事件依次的調用handler對應的回調函數進行處理。

一個線程處理這麼多個fd的事件,而且還是要排隊處理,顯然不合理。那麼如何去分攤這些事件的處理呢?答案當然是多線程,這裡又引入了一個ACE_SELECT_REACTOR_TOKEN類來進行線程同步。

具體怎麼做?由于現在監測事件和處理事件都在同一個大的函數中,誰能執行函數,誰就有權利進行監測和處理事件。另一方面,在一個線程已經監測到事件并進行處理的時候,其他線程必須要防止沒有觸發事件的fd上此時有事件。ACE中具體實作方案就是,owner函數來替換舊的線程,誰能夠成為ACE_Select_Reactor的owner,誰就可以去執行監測并處理事件,而為了保證新舊owner之間處理時的安全以及序列化,就增加了ACE_SELECT_REACTOR_TOKEN。

下面來看owner函數的實作:

0118 template <class ACE_SELECT_REACTOR_TOKEN> int
0119 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t tid,
0120                                                        ACE_thread_t *o_id)
0121 {
0122   ACE_TRACE ("ACE_Select_Reactor_T::owner");
0123   ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -));
0124 
0125   if (o_id)
0126     *o_id = this->owner_;
0127 
0128   this->owner_ = tid;
0129 
0130   return ;
0131 }
           

這段代碼說明了必須要先擷取到Token,目前調用的執行的線程也即新的線程才能成為owner。

這個條件在handle_events中也是有限定的,如果不是目前線程作為owner去執行,handle_events是無法執行的,會立刻退出。其代碼如下:

template <class ACE_SELECT_REACTOR_TOKEN> int
 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
   (ACE_Time_Value *max_wait_time)
 {
   ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
 
 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
 
   // Stash the current time -- the destructor of this object will
   // automatically compute how much time elapsed since this method was
   // called.
   ACE_Countdown_Time countdown (max_wait_time);
 
   ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -);
 
   if (ACE_OS::thr_equal (ACE_Thread::self (),
                          this->owner_) ==  || this->deactivated_)
     return -;
 
   // Update the countdown to reflect time waiting for the mutex.
   countdown.update ();
 #else
   if (this->deactivated_)
     return -;
 #endif /* ACE_MT_SAFE */
 
   return this->handle_events_i (max_wait_time);
 }
           

這樣大家應該比較清楚了。ACE_Select_Reactor_T的多路複用和分離就是這樣去實作的。

繼續閱讀