天天看點

I/O多路複用---epoll函數測試

參考文章來源:

epoll使用詳解(精髓)

Epoll學習筆記

epoll是直到Linux2.6才出現了由核心直接支援的實作方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。

epoll可以同時支援水準觸發和邊緣觸發(Edge Triggered,隻告訴程序哪些檔案描述符剛剛變為就緒狀态,它隻說一遍,如果我們沒有采取行動,那麼它将不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實作相當複雜。

epoll的接口非常簡單,一共就三個函數:

1. intepoll_create(int size);

建立一個epoll的句柄,size用來告訴核心這個監聽的數目一共有多大。這個參數不同于select()中的第一個參數,給出最大監聽的fd+1的值。需要注意的是,當建立好epoll句柄後,它就是會占用一個fd值,在linux下如果檢視/proc/程序id/fd/,是能夠看到這個fd的,是以在使用完epoll後,必須調用close()關閉,否則可能導緻fd被耗盡。

2. intepoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注冊函數,它不同與select()是在監聽事件時告訴核心要監聽什麼類型的事件,而是在這裡先注冊要監聽的事件類型。第一個參數是epoll_create()的傳回值,第二個參數表示動作,用三個宏來表示:

EPOLL_CTL_ADD:注冊新的fd到epfd中;

EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;

EPOLL_CTL_DEL:從epfd中删除一個fd;

第三個參數是需要監聽的fd;

第四個參數是告訴核心需要監聽什麼事,structepoll_event結構如下:

typedef unionepoll_data {

    void *ptr;

    int fd;

    __uint32_t u32;

    __uint64_t u64;

} epoll_data_t;

struct epoll_event {

    __uint32_t events;

    epoll_data_t data;

};

events可以是以下幾個宏的集合:

EPOLLIN:表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉);

EPOLLOUT:表示對應的檔案描述符可以寫;

EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來);

EPOLLERR:表示對應的檔案描述符發生錯誤;

EPOLLHUP:表示對應的檔案描述符被挂斷;

EPOLLET:将EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水準觸發(Level Triggered)來說的。

EPOLLONESHOT:隻監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列裡。

3. intepoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

函數功能:

傳回值:該函數傳回需要處理的事件數目,如傳回0表示已逾時。

參數含義:

參數events用來從核心得到事件的集合(儲存所有的讀寫事件);

maxevents告之核心這個events有多大,這個maxevents(所有socket句柄數)的值不能大于建立epoll_create()時的size;

參數timeout是逾時時間(毫秒,0會立即傳回,-1将不确定,也有說法說是永久阻塞)。

epoll還是poll的一種優化,傳回後不需要對所有的fd進行周遊,在核心中維持了fd的清單。select和poll是将這個核心清單維持在使用者态,然後傳遞到核心中。但是隻有在2.6的核心才支援。

epoll更适合于處理大量的fd ,且活躍fd不是很多的情況,畢竟fd較多還是一個串行的操作。

在許多測試中我們會看到如果沒有大量的idle-connection或者dead-connection,epoll的效率并不會比select/poll高很多,但是當我們遇到大量的idle-connection(例如WAN環境中存在大量的慢速連接配接),就會發現epoll的效率大大高于select/poll。

測試代碼:

epoll_server.cpp

#include <netinet/in.h>
#include <arpa/inet.h>
#include <memory.h>
#include <string.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <errno.h>

using namespace std;

class CTCPServer
{
public:
    CTCPServer(int nServerPort, int nLengthOfQueueOfListen = 100, const char *strBoundIP = NULL)
    {
	m_nServerPort = nServerPort;
	m_nLengthOfQueueOfListen = nLengthOfQueueOfListen;

	if(NULL == strBoundIP)
	{
	    m_strBoundIP = NULL;
	}
	else
	{
	    int length = strlen(strBoundIP);
	    m_strBoundIP = new char[length + 1];
	    memcpy(m_strBoundIP, strBoundIP, length + 1);
	}
    }

    virtual ~CTCPServer()
    {
	if(m_strBoundIP != NULL)
	{
	    delete [] m_strBoundIP;
	}
    }

public:
    int Run()
    {
	const int MAXEPOLLSIZE = 100;
	const int MAXEVENTSIZE = 50;	

	int nListenSocket = socket(AF_INET, SOCK_STREAM, 0);
	if(-1 == nListenSocket)
	{
	    cout << "socket error" << std::endl;
	    return -1;
	}
	
	SetNonBlock(nListenSocket);//非阻塞  recv函數 沒有資料就緒馬上傳回

	sockaddr_in ServerAddress;
	memset(&ServerAddress, 0, sizeof(sockaddr_in));
	ServerAddress.sin_family = AF_INET;

	if(NULL == m_strBoundIP)
	{
	    ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
	}
	else
	{
	    if(inet_pton(AF_INET, m_strBoundIP, &ServerAddress.sin_addr) != 1)
	    {
		cout << "inet_pton error" << endl;
		close(nListenSocket);
		return -1;
	    }
	}

	ServerAddress.sin_port = htons(m_nServerPort);
	int on = 1;
	setsockopt(nListenSocket,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));

