天天看點

epoll介紹及使用

一、 介紹

    Epoll 是一種高效的管理socket的模型,相對于select和poll來說具有更高的效率和易用性。傳統的select以及poll的效率會因為 socket數量的線形遞增而導緻呈二次乃至三次方的下降,而epoll的性能不會随socket數量增加而下降。标準的linux-2.4.20核心不支援epoll,需要打patch。本文主要從linux-2.4.32和linux-2.6.10兩個核心版本介紹epoll。

select運作機制

select()的機制中提供一種

fd_set

的資料結構,實際上是一個long類型的數組,每一個數組元素都能與一打開的檔案句柄(不管是Socket句柄,還是其他檔案或命名管道或裝置句柄)建立聯系,建立聯系的工作由程式員完成,當調用select()時,由核心根據IO狀态修改fd_set的内容,由此來通知執行了select()的程序哪一Socket或檔案可讀。

select機制的問題

  1. 每次調用select,都需要把

    fd_set

    集合從使用者态拷貝到核心态,如果

    fd_set

    集合很大時,那這個開銷也很大
  2. 同時每次調用select都需要在核心周遊傳遞進來的所有

    fd_set

    ,如果

    fd_set

    集合很大時,那這個開銷也很大
  3. 為了減少資料拷貝帶來的性能損壞,核心對被監控的

    fd_set

    集合大小做了限制,并且這個是通過宏控制的,大小不可改變(限制為1024)

poll的機制與select類似,與select在本質上沒有多大差别,管理多個描述符也是進行輪詢,根據描述符的狀态進行處理,但是poll沒有最大檔案描述符數量的限制。也就是說,poll隻解決了上面的問題3,并沒有解決問題1,2的性能開銷問題。

二、 Epoll的使用

epoll用到的所有函數都是在頭檔案sys/epoll.h中聲明的,下面簡要說明所用到的資料結構和函數:

所用到的資料結構

typedef union epoll_data {
                void ptr;
                int fd;
                __uint32_t u32;
                __uint64_t u64;
      } epoll_data_t;
 
      struct epoll_event {
                __uint32_t events;    / Epoll events /
                epoll_data_t data;    / User data variable /
      };
           

    結構體epoll_event 被用于注冊所感興趣的事件和回傳所發生待處理的事件,其中epoll_data 聯合體用來儲存觸發事件的某個檔案描述符相關的資料,例如一個client連接配接到伺服器,伺服器通過調用accept函數可以得到于這個client對應的socket檔案描述符,可以把這檔案描述符賦給epoll_data的fd字段以便後面的讀寫操作在這個檔案描述符上進行。epoll_event 結構體的events字段是表示感興趣的事件和被觸發的事件可能的取值為:EPOLLIN :表示對應的檔案描述符可以讀;

EPOLLOUT:表示對應的檔案描述符可以寫;
EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀
EPOLLERR:表示對應的檔案描述符發生錯誤;
EPOLLHUP:表示對應的檔案描述符被挂斷;
EPOLLET:表示對應的檔案描述符設定為edge模式;
           
  • 水準觸發(LT):預設工作模式,即當epoll_wait檢測到某描述符事件就緒并通知應用程式時,應用程式可以不立即處理該事件;下次調用epoll_wait時,會再次通知此事件
  • 邊緣觸發(ET): 當epoll_wait檢測到某描述符事件就緒并通知應用程式時,應用程式必須立即處理該事件。如果不處理,下次調用epoll_wait時,不會再次通知此事件。(直到你做了某些操作導緻該描述符變成未就緒狀态了,也就是說邊緣觸發隻在狀态由未就緒變為就緒時隻通知一次)。

所用到的函數:

1、epoll_create函數

    函數聲明:int epoll_create(int size)

    該函數生成一個epoll專用的檔案描述符,其中的參數是指定生成描述符的最大範圍。在linux-2.4.32核心中根據size大小初始化哈希表的大小,在linux2.6.10核心中該參數無用,使用紅黑樹管理所有的檔案描述符,而不是hash。

2、epoll_ctl函數

    函數聲明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)

    該函數用于控制某個檔案描述符上的事件,可以注冊事件,修改事件,删除事件。

    參數:epfd:由 epoll_create 生成的epoll專用的檔案描述符;

                op:要進行的操作例如注冊事件,可能的取值

如果調用成功傳回0,不成功傳回-1

