天天看點

從網上找到的聊天室代碼看epoll相關的API

拿一個聊天室的demo來講一下Socket網絡程式設計中的epoll相關的api的使用

server端代碼:

//server.cpp
#include <iostream>
#include <list>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

using namespace std;

//選用list存放sockfd
list<int> clients_list;

//server port
#define SERVER_PORT 8888

//epoll支援的最大并發量
#define EPOLL_SIZE 5000

//message buf size
#define BUF_SIZE 0xFFFF

#define SERVER_WELCOME "Welcome you join  to the chat room! Your chat ID is: Client #%d"

#define SERVER_MESSAGE "ClientID %d say >> %s"

// exit
#define EXIT "EXIT"

#define CAUTION "There is only one int the char room!"

int setnonblocking(int sockfd){
    fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK);
    return 0;
}

void addfd(int epollfd, int fd, bool enable_et){
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    if(enable_et){
        ev.events = EPOLLIN | EPOLLET;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
    setnonblocking(fd);
}

int sendBroadcastmessage(int clientfd){
    char buf[BUF_SIZE], message[BUF_SIZE];
    //清零
    bzero(buf, BUF_SIZE);
    bzero(message, BUF_SIZE);

    printf("read from client(clientID = %d)\n", clientfd);
    int len = recv(clientfd, buf, BUF_SIZE, 0);
    //len=0 client關閉了連接配接
    if(len == 0){
        close(clientfd);
        clients_list.remove(clientfd);
        printf("ClientID = %d closed.\n now there are %d client in the char room\n", clientfd, (int)clients_list.size());
    }else{//進行廣播
        if(clients_list.size() == 1){
            send(clientfd, CAUTION, strlen(CAUTION), 0);
            return len;
        }
        sprintf(message, SERVER_MESSAGE, clientfd, buf);

        list<int>::iterator iter;
        for(iter = clients_list.begin(); iter != clients_list.end(); iter++){
            if(*iter != clientfd){
                if(send(*iter, message, BUF_SIZE, 0) < 0){
                     perror("error"); 
                     exit(-1);
                }
            }
        }
    }
}

int main(int argc, char* argv[]){
	//伺服器IP + port
    struct sockaddr_in serverAddr;
    serverAddr.sin_family = PF_INET;
    serverAddr.sin_port = htons(SERVER_PORT);
    serverAddr.sin_addr.s_addr = htonl (INADDR_ANY);

    //建立監聽socket
    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    if(listenfd < 0){
    	perror("listenfd");
    	exit(-1);
    }
    printf("listen socket created");

    //綁定位址
    if( bind(listenfd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
        perror("bind error");
        exit(-1);
    }

    //監聽
    int ret = listen(listenfd, 5);
    if(ret < 0) { 
        perror("listen error"); exit(-1);
    }

    //在核心中建立事件表
    int epfd = epoll_create(EPOLL_SIZE);
    if(epfd < 0){
        perror("epfd error");
        exit(-1);
    }
    printf("epoll created, epoll size = %d\n", epfd);

    static struct epoll_event events[EPOLL_SIZE];

    //往核心事件表裡添加事件
    addfd(epfd, listenfd, true);

    //主循環
    while(1){
        int epoll_events_count = epoll_wait(epfd, events, EPOLL_SIZE, -1);
        if(epoll_events_count < 0){
            perror("epoll failure");
            break;
        }
        printf("epoll event counts = %d\n", epoll_events_count);

        for(int i = 0; i < epoll_events_count; i++){
            int sockfd = events[i].data.fd;
            if(sockfd == listenfd){
                struct sockaddr_in client_address;
                socklen_t client_addrLength = sizeof(struct sockaddr_in);
                int clientfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrLength);
                printf("client connection from: %s : % d(IP : port), clientfd = %d \n", inet_ntoa(client_address.sin_addr),
                ntohs(client_address.sin_port),
                clientfd);

                addfd(epfd, clientfd, true);

                //服務端用list儲存使用者連接配接
                clients_list.push_back(clientfd);
                printf("Add new clientfd = %d to epoll\n", clientfd);
                printf("Now there are %d clients int the chat room\n", (int)clients_list.size());

                //服務端發送歡迎消息
                char message[BUF_SIZE];
                bzero(message, BUF_SIZE);
                sprintf(message, SERVER_WELCOME, clientfd);
                int ret = send(clientfd, message, BUF_SIZE, 0);
                if(ret < 0){
                    perror("error");
                    exit(-1);
                }
            }
            else{
                int ret = sendBroadcastmessage(sockfd);
                if(ret < 0){
                    perror("error");
                    exit(-1);
                }
            }
        }
    }

    close(listenfd); //關閉socket
    close(epfd);    //關閉核心
    return 0;
}
           

Client端代碼:

//client.cpp
#include <iostream>
#include <list>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

using namespace std;

// server port
#define SERVER_PORT 8888

//epoll 支援的最大并發量
#define EPOLL_SIZE 5000

//message buffer size
#define BUF_SIZE 0xFFFF

// exit
#define EXIT "EXIT"

//設定sockfd,pipefd非阻塞
int setnonblocking(int sockfd){
	fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK);
	return 0;
}

