天天看點

UDT中的epoll

epoll 是為處理大量句柄而改進的poll,在UDT中也有支援。UDT使用了核心提供的epoll,主要是epoll_create,epoll_wait,epoll_ctl,UDT定義了CEPollDesc這個結構來管理epoll的描述符和套接字。

struct CEPollDesc

{

   int m_iID;                                // epoll ID

   std::set<UDTSOCKET> m_sUDTSocksOut;       // set of UDT sockets waiting for write events

   std::set<UDTSOCKET> m_sUDTSocksIn;        // set of UDT sockets waiting for read events

   std::set<UDTSOCKET> m_sUDTSocksEx;        // set of UDT sockets waiting for exceptions

   int m_iLocalID;                           // local system epoll ID

   std::set<SYSSOCKET> m_sLocals;            // set of local (non-UDT) descriptors

   std::set<UDTSOCKET> m_sUDTWrites;         // UDT sockets ready for write

   std::set<UDTSOCKET> m_sUDTReads;          // UDT sockets ready for read

   std::set<UDTSOCKET> m_sUDTExcepts;        // UDT sockets with exceptions (connection broken, etc.)

};

特别要提醒的是,當對端socket連接配接中斷的時候,也是在m_sUDTReads裡的

UDT還實作了一個類來進行各項操作,實作的有

create():建立一個epoll,調用了epoll_create

add_usock():添加一個UDT套接字到epoll

add_ssock():添加一個系統套接字到epoll,調用了epoll_ctl

remove_usock():從epoll中移除一個UDT套接字

remove_ssock():從epoll中移除一個系統套接字,調用了epoll_ctl

wait():等待epoll事件或者逾時,調用了epoll_wait

release():關閉并釋放一個epoll

UDT裡對epoll的調用是四段式的,比如add_usock這裡,調用順序是epoll_add_usock()->CUDT::epoll_add_usock()->s_UDTUnited.epoll_add_usock()->CEPoll::add_usock

int epoll_add_usock(int eid, UDTSOCKET u, const int* events)

   return CUDT::epoll_add_usock(eid, u, events);

}

int CUDT::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)

   try

   {

      return s_UDTUnited.epoll_add_usock(eid, u, events);

   }

   catch (CUDTException e)

      s_UDTUnited.setError(new CUDTException(e));

      return ERROR;

   catch (...)

      s_UDTUnited.setError(new CUDTException(-1, 0, 0));

int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)

   CUDTSocket* s = locate(u);

   int ret = -1;

   if (NULL != s)

      ret = m_EPoll.add_usock(eid, u, events);

      s->m_pUDT->addEPoll(eid);

   else

      throw CUDTException(5, 4);

   return ret;

int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)

   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);

   if (p == m_mPolls.end())

      throw CUDTException(5, 13);

   if (!events || (*events & UDT_EPOLL_IN))       //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以進行&運算

      p->second.m_sUDTSocksIn.insert(u);

   if (!events || (*events & UDT_EPOLL_OUT))

      p->second.m_sUDTSocksOut.insert(u);

   return 0;

UDT命名空間提供給應用程式調用接口,可成為API層,API層調用CUDT API,這一層主要做錯誤處理,捕捉下面兩層抛出的錯誤儲存起來交給應用程式使用,CUDT API調用CUDTUnited的實作,如果是UDT SOCKET的socket(),bind(),listen()等,到這層也就結束了,不過epoll要多個第四層,再調用CEPoll的實作。現在來看看CUDTUnited和CEPoll的實作。

CUDTSocket* s = locate(u);

 調用CUDTUnited::locate(),根據SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到對應的CUDTSocket結構,如果找不到,抛出錯誤,找到了就調用CEPoll的add_usock實作,根據傳的event參數,将socket導入相應的隊列。之後調用s->m_pUDT->addEPoll(eid)

