源碼可以到http://www.aoc.nrao.edu/php/tjuerges/ALMA/ACE-5.5.2/html/ace/上去找.
ACE_Select_Reactor_T主要是使用select來進行多路複用和分離。為了監測多個fd,ACE中新增了ACE_Handle_Set,就像類說明的說的,C++ wrapper facade for the socket @c fd_set abstraction.
用來作為fd_set的抽象。這個類是ACE_Dev_Poll_Reactor和ACE_WFMO_Reactor中不會使用到得。
其中ACE_Select_Reactor_T的繼承圖如下:

而ACE_Select_Reactor_T中最核心的當然是事件的多路複用函數handle_events_i,其源碼如下:
1414 template <class ACE_SELECT_REACTOR_TOKEN> int
1415 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
1416 (ACE_Time_Value *max_wait_time)
1417 {
1418 int result = -;
1419
1420 ACE_SEH_TRY
1421 {
1422 // We use the data member dispatch_set_ as the current dispatch
1423 // set.
1424
1425 // We need to start from a clean dispatch_set
1426 this->dispatch_set_.rd_mask_.reset ();
1427 this->dispatch_set_.wr_mask_.reset ();
1428 this->dispatch_set_.ex_mask_.reset ();
1429
1430 int number_of_active_handles =
1431 this->wait_for_multiple_events (this->dispatch_set_,
1432 max_wait_time);
1433
1434 result =
1435 this->dispatch (number_of_active_handles,
1436 this->dispatch_set_);
1437 }
1438 ACE_SEH_EXCEPT (this->release_token ())
1439 {
1440 // As it stands now, we catch and then rethrow all Win32
1441 // structured exceptions so that we can make sure to release the
1442 // <token_> lock correctly.
1443 }
1444
1445 return result;
1446 }
從這段代碼上看,ACE_Select_Reactor_T是可以監測到多個fd傳回出發的多個events的,這點與ACE_Dev_Poll_Reactor的監測多個fd但是一次隻能最多傳回一個event不同,那麼這些事件的處理又是怎麼樣的呢?
上述的dispatch 函數同步層層調用,最後會調到dispatch_io_set,這個函數也是處理多個觸發事件的函數,其實作為:
1179 template <class ACE_SELECT_REACTOR_TOKEN> int
1180 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set
1181 (int number_of_active_handles,
1182 int &number_of_handlers_dispatched,
1183 int mask,
1184 ACE_Handle_Set &dispatch_mask,
1185 ACE_Handle_Set &ready_mask,
1186 ACE_EH_PTMF callback)
1187 {
1188 ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_set");
1189 ACE_HANDLE handle;
1190
1191 ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
1192
1193 while ((handle = handle_iter ()) != ACE_INVALID_HANDLE &&
1194 number_of_handlers_dispatched < number_of_active_handles)
1195 {
1196 ++number_of_handlers_dispatched;
1197
1198 this->notify_handle (handle,
1199 mask,
1200 ready_mask,
1201 this->handler_rep_.find (handle),
1202 callback);
1203
1204 // clear the bit from that dispatch mask,
1205 // so when we need to restart the iteration (rebuilding the iterator...)
1206 // we will not dispatch the already dispatched handlers
1207 this->clear_dispatch_mask (handle, mask);
1208
1209 if (this->state_changed_)
1210 {
1211
1212 handle_iter.reset_state ();
1213 this->state_changed_ = false;
1214 }
1215 }
1216
1217 return ;
1218 }
從實作中可以看出,在while循環中依次調用notify_handle來處理傳進來的所有的handles,那麼notify_handle又是如何實作的?
template <class ACE_SELECT_REACTOR_TOKEN> void
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle
(ACE_HANDLE handle,
ACE_Reactor_Mask mask,
ACE_Handle_Set &ready_mask,
ACE_Event_Handler *event_handler,
ACE_EH_PTMF ptmf)
{
ACE_TRACE ("ACE_Select_Reactor_T::notify_handle");
// Check for removed handlers.
if (event_handler == )
return;
int reference_counting_required =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
// Call add_reference() if needed.
if (reference_counting_required)
{
event_handler->add_reference ();
}
int status = (event_handler->*ptmf) (handle);
if (status < )
this->remove_handler_i (handle, mask);
else if (status > )
ready_mask.set_bit (handle);
// Call remove_reference() if needed.
if (reference_counting_required)
{
event_handler->remove_reference ();
}
}
最終可以看到每個handle對應的事件處理也就是:
(event_handler->*ptmf) (handle)。
那麼這樣說的話,每次在調用handle_events,select監測所有fd的事件,若有觸發,則全部傳回。将這些觸發的io事件依次的調用handler對應的回調函數進行處理。
一個線程處理這麼多個fd的事件,而且還是要排隊處理,顯然不合理。那麼如何去分攤這些事件的處理呢?答案當然是多線程,這裡又引入了一個ACE_SELECT_REACTOR_TOKEN類來進行線程同步。
具體怎麼做?由于現在監測事件和處理事件都在同一個大的函數中,誰能執行函數,誰就有權利進行監測和處理事件。另一方面,在一個線程已經監測到事件并進行處理的時候,其他線程必須要防止沒有觸發事件的fd上此時有事件。ACE中具體實作方案就是,owner函數來替換舊的線程,誰能夠成為ACE_Select_Reactor的owner,誰就可以去執行監測并處理事件,而為了保證新舊owner之間處理時的安全以及序列化,就增加了ACE_SELECT_REACTOR_TOKEN。
下面來看owner函數的實作:
0118 template <class ACE_SELECT_REACTOR_TOKEN> int
0119 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t tid,
0120 ACE_thread_t *o_id)
0121 {
0122 ACE_TRACE ("ACE_Select_Reactor_T::owner");
0123 ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -));
0124
0125 if (o_id)
0126 *o_id = this->owner_;
0127
0128 this->owner_ = tid;
0129
0130 return ;
0131 }
這段代碼說明了必須要先擷取到Token,目前調用的執行的線程也即新的線程才能成為owner。
這個條件在handle_events中也是有限定的,如果不是目前線程作為owner去執行,handle_events是無法執行的,會立刻退出。其代碼如下:
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
(ACE_Time_Value *max_wait_time)
{
ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
// Stash the current time -- the destructor of this object will
// automatically compute how much time elapsed since this method was
// called.
ACE_Countdown_Time countdown (max_wait_time);
ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -);
if (ACE_OS::thr_equal (ACE_Thread::self (),
this->owner_) == || this->deactivated_)
return -;
// Update the countdown to reflect time waiting for the mutex.
countdown.update ();
#else
if (this->deactivated_)
return -;
#endif /* ACE_MT_SAFE */
return this->handle_events_i (max_wait_time);
}
這樣大家應該比較清楚了。ACE_Select_Reactor_T的多路複用和分離就是這樣去實作的。