epoll
一、從網卡接收資料說起
下圖是一個典型的計算機結構圖,計算機由CPU、存儲器(記憶體)、網絡接口等部件組成。了解epoll本質的第一步,要從硬體的角度看計算機怎樣接收網絡資料。下圖展示了網卡接收資料的過程。在①階段,網卡收到網線傳來的資料;經過②階段的硬體電路的傳輸;最終将資料寫入到記憶體中的某個位址上(③階段)。這個過程涉及到DMA傳輸、IO通路選擇等硬體有關的知識,但我們隻需知道:網卡會把接收到的資料寫入記憶體。![]()
select epoll poll總結epollepoll的使用
通過硬體傳輸網卡接收的資料存放在記憶體中去,作業系統可以去讀取他們
二、如何知道接收了資料?
了解epoll本質的第二步,要從CPU的角度來看資料接收。要了解這個問題,要先了解一個概念——中斷。
計算機程式在執行程式的時候,會有優先級的需求。比如
當計算機收到斷電信号時(電容可以儲存少許電量,供CPU運作很短的一小段時間),它應立即去儲存資料,儲存資料的程式具有較高的優先級。
一般而言,由硬體産生的信号需要cpu立馬做出回應(不然資料可能就丢失),是以它的優先級很高。cpu理應中斷掉正在執行的程式,去做出響應;當cpu完成對硬體的響應後,再重新執行使用者程式。中斷的過程如下圖,和函數調用差不多。隻不過函數調用是事先定好位置,而中斷的位置由“信号”決定。
以鍵盤為例,當使用者按下鍵盤某個按鍵時,鍵盤會給cpu的中斷引腳發出一個高電平。cpu能夠捕獲這個信号,然後執行鍵盤中斷程式。下圖展示了各種硬體通過中斷與cpu互動。
現在可以回答本節提出的問題了:當網卡把資料寫入到記憶體後,網卡向cpu發出一個中斷信号,作業系統便能得知有新資料到來,再通過網卡中斷程式去處理資料。
三、程序阻塞為什麼不占用cpu資源?
了解epoll本質的第三步,要從作業系統程序排程的角度來看資料接收。阻塞是程序排程的關鍵一環,指的是程序在等待某事件(如接收到網絡資料)發生之前的等待狀态,recv、select和epoll都是阻塞方法。了解“程序阻塞為什麼不占用cpu資源?”,也就能夠了解這一步。
//建立socket socket- bind-listen-recv
int s = socket(AF_INET, SOCK_STREAM, 0);
//綁定
bind(s, ...)
//監聽
listen(s, ...)
//接受用戶端連接配接
int c = accept(s, ...)
//接收用戶端資料
recv(c, ...);
//将資料列印出來
printf(...)
這是一段最基礎的網絡程式設計代碼,先建立socket對象,依次調用bind、listen、accept,最後調用recv接收資料。recv是個阻塞方法,當程式運作到recv時,它會一直等待,直到接收到資料才往下執行。
那麼阻塞的原理是什麼?
工作隊列
作業系統為了支援多任務,實作程序排程的功能,會把程序分為運作和等待等幾種狀态,運作狀态是程序獲得CPU使用權,正在執行代碼的狀态,等待狀态是阻塞狀态,,比如上述程式運作到recv時,程式會從運作狀态變為等待狀态,接收到資料後又變回運作狀态。作業系統會分時執行各個運作狀态的程序,由于速度很快,看上去就像是同時執行多個任務。
下圖中的計算機中運作着A、B、C三個程序,其中程序A執行着上述基礎網絡程式,一開始,這3個程序都被作業系統的工作隊列所引用,處于運作狀态,會分時執行。![]()
select epoll poll總結epollepoll的使用
等待隊列
當程序A執行到建立socket的語句時,作業系統會建立一個由檔案系統管理的socket對象(如下圖)。這個socket對象包含了發送緩沖區、接收緩沖區、等待隊列等成員。等待隊列是個非常重要的結構,它指向所有需要等待該socket事件的程序。
當程式執行到recv時,作業系統會将程序A從工作隊列移動到該socket的等待隊列中(如下圖)。由于工作隊列隻剩下了程序B和C,依據程序排程,cpu會輪流執行這兩個程序的程式,不會執行程序A的程式。是以程序A被阻塞,不會往下執行代碼,也不會占用cpu資源ps:作業系統添加等待隊列隻是添加了對這個“等待中”程序的引用,以便在接收到資料時擷取程序對象、将其喚醒,而非直接将程序管理納入自己之下。上圖為了友善說明,直接将程序挂到等待隊列之下。![]()
select epoll poll總結epollepoll的使用
喚醒程序
當socket接收到資料後,作業系統将該socket等待隊列上的程序重新放回到工作隊列,該程序變成了運作狀态,繼續執行代碼。也由于socket的接收緩沖區已經有了資料,recv可以傳回接收到的資料
四、核心接收網絡資料全過程
如下圖所示,程序在recv阻塞期間,計算機收到了對端傳送的資料(步驟①)。資料經由網卡傳送到記憶體(步驟②),然後網卡通過中斷信号通知cpu有資料到達,cpu執行中斷程式(步驟③)。此處的中斷程式主要有兩項功能,先将網絡資料寫入到對應socket的接收緩沖區裡面(步驟④),再喚醒程序A(步驟⑤),重新将程序A放入工作隊列中。
喚醒程序的過程如下圖所示。
以上是核心接收資料全過程
其一,作業系統如何知道網絡資料對應于哪個socket?
因為一個socket對應一個端口号,而網絡資料包中包含了IP和端口的資訊,核心可以通過端口号找到對應的socket。當然,為了提高處理速度,作業系統會維護端口号到socket的索引結構,以快速讀取。 建立套接字連接配接的時候要綁定端口号
五、同時監視多個socket的簡單方法
服務端需要多個用戶端連接配接,而recv隻能監視單個socket,這種沖突下,人們開始尋找監視多個socket的方法。epoll的要義是高效監聽多個socket.從曆史發展角度看,必然先出現一種不太高效的方法,人們再加以改進。隻有先了解了不太高效的方法,才能夠了解epoll的本質。
假如能夠預先傳入一個socket清單,如果清單中的socket都沒有資料,挂起程序,直到有一個socket收到資料,喚醒程序。這種方法很直接,也是select的設計思想。
為友善了解,我們先複習select的用法。在如下的代碼中,先準備一個數組(下面代碼中的fds),讓fds存放着所有需要監視的socket。然後調用select,如果fds中的所有socket都沒有資料,select會阻塞,直到有一個socket接收到資料,select傳回,喚醒程序。使用者可以周遊fds,通過FD_ISSET判斷具體哪個socket收到資料,然後做出處理。
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...)
int fds[] = 存放需要監聽的socket
while(1){
int n = select(..., fds, ...)
for(int i=0; i < fds.count; i++){
if(FD_ISSET(fds[i], ...)){
//fds[i]的資料處理
}
}
}
select的流程
select的實作思路很直接。假如程式同時監視如下圖的sock1、sock2和sock3三個socket,那麼在調用select之後,作業系統把程序A分别加入這三個socket的等待隊列中。
select的底層實作是連結清單
![]()
select epoll poll總結epollepoll的使用
當任何一個socket收到資料後,中斷程式将喚起程序。下圖展示了sock2接收到了資料的處理流程。![]()
select epoll poll總結epollepoll的使用
所謂喚起程序,就是将程序從所有的等待隊列中移除,加入到工作隊列裡面。如下圖所示。
經由這些步驟,當程序A被喚醒後,它知道至少有一個socket接收了資料。程式隻需周遊一遍socket清單,就可以得到就緒的socket。
這種簡單方式行之有效,在幾乎所有作業系統都有對應的實作。
缺點
其一,每次調用select都需要将程序加入到所有監視socket的等待隊列,每次喚醒都需要從每個隊列中移除。這裡涉及了兩次周遊,而且每次都要将整個fds清單傳遞給核心,有一定的開銷。正是因為周遊操作開銷大,出于效率的考量,才會規定select的最大監視數量,預設隻能監視1024個socket。
其二,程序被喚醒後,程式并不知道哪些socket收到資料,還需要周遊一次。
6 epoll的設計思路
措施一:功能分離
select低效的原因之一是将“維護等待隊列”和“阻塞程序”兩個步驟合二為一。如下圖所示,每次調用select都需要這兩步操作,然而大多數應用場景中,需要監視的socket相對固定,并不需要每次都修改。epoll将這兩個操作分開,先用epoll_ctl維護等待隊列,再調用epoll_wait阻塞程序。顯而易見的,效率就能得到提升
。
為友善了解後續的内容,我們先複習下epoll的用法。如下的代碼中,先用epoll_create建立一個epoll對象epfd,再通過epoll_ctl将需要監視的socket添加到epfd中,最後調用epoll_wait等待資料。
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, …)
listen(s, …)
int epfd = epoll_create(…);
epoll_ctl(epfd, …); //将所有需要監聽的socket添加到epfd中
while(1){
int n = epoll_wait(…)
for(接收到資料的socket){
//處理
}
}
措施二:就緒清單
select低效的另一個原因在于程式不知道哪些socket收到資料,隻能一個個周遊。如果核心維護一個“就緒清單”,引用收到資料的socket,就能避免周遊。如下圖所示,計算機共有三個socket,收到資料的sock2和sock3被rdlist(就緒清單)所引用。當程序被喚醒後,隻要擷取rdlist的内容,就能夠知道哪些socket收到資料。
對于epoll:
大學女生宿舍,你想找你對象,于是你找到舍管大媽,把你女朋友的名字和宿舍房号報給舍管大媽,大媽直接就可以幫你找到你女朋友。
對于select:
還是女生宿舍,你告訴舍管大媽你女朋友的名字,但是舍管大媽,是帶着你從一樓101開始一件一件宿舍去找你女朋友…
7 epoll的原理和流程
如下圖所示,當某個程序調用epoll_create方法時,核心會建立一個eventpoll對象(也就是程式中epfd所代表的對象)。eventpoll對象也是檔案系統中的一員,和socket一樣,它也會有等待隊列。
建立一個代表該epoll的eventpoll對象是必須的,因為核心要維護“就緒清單”等資料,“就緒清單”可以作為eventpoll的成員。
維護監視清單
建立epoll對象後,可以用epoll_ctl添加或删除所要監聽的socket,以添加socket位列,下圖,如果通過epoll_ctl添加sock1、sock2和sock3的監視,核心會将eventpoll添加到這三個socket的等待隊列中。
當socket收到資料後,中斷程式會操作eventpoll對象,而不是直接操作程序。
接收資料
當socket收到資料後,中斷程式會給eventpoll的“就緒清單”添加socket引用。如下圖展示的是sock2和sock3收到資料後,中斷程式讓rdlist引用這兩個socket。
這裡是引用
eventpoll對象相當于是socket和程序之間的中介,socket的資料接收并不直接影響程序,而是通過改變eventpoll的就緒清單來改變程序狀态。
當程式執行到epoll_wait時,如果rdlist已經引用了socket,那麼epoll_wait直接傳回,如果rdlist為空,阻塞程序。
阻塞和喚醒程序
假設計算機中正在運作程序A和程序B,在某時刻程序A運作到了epoll_wait語句。如下圖所示,核心會将程序A放入eventpoll的等待隊列中,阻塞程序。當socket接收到資料,中斷程式一方面修改rdlist,另一方面喚醒eventpoll等待隊列中的程序,程序A再次進入運作狀态(如下圖)。也因為rdlist的存在,程序A可以知道哪些socket發生了變化。![]()
select epoll poll總結epollepoll的使用
八、epoll的實作細節
,相信讀者對epoll的本質已經有一定的了解。但我們還留有一個問題,eventpoll的資料結構是什麼樣子?
再留兩個問題,就緒隊列應該應使用什麼資料結構?eventpoll應使用什麼資料結構來管理通過epoll_ctl添加或删除的socket?
如下圖所示,eventpoll包含了lock、mtx、wq(等待隊列)、rdlist等成員。rdlist和rbr是我們所關心的。
![]()
select epoll poll總結epollepoll的使用 就緒清單的資料結構
就緒清單引用了就緒的socket,是以它能快速的插入資料
程式可能随時調用epoll_ctl添加socket,也可能随時删除
當删除時,若該socket已經存放在就緒清單中,它也應該被移除。
是以就緒清單應是一種能夠快速插入和删除的資料結構。雙向連結清單就是這樣一種資料結構,epoll使用雙向連結清單來實作就緒隊列(對應上圖的rdllist)。
索引結構
既然epoll将“維護監視隊列”和“程序阻塞”分離,也意味着需要有個資料結構來儲存監視的socket。至少要友善的添加和移除,還要便于搜尋,以避免重複添加。紅黑樹是一種自平衡二叉查找樹,搜尋、插入和删除時間複雜度都是O(log(N)),效率較好。epoll使用了紅黑樹作為索引結構(對應上圖的rbr)。
ps:因為作業系統要兼顧多種功能,以及由更多需要儲存的資料,rdlist并非直接引用socket,而是通過epitem間接引用,紅黑樹的節點也是epitem對象。同樣,檔案系統也并非直接引用着socket。為友善了解,本文中省略了一些間接結構。
9結論
epoll在select和poll(poll和select基本一樣,有少量改進)的基礎引入了eventpoll作為中間層,使用了先進的資料結構,是一種高效的多路複用技術。
epoll的使用
1 Epoll和select的差別對比
e1poll不存在集合的覆寫 epoll_create會傳回一個fd,指向空間包含全部的事件(結構體)
2epoll把要監聽的每一個fd都包裝成一個事件,并把這個事件記入epollfd 讓epollfd來監聽
3select産生動靜是吧fd放入集合 但是epoll通過epoll_wait 把産生動靜的fd所包裝好的事件放入結構體數組
4select需要備份,需要重新建立數組放fd循環比對,epoll直接通過包裝好的事件(結構體)就能獲得fd,效率也更快(差别主要展現在這)
5兩者的差別是的select适合使用者客服端不多的情況,而epoll沒有用戶端的上限
Epoll模型的三個函數
** 函數原型+功能說明**
#include <sys/epoll.h>
int epoll_create(int size);
作用:建立一個epoll句柄,告訴他需要監聽的數目(也可以了解成申請一片空間,用于存放監聽的套接字)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
作用:控制某個epoll監控的檔案描述符上的事件:注冊,修改、删除(也就是增添 删除 修改一個事件)
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
2.2 epoll_create()參數使用
int epoll_create(int size); 參數一:通知核心監聽size個fd,隻是個建議值并與硬體有關系。(從 Linux
核心 2.6.8 版本起,size 這個參數就被忽略了,隻要求 size 大于 0 即可) 傳回值:傳回epoll句柄(fd)
2.3 epoll_ctl()參數使用 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
參數一:int epfd:epoll_create()的傳回值
參數二:int op: 表示動作,用三個宏來表示
EPOLL_CTL_ADD(注冊新的fd到epfd) EPOLL_CTL_MOD(修改已經注冊的fd監聽事件)
EPOLL_CTL_DEL(從epfd删除一個fd)
參數三:int fd 操作對象(socket)
參數四:struct epoll_evevt* evevt; 告訴核心需要監聽的事件
結構體如下:
struct epoll_event {
__uint32_t events; 宏定義讀和寫EPOLLIN讀EPOLLOUT寫 epoll_data_t data; 聯合體 }; 聯合體如下:
typedef union epoll_data { void *ptr; int fd;
__uint32_t u32;
__uint64_t u64; } epoll_data_t;
傳回值:成功傳回0,不成功傳回-1
特别注意參數四的使用
2.4 epoll_wait()參數使用
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
參數一:int epfd:epoll_create()函數傳回值
參數二:struct epoll_events events用于回傳代處理事件的數組(也就是存放産生動靜的事件)
參數三:int maxevents 同時最多産生多少事件,告訴核心events有多大,該值必須大于0
參數四:int timeout表示 -1相當于阻塞,0相當于非阻塞,逾時時間(機關:毫秒) 傳回值:成功傳回産生動靜事件的個數*
補充說明: 産生動靜是指
有新的用戶端需要連接配接 或者
已連接配接的用戶端發送了資訊
3 Epoll模型封裝成類
頭檔案
const int MAXEPOLLSIZE = 10;
class Epoll
{
public:
Epoll();
bool Add(int fd,int eventsOption);//添加事件
//Returns the number of triggered events
int Wait();//等待事件觸發
bool Del(int fd);
//bool Delete(const int eventIndex);//删除事件
int GetEventOccurfd(const int eventIndex) const;//得到事件數組某個值的fd
int GetEvents(const int eventIndex) const;//得到事件數組的宏//可讀可寫,觸發方式
private:
int epollfd;//epoll專用檔案描述符
int fdNumber;//epollfd裡面用戶端有多少
struct epoll_event event;//事件
struct epoll_event events[MAXEPOLLSIZE];
struct rlimit rt;
};
源檔案
#include "Epoll.h"
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
Epoll::Epoll():fdNumber(0)
{
rt.rlim_max= rt.rlim_cur = MAXEPOLLSIZE;
if(::setrlimit(RLIMIT_NOFILE,&rt) == -1){
std::cout << "setlimit failed\n";
exit(1);
}
epollfd = epoll_create(MAXEPOLLSIZE);
//建立epoll句柄,上限為MAXEPOLLSIZE
}
bool Epoll::Add(int fd,int eventsOption)//把監聽到的fd放到epoll句柄裡面
{
event.data.fd = fd;
event.events = eventsOption; //EPOLLIN | EPOLLET
if(epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event) < 0){
return false;
}
fdNumber++;//檔案描述符的數量+1
return true;
}
bool Epoll::Del(int fd)
{
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
int ret = epoll_ctl(epollfd, EPOLL_CTL_DEL,fd, &event);
if (ret<0)
{
return false;
}
close(event.data.fd);
printf("client is close ,fd = %d\n", event.data.fd);
return true;
}
int Epoll::Wait()
{
int eventNumber;//初始化事件響應數量
eventNumber = epoll_wait(epollfd,events,fdNumber,-1);
if(eventNumber < 0){
std::cout << "epoll_wait failed \n";
exit(1);
}
return eventNumber;
}
int Epoll::GetEventOccurfd(const int eventIndex) const
{
return events[eventIndex].data.fd;
}
int Epoll::GetEvents(const int eventIndex) const
{
return events[eventIndex].events;
}
4 Epoll簡單實作列印 代碼+解釋
服務端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#define MAXLINE 80
#define SERV_PORT 8000 //端口号
#define OPEN_MAX 1024 //最多連接配接數
int main(int argc,char *argv[])
{
int maxi,sockfd,connfd;//sockfd 用來監聽 connfd用來連接配接的· 監聽套接字要自己建立 連接配接套接字等待傳回就行不需要socket
int nready,efd; //efd是 epoll模型辨別符 nready 産生的動靜數
struct sockaddr_in clientaddr, serveraddr;
struct epoll_event event, events[OPEN_MAX]; //一個果籃 一個結構體
sockfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&serveraddr, sizeof(serveraddr));
bzero(&clientaddr, sizeof(clientaddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(SERV_PORT);
int on =1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) ;//設定為可重複使用的端口
bind(sockfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr));
listen(sockfd, OPEN_MAX);//伺服器套接字
efd=epoll_create(OPEN_MAX);
/*包裝伺服器fd為事件*/
event.events=EPOLLIN|EPOLLET;
event.data.fd=sockfd;
epoll_ctl(efd,EPOLL_CTL_ADD,sockfd,&event);//把伺服器fd包裝成事件放在紅黑樹上
while(1)
{
//參數一 epollfd 參數二 産生動靜的結構體數組 參數三 同時最多産生多少動靜 參數四 超市時間
nready=epoll_wait(efd,events,OPEN_MAX,-1);//傳回值為動靜數量
for(int i=0;i<nready;i++)
{
if(!(events[i].events & EPOLLIN))//判斷為可讀事件 不是立刻傳回循環 與select不同
continue;
if (events[i].data.fd==sockfd)//表示有新的連接配接
{
int len=sizeof(clientaddr);
char ipstr[128];//列印用到
connfd=accept(sockfd,(struct sockaddr *)&clientaddr,&len);
printf("client ip%s ,port %d\n",inet_ntop(AF_INET,&clientaddr.sin_addr.s_addr,ipstr,sizeof(ipstr)),
ntohs(clientaddr.sin_port));
event.events = EPOLLIN|EPOLLET;
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);
}
else//表示舊的資料産生可讀事件(1 用戶端發來資料 2 用戶端斷開連結)
{
connfd=events[i].data.fd;
char buf [1024];
memset(buf,0,1024);
int nread; nread=read(connfd,buf,sizeof(buf));
if(nread==0)
{
printf("client is close..\n"); //列印
epoll_ctl(efd, EPOLL_CTL_DEL, connfd, NULL);//删除果子 select是從集合 和 數組 删除
close(connfd);//關閉客服端 select一樣
}
else
{
printf("%s",buf);
}
}
}
}
}
客服端
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
int main(int argc,char* argv[])
{
int sfd;
struct sockaddr_in sfdaddr; //指定要連接配接伺服器的結構體 ip 端口
int len;
char wbuf[1024];
//1.socket 通信用套接字,建立一個socket
sfd = socket(AF_INET,SOCK_STREAM,0);
char ipstr[] = "127.0.0.1"; //要連上的ip位址
//初始化位址
bzero(&sfdaddr,sizeof(sfdaddr));
sfdaddr.sin_family = AF_INET;
sfdaddr.sin_port = htons(8000);
int on =1;
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) ;//設定為可重複使用的端口
inet_pton(AF_INET,ipstr,&sfdaddr.sin_addr.s_addr); //轉換ip 儲存到結構體内
//2.connect 主動連接配接伺服器
connect(sfd,(struct sockaddr *)&sfdaddr,sizeof(sfdaddr));
memset(wbuf,0,1024);
printf("please input\n");
while(1)
{
fgets(wbuf, 100, stdin);//stdin 意思是鍵盤輸入
write(sfd,wbuf,sizeof(wbuf));
memset(wbuf,0,1024);
}
close(sfd);
return 0;
}