天天看點

Windows下完成端口移植Linux下的epoll

     距離上一篇部落格都已經半個多月了,這麼多天一直在學習研究關于Windows的完成端口移植到Linux下epoll方面的内容。這兩方面以前都沒有太多的接觸,是以花費了較長的時間。在連續加班兩天後,用一個周末的代價換來了一個調試成功。下面就把最近的成果與各位網友分享一下。如有不正确之處,望指正。    先來說說Windows下的完成端口。完成端口号稱是Windows下面最複雜的異步IO操作。但是如果你想開發出具有高性能的、支援大量連接配接的網絡服務程式的話,就必須将它拿下。這裡假設你已經對完成端口有一定的了解了。        下面引用一下幽默講解Windows支援的五種Socket I/O模型的例子來通俗的說一下完成端口究竟是怎麼回事。     老陳有一個在外地工作的女兒,不能經常回來,老陳和她通過信件聯系。他們的信會被郵差投遞到他們的微軟信箱裡。      我們平時使用的select模型,老陳每隔幾分鐘便到樓下看看是否有信。這樣的方式會浪費老陳很多時間。同理,程式會阻塞在這裡等待資料的到來,使得該程序(線程)無法進行其他的操作,導緻性能的降低。      WSAAsyncSelect模型、WSAEventSelect模型同為事件觸發模型。此時,隻要有信到,微軟就會主動通知老陳。此時,老陳隻需要等待通知即可,在等待過程中可以做其他的事情。  而Overlapped I/O 事件通知模型基本和上面兩種類似。隻是,老陳不需要上下樓取信了,他隻需告訴微軟自己在幾樓幾号,微軟就會把信送到老陳家裡。      後來微軟推出了Overlapped I/O 完成例程模型,老陳将自己拆信—閱讀—回複的過程告訴微軟,微軟就會按照上述步驟去處理信件。      但是,由于微軟要處理的信件實在太多了,信箱經常崩潰。于是采用了新技術Completion Port來處理這些信件。        通過Win32的重疊I/O機制,應用程式可以提請一項I/O操作,重疊的操作請求在背景完成,而同一時間提請操作的線程去做其他的事情。等重疊操作完成後線程收到有關的通知。而一個完成端口其實就是一個通知隊列,由作業系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操作一旦完成,某個可以對該操作結果進行處理的工作者線程就會收到一則通知。        完成端口的使用主要分兩步。      首先,建立完成端口         HANDLE        hIocp;

hIocp = CreateIoCompletionPort(

        INVALID_HANDLE_VALUE,

        NULL,

        (ULONG_PTR)0,

        0);

if (hIocp == NULL) {

        // Error

}      完成端口建立後,要把将使用該完成端口的套接字與之關聯起來。方法是再次調用CreateIoCompletionPort ()函數,第一個參數FileHandle設為套接字的句柄,第二個參數ExistingCompletionPort 設為剛剛建立的那個完成端口的句柄。

以下代碼建立了一個套接字,并把它和前面建立的完成端口關聯起來: SOCKET        s;

s = socket(AF_INET, SOCK_STREAM, 0);

