天天看點

select epoll poll總結epollepoll的使用

epoll

一、從網卡接收資料說起

下圖是一個典型的計算機結構圖,計算機由CPU、存儲器(記憶體)、網絡接口等部件組成。了解epoll本質的第一步,要從硬體的角度看計算機怎樣接收網絡資料。
select epoll poll總結epollepoll的使用
下圖展示了網卡接收資料的過程。在①階段,網卡收到網線傳來的資料;經過②階段的硬體電路的傳輸;最終将資料寫入到記憶體中的某個位址上(③階段)。這個過程涉及到DMA傳輸、IO通路選擇等硬體有關的知識,但我們隻需知道:網卡會把接收到的資料寫入記憶體。
select epoll poll總結epollepoll的使用
通過硬體傳輸網卡接收的資料存放在記憶體中去,作業系統可以去讀取他們

二、如何知道接收了資料?

了解epoll本質的第二步,要從CPU的角度來看資料接收。要了解這個問題,要先了解一個概念——中斷。

計算機程式在執行程式的時候,會有優先級的需求。比如

當計算機收到斷電信号時(電容可以儲存少許電量,供CPU運作很短的一小段時間),它應立即去儲存資料,儲存資料的程式具有較高的優先級。

一般而言,由硬體産生的信号需要cpu立馬做出回應(不然資料可能就丢失),是以它的優先級很高。cpu理應中斷掉正在執行的程式,去做出響應;當cpu完成對硬體的響應後,再重新執行使用者程式。中斷的過程如下圖,和函數調用差不多。隻不過函數調用是事先定好位置,而中斷的位置由“信号”決定。

select epoll poll總結epollepoll的使用
以鍵盤為例,當使用者按下鍵盤某個按鍵時,鍵盤會給cpu的中斷引腳發出一個高電平。cpu能夠捕獲這個信号,然後執行鍵盤中斷程式。下圖展示了各種硬體通過中斷與cpu互動。
select epoll poll總結epollepoll的使用
現在可以回答本節提出的問題了:當網卡把資料寫入到記憶體後,網卡向cpu發出一個中斷信号,作業系統便能得知有新資料到來,再通過網卡中斷程式去處理資料。

三、程序阻塞為什麼不占用cpu資源?

了解epoll本質的第三步,要從作業系統程序排程的角度來看資料接收。阻塞是程序排程的關鍵一環,指的是程序在等待某事件(如接收到網絡資料)發生之前的等待狀态,recv、select和epoll都是阻塞方法。了解“程序阻塞為什麼不占用cpu資源?”,也就能夠了解這一步。
//建立socket   socket- bind-listen-recv
int s = socket(AF_INET, SOCK_STREAM, 0);   
//綁定
bind(s, ...)
//監聽
listen(s, ...)
//接受用戶端連接配接
int c = accept(s, ...)
//接收用戶端資料
recv(c, ...);
//将資料列印出來
printf(...)
           
這是一段最基礎的網絡程式設計代碼,先建立socket對象,依次調用bind、listen、accept,最後調用recv接收資料。recv是個阻塞方法,當程式運作到recv時,它會一直等待,直到接收到資料才往下執行。

那麼阻塞的原理是什麼?

工作隊列

作業系統為了支援多任務,實作程序排程的功能,會把程序分為運作和等待等幾種狀态,運作狀态是程序獲得CPU使用權,正在執行代碼的狀态,等待狀态是阻塞狀态,,比如上述程式運作到recv時,程式會從運作狀态變為等待狀态,接收到資料後又變回運作狀态。作業系統會分時執行各個運作狀态的程序,由于速度很快,看上去就像是同時執行多個任務。
下圖中的計算機中運作着A、B、C三個程序,其中程序A執行着上述基礎網絡程式,一開始,這3個程序都被作業系統的工作隊列所引用,處于運作狀态,會分時執行。
select epoll poll總結epollepoll的使用

等待隊列