int addfd(int epollfd, int fd, bool enable_et){
	struct epoll_event ev;
	ev.data.fd = fd;
	ev.events = EPOLLIN; //輸入出發epoll-event
	if(enable_et){
		ev.events = EPOLLIN | EPOLLET;
	}
	epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
	setnonblocking(fd);
}

int main(int argc, char* argv[]){
	//使用者連接配接的伺服器IP、端口
	struct sockaddr_in serverAddr;
	serverAddr.sin_family = PF_INET;
	serverAddr.sin_port = htons(SERVER_PORT);
	const char* servInetAddr = "127.0.0.1";
	inet_pton(AF_INET, servInetAddr, &serverAddr.sin_addr);

	//建立socket
	int sock = socket(PF_INET, SOCK_STREAM, 0);
	if(sock < 0){
		perror("sock error");
		exit(-1);
	}

	// 連接配接服務端
	if(connect(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0){
		perror("connect error");
		exit(-1);
	}

	//建立管道,其中fd[0]用于父程序讀,fd[1]用于子程序寫
	int pipe_fd[2];
	if(pipe(pipe_fd) < 0){
		perror("pipe error");
		exit(-1);
	}

	// 1 建立epoll
	int epfd = epoll_create(EPOLL_SIZE);
	if(epfd < 0) { perror("epfd error"); exit(-1); }
	static struct epoll_event events[2];
	//将sock和管道讀端都加到核心事件表中
	addfd(epfd, sock, true);
	addfd(epfd, pipe_fd[0], true);

	// 表示用戶端是否正常工作
    bool isClientwork = true;

    // 聊天資訊緩沖區
    char message[BUF_SIZE];


   	//Fork
   	int pid = fork();
   	if(pid < 0) { perror("fork error"); exit(-1); }
   	else if(pid == 0){  //子程序
   		//子程序負責寫入管道,是以先關閉讀端
   		close(pipe_fd[0]);
   		printf("Please input 'exit' to exit the chat room\n");

   		while(isClientwork){
   			bzero(&message, BUF_SIZE);
   			fgets(message, BUF_SIZE, stdin);

   			//用戶端輸出exit,退出
   			if(strncasecmp(message, EXIT, strlen(EXIT)) == 0){
   				isClientwork = 0;
   			}else{
   				//子程序将資訊寫入管道
   				if(write(pipe_fd[1], message, strlen(message) - 1) < 0){
   					{ perror("fork error"); exit(-1); }
   				}
   			} 
   		}
   	}else{		//pid > 0 父程序
   		//父程序負責讀管道資料,是以先關閉寫端
   		close(pipe_fd[1]);

  		while(isClientwork){
  			int epoll_events_count = epoll_wait(epfd, events, 2, -1);
   		//處理就緒事件
   		for(int i = 0; i < epoll_events_count; i++){
   			bzero(&message, BUF_SIZE);
   			//服務端發來消息
   			if(events[i].data.fd == sock){
   				//接受服務端消息
   				int ret = recv(sock, message, BUF_SIZE, 0);

   				//ret = 0  服務端關閉
   				if(ret == 0){
   					printf("Server closed connection: %d\n", sock);
                    close(sock);
                    isClientwork = 0;
   				}else{
   					printf("%s\n", message);
   				}
   			}else{
   				//子程序寫入事件發生,父程序處理并發送資料
   				int ret = read(events[i].data.fd, message, BUF_SIZE);
   				if(ret = 0){
   					isClientwork = 0;
   				}else{
   					send(sock, message, BUF_SIZE, 0);
   				}
   			}
   		}
  		}
   		
   	}

   	if(pid){
   		close(pipe_fd[1]);
   		close(sock);
   	}else{
   		close(pipe_fd[0]);
   	}

   	return 0;
}
           

代碼寫得十分精煉,一般來說TCP服務端通信的正常步驟是:

使用socket()建立TCP套接字(socket)

将建立的套接字綁定到一個本地位址和端口上(bind)

将套接字設定成監聽模式,準備接受用戶端請求(listen)

等待用戶端請求到來:當請求到來後,接受連接配接請求,傳回一個對應此次連接配接的新的套接字(accept)

用accept傳回的套接字和用戶端進行通信(這裡使用send()和recv())

傳回,等待又一個客戶請求

關閉套接字

下面簡單說一下這裡使用epoll的相關API:

int epoll_create(int size);    

參數size:用來告訴核心要監聽的數目一共有多少個。

傳回值:成功時,傳回一個非負整數的檔案描述符,作為建立好的epoll句柄。調用失敗時,傳回-1,錯誤資訊可以通過errno獲得。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

參數epfd:epoll_create()函數傳回的epoll句柄。

參數op:操作選項,OP可選的值有三個:EPOLL_CTL_ADD(注冊新的fd到epfd上)、EPOLL_CTL_MOD(修改已經注冊的fd的監聽事件)、EPOLL_CTL_DEL(從epfd中删除一個fd)。

參數fd:要進行操作的目标檔案描述符。

參數event:struct epoll_event結構指針,将fd和要進行的操作關聯起來。

傳回值:成功時,傳回0,作為建立好的epoll句柄。調用失敗時,傳回-1,錯誤資訊可以通過errno獲得。

說明:epoll的事件注冊函數,它不同與select()是在監聽事件時告訴核心要監聽什麼類型的事件,而是在這裡先注冊要監聽的事件類型。

另外,epoll_event的結構如下:

typedef union epoll_data {  
   void *ptr;  
   int fd;  
   __uint32_t u32;  
   __uint64_t u64;  
} epoll_data_t;  
  
struct epoll_event {  
   __uint32_t events; /* Epoll events */  
   epoll_data_t data; /* User data variable */  
}; 
           

events可以是以下幾個宏的集合:

EPOLLIN :表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉);

EPOLLOUT:表示對應的檔案描述符可以寫;

EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來);

EPOLLERR:表示對應的檔案描述符發生錯誤;

EPOLLHUP:表示對應的檔案描述符被挂斷;

EPOLLET: 将EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水準觸發(Level Triggered)來說的。

EPOLLONESHOT:隻監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列裡

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

參數epfd:epoll_create()函數傳回的epoll句柄。

參數events:struct epoll_event結構指針,用來從核心得到事件的集合。

參數 maxevents:告訴核心這個events有多大

參數 timeout: 等待時的逾時時間,以毫秒為機關。

傳回值:成功時,傳回需要處理的事件數目。調用失敗時,傳回0,表示等待逾時。

繼續閱讀