一、 介紹
Epoll 是一種高效的管理socket的模型,相對于select和poll來說具有更高的效率和易用性。傳統的select以及poll的效率會因為 socket數量的線形遞增而導緻呈二次乃至三次方的下降,而epoll的性能不會随socket數量增加而下降。标準的linux-2.4.20核心不支援epoll,需要打patch。本文主要從linux-2.4.32和linux-2.6.10兩個核心版本介紹epoll。
select運作機制
select()的機制中提供一種
fd_set
的資料結構,實際上是一個long類型的數組,每一個數組元素都能與一打開的檔案句柄(不管是Socket句柄,還是其他檔案或命名管道或裝置句柄)建立聯系,建立聯系的工作由程式員完成,當調用select()時,由核心根據IO狀态修改fd_set的内容,由此來通知執行了select()的程序哪一Socket或檔案可讀。
select機制的問題
- 每次調用select,都需要把
集合從使用者态拷貝到核心态,如果fd_set
集合很大時,那這個開銷也很大fd_set
- 同時每次調用select都需要在核心周遊傳遞進來的所有
,如果fd_set
集合很大時,那這個開銷也很大fd_set
- 為了減少資料拷貝帶來的性能損壞,核心對被監控的
集合大小做了限制,并且這個是通過宏控制的,大小不可改變(限制為1024)fd_set
poll的機制與select類似,與select在本質上沒有多大差别,管理多個描述符也是進行輪詢,根據描述符的狀态進行處理,但是poll沒有最大檔案描述符數量的限制。也就是說,poll隻解決了上面的問題3,并沒有解決問題1,2的性能開銷問題。
二、 Epoll的使用
epoll用到的所有函數都是在頭檔案sys/epoll.h中聲明的,下面簡要說明所用到的資料結構和函數:
所用到的資料結構
typedef union epoll_data {
void ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; / Epoll events /
epoll_data_t data; / User data variable /
};
結構體epoll_event 被用于注冊所感興趣的事件和回傳所發生待處理的事件,其中epoll_data 聯合體用來儲存觸發事件的某個檔案描述符相關的資料,例如一個client連接配接到伺服器,伺服器通過調用accept函數可以得到于這個client對應的socket檔案描述符,可以把這檔案描述符賦給epoll_data的fd字段以便後面的讀寫操作在這個檔案描述符上進行。epoll_event 結構體的events字段是表示感興趣的事件和被觸發的事件可能的取值為:EPOLLIN :表示對應的檔案描述符可以讀;
EPOLLOUT:表示對應的檔案描述符可以寫;
EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀
EPOLLERR:表示對應的檔案描述符發生錯誤;
EPOLLHUP:表示對應的檔案描述符被挂斷;
EPOLLET:表示對應的檔案描述符設定為edge模式;
- 水準觸發(LT):預設工作模式,即當epoll_wait檢測到某描述符事件就緒并通知應用程式時,應用程式可以不立即處理該事件;下次調用epoll_wait時,會再次通知此事件
- 邊緣觸發(ET): 當epoll_wait檢測到某描述符事件就緒并通知應用程式時,應用程式必須立即處理該事件。如果不處理,下次調用epoll_wait時,不會再次通知此事件。(直到你做了某些操作導緻該描述符變成未就緒狀态了,也就是說邊緣觸發隻在狀态由未就緒變為就緒時隻通知一次)。
所用到的函數:
1、epoll_create函數
函數聲明:int epoll_create(int size)
該函數生成一個epoll專用的檔案描述符,其中的參數是指定生成描述符的最大範圍。在linux-2.4.32核心中根據size大小初始化哈希表的大小,在linux2.6.10核心中該參數無用,使用紅黑樹管理所有的檔案描述符,而不是hash。
2、epoll_ctl函數
函數聲明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
該函數用于控制某個檔案描述符上的事件,可以注冊事件,修改事件,删除事件。
參數:epfd:由 epoll_create 生成的epoll專用的檔案描述符;
op:要進行的操作例如注冊事件,可能的取值
如果調用成功傳回0,不成功傳回-1
EPOLL_CTL_ADD 注冊、
EPOLL_CTL_MOD 修改、
EPOLL_CTL_DEL 删除
fd:關聯的檔案描述符;
event:指向epoll_event的指針;
3、epoll_wait函數
函數聲明:int epoll_wait(int epfd,struct epoll_event events,int maxevents,int timeout)
該函數用于輪詢I/O事件的發生;
參數:
epfd:由epoll_create 生成的epoll專用的檔案描述符;
epoll_event:用于回傳代處理事件的數組;
maxevents:每次能處理的事件數;
timeout:等待I/O事件發生的逾時值(ms);-1永不逾時,直到有事件産生才觸發,0立即傳回。
傳回發生事件數。-1有錯誤。
有關EPOLLOUT(寫)監聽的使用,了解起來有點麻煩。因為監聽一般都是被動操作,用戶端有資料上來需要讀寫(被動的讀操作),EPOLIN監聽事件很好了解,但是伺服器給客戶發送資料是個主動的操作,寫操作如何監聽呢?
如果将用戶端的socket接口都設定成 EPOLLIN | EPOLLOUT(讀,寫)兩個操作都設定,那麼這個寫操作會一直監聽,有點影響效率。EPOLLOUT(寫)監聽的使用場,一般說明主要有以下三種使用場景:
-
對用戶端socket隻使用EPOLLIN(讀)監聽,不監聽EPOLLOUT(寫),寫操作一般使用socket的send操作。
如果發送遇到EAGAIN/EWOULDBLOCK 就去加入EPOLLOUT,等待直到socket輸出緩沖區可以寫,那麼epoll_wait就能觸發EPOLLOUT,再去寫資料。見示例2
- 用戶端的socket初始化為EPOLLIN(讀)監聽,有資料需要發送時,對用戶端的socket修改為EPOLLOUT(寫)操作,這時EPOLL機制會回調發送資料的函數,發送完資料之後,再将用戶端的socket修改為EPOLL(讀)監聽
- 對用戶端socket使用EPOLLIN 和 EPOLLOUT兩種操作,這樣每一輪epoll_wait循環都會回調讀,寫函數,這種方式效率不是很好
簡單的應用
跟網上的其它代碼修改了一下,可以直接編譯運作
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <memory.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000
/*
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; // Epoll events
epoll_data_t data; // User data variable
};
*/
void setnonblocking(int sock)
{
int opts;
opts=fcntl(sock,F_GETFL);
if(opts<0)
{
perror("fcntl(sock,GETFL)");
exit(1);
}
opts = opts|O_NONBLOCK;
if(fcntl(sock,F_SETFL,opts)<0)
{
perror("fcntl(sock,SETFL,opts)");
exit(1);
}
}
int main(int argc, char* argv[])
{
int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber;
ssize_t n;
char line[MAXLINE];
socklen_t clilen;
if ( 2 == argc )
{
if( (portnumber = atoi(argv[1])) < 0 )
{
fprintf(stderr,"Usage:%s portnumber\n",argv[0]);
return 1;
}
}
else
{
fprintf(stderr,"Usage:%s portnumber\n",argv[0]);
return 1;
}
//聲明epoll_event結構體的變量,ev用于注冊事件,數組用于回傳要處理的事件
struct epoll_event ev, events[20];
//生成用于處理accept的epoll專用的檔案描述符
epfd=epoll_create(256);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
//把socket設定為非阻塞方式
//setnonblocking(listenfd);
//設定與要處理的事件相關的檔案描述符
ev.data.fd=listenfd;
//設定要處理的事件類型
ev.events = EPOLLIN|EPOLLET;
//ev.events=EPOLLIN;
//注冊epoll事件
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char *local_addr="127.0.0.1";
inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber);
serveraddr.sin_port=htons(portnumber);
bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, LISTENQ);
maxi = 0;
while ( 1 ) {
//等待epoll事件的發生
nfds=epoll_wait(epfd, events, 20, 500);
//處理所發生的所有事件
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd)//如果新監測到一個SOCKET使用者連接配接到了綁定的SOCKET端口,建立新的連接配接。
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
if(connfd<0){
perror("connfd<0");
exit(1);
}
//setnonblocking(connfd);
char *str = inet_ntoa(clientaddr.sin_addr);
cout << "accapt a connection from " << str << endl;
//設定用于讀操作的檔案描述符
ev.data.fd=connfd;
//設定用于注測的讀操作事件
ev.events=EPOLLIN|EPOLLET;
//ev.events=EPOLLIN;
//注冊ev
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
}
else if(events[i].events&EPOLLIN)//如果是已經連接配接的使用者,并且收到資料,那麼進行讀入。
{
cout << "EPOLLIN" << endl;
if ( (sockfd = events[i].data.fd) < 0)
continue;
if ( (n = read(sockfd, line, MAXLINE)) < 0) {
if (errno == ECONNRESET) {
close(sockfd);
events[i].data.fd = -1;
} else
std::cout<<"readline error"<<std::endl;
} else if (n == 0) {
close(sockfd);
events[i].data.fd = -1;
}
line[n] = '/0';
cout << "read " << line << endl;
//設定用于寫操作的檔案描述符
ev.data.fd=sockfd;
//設定用于注測的寫操作事件
ev.events=EPOLLOUT|EPOLLET;
//修改sockfd上要處理的事件為EPOLLOUT
//epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
else if(events[i].events&EPOLLOUT) // 如果有資料發送
{
sockfd = events[i].data.fd;
write(sockfd, line, n);
//設定用于讀操作的檔案描述符
ev.data.fd=sockfd;
//設定用于注測的讀操作事件
ev.events=EPOLLIN|EPOLLET;
//修改sockfd上要處理的事件為EPOLIN
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
}
}
return 0;
}
示例2
void write_handler2(ev_data *ptr)
{
//判斷指針, 可自由發揮
if(!ptr || !ptr->buffer){
close(ptr->fd);
puts(" write_handler2 ptr is empty ");
return;
}
//如果沒資料,就直接傳回了
if(ptr->nread <= 0){
printf("buffer is empty , ptr->nread:%d \n",ptr->nread);
return;
}
static struct epoll_event ev ={0,{0}};
//要發送的位元組數
int left = ptr->nread - ptr->start_write_index;
//指針位置
char * pbuffer = ptr->buffer + ptr->start_write_index;
int nwriten = -1;
int write_bytes = 0;
printf("begin write , nread:%d, start_write_index:%d,left:%d\n",
ptr->nread,ptr->start_write_index,left);
while(left > 0){
nwriten = write(ptr->fd,pbuffer,left);
//如果有錯誤
if(nwriten <= 0){
/*
socket輸出緩沖區滿了
那就加入EPOLLOUT事件,等epoll_wait傳回通知你
*/
if(errno < 0 && ( EWOULDBLOCK == errno || EAGAIN == errno)){
//記錄一下, 下一次要從哪裡開始
ptr->start_write_index += write_bytes;
//這個純粹為了防止重複的epoll_ctl,額外增加負擔的,你不做判斷也沒事
if(EPOLLOUT & ptr->events)
return;
//增加EPOLLOUT事件
ptr->events |= EPOLLOUT;
ev.events = ptr->events;
ev.data.ptr = ptr;
//修改事件
epoll_ctl(ptr->epoll_fd,EPOLL_CTL_MOD,ptr->fd,&ev);
printf("socket buff is full , nread:%d,start_write_index:%d,left:%d\n",
ptr->nread,ptr->start_write_index,left);
return;
}
else{
//如果出錯了, 這裡你可以自由發揮
close(ptr->fd);
free_event(ptr);
perror("write error");
return;
}
}
pbuffer += nwriten;
left -= nwriten;
write_bytes += nwriten;
}
//到這裡,說明該寫的都寫了, 重置一下讀寫位置
ptr->start_write_index = ptr->nread = 0;
//這個判斷純粹為了防止每次都去epoll_ctl,不加判斷也行
if(EPOLLOUT & ptr->events) {
//把 EPOLLOUT 删了 ,這樣就跟原來一樣還是EPOLLIN|EPOLLET
ptr->events &= ~EPOLLOUT;
ev.events =ptr->events;
ev.data.ptr = ptr;
//修改一下
epoll_ctl(ptr->epoll_fd, EPOLL_CTL_MOD, ptr->fd, &ev);
}
}