epoll
epoll
是Linux對
select
功能的改進,其性能大大提升,而且和監控的IO個數無關。
- API:
- epoll_create:
***`int epoll_create(int size);`*** 建立一個`epoll`執行個體,`size`參數是可監控IO的數量大小,但是Linux 2.6.8之後已不再使用。
- epoll_ctl:
***`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);`*** 對`epoll`執行個體進行操作,由`op`指定操作類型:
- EPOLL_CTL_ADD:添加一個檔案描述符到
執行個體中。epoll
- EPOLL_CTL_MOD:更新
執行個體中檔案描述符。epoll
- EPOLL_CTL_DEL:删除
執行個體中檔案描述符。epoll
- epoll_wait
***```int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);```*** 等待已經準備好的IO.`events`參數傳回的資料就是在`epoll_ctl`函數第三個參數設定的值。
- 限制:無
- 使用範例:
最多的用途就是socket程式設計,可以大大提高伺服器的性能,此處我們實作一個簡單的http伺服器。epoll
#define MAXFDS 128 #define EVENTS 100 #define PORT 8080 #define MAXEPOLLSIZE 1024*10 typedef enum { false, true }bool; /***************定義處理socket的回調函數類型***********/ typedef int (*socket_pro)(int fd,void *data); /***************定義回調函數的使用者資料***********/ typedef struct userdata { int fd; socket_pro cb; }userdata_t; static int epfd;//epoll句柄 /***************發送一個檔案資料的函數***********/ static void cws_client_request (int connfd,void *data) { struct epoll_event ev = {0}; char buffer[1024*8] = {0}; int ret; char *requestPath = NULL; char tmpPath[512] = {"./www/"}; int pagesize = 0; ret = recv (connfd, buffer, sizeof (buffer) -1, 0); if (ret > 0) { if (strncmp (buffer, "GET ", 4) != 0) { printf("bad request.\n"); } if (strncmp (buffer, "GET /", 5) == 0) { if (strncmp (buffer, "GET / ", 6) == 0) { strcat(tmpPath, "/index.html"); requestPath = tmpPath; } else { requestPath = buffer+5; char * pos = strstr(buffer+5, " "); strncat(tmpPath, requestPath, pos - requestPath); requestPath = tmpPath; } } char * badRequest = (char *)"<b>Error 404 Not Found.</b>"; char * httpStatus200 = (char *)"HTTP/1.0 200 OK\r\nContent-Type:text/html\r\n\r\n"; FILE * fp = fopen(requestPath, "r"); FILE * connfp = fdopen(connfd, "w"); if ( connfp == NULL ) { perror("fdopen error");//cout <<"bad connfp"<<endl; } if (fp == NULL) { setlinebuf(connfp); fwrite(badRequest, strlen(badRequest), 1, connfp); fclose(connfp); } else { setlinebuf(connfp); //fwrite(httpStatus200, strlen(httpStatus200), 1, connfp); //fflush(connfp); while ((ret = fread (buffer, 1, sizeof(buffer) -1, fp)) > 0) { fwrite(buffer, 1, ret, connfp); pagesize += ret; fflush(connfp); } printf("pagesize:%d\n",pagesize);//cout <<"pagesize:" << pagesize << endl; fclose(fp); } } //1 close(connfd); epoll_ctl (epfd, EPOLL_CTL_DEL, connfd, &ev); } /***************處理已經連接配接socket函數***********/ static int __process_data_fd(int fd,void *data) { struct epoll_event *ev = (struct epoll_event *)data; cws_client_request(fd,ev); free(data); return; } /***************處理監聽socket函數***********/ static int __process_listen_fd(int fd,void *data) { struct sockaddr_in caddr = {0}; struct epoll_event ev = {0}; int len = sizeof (caddr); int cfd = accept (fd, (struct sockaddr *) &caddr, (socklen_t *) & len); if (-1 == cfd) { perror("accpet error");//cout << "server has an error, now accept a socket fd" <<endl; break; } setNonBlock (cfd); userdata_t *cb_data = malloc(sizeof(userdata_t)); cb_data->fd = cfd; cb_data->cb = __process_data_fd; ev.data.ptr = cfd; ev.events = EPOLLIN | EPOLLET; epoll_ctl (epfd, EPOLL_CTL_ADD, cfd, &ev); return 0; } /***************設定描述符為非阻塞***********/ static bool setNonBlock (int fd) { int flags = fcntl (fd, F_GETFL, 0); flags |= O_NONBLOCK; if (-1 == fcntl (fd, F_SETFL, flags)) { return false; } return true; } /***************主函數***********/ int main (int argc, char *argv[]) { int listen_fd, nfds; int on = 1; char buffer[512]; struct sockaddr_in saddr, caddr; struct epoll_event ev, events[EVENTS]; signal(SIGPIPE, SIG_IGN); if (fork()) { exit(0); } if (-1 == (listen_fd = socket(AF_INET, SOCK_STREAM, 0))) { perror("socket error");//cout << "create socket error!" << endl; return -1; } epfd = epoll_create (MAXFDS); setsockopt (listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)); bzero (&saddr, sizeof (saddr)); saddr.sin_family = AF_INET; saddr.sin_port = htons ((short) (PORT)); saddr.sin_addr.s_addr = INADDR_ANY; if (-1 == bind(listen_fd, (struct sockaddr *) &saddr, sizeof (saddr))) { perror("bind error");//cout << " cann't bind socket on server " << endl; return -1; } if (-1 == listen (listen_fd, 32)) { perror("listen error");//cout << "listen error" << endl; return -1; } userdata_t *cb_data = (userdata_t *)malloc(sizeof(userdata_t)); cb_data->fd = listen_fd; cb_data->cb = __process_listen_fd; ev.data.ptr = cb_data; ev.events = EPOLLIN|EPOLLET; epoll_ctl (epfd, EPOLL_CTL_ADD, listen_fd, &ev); for (;;) { int i; nfds = epoll_wait (epfd, events, MAXFDS, -1); for (i = 0; i < nfds; ++i) { userdata_t *cb_data = (userdata_t *)events[i].data.ptr; cb_data.cb(cb_data.fd,cb_data); } } if (listen_fd > 0) { shutdown (listen_fd, SHUT_RDWR); close (listen_fd); } return 0; }
linux native aio
Linux native aio 有兩種API,一種是libaio提供的API,一種是利用系統調用封裝成的API,後者使用的較多,因為不需要額外的庫且簡單。
- API
- io_setup:
是用來設定一個異步請求的上下文,第一個參數是請求事件的個數,第二個參數唯一辨別一個異步請求。
- io_commit:
是用來送出一個異步io請求的,在送出之前,需要設定一下結構體`iocb`,該結構體有以下字段需要設定:
-
:使用者設定的資料,到時通過aio_data:
函數傳回。io_getevents
-
:異步操作碼,有以下幾種操作:aio_lio_opcode
- `IOCB_CMD_PREAD`:讀操作,相當于調用`pread` - `IOCB_CMD_PWRITE`:寫操作,相當于`pwrite` - `IOCB_CMD_FSYNC`:同步操作,相當于調用`fsync` - `IOCB_CMD_FDSYNC`:同步操作,相當于調用`fdatasync`
-
:使用者提供的存儲資料的bufferaio_buf
-
:檔案的中偏移量aio_offset
-
:IO操作的資料大小aio_nbytes
-
:該字段要麼不設定,要麼設定為aio_flags
,表示使用eventfd通知事件的完成。IOCB_FLAG_RESFD
-
:如果aio_resfd
字段設定為aio_flags
,該字段設定為IOCB_FLAG_RESFD
的傳回值。eventfd
-
- io_getevents:
用來擷取完成的io事件,參數`min_nr`是事件個數的的最小值,`nr`是事件個數的最大值,如果沒有足夠的事件發生,該函數會阻塞, `timeout`參數是阻塞的逾時時間。該函數會傳回一個`io_events`結構體,該結構體有以下字段: * `data`:這就是在`struct iocb`結構體`aio_data`設定的值。 * `obj`:傳回的`struct iocb`結構體,是`io_commit`第三個參數設定數組中的值。 * `res`和`res2`:傳回結果。
- io_destroy:
在所有時間處理完之後,調用此函數銷毀異步io請求。
-
限制:
aio隻能使用于正常的檔案IO,不能使用于socket,管道等IO。
-
使用範例
上面已經介紹過了,io_getevents在調用之後會阻塞直到有足夠的事件發生,是以要實作真正的異步IO,需要借助
和eventfd
達到目的。epoll
- 首先我們封裝一下系統調用來作為我們使用的API:
int io_setup(unsigned nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout); } int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } int eventfd2(unsigned int initval, int flags) { return syscall(__NR_eventfd2, initval, flags); }
- 定義自己的異步使用者資料和回調函數:
typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res);//回調函數類型 struct userdata//使用者資料 { int64_t offset; int64_t filesize; int64_t block_size; }; struct user_iocb//封裝結構體,以便異步傳回使用者資料。 { struct iocb iocb; struct userdata user_cb; }; void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2) { int64_t offset = iocb->aio_offset; struct custom_iocb *iocbp = (struct custom_iocb *) iocb; printf("data=%.*s\n",page_size, (char *) iocb->aio_buf); }
- 建立異步請求:
//異步讀取整個檔案資料 #define page_size 1024 char *path = "test.txt" //計算異步請求的個數 static int get_event_num(uint64_t len) { return len / page_size + (len % page_size != 0); } static int get_filesize(char *path) { struct stat buf = {0}; int iret = stat(path, &buf); return (iret >= 0) ? buf.st_size : iret; } int main(void) { int event_fd = 0; int file_size = get_filesize(path); int event_num = get_event_num(file_size); struct iocb *iocbs = malloc(event_num * sizeof (struct iocb)); struct iocb * iocbps[event_num] = {0}; aio_context_t ctx = 0; /****************建立使用的eventfd******************/ event_fd = eventfd2(0, O_NONBLOCK | O_CLOEXEC); /****************建立異步請求上下文*****************/ if (io_setup(event_num, &ctx)) { perror("io_setup"); return 4; } /*****************設定請求的資料********************/ int fd = open(path, O_RDWR, 0664); for (int i = 0; i < event_num; i++) { struct iocb *_ = iocbs + i; _->aio_buf = (__u64) ((char *) buf + (i * page_size));//設定存儲請求資料的buf _->aio_fildes = fd;//請求的檔案描述符 _->aio_offset = i * page_size;//讀檔案的偏移量,異步請求檔案指針不會移動的,自己設定 _->aio_nbytes = page_size;//每個異步請求請求資料的大小 _->aio_resfd = event_fd;//用來通知有事件完成的eventfd _->aio_flags = IOCB_FLAG_RESFD;//使用eventfd的标志 _->aio_data = (__u64) aio_callback;//設定使用者資料,這裡是個回調函數 iocbps[i] = _; } /*****************送出異步請求********************/ if (io_submit(ctx, event_num, iocbps) != event_num) { perror("io_submit"); return 6; } /***************利用epoll等待異步請求事件完成*****/ int epfd = 0; epfd = epoll_create(1); if (epfd == -1) { perror("epoll_create"); return 7; } struct epoll_event epevent = {0}; epevent.events = EPOLLIN | EPOLLET; epevent.data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, event_fd, &epevent)) { perror("epoll_ctl"); return 8; } /*****************處理異步請求事件********/ int i = 0; while (i < event_num) { int64_t finished_aio = 0; if (epoll_wait(epfd, &epevent, 1, -1) != 1)//等待事件發生 { perror("epoll_wait"); return 9; } //讀取異步事件完成的個數,finished_aio接收 if (read(event_fd, &finished_aio, sizeof (finished_aio)) != sizeof (finished_aio)) { perror("read"); return 10; } printf("finished io number: %"PRIu64"\n", finished_aio); struct timespec tms; struct io_event events[event_num]; while (finished_aio > 0) { tms.tv_sec = 0; tms.tv_nsec = 0; int r = io_getevents(ctx, 1, event_num, events, &tms);//擷取發生的事件 if (r > 0) { int j = 0; for (j = 0; j < r; ++j) { ((io_callback_t *) (events[j].data))(ctx, (struct iocb *) events[j].obj, events[j].res, events[j].res2);//調用使用者callback } i += r; finished_aio -= r; } } } /*****************處理異步事件結束********/ close(epfd); io_destroy(ctx); close(fd); close(event_fd); return 0; }
- 定義自己的異步使用者資料和回調函數:
SIGIO
在Linux下,每一個檔案描述符都可以設定
O_ASYNC
辨別,當檔案可讀或者可寫時可以發送信号通知相關程序,預設信号是
SIGIO
,可以
使用
fcntl
函數改變這個預設信号。
- 限制:
-
在多個檔案描述符的情況下,信号驅動模式不能确定哪個描述符是可以讀或者寫的,是以必須周遊每一個描述符來判斷,一個
好的方法是使用
擷取哪個描述符可讀或者寫。epoll
- 隻能用于socket、管道和終端IO,不能用于普通檔案的IO。
-
- 使用範例
- 設定描述符的
辨別和描述符的歸屬程序。O_ASYNC
- 設定檔案描述符為非阻塞。
- 設定信号
的處理動作。SIGIO
void new_op(int signum, siginfo_t *info, void *myact)//信号處理函數的實作 { int i; for (i = 0; i < 1; i++) { printf("data:%u\n", info->si_int); } char buf[10] = {0}; while(read(STDIN_FILENO,buf,10) > 0) { printf("input:%s",buf); } printf("handle signal %d over;\n", signum); } int main(void) { int oflags; struct sigaction act; sigemptyset(&act.sa_mask); act.sa_sigaction = new_op; //信号處理函數 if (sigaction(SIGPOLL, &act, NULL) < 0) { printf("install sigal error\n"); } fcntl(STDIN_FILENO, F_SETOWN, getpid()); oflags = fcntl(STDIN_FILENO, F_GETFL); fcntl(STDIN_FILENO, F_SETFL, oflags | FASYNC|O_NONBLOCK); while (1){ sleep(1); }; }