void CUDT::addEPoll(const int eid)

   CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);                      //這種通過類來實作加鎖解鎖的,我第一次見,還挺友善。

   m_sPollID.insert(eid);

   CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);

   if (!m_bConnected || m_bBroken || m_bClosing)

      return;

   if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||

      ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))

      s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);

   if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())

      s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);

這裡已經開始更新m_sUDTWrites,m_sUDTReads,通過update_events(),如前所述,update_events()也是CEPoll的成員函數。

int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)

   map<int, CEPollDesc>::iterator p;

   vector<int> lost;

   for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)

      p = m_mPolls.find(*i);

      if (p == m_mPolls.end())

      {

         lost.push_back(*i);

      }

      else

         if ((events & UDT_EPOLL_IN) != 0)

            update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);   //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以進行&運算

         if ((events & UDT_EPOLL_OUT) != 0)

            update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);

         if ((events & UDT_EPOLL_ERR) != 0)

            update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);

   for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)

      eids.erase(*i);

void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)

   if (enable && (watch.find(uid) != watch.end()))

      result.insert(uid);

   else if (!enable)

      result.erase(uid);

最後說下wait函數的實作,一樣是四段式,就跳過前面三段,直接看第四段

int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)

   // if all fields is NULL and waiting time is infinite, then this would be a deadlock   都空的的話,會死鎖,抛出異常

   if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))

      throw CUDTException(5, 3, 0);

   // Clear these sets in case the app forget to do it.  清空

   if (readfds) readfds->clear();

   if (writefds) writefds->clear();

   if (lrfds) lrfds->clear();

   if (lwfds) lwfds->clear();

   int total = 0;

   int64_t entertime = CTimer::getTime();

   while (true)

      CGuard::enterCS(m_EPollLock);

      map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);

         CGuard::leaveCS(m_EPollLock);

         throw CUDTException(5, 13);

      if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))

         // no socket is being monitored, this may be a deadlock  都空的的話,會死鎖,抛出異常

         throw CUDTException(5, 3);

      // Sockets with exceptions are returned to both read and write sets.          把m_sUDTReads和m_sUDTExcepts都讀到readfds裡

      if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))

         *readfds = p->second.m_sUDTReads;

         for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)

            readfds->insert(*i);

         total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();

      if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))          //把m_sUDTWrites和m_sUDTExcepts都讀到writefds裡

         *writefds = p->second.m_sUDTWrites;

            writefds->insert(*i);

         total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();

      if (lrfds || lwfds)     //讀系統套接字

         #ifdef LINUX

         const int max_events = p->second.m_sLocals.size();

         epoll_event ev[max_events];

         int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);

         for (int i = 0; i < nfds; ++ i)

         {

            if ((NULL != lrfds) && (ev[i].events & EPOLLIN))

           {

               lrfds->insert(ev[i].data.fd);

               ++ total;

            }

            if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))

            {

               lwfds->insert(ev[i].data.fd);

         }

         #else

         //currently "select" is used for all non-Linux platforms.

         //faster approaches can be applied for specific systems in the future.

         //"select" has a limitation on the number of sockets

         fd_set readfds;

         fd_set writefds;

         FD_ZERO(&readfds);

         FD_ZERO(&writefds);

         for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)

            if (lrfds)

               FD_SET(*i, &readfds);

            if (lwfds)

               FD_SET(*i, &writefds);

         timeval tv;

         tv.tv_sec = 0;

         tv.tv_usec = 0;

         if (::select(0, &readfds, &writefds, NULL, &tv) > 0)

            for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)

               if (lrfds && FD_ISSET(*i, &readfds))

               {

                  lrfds->insert(*i);

                  ++ total;

               }

               if (lwfds && FD_ISSET(*i, &writefds))

                  lwfds->insert(*i);

         #endif

      CGuard::leaveCS(m_EPollLock);

      if (total > 0)

         return total;

      if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))

         throw CUDTException(6, 3, 0);

      CTimer::waitForEvent();

繼續閱讀