	if(bind(nListenSocket, (sockaddr *)&ServerAddress, sizeof(sockaddr_in)) == -1)
	{
	    cout << "bind error" << endl;
	    close(nListenSocket);
	    return -1;
	}

	if(listen(nListenSocket, m_nLengthOfQueueOfListen) == -1)
	{
	    cout << "listen error" << endl;
	    close(nListenSocket);
	    return -1;
	}
	
	int efd;
	struct epoll_event ev;//告訴核心要監聽的事件
	struct epoll_event events[MAXEPOLLSIZE];//傳回從核心得到的已經就緒的事件集合
	
	efd = epoll_create(MAXEPOLLSIZE);//create epoll handler
	ev.events = EPOLLIN|EPOLLET;
	ev.data.fd = nListenSocket;
	
	string recv_buf;//----
	if(epoll_ctl(efd,EPOLL_CTL_ADD,nListenSocket,&ev)<0)
	{
		cout<<"epoll_ctl() error"<<endl;
		return -1;
	}
	
	while(1)
	{
	int n,i;
	int len;
	int con_fd;
	char buf[256];
	cout << "epoll_wait()..." << endl;
	// 傳回需要處理的就緒事件的數目
	n = epoll_wait(efd,events,MAXEVENTSIZE,-1);
	cout << "n=" << n << endl;
	for(i=0;i<n;i++)
	{
		/*
		cout << "i=" << i << endl;
		cout << "n=" << n << endl;		
		cout << "events[i].data.fd: " << events[i].data.fd << endl;*/
		/*
		if((events[i].events&EPOLLERR)||
		(events[i].events&EPOLLHUP)||
		(!(events[i].events&EPOLLIN))) /* An error has occured on this fd, or the socket is not
                 ready for reading (why were we notified then?) */
		/*{
			cout<<"epoll error"<<endl;
			close(events[i].data.fd);
			continue;
		}*/
		//就緒事件的檔案描述符為 監聽套接字
		/*else*/ 
		if(nListenSocket == events[i].data.fd)
		{
			sockaddr_in ClientAddress;
			socklen_t LengthOfClientAddress = sizeof(sockaddr_in);
			int nConnectedSocket = accept(nListenSocket, (sockaddr *)&ClientAddress, &LengthOfClientAddress);
			if(-1 == nConnectedSocket)
			{
				cout << "accept error" << std::endl;
				close(nListenSocket);
				return -1;
			}
			cout << "Connection from :" << inet_ntoa(ClientAddress.sin_addr)<< ":" 
			<< ntohs(ClientAddress.sin_port) << endl;
			SetNonBlock(nConnectedSocket);//**設定連接配接套接字為非阻塞狀态
			cout << "nConnectedSocked: " << nConnectedSocket << endl;
			//ev.events = EPOLLIN|EPOLLOUT|EPOLLET;//read write edge_triggered
			ev.events = EPOLLIN|EPOLLOUT|EPOLLET;//
			ev.data.fd = nConnectedSocket;
			//注冊新的fd到efd句柄中
			if(epoll_ctl(efd,EPOLL_CTL_ADD,nConnectedSocket,&ev)<0)
			{
				cout<<"epoll_ctl() error"<<endl;
				return -1;
			}
			
		}
		else if(events[i].events&EPOLLIN)//readable
		{
			int res = 1;
			recv_buf = "";
			cout << i << ":epollin..." << endl;
			con_fd = events[i].data.fd;
			if(con_fd < 0)
			{
				cout << "con_fd < 0" << endl;
				break;
			}
			else
			{
				//cout << "con_fd = " << con_fd << endl;
			
				while(((len = recv(con_fd,buf,sizeof(buf)-1,0))!=-1)&&(errno != EAGAIN))
			  	{
					buf[len] = '\0';
					cout <<"len = " << len <<  ", buf: " << buf;
					
					recv_buf += buf;
					if(len == sizeof(buf) - 1)//has more data to read
						continue;
					else if((len < sizeof(buf) - 1)&& (len > 0))//the last data segment
						break;

					else if(len == 0)//the peer has closed the socket
					{
						cout << "the peer has closed the socket..." << endl;
						close(con_fd);
						ev.data.fd = con_fd;
						
						if(epoll_ctl(efd,EPOLL_CTL_DEL,con_fd,&ev) < 0)
						{
							cout<<"epoll_ctl() error"<<endl;
							return -1;
						}
						break;
					}
				}
				if(recv_buf != "")
					cout << "Recv:" << recv_buf << endl;
				else if(errno == EAGAIN)
					cout << "no data in the buffer to read..." << endl;
				
			}
				
			
		}
		else if(events[i].events&EPOLLOUT)//writeable
		{
			cout << i<< ":epollout..." << endl;
			con_fd = events[i].data.fd;
			if(con_fd < 0)
			{
				cout << "con_fd < 0" << endl;
				break;
			}
			else
			{
				//cout << "con_fd = " << con_fd << endl;
				strcpy(buf,"hello client...");
				if((len=send(con_fd,buf,sizeof(buf),0)) == -1)
				{
					perror("send");
					exit(1);
				}
			}
				
		}
		else if(events[i].events&EPOLLHUP)
		{
			cout << i << ":epollhup..." << endl;
		}
		else if(events[i].events&EPOLLERR)
		{
			cout << i << ":epollerr..." << endl;
		}
		else
		{
			cout << i << ":other events..." << endl;
		}
	}//end of "for"
	}//end of "while"