當程序A執行到建立socket的語句時,作業系統會建立一個由檔案系統管理的socket對象(如下圖)。這個socket對象包含了發送緩沖區、接收緩沖區、等待隊列等成員。等待隊列是個非常重要的結構,它指向所有需要等待該socket事件的程序。
select epoll poll總結epollepoll的使用
當程式執行到recv時,作業系統會将程序A從工作隊列移動到該socket的等待隊列中(如下圖)。由于工作隊列隻剩下了程序B和C,依據程序排程,cpu會輪流執行這兩個程序的程式,不會執行程序A的程式。是以程序A被阻塞,不會往下執行代碼,也不會占用cpu資源
select epoll poll總結epollepoll的使用
ps:作業系統添加等待隊列隻是添加了對這個“等待中”程序的引用,以便在接收到資料時擷取程序對象、将其喚醒,而非直接将程序管理納入自己之下。上圖為了友善說明,直接将程序挂到等待隊列之下。

喚醒程序

當socket接收到資料後,作業系統将該socket等待隊列上的程序重新放回到工作隊列,該程序變成了運作狀态,繼續執行代碼。也由于socket的接收緩沖區已經有了資料,recv可以傳回接收到的資料

四、核心接收網絡資料全過程

如下圖所示,程序在recv阻塞期間,計算機收到了對端傳送的資料(步驟①)。資料經由網卡傳送到記憶體(步驟②),然後網卡通過中斷信号通知cpu有資料到達,cpu執行中斷程式(步驟③)。此處的中斷程式主要有兩項功能,先将網絡資料寫入到對應socket的接收緩沖區裡面(步驟④),再喚醒程序A(步驟⑤),重新将程序A放入工作隊列中。
select epoll poll總結epollepoll的使用
喚醒程序的過程如下圖所示。
select epoll poll總結epollepoll的使用
以上是核心接收資料全過程

其一,作業系統如何知道網絡資料對應于哪個socket?

因為一個socket對應一個端口号,而網絡資料包中包含了IP和端口的資訊,核心可以通過端口号找到對應的socket。當然,為了提高處理速度,作業系統會維護端口号到socket的索引結構,以快速讀取。 建立套接字連接配接的時候要綁定端口号

五、同時監視多個socket的簡單方法

服務端需要多個用戶端連接配接,而recv隻能監視單個socket,這種沖突下,人們開始尋找監視多個socket的方法。epoll的要義是高效監聽多個socket.從曆史發展角度看,必然先出現一種不太高效的方法,人們再加以改進。隻有先了解了不太高效的方法,才能夠了解epoll的本質。

假如能夠預先傳入一個socket清單,如果清單中的socket都沒有資料,挂起程序,直到有一個socket收到資料,喚醒程序。這種方法很直接,也是select的設計思想。

為友善了解,我們先複習select的用法。在如下的代碼中,先準備一個數組(下面代碼中的fds),讓fds存放着所有需要監視的socket。然後調用select,如果fds中的所有socket都沒有資料,select會阻塞,直到有一個socket接收到資料,select傳回,喚醒程序。使用者可以周遊fds,通過FD_ISSET判斷具體哪個socket收到資料,然後做出處理。

int s = socket(AF_INET, SOCK_STREAM, 0);  
bind(s, ...)
listen(s, ...)

int fds[] =  存放需要監聽的socket

while(1){
    int n = select(..., fds, ...)
    for(int i=0; i < fds.count; i++){
        if(FD_ISSET(fds[i], ...)){
            //fds[i]的資料處理
        }
    }
}


           

select的流程

select的實作思路很直接。假如程式同時監視如下圖的sock1、sock2和sock3三個socket,那麼在調用select之後,作業系統把程序A分别加入這三個socket的等待隊列中。

select的底層實作是連結清單

select epoll poll總結epollepoll的使用
當任何一個socket收到資料後,中斷程式将喚起程序。下圖展示了sock2接收到了資料的處理流程。
select epoll poll總結epollepoll的使用
所謂喚起程序,就是将程序從所有的等待隊列中移除,加入到工作隊列裡面。如下圖所示。
select epoll poll總結epollepoll的使用