if (s == INVALID_SOCKET) {

        // Error }

if (CreateIoCompletionPort((HANDLE)s, 

                                       hIocp, 

                                      (ULONG_PTR)0, 

                                                        0) == NULL)

{

// Error

}

... 這時就完成了套接字與完成端口的關聯操作。在這個套接字上進行的任何重疊操作都将通過完成端口發出完成通知。       其次,使用API函數GetQueuedCompletionStatus來不斷的監聽查詢某個完成端口的I/O操作的結果。通常來講,在主線程中都隻建立一個完成端口,将所有的套接字都與此完成端口關聯。而進行監聽查詢的線程數一般取CPU數量的兩倍。 BOOL GetQueuedCompletionStatus(

        HANDLE CompletionPort,             // handle to completion port

        LPDWORD lpNumberOfBytes,        // bytes transferred

        PULONG_PTR lpCompletionKey,     // file completion key

        LPOVERLAPPED *lpOverlapped,     // buffer

        DWORD dwMilliseconds              // optional timeout value

);   第一個參數指出了線程要監視哪一個完成端口。GetQueuedCompletionStatus使調用線程挂起,直到指定的端口的I/O完成隊列中出現了一項或直到逾時。同I/O完成端口相關聯的第3個資料結構是使線程得到完成I/O項中的資訊:傳輸的位元組數,完成鍵和OVERLAPPED結構的位址。該資訊是通過傳遞給GetQueuedCompletionSatatus的lpdwNumberOfBytesTransferred,lpdwCompletionKey和lpOverlapped參數傳回給線程的。      注意lpOverlapped,這是很重要的一個資料結構,從這裡你将獲得你想要的資料,并進行判斷處理。這裡你可能會問,這個lpOverlapped資料結構是哪裡來的,是什麼類型的呢?接下來你就明白了。       上面讨論了完成端口的使用,這其實是後期的處理,要想真正了解整個過程,還需要學習下面關于之前如何将發送和接收資料的I/O操作送出。     一個是API函數WSARecv從一個套接口接收資料。   int WSAAPI WSARecv ( SOCKET s, LPWSABUF lpBuffers,

  DWORD dwBufferCount, LPDWORD lpNumberOfBytesRecvd,

  LPINT lpFlags, LPWSAOVERLAPPED lpOverlapped,

  LPWSAOVERLAPPED_COMPLETION_ROUTINE

l     pCompletionRoutine );    lpOverlapped:一個指向WSAOVERLAPPED結構的指針,在這個參數中就可以設定你要接收的資料結構。   另一個是API函數WASSend在一個已連接配接的套接口上發送資料。   int WSAAPI WSASend (    

  SOCKET s,    

  LPWSABUF lpBuffers,

  DWORD dwBufferCount,    

  LPDWORD lpNumberOfBytesSent,

  int iFlags,    

  LPWSAOVERLAPPED lpOverlapped,

  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine

  );   同理,lpOverlapped用來設定發送是資料結構。   對完成端口來說,将一個套結字邦定到完成端口後,WSARecv和WSASend會立即傳回,提高了系統的效率。可以調用 GetQueuedCompletionStatus來判斷WSARecv和WSASend是否完成。主線程接受到一個連接配接後,調用WSARecv等待該連接配接發送的資料(不阻塞,由完成端口實作資料的接受完畢判斷)。線上程函數中接受完畢,然後用WSASend函數發送給客戶資料(同樣是不阻塞,直接傳回,由完成端口判斷資料是否發送完畢)。這樣線上程函數中需要程式員自己設定狀态來區分是發送完畢還是接受完畢。 注意WSARecv 隻是向系統送出一個異步接收請求,這個請求會在有資料到達之後傳回,并且放入完成隊列通知工作線程,這個異步接收請求到此完成,繼續送出請求是為了接收下一個資料包,也就是說,每次請求傳回之後必須再次送出。WSASend也隻是向系統送出一個異步發送請求,當發送成功後,需要送出WSARecv接收請求,因為發送是主動的,發送完畢後必然要等待接收對方的回複。如果不送出WSARecv接收請求,則對方發過來的資料後,完成端口不會監聽。   先寫這些,下一篇在寫關于Linux下面epoll的相關内容。       在上一篇中,我們主要讨論了Windows下關于完成端口的一些知識。對應于完成端口,Linux下面在2.5.44核心中有了epoll,這個是為處理大批量句柄而引進的。   先來看看為什麼要引進epoll以及它帶來的好處。 在Linux核心中,原有的select所用到的FD_SET是有限的,在核心中的參數_FD_SETSIZE來設定的。如果想要同時檢測1025個句柄的可讀(或可寫)狀态,則select無法滿足。而且,而且select是采用輪詢方法進行檢測的,也就是說每次檢測都要周遊所有FD_SET中的句柄。顯然,當随着FD_SET中的句柄數的增多,select的效率會不斷的下降。如今的伺服器,都是要滿足上萬甚至更多的連接配接的,顯然想要更高效的實作這一要求,必須采用新的方法。于是,不斷的修改後,終于形成了穩定的epoll。 epoll優點:(1)支援大數量的socket描述符(FD)。舉個例子來說,在1GB記憶體的機器上大約可以打開10萬個左右的socket,這個數字足以滿足一般的伺服器需求。(2)epoll的IO效率不會随着FD數量的增加而線性下降(多少肯定會下降的)。至于原理,可以查閱epoll的實作原理;(3)使用mmap加速核心與使用者空間的消息傳遞,無論是select,poll還是epoll都需要核心把FD消息通知給使用者空間,如何避免不必要的記憶體拷貝就很重要,在這點上,epoll是通過核心于使用者空間mmap同一塊記憶體實作的。當然epoll還有其他一些優點,這裡就不一一列舉了。   下面來重點說說epoll的使用,這也是大家最關心的部分。在2.6核心中epoll變的簡潔而強大。 先來弄清楚一個概念,即epoll的2種工作方式:LT和ET。 LT(level triggered)是預設的工作方式,同時支援block和non-block。其實這個有點像電路裡面的電平觸發方式。在這種模式下,核心會告訴你一個檔案描述符fd就緒了,然後你就可以對這個fd進行IO操作。如果你不做任何操作,核心會繼續通知你。是以,假如你讀取資料沒有讀取完時,核心會繼續通知你。其實傳統的select/poll就是這種模式。 ET(edge-triggered)是告訴工作方式,隻支援non-block。其實這個有點像電路裡面的邊沿觸發方式。這個是高效伺服器必選的方式。在這種模式下,當描述符從未就緒變為就緒時,核心通過epoll通知你。然後對這個fd隻通知你一次,因為之後一直為就緒态,沒有了狀态的變化,直到你做了某些操作導緻了那個fd不再為就緒态。但是注意,如果一直不對這個fd進行IO操作,核心不會發送更多的通知。 在弄清楚了上述兩種模式之後,接下來就可以使用epoll了。 主要用到三個函數epoll_create,epoll_ctl,epoll_wait。   int epoll_create(int size); 建立一個epoll的句柄,size用來告訴核心這個監聽的數目一共有多大。當建立好epoll句柄後,它就是會占用一個fd值,是以在使用完epoll後,必須調用close()關閉,否則可能導緻fd被耗盡。   int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); epoll的事件注冊函數第一個參數是epoll_create()的傳回值,第二個參數表示動作,用三個宏來表示:   EPOLL_CTL_ADD:注冊新的fd到epfd中;

  EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;

  EPOLL_CTL_DEL:從epfd中删除一個fd;   第三個參數是需要監聽的fd,第四個參數是告訴核心需要監聽什麼事,struct epoll_event結構如下:   struct epoll_event {

     __uint32_t events;    

     epoll_data_t data;    

  };   events可以是以下幾個宏的集合:   EPOLLIN :表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉);

  EPOLLOUT:表示對應的檔案描述符可以寫;

  EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶                   外資料到來);

  EPOLLERR:表示對應的檔案描述符發生錯誤;

  EPOLLHUP:表示對應的檔案描述符被挂斷;

  EPOLLET:将EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水                 平觸發(Level Triggered)來說的。

  EPOLLONESHOT:隻監聽一次事件,當監聽完這次事件之後,如果還需要                          繼續監聽這個socket的話,需要再次把這個socket加入                          到EPOLL隊列裡   int epoll_wait(int epfd, struct epoll_event * events,                int maxevents, int timeout); 等待事件的産生,類似于select()調用。參數events用來從核心得到事件的集合,maxevents告之核心這個events有多大,這個maxevents的值不能大于建立epoll_create()時的size,參數timeout是逾時時間(毫秒,0會立即傳回,-1将永久阻塞)。該函數傳回需要處理的事件數目,如傳回0表示已逾時。     下面給出epoll編寫伺服器的模型: #include <stdio.h>