EPOLL_CTL_ADD 注冊、
EPOLL_CTL_MOD 修改、
EPOLL_CTL_DEL 删除
fd:關聯的檔案描述符;
event:指向epoll_event的指針;
           

3、epoll_wait函數

函數聲明:int epoll_wait(int epfd,struct epoll_event   events,int maxevents,int timeout)

該函數用于輪詢I/O事件的發生;

參數:

epfd:由epoll_create 生成的epoll專用的檔案描述符;

epoll_event:用于回傳代處理事件的數組;

maxevents:每次能處理的事件數;

timeout:等待I/O事件發生的逾時值(ms);-1永不逾時,直到有事件産生才觸發,0立即傳回。

傳回發生事件數。-1有錯誤。

有關EPOLLOUT(寫)監聽的使用,了解起來有點麻煩。因為監聽一般都是被動操作,用戶端有資料上來需要讀寫(被動的讀操作),EPOLIN監聽事件很好了解,但是伺服器給客戶發送資料是個主動的操作,寫操作如何監聽呢?

 如果将用戶端的socket接口都設定成 EPOLLIN | EPOLLOUT(讀,寫)兩個操作都設定,那麼這個寫操作會一直監聽,有點影響效率。EPOLLOUT(寫)監聽的使用場,一般說明主要有以下三種使用場景:

  1. 對用戶端socket隻使用EPOLLIN(讀)監聽,不監聽EPOLLOUT(寫),寫操作一般使用socket的send操作。

    如果發送遇到EAGAIN/EWOULDBLOCK 就去加入EPOLLOUT,等待直到socket輸出緩沖區可以寫,那麼epoll_wait就能觸發EPOLLOUT,再去寫資料。見示例2

  2. 用戶端的socket初始化為EPOLLIN(讀)監聽,有資料需要發送時,對用戶端的socket修改為EPOLLOUT(寫)操作,這時EPOLL機制會回調發送資料的函數,發送完資料之後,再将用戶端的socket修改為EPOLL(讀)監聽
  3. 對用戶端socket使用EPOLLIN 和 EPOLLOUT兩種操作,這樣每一輪epoll_wait循環都會回調讀,寫函數,這種方式效率不是很好

簡單的應用

跟網上的其它代碼修改了一下,可以直接編譯運作

#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <memory.h>

using namespace std;

#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000

/*
typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;

struct epoll_event {
    __uint32_t events; // Epoll events 
    epoll_data_t data; // User data variable 
};
*/

void setnonblocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if(fcntl(sock,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

int main(int argc, char* argv[])
{
    int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber;
    ssize_t n;
    char line[MAXLINE];
    socklen_t clilen;


    if ( 2 == argc )
    {
        if( (portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr,"Usage:%s portnumber\n",argv[0]);
            return 1;
        }
    }
    else
    {
        fprintf(stderr,"Usage:%s portnumber\n",argv[0]);
        return 1;
    }



    //聲明epoll_event結構體的變量,ev用于注冊事件,數組用于回傳要處理的事件

    struct epoll_event ev, events[20];
    //生成用于處理accept的epoll專用的檔案描述符

    epfd=epoll_create(256);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    //把socket設定為非阻塞方式
    //setnonblocking(listenfd);

    //設定與要處理的事件相關的檔案描述符
    ev.data.fd=listenfd;
	
    //設定要處理的事件類型
    ev.events = EPOLLIN|EPOLLET;
    //ev.events=EPOLLIN;

    //注冊epoll事件
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
	
    memset(&serveraddr, 0, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr="127.0.0.1";
    inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber);

    serveraddr.sin_port=htons(portnumber);
    bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    maxi = 0;
    while ( 1 ) {
        //等待epoll事件的發生

        nfds=epoll_wait(epfd, events, 20, 500);
        //處理所發生的所有事件

        for(i=0;i<nfds;++i)
        {
            if(events[i].data.fd==listenfd)//如果新監測到一個SOCKET使用者連接配接到了綁定的SOCKET端口,建立新的連接配接。

            {
                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
                if(connfd<0){
                    perror("connfd<0");
                    exit(1);
                }
                //setnonblocking(connfd);

                char *str = inet_ntoa(clientaddr.sin_addr);
                cout << "accapt a connection from " << str << endl;
                //設定用于讀操作的檔案描述符

                ev.data.fd=connfd;
                //設定用于注測的讀操作事件

                ev.events=EPOLLIN|EPOLLET;
                //ev.events=EPOLLIN;

                //注冊ev

                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
            }
            else if(events[i].events&EPOLLIN)//如果是已經連接配接的使用者,并且收到資料,那麼進行讀入。

            {
                cout << "EPOLLIN" << endl;
                if ( (sockfd = events[i].data.fd) < 0)
                    continue;
                if ( (n = read(sockfd, line, MAXLINE)) < 0) {
                    if (errno == ECONNRESET) {
                        close(sockfd);
                        events[i].data.fd = -1;
                    } else
                        std::cout<<"readline error"<<std::endl;
                } else if (n == 0) {
                    close(sockfd);
                    events[i].data.fd = -1;
                }
                line[n] = '/0';
                cout << "read " << line << endl;
                //設定用于寫操作的檔案描述符

                ev.data.fd=sockfd;
                //設定用于注測的寫操作事件

                ev.events=EPOLLOUT|EPOLLET;
                //修改sockfd上要處理的事件為EPOLLOUT

                //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);

            }
            else if(events[i].events&EPOLLOUT) // 如果有資料發送

            {
                sockfd = events[i].data.fd;
                write(sockfd, line, n);
                //設定用于讀操作的檔案描述符

                ev.data.fd=sockfd;
                //設定用于注測的讀操作事件

                ev.events=EPOLLIN|EPOLLET;
                //修改sockfd上要處理的事件為EPOLIN

                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
            }
        }
    }
    return 0;
}
           

示例2

void write_handler2(ev_data *ptr)
{
 
    //判斷指針, 可自由發揮
    if(!ptr || !ptr->buffer){
        close(ptr->fd);
        puts(" write_handler2 ptr is empty ");
        return;
    }
 
     //如果沒資料,就直接傳回了
    if(ptr->nread <= 0){
        printf("buffer is empty , ptr->nread:%d \n",ptr->nread);
        return;
    }
    static struct epoll_event ev ={0,{0}};
 
 
    //要發送的位元組數
    int left = ptr->nread - ptr->start_write_index;
 
    //指針位置
    char * pbuffer = ptr->buffer + ptr->start_write_index;
    int nwriten = -1;
    int write_bytes = 0;
    printf("begin write , nread:%d, start_write_index:%d,left:%d\n",
    ptr->nread,ptr->start_write_index,left);
 
 
    while(left > 0){
        nwriten  = write(ptr->fd,pbuffer,left);
 
        //如果有錯誤
        if(nwriten <= 0){
 
            /*
                socket輸出緩沖區滿了
                那就加入EPOLLOUT事件,等epoll_wait傳回通知你
            */
            if(errno < 0 && ( EWOULDBLOCK == errno || EAGAIN == errno)){
 
                //記錄一下, 下一次要從哪裡開始
                ptr->start_write_index += write_bytes;
 
                //這個純粹為了防止重複的epoll_ctl,額外增加負擔的,你不做判斷也沒事
                if(EPOLLOUT & ptr->events)
                    return;
            
                //增加EPOLLOUT事件
                ptr->events |= EPOLLOUT;
                ev.events = ptr->events;
                ev.data.ptr = ptr;
                
                //修改事件
                epoll_ctl(ptr->epoll_fd,EPOLL_CTL_MOD,ptr->fd,&ev);
                printf("socket buff is full , nread:%d,start_write_index:%d,left:%d\n",
                ptr->nread,ptr->start_write_index,left);
                return;
            }
            else{
 
                //如果出錯了, 這裡你可以自由發揮
                close(ptr->fd);
                free_event(ptr);
                perror("write error");
                return;
            }
        }
        pbuffer += nwriten;
        left -= nwriten;
        write_bytes += nwriten;
    }
 
    //到這裡,說明該寫的都寫了, 重置一下讀寫位置
    ptr->start_write_index = ptr->nread = 0;
 
    //這個判斷純粹為了防止每次都去epoll_ctl,不加判斷也行
    if(EPOLLOUT & ptr->events) {
 
        //把 EPOLLOUT 删了 ,這樣就跟原來一樣還是EPOLLIN|EPOLLET
        ptr->events &= ~EPOLLOUT;
        ev.events =ptr->events;
        ev.data.ptr = ptr;
        //修改一下
        epoll_ctl(ptr->epoll_fd, EPOLL_CTL_MOD, ptr->fd, &ev);
    }
}
           

繼續閱讀