經由這些步驟,當程序A被喚醒後,它知道至少有一個socket接收了資料。程式隻需周遊一遍socket清單,就可以得到就緒的socket。

這種簡單方式行之有效,在幾乎所有作業系統都有對應的實作。

缺點

其一,每次調用select都需要将程序加入到所有監視socket的等待隊列,每次喚醒都需要從每個隊列中移除。這裡涉及了兩次周遊,而且每次都要将整個fds清單傳遞給核心,有一定的開銷。正是因為周遊操作開銷大,出于效率的考量,才會規定select的最大監視數量,預設隻能監視1024個socket。

其二,程序被喚醒後,程式并不知道哪些socket收到資料,還需要周遊一次。

6 epoll的設計思路

措施一:功能分離

select低效的原因之一是将“維護等待隊列”和“阻塞程序”兩個步驟合二為一。如下圖所示,每次調用select都需要這兩步操作,然而大多數應用場景中,需要監視的socket相對固定,并不需要每次都修改。epoll将這兩個操作分開,先用epoll_ctl維護等待隊列,再調用epoll_wait阻塞程序。顯而易見的,效率就能得到提升

select epoll poll總結epollepoll的使用

為友善了解後續的内容,我們先複習下epoll的用法。如下的代碼中,先用epoll_create建立一個epoll對象epfd,再通過epoll_ctl将需要監視的socket添加到epfd中,最後調用epoll_wait等待資料。

int s = socket(AF_INET, SOCK_STREAM, 0);

bind(s, …)

listen(s, …)

int epfd = epoll_create(…);

epoll_ctl(epfd, …); //将所有需要監聽的socket添加到epfd中

while(1){

int n = epoll_wait(…)

for(接收到資料的socket){

//處理

}

}

措施二:就緒清單

select低效的另一個原因在于程式不知道哪些socket收到資料,隻能一個個周遊。如果核心維護一個“就緒清單”,引用收到資料的socket,就能避免周遊。如下圖所示,計算機共有三個socket,收到資料的sock2和sock3被rdlist(就緒清單)所引用。當程序被喚醒後,隻要擷取rdlist的内容,就能夠知道哪些socket收到資料。
select epoll poll總結epollepoll的使用

對于epoll:

大學女生宿舍,你想找你對象,于是你找到舍管大媽,把你女朋友的名字和宿舍房号報給舍管大媽,大媽直接就可以幫你找到你女朋友。

對于select:

還是女生宿舍,你告訴舍管大媽你女朋友的名字,但是舍管大媽,是帶着你從一樓101開始一件一件宿舍去找你女朋友…

7 epoll的原理和流程

如下圖所示,當某個程序調用epoll_create方法時,核心會建立一個eventpoll對象(也就是程式中epfd所代表的對象)。eventpoll對象也是檔案系統中的一員,和socket一樣,它也會有等待隊列。
select epoll poll總結epollepoll的使用
建立一個代表該epoll的eventpoll對象是必須的,因為核心要維護“就緒清單”等資料,“就緒清單”可以作為eventpoll的成員。

維護監視清單

建立epoll對象後,可以用epoll_ctl添加或删除所要監聽的socket,以添加socket位列,下圖,如果通過epoll_ctl添加sock1、sock2和sock3的監視,核心會将eventpoll添加到這三個socket的等待隊列中。
select epoll poll總結epollepoll的使用
當socket收到資料後,中斷程式會操作eventpoll對象,而不是直接操作程序。

接收資料

當socket收到資料後,中斷程式會給eventpoll的“就緒清單”添加socket引用。如下圖展示的是sock2和sock3收到資料後,中斷程式讓rdlist引用這兩個socket。

這裡是引用

select epoll poll總結epollepoll的使用
eventpoll對象相當于是socket和程序之間的中介,socket的資料接收并不直接影響程序,而是通過改變eventpoll的就緒清單來改變程序狀态。
當程式執行到epoll_wait時,如果rdlist已經引用了socket,那麼epoll_wait直接傳回,如果rdlist為空,阻塞程序。

阻塞和喚醒程序