#include <sys/socket.h>

#include <sys/epoll.h>

#include <fcntl.h>

#include <string.h>

#include <errno.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#define MAXSIZE         64000

#define MAXEPS            256

//#define EVENTS            100

#define LISTENQ         32

#define SERV_PORT     8000

int setnonblock(int sock)

{

     int flags = fcntl(sock, F_GETFL, 0);

     if(-1 == flags) {

            perror("fcntl(sock, F_GETFL)");

            return -1;

     }

     flags |= O_NONBLOCK;

     if(-1 == fcntl(sock, F_SETFL, flags)) {

            perror("fcntl(sock, F_SETFT, flags)");

            return -2;

     }

     return 0;

}

int main(int argc, char *argv[])

{

     int i, maxi, listenfd, connfd, sockfd, epfd, nfds;

     ssize_t n;

     char buf[MAXSIZE];        

     socklen_t clilen;

     struct epoll_event ev, events[20];

     epfd = epoll_create(MAXEPS);

     struct sockaddr_in clientaddr;

     struct sockaddr_in serveraddr;

     listenfd = socket(AF_INET, SOCK_STREAM, 0);

     setnonblock(listenfd);

     ev.data.fd = listenfd;

     ev.events = EPOLLIN | EPOLLET;

     epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

     memset(&serveraddr, 0, sizeof(serveraddr));

     serveraddr.sin_family = AF_INET;

     serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

     serveraddr.sin_port = htons(SERV_PORT);

     if(bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) != 0) {

            perror("bind failed");

            return -1;

     }

     if(listen(listenfd, LISTENQ) != 0) {

            perror("listen failed");

            return -2;

     }

     maxi = 0;

     printf("began to accept...\n");

     for(;;) {

            nfds = epoll_wait(epfd, events, 32, 10000);

            if(-1 == m_nfds)    {

                 if(EINTR == errno) {

            continue;

    }

    return -1;

            }

         for(i=0; i<nfds; ++i) {

                 if(events[i].data.fd == listenfd) {

                        connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilen);

                        if(connfd < 0) {

                             perror("accept failed");

                             return -3;

                        }

         printf("accepted..\n");

                        setnonblock(connfd);

                        ev.data.fd = connfd;

                        ev.events = EPOLLIN | EPOLLET;

                        epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

                 }

                 else if(events[i].events & EPOLLIN) {

                        if((sockfd = events[i].data.fd) < 0)

                             continue;

                        if((n = read(sockfd, buf, MAXSIZE)) < 0) {

                             if(errno == ECONNRESET) {    

                                    close(sockfd);

                                    events[i].data.fd = -1;

                             } else {

                                    perror("read failed");

                             }

                        }

                        else if(0 == n) {

                             close(sockfd);

                             events[i].data.fd = -1;

                        }

         printf("Read the buf: %s\n", buf);

                        ev.data.fd = sockfd;

                        ev.events = EPOLLOUT | EPOLLET;

                        epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);

                 }

                 else if(events[i].events & EPOLLOUT) {

                        sockfd = events[i].data.fd;

                        char *sndbuf = "I get your message!";

                        write(sockfd,sndbuf, 10);

                        ev.data.fd = sockfd;

                        ev.events = EPOLLIN | EPOLLET;

                        epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);

                 }

            }

     }

     return 0;

}     個人總結: 一般epoll伺服器模型基本就是上面那個。有人你加入線程池來進行處理,即在epoll-wait和收發處理分開到兩個線程中,這樣在處理收發資料時,仍然不影響epoll進行監聽。也有人建議不使用線程池,因為線程切換等帶來的開銷不亞于資料處理。這裡本人沒有進行測試,不便下結論。 主要先想說的,很多在上述模型之外,還需要注意的就是如何讀寫的問題。因為上述模型隻是小量的資料收發,比較簡單。 由于epoll工作在ET模式的時候,必須使用非阻塞套接口,以避免由于一個檔案句柄的阻塞讀/阻塞寫操作把處理多個檔案描述符的任務餓死。隻有當read(2)或者write(2)傳回EAGAIN時才需要挂起,等待。那麼如何保證讀取資料或者是發送資料都已經結束了呢?(即全部讀完或全部發送)。 讀資料的時候需要考慮的是當read()傳回的大小如果等于請求的大小,那麼很有可能是緩沖區還有資料未讀完,也意味着該次事件還沒有處理完,是以還需要再次讀取,直到傳回的大小小于請求的大小。也可以在一個while(true)循環當中不斷的read,直到傳回EAGAIN位置。建議使用前一種方法。因為很長時間沒有給對方回複後,對方可能會認為此次資料包丢失,接着再次發送同樣的資料包,伺服器此時可以控制退出此次讀取,重新讀取。 同理,如果發送端流量大于接收端的流量(意思是epoll所在的程式讀比轉發的socket要快),由于是非阻塞的socket,那麼send()函數雖然傳回,但實際緩沖區的資料并未真正發給接收端,這樣不斷的讀和發,當緩沖區滿後會産生EAGAIN錯誤,同時不理會這次請求發送的資料。這時就需要你對send進行一些改動,等待片刻後繼續發送,并檢檢視發送的資料量是否正确。

本文出自 “飛雪待劍” 部落格,請務必保留此出處http://jazka.blog.51cto.com/809003/252620

繼續閱讀