	close(nListenSocket);

	return 0;
    }//end of int Run()

private:
    virtual void ServerFunction(int nConnectedSocket, int nListenSocket)
    {
    }

    static int SetNonBlock(int fd)
    {
	int flags = fcntl(fd,F_GETFL,0);
	if(flags == -1)
	{
		cout<<"fcntl error"<<endl;
		return -1;
	}
	flags |= O_NONBLOCK;

	if(fcntl(fd,F_SETFL,flags) == -1)
	{
		cout<<"fcntl error"<<endl;
		return -1;
	}
	
	return 0;
   }

private:
    int m_nServerPort;
    char* m_strBoundIP;
    int m_nLengthOfQueueOfListen;
};

class CMyTCPServer : public CTCPServer
{
public:
    CMyTCPServer(int nServerPort, int nLengthOfQueueOfListen = 100, const char *strBoundIP = NULL) : CTCPServer(nServerPort, nLengthOfQueueOfListen, strBoundIP)
    {
    }

    virtual ~CMyTCPServer()
    {
    }

private:
    virtual void ServerFunction(int nConnectedSocket, int nListenSocket)
    {
	char buf[14];		
	write(nConnectedSocket, "Hello World\n", 13);
	read(nConnectedSocket,buf,14);
	cout<<buf<<endl;
	close(nConnectedSocket);
    }
};

int main()
{
// CTCPServer(int nServerPort, int nLengthOfQueueOfListen = 100, const char *strBoundIP = NULL)
    CMyTCPServer myserver(4002);
    myserver.Run();
    return 0;
}
           

test_client.cpp

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
/* 伺服器程式監聽的端口号 */
//#define PORT 1240
/* 我們一次所能夠接收的最大位元組數 */
#define MAXDATASIZE 100
int main(int argc, char *argv[])
{
/* 套接字描述符 */
int sockfd, numbytes;
char buf[MAXDATASIZE];
int port;
struct hostent *he;
/* 連接配接者的主機資訊 */
struct sockaddr_in their_addr;
/* 檢查參數資訊 */
if(argc!= 3)
{
/* 如果沒有參數,則給出使用方法後退出 */
fprintf(stderr,"usage: server_host server_port\n");
exit(1);
}
/* 取得主機資訊 */
if ((he=gethostbyname(argv[1])) == NULL)
{
/* 如果 gethostbyname()發生錯誤,則顯示錯誤資訊并退出 */
herror("gethostbyname");
exit(1);
}
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
{
/* 如果 socket()調用出現錯誤則顯示錯誤資訊并退出 */
perror("socket");
exit(1);
}

port = atoi(argv[2]);//

/* 主機位元組順序 */
their_addr.sin_family = AF_INET;
/* 網絡位元組順序,短整型 */
their_addr.sin_port = htons(port);
their_addr.sin_addr = *((struct in_addr *)he->h_addr);
/* 将結構剩下的部厘清零*/
bzero(&(their_addr.sin_zero), 8);
if(connect(sockfd, (struct sockaddr *)&their_addr, sizeof(struct sockaddr)) == -1)
{
/* 如果 connect()建立連接配接錯誤,則顯示出錯誤資訊,退出 */
perror("connect");
exit(1);
}



if((numbytes=recv(sockfd, buf, MAXDATASIZE, 0)) == -1)
{
// 如果接收資料錯誤,則顯示錯誤資訊并退出 
perror("recv");
exit(1);
}
buf[numbytes] = '\0';
printf("Received: %s\n",buf);


int count;
for(count = 0;count < 2;count++)
{
strcpy(buf,"hello server,i'm client!\n");
send(sockfd,buf,strlen(buf),0);
}

sleep(10);

strcpy(buf,"hello server,10 s has passed, i've come back now\n");
send(sockfd,buf,strlen(buf),0);

/*
sleep(100);
strcpy(buf,"Received your message2!\n");
send(sockfd,buf,strlen(buf),0);
*/
sleep(10);

close(sockfd);


return 0;
}
           

運作結果:

Server端:

I/O多路複用---epoll函數測試

Client端:

I/O多路複用---epoll函數測試

繼續閱讀