假設計算機中正在運作程序A和程序B,在某時刻程序A運作到了epoll_wait語句。如下圖所示,核心會将程序A放入eventpoll的等待隊列中,阻塞程序。
select epoll poll總結epollepoll的使用
當socket接收到資料,中斷程式一方面修改rdlist,另一方面喚醒eventpoll等待隊列中的程序,程序A再次進入運作狀态(如下圖)。也因為rdlist的存在,程序A可以知道哪些socket發生了變化。
select epoll poll總結epollepoll的使用

八、epoll的實作細節

,相信讀者對epoll的本質已經有一定的了解。但我們還留有一個問題,eventpoll的資料結構是什麼樣子?

再留兩個問題,就緒隊列應該應使用什麼資料結構?eventpoll應使用什麼資料結構來管理通過epoll_ctl添加或删除的socket?

如下圖所示,eventpoll包含了lock、mtx、wq(等待隊列)、rdlist等成員。rdlist和rbr是我們所關心的。

select epoll poll總結epollepoll的使用

就緒清單的資料結構

就緒清單引用了就緒的socket,是以它能快速的插入資料

程式可能随時調用epoll_ctl添加socket,也可能随時删除

當删除時,若該socket已經存放在就緒清單中,它也應該被移除。

是以就緒清單應是一種能夠快速插入和删除的資料結構。雙向連結清單就是這樣一種資料結構,epoll使用雙向連結清單來實作就緒隊列(對應上圖的rdllist)。

索引結構

既然epoll将“維護監視隊列”和“程序阻塞”分離,也意味着需要有個資料結構來儲存監視的socket。至少要友善的添加和移除,還要便于搜尋,以避免重複添加。紅黑樹是一種自平衡二叉查找樹,搜尋、插入和删除時間複雜度都是O(log(N)),效率較好。epoll使用了紅黑樹作為索引結構(對應上圖的rbr)。
ps:因為作業系統要兼顧多種功能,以及由更多需要儲存的資料,rdlist并非直接引用socket,而是通過epitem間接引用,紅黑樹的節點也是epitem對象。同樣,檔案系統也并非直接引用着socket。為友善了解,本文中省略了一些間接結構。

9結論

epoll在select和poll(poll和select基本一樣,有少量改進)的基礎引入了eventpoll作為中間層,使用了先進的資料結構,是一種高效的多路複用技術。

epoll的使用

1 Epoll和select的差別對比

e1poll不存在集合的覆寫 epoll_create會傳回一個fd,指向空間包含全部的事件(結構體)

2epoll把要監聽的每一個fd都包裝成一個事件,并把這個事件記入epollfd 讓epollfd來監聽

3select産生動靜是吧fd放入集合 但是epoll通過epoll_wait 把産生動靜的fd所包裝好的事件放入結構體數組

4select需要備份,需要重新建立數組放fd循環比對,epoll直接通過包裝好的事件(結構體)就能獲得fd,效率也更快(差别主要展現在這)

5兩者的差別是的select适合使用者客服端不多的情況,而epoll沒有用戶端的上限

Epoll模型的三個函數

** 函數原型+功能說明**

#include <sys/epoll.h>

int epoll_create(int size);
作用:建立一個epoll句柄,告訴他需要監聽的數目(也可以了解成申請一片空間,用于存放監聽的套接字)

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
作用:控制某個epoll監控的檔案描述符上的事件:注冊,修改、删除(也就是增添 删除 修改一個事件)

int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
           

2.2 epoll_create()參數使用

int epoll_create(int size); 參數一:通知核心監聽size個fd,隻是個建議值并與硬體有關系。(從 Linux

核心 2.6.8 版本起,size 這個參數就被忽略了,隻要求 size 大于 0 即可) 傳回值:傳回epoll句柄(fd)

2.3 epoll_ctl()參數使用 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

參數一:int epfd:epoll_create()的傳回值

參數二:int op: 表示動作,用三個宏來表示

EPOLL_CTL_ADD(注冊新的fd到epfd) EPOLL_CTL_MOD(修改已經注冊的fd監聽事件)

EPOLL_CTL_DEL(從epfd删除一個fd)

參數三:int fd 操作對象(socket)

參數四:struct epoll_evevt* evevt; 告訴核心需要監聽的事件

結構體如下:

struct epoll_event {

__uint32_t events; 宏定義讀和寫EPOLLIN讀EPOLLOUT寫 epoll_data_t data; 聯合體 }; 聯合體如下:

typedef union epoll_data { void *ptr; int fd;

__uint32_t u32;

__uint64_t u64; } epoll_data_t;

傳回值:成功傳回0,不成功傳回-1

特别注意參數四的使用

2.4 epoll_wait()參數使用

int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)

參數一:int epfd:epoll_create()函數傳回值

參數二:struct epoll_events events用于回傳代處理事件的數組(也就是存放産生動靜的事件)

參數三:int maxevents 同時最多産生多少事件,告訴核心events有多大,該值必須大于0

參數四:int timeout表示 -1相當于阻塞,0相當于非阻塞,逾時時間(機關:毫秒) 傳回值:成功傳回産生動靜事件的個數*

補充說明: 産生動靜是指

有新的用戶端需要連接配接 或者

已連接配接的用戶端發送了資訊

3 Epoll模型封裝成類

頭檔案

const int  MAXEPOLLSIZE = 10;

class Epoll
{
   public:
       Epoll();
       bool Add(int fd,int eventsOption);//添加事件

       //Returns the number of triggered events
       int Wait();//等待事件觸發
	   bool Del(int fd);
       //bool Delete(const int eventIndex);//删除事件
       int  GetEventOccurfd(const int eventIndex) const;//得到事件數組某個值的fd
       int  GetEvents(const int eventIndex) const;//得到事件數組的宏//可讀可寫,觸發方式

   private:
        int epollfd;//epoll專用檔案描述符
        int fdNumber;//epollfd裡面用戶端有多少
        
        struct epoll_event event;//事件
        struct epoll_event events[MAXEPOLLSIZE];
        struct rlimit  rt;

};
           

源檔案

#include "Epoll.h"
#include <stdio.h>
#include <stdlib.h>
#include <iostream>

Epoll::Epoll():fdNumber(0)
{
    rt.rlim_max= rt.rlim_cur = MAXEPOLLSIZE; 
    if(::setrlimit(RLIMIT_NOFILE,&rt) == -1){
        std::cout << "setlimit failed\n";
        exit(1);
    }

    epollfd = epoll_create(MAXEPOLLSIZE);
    //建立epoll句柄,上限為MAXEPOLLSIZE
}

bool Epoll::Add(int fd,int eventsOption)//把監聽到的fd放到epoll句柄裡面
{
     event.data.fd = fd;
     event.events = eventsOption; //EPOLLIN | EPOLLET
     if(epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event) < 0){
         return false;
     }

     fdNumber++;//檔案描述符的數量+1
     return true;
}


bool Epoll::Del(int fd)
{
	event.data.fd = fd;
	event.events = EPOLLIN | EPOLLET;
	int ret = epoll_ctl(epollfd, EPOLL_CTL_DEL,fd, &event);
	if (ret<0)
	{
		return false;
     }
	close(event.data.fd);
	printf("client is close ,fd = %d\n", event.data.fd);
	return true;
}
int Epoll::Wait()
{
    int eventNumber;//初始化事件響應數量
    eventNumber = epoll_wait(epollfd,events,fdNumber,-1);
    if(eventNumber < 0){
        std::cout << "epoll_wait  failed \n";
        exit(1);
    }
    return eventNumber;
}


int Epoll::GetEventOccurfd(const int eventIndex) const
{
    return events[eventIndex].data.fd;
}

int Epoll::GetEvents(const int eventIndex) const
{
    return events[eventIndex].events;
}
           

4 Epoll簡單實作列印 代碼+解釋

服務端

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#define MAXLINE 80
#define SERV_PORT 8000 //端口号
#define OPEN_MAX 1024  //最多連接配接數



int main(int argc,char *argv[])
{	
	int maxi,sockfd,connfd;//sockfd 用來監聽 connfd用來連接配接的· 監聽套接字要自己建立  連接配接套接字等待傳回就行不需要socket
	int nready,efd; //efd是 epoll模型辨別符 nready 産生的動靜數
	struct sockaddr_in clientaddr, serveraddr;
	struct epoll_event event, events[OPEN_MAX]; //一個果籃 一個結構體
	
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    bzero(&serveraddr, sizeof(serveraddr));
	bzero(&clientaddr, sizeof(clientaddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(SERV_PORT);
	int on =1;
	setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) ;//設定為可重複使用的端口
    bind(sockfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr));
    listen(sockfd, OPEN_MAX);//伺服器套接字
	efd=epoll_create(OPEN_MAX);
	/*包裝伺服器fd為事件*/
	event.events=EPOLLIN|EPOLLET;
	event.data.fd=sockfd;
    epoll_ctl(efd,EPOLL_CTL_ADD,sockfd,&event);//把伺服器fd包裝成事件放在紅黑樹上	
    while(1)
	{
		//參數一 epollfd 參數二 産生動靜的結構體數組 參數三 同時最多産生多少動靜 參數四 超市時間
		nready=epoll_wait(efd,events,OPEN_MAX,-1);//傳回值為動靜數量
		for(int i=0;i<nready;i++)
		{        	
			if(!(events[i].events & EPOLLIN))//判斷為可讀事件 不是立刻傳回循環 與select不同
				continue;
			if (events[i].data.fd==sockfd)//表示有新的連接配接
			{
				int len=sizeof(clientaddr);
				char ipstr[128];//列印用到
				connfd=accept(sockfd,(struct sockaddr *)&clientaddr,&len);
				printf("client ip%s ,port %d\n",inet_ntop(AF_INET,&clientaddr.sin_addr.s_addr,ipstr,sizeof(ipstr)),
					ntohs(clientaddr.sin_port));
				event.events = EPOLLIN|EPOLLET; 
				event.data.fd = connfd;  
				epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);			
			}
			else//表示舊的資料産生可讀事件(1 用戶端發來資料 2 用戶端斷開連結)
			{				
     			connfd=events[i].data.fd;
				char buf [1024];
				memset(buf,0,1024);
				int nread;			nread=read(connfd,buf,sizeof(buf));
				
				if(nread==0)
				{
					printf("client is close..\n"); //列印
					epoll_ctl(efd, EPOLL_CTL_DEL, connfd, NULL);//删除果子 select是從集合 和 數組 删除
					close(connfd);//關閉客服端 select一樣
                }
				else
				{
					printf("%s",buf);
				}			
			}		
		}	
	}	
}
           

客服端

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>

int main(int argc,char* argv[])
{
	int sfd;
	struct sockaddr_in sfdaddr; //指定要連接配接伺服器的結構體 ip 端口
	int len;	
	char wbuf[1024];	
	//1.socket  通信用套接字,建立一個socket
	sfd = socket(AF_INET,SOCK_STREAM,0);         	
	char ipstr[] = "127.0.0.1";           //要連上的ip位址
	//初始化位址
	bzero(&sfdaddr,sizeof(sfdaddr));	
	sfdaddr.sin_family = AF_INET;
	sfdaddr.sin_port = htons(8000);	
	int on =1;
	setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) ;//設定為可重複使用的端口
	inet_pton(AF_INET,ipstr,&sfdaddr.sin_addr.s_addr); //轉換ip 儲存到結構體内 
	
	//2.connect  主動連接配接伺服器
	connect(sfd,(struct sockaddr *)&sfdaddr,sizeof(sfdaddr));
	memset(wbuf,0,1024);
	printf("please input\n"); 
	while(1)
	{			   		  
		fgets(wbuf, 100, stdin);//stdin 意思是鍵盤輸入
		write(sfd,wbuf,sizeof(wbuf));	
		memset(wbuf,0,1024);					 
	}	
	close(sfd);
	return 0;
}
           

繼續閱讀