天天看點

I/O複用

select

select系統調用的用途是:在一段指定時間内,監聽使用者感興趣的檔案描述符上的可讀、可寫和異常等事件。

#include<sys/select.h>
/* Check the first NFDS descriptors each in READFDS (if not NULL) for read
   readiness, in WRITEFDS (if not NULL) for write readiness, and in EXCEPTFDS
   (if not NULL) for exceptional conditions.  If TIMEOUT is not NULL, time out
   after waiting the interval specified therein.  Returns the number of ready
   descriptors, or -1 for errors.

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int select (int __nfds, fd_set *__restrict __readfds,
		   fd_set *__restrict __writefds,
		   fd_set *__restrict __exceptfds,
		   struct timeval *__restrict __timeout);
           
  1. nfds 參數指定被監聽的檔案描述符的總數。它通常被設定為sclect監聽的所有檔案描述符中的最大值加1,因為檔案描述符是從0開始計數的。
  2. readfds、writefds 和exceptfds參數分别指向可讀、可寫和異常等事件對應的檔案描述符集合。應用程式調用select函數時,通過這3個參數傳人自己感興趣的檔案描述符。select調用傳回時,核心将修改它們來通知應用程式哪些檔案描述符已經就緒。這3個參數是fd_set結構指針類型。fd_set 結構體的定義如下:
#
#include <typesizes.h>
/* Number of descriptors that can fit in an `fd_set'.  */
#define __FD_SETSIZE		1024

/* The fd_set member is required to be an array of longs.  */
typedef long int __fd_mask;

/* Some versions of <linux/posix_types.h> define this macros.  */
#undef	__NFDBITS
/* It's easier to assume 8-bit bytes than to get CHAR_BIT.  */
#define __NFDBITS	(8 * (int) sizeof (__fd_mask))
#define	__FD_ELT(d)	((d) / __NFDBITS)
#define	__FD_MASK(d)	((__fd_mask) (1UL << ((d) % __NFDBITS)))

/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;
           

由以上定義可見,fd_set 結構體僅包含一個整型數組,該數組的每個元素的每一位(bit)标記一個檔案描述符。fd_set 能容納的檔案描述符數量由FD_SETSIZE指定,這就限制了select能同時處理的檔案描述符的總量。由于位操作過于煩瑣,我們應該使用下面的一系列宏來通路fd_set 結構體中的位:

/* Access macros for `fd_set'.  */
#define	FD_SET(fd, fdsetp)	__FD_SET (fd, fdsetp)	//設定fdset的位fd
#define	FD_CLR(fd, fdsetp)	__FD_CLR (fd, fdsetp)	//清除fdset的位fd
#define	FD_ISSET(fd, fdsetp)	__FD_ISSET (fd, fdsetp)		//測試fdset的位fd是否被設定
#define	FD_ZERO(fdsetp)		__FD_ZERO (fdsetp)	//清除fdset的所有位
           
  1. timeout 參數用來設定select函數的逾時時間。它是一個timeval結構類型的指針,采用指針參數是因為核心将修改它以告訴應用程式select等待了多久。不過我們不能完全信任sclect調用傳回後的timeout值,比如調用失敗時timeout值是不确定的。timeval 結構體的定由以上定義如下:
/* A time value that is accurate to the nearest
   microsecond but also has a range of years.  */
struct timeval
{
  __time_t tv_sec;		/* 秒.  */
  __suseconds_t tv_usec;	/* 微秒.  */
};
           

由以上定義可見,select 給我們提供了一個微秒級的定時方式。如果給timeout變量的tv_sec成員和tv_usec成員都傳遞0,則select将立即傳回。如果給timeout傳遞NULL,則select将一直阻塞, 直到某個檔案描述符就緒。

select成功時傳回就緒(可讀、可寫和異常)檔案描述符的總數。如果在逾時時間内沒有任何檔案描述符就緒,select 将傳回0. select 失敗時傳回-1并設定ermo。如果在select等待期間,程式接收到信号,則select立即傳回-1,并設定ermo為EINTR。

檔案描述符就緒條件

哪些情況下檔案描述符可以被認為是可讀、可寫或者出現異常,對于select的使用非常關鍵。在網絡程式設計中,下列情況下socket可讀:

  • socket核心接收緩存區中的位元組數大于或等于其低水位标記SO_RCVLOWAT。此時我們可以無阻塞地讀該socket,并且讀操作傳回的位元組數大于0。
  • socket通信的對方關閉連接配接。此時對該socket的讀操作将傳回0。口監聽socket上有新的連接配接請求。
  • socket上有未處理的錯誤。此時我們可以使用getsockopt 來讀取和清除該錯誤。下列情況下socket可寫:
    • socket核心發送緩存區中的可用位元組數大于或等于其低水位标記SO_SNDLOWAT.此時我們可以無阻塞地寫該socket,并且寫操作傳回的位元組數大于0。
    • socket的寫操作被關閉。對寫操作被關閉的socket執行寫操作将觸發一個 SIGPIPE信号。
    • socket使用非阻塞connect連接配接成功或者失敗(逾時)之後。
    • socket上有未處理的錯誤。此時我們可以使用getsockopt來讀取和清除該錯誤。

poll

poll系統調用和select類似,也是在指定時間内輪詢一定數量的檔案描述符,以測試其中是否有就緒者。poll 的原型如下:

int poll(pollfd *__fds, nfds_t __nfds, int __timeout)
Poll the file descriptors described by the NFDS structures starting at
FDS. If TIMEOUT is nonzero and not -1, allow TIMEOUT milliseconds for
an event to occur; if TIMEOUT is -1, block until an event occurs.
Returns the number of file descriptors with events, zero if timed out,
or -1 for errors.

This function is a cancellation point and therefore not marked with
__THROW.
           
  • fds 參數是一個polfd結構類型的數組,它指定所有檔案描述符上發生的可讀、可寫和異常等事件。pollfd 結構體的定義如下:
/* Data structure describing a polling request.  */
struct pollfd
  {
    int fd;			/* 檔案描述符。  */
    short int events;		/* 注冊的事件類型。  */
    short int revents;		/* 實際發生的事件類型,由核心填充。  */
  };
           

其中,fd 成員指定檔案描述符; events 成員告訴poll監聽fd上的哪些事件,它是一系列事件的按位或; revents 成員則由核心修改,以通知應用程式fd上實際發生了哪些事件。poll支援的事件類型如表所示。

I/O複用
  • nfds參數指定被監聽事件集合fds的大小。其類型nfds_t的定義如下:
/* Type used for the number of file descriptors.  */
typedef unsigned long int nfds_t;
           
  • timcout 參數指定poll的逾時值,機關是毫秒。當timeout為-1時,poll 調用将永遠阻塞,直到某個事件發生;當timcout為0時,poll調用将立即傳回。poll系統調用的傳回值的含義與select相同。

epoll

epoll是Linux特有的I/O複用函數。它在實作和使用上與select、poll 有很大差異。首先,epoll 使用一組函數來完成任務,而不是單個函數。其次,epoll 把使用者關心的檔案描述符上的事件放在核心裡的一個事件表中,進而無須像select和poll那樣每次調用都要重複傳人檔案描述符集或事件集。但epoll需要使用一個額外的檔案描述符,來唯一辨別核心中的這個事件表。這個檔案描述符使用如下epoll_create 函數來建立:

#include<sys/epoll.h>

/* Creates an epoll instance.  Returns an fd for the new instance.
   The "size" parameter is a hint specifying the number of file
   descriptors to be associated with the new instance.  The fd
   returned by epoll_create() should be closed with close().  */
extern int epoll_create (int __size) __THROW;
           

size參數現在并不起作用,隻是給核心一個提示, 告訴它事件表需要多大。該函數傳回的檔案描述符将用作其他所有epoll 系統調用的第一個參 數,以指定要通路的核心事件表。下面的函數用來操作epoll的核心事件表:

#include<sys/epoll.h>

/* Manipulate an epoll instance "epfd". Returns 0 in case of success,
   -1 in case of error ( the "errno" variable will contain the
   specific error code ) The "op" parameter is one of the EPOLL_CTL_*
   constants defined above. The "fd" parameter is the target of the
   operation. The "event" parameter describes which events the caller
   is interested in and any associated user data.  */
extern int epoll_ctl (int __epfd, int __op, int __fd,
		      struct epoll_event *__event) __THROW;
//epoll_ctl 成功時傳回0,失敗則傳回-1并設定ermo.
           

fd參數是要操作的檔案描述符,op 參數則指定操作類型。操作類型有如下3種:

/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
#define EPOLL_CTL_ADD 1	/* Add a file descriptor to the interface.  */
#define EPOLL_CTL_DEL 2	/* Remove a file descriptor from the interface.  */
#define EPOLL_CTL_MOD 3	/* Change file descriptor epoll_event structure.  */
           

event參數指定事件,它是epoll_event 結構指針類型。epoll_event 的定義如下:

struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;
           

其中events成員描述事件類型。epoll 支援的事件類型和poll基本相同。表示epoll事件類型的宏是在poll對應的宏前加上“E",比如epoll的資料可讀事件是EPOLLIN但epoll有兩個額外的事件類型一EPOLLET 和EPOLLONESHOT它們對于epoll的高效運作非常關鍵,我們将在後面讨論它們。data 成員用于存儲使用者資料,其類型epoll_data_t的定義如下:

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;
           

epoll_data_t是一個聯合體,其4個成員中使用最多的是fd,它指定事件所從屬的目标檔案描述符。ptr成員可用來指定與fd相關的使用者資料。但由于epoll_data_t 是一個聯合體,我們不能同時使用其ptr成員和fd成員,是以,如果要将檔案描述符和使用者資料關聯起來, 以實作快速的資料通路,隻能使用其他手段,比如放棄使用epoll_data_t的fd成員,而在ptr指向的使用者資料中包含fd。

epoll_wait

epoll系列系統調用的主要接口是epoll_wait函數。它在一段逾時時間内等待一組檔案描述符上的事件,其原型如下:

/* Wait for events on an epoll instance "epfd". Returns the number of
   triggered events returned in "events" buffer. Or -1 in case of
   error with the "errno" variable set to the specific error code. The
   "events" parameter is a buffer that will contain triggered
   events. The "maxevents" is the maximum number of events to be
   returned ( usually size of "events" ). The "timeout" parameter
   specifies the maximum wait time in milliseconds (-1 == infinite).

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int epoll_wait (int __epfd, struct epoll_event *__events,
		       int __maxevents, int __timeout);
//成功時傳回就緒的檔案描述符的個數,失敗時傳回-1并設定ermo。
           

timeout 參數的含義與poll接口的timeout 參數相同。maxevents 參數指定最多監聽多少個事件,它必須大于0。

epoll_ wait 函數如果檢測到事件,就将所有就緒的事件從核心事件表(由epfd 參數指定)中複制到它的第二個參數events指向的數組中。這個數組隻用于輸出epoll_wait 檢測到的就緒事件,而不像seleet和poll的數組參數那樣既用于傳入使用者注冊的事件,又用于輸出核心檢測到的就緒事件。這就極大地提高了應用程式索引就緒檔案描述符的效率。

LT和ET

epoll對檔案描述符的操作有兩種模式: LT (Level Trigger,電平觸發)模式和eT ( Edge Trigger,邊沿觸發)模式。LT 模式是預設的工作模式,這種模式下epoll相當于一個效率高的poll當往epoll核心事件表中注冊一個檔案描述符上的EPOLLET事件時,epoll 将以ET模式來操作該檔案描述符。ET 模式是epoll的高效工作模式。

對于采用LT工作模式的檔案描述符,當epoll_wait檢測到其上有事件發生并将此事件通知應用程式後,應用程式可以不立即處理該事件。這樣,當應用程式下一次調用epoll_wait 時,epoll_wait 還會再次向應用程式通告此事件,直到該事件被處理。而對于采用ET工作模式的檔案描述符,當epoll_wait檢測到其上有事件發生并将此事件通知應用程式後,應用程式必須立即處理該事件,因為後續的epoll wait 調用将不再向應用程式通知這一事件。 可見,ET模式在很大程度上降低了同一個epoll事件被重複觸發的次數,是以效率要比LT模式高。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void addfd(int epollfd, int fd, bool enable_et)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if (enable_et)
    {
        event.events |= EPOLLET;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void lt(epoll_event *events, int number, int epollfd, int listenfd)
{
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; i++)
    {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd)
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
            addfd(epollfd, connfd, false);
        }
        else if (events[i].events & EPOLLIN)
        {
            printf("event trigger once\n");
            memset(buf, '\0', BUFFER_SIZE);
            int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
            if (ret <= 0)
            {
                close(sockfd);
                continue;
            }
            printf("get %d bytes of content: %s\n", ret, buf);
        }
        else
        {
            printf("something else happened \n");
        }
    }
}

void et(epoll_event *events, int number, int epollfd, int listenfd)
{
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; i++)
    {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd)
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
            addfd(epollfd, connfd, true);
        }
        else if (events[i].events & EPOLLIN)
        {
            printf("event trigger once\n");
            while (1)
            {
                memset(buf, '\0', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                if (ret < 0)
                {
                    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                    {
                        printf("read later\n");
                        break;
                    }
                    close(sockfd);
                    break;
                }
                else if (ret == 0)
                {
                    close(sockfd);
                }
                else
                {
                    printf("get %d bytes of content: %s\n", ret, buf);
                }
            }
        }
        else
        {
            printf("something else happened \n");
        }
    }
}

int main(int argc, char *argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd, true);

    while (1)
    {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (ret < 0)
        {
            printf("epoll failure\n");
            break;
        }
        lt(events, ret, epollfd, listenfd);
        //et( events, ret, epollfd, listenfd );
    }
    close(listenfd);
    return 0;
}
           

EPOLLONESHOT

即使我們使用ET模式,一個socket上的某個事件還是可能被觸發多次。這在并發程式中就會引起一個問題。比如一個線程(或程序,下同)在讀取完某個socket.上的資料後開始處理這些資料,而在資料的處理過程中該socket上又有新資料可讀(EPOLLIN 再次被觸發),此時另外一個線程被喚醒來讀取這些新的資料。于是就出現了兩個線程同時操作一個socket的局面。這當然不是我們期望的。我們期望的是一個socket連接配接在任一時 刻都隻被一個線程處理。這-一點可以使用epoll的EPOLLONESHOT事件實作。

對于注冊了EPOLLONESHOT事件的檔案描述符,作業系統最多觸發其上注冊的一個可讀、可寫或者異常事件,且隻觸發一次, 除非我們使用epoll_ctl 函數重置該檔案描述符上注冊的EPOLLONESHOT事件。這樣,當一個線程在處理某個socket時,其他線程是不可能有機會操作該socket的。但反過來思考,注冊了EPOLLONESHOT事件的socket一旦被某個線程處理完畢,該線程就應該立即重置這個socket上的EPOLLONESHOT事件,以確定這個socket下一次可讀時,其EPOLLIN事件能被觸發,進而讓其他工作線程有機會繼續處理這個socket。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

struct fds
{
    int epollfd;
    int sockfd;
};

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void addfd(int epollfd, int fd, bool oneshot)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if (oneshot)
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void reset_oneshot(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

void *worker(void *arg)
{
    int sockfd = ((fds *)arg)->sockfd;
    int epollfd = ((fds *)arg)->epollfd;
    printf("start new thread to receive data on fd: %d\n", sockfd);
    char buf[BUFFER_SIZE];
    memset(buf, '\0', BUFFER_SIZE);
    while (1)
    {
        int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
        if (ret == 0)
        {
            close(sockfd);
            printf("foreiner closed the connection\n");
            break;
        }
        else if (ret < 0)
        {
            if (errno == EAGAIN)
            {
                reset_oneshot(epollfd, sockfd);
                printf("read later\n");
                break;
            }
        }
        else
        {
            printf("get content: %s\n", buf);
            sleep(5);
        }
    }
    printf("end thread receiving data on fd: %d\n", sockfd);
}

int main(int argc, char *argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd, false);

    while (1)
    {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (ret < 0)
        {
            printf("epoll failure\n");
            break;
        }

        for (int i = 0; i < ret; i++)
        {
            int sockfd = events[i].data.fd;
            if (sockfd == listenfd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
                addfd(epollfd, connfd, true);
            }
            else if (events[i].events & EPOLLIN)
            {
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
            }
            else
            {
                printf("something else happened \n");
            }
        }
    }
    close(listenfd);
    return 0;
}
           

從工作線程函數worker來看,如果一個工作線程處理完某個socket上的一次請求(我們用休眠5s來模拟這個過程)之後,又接收到該socket上新的客戶請求,則該線程将繼續為這個socket服務。并且因為該socket上注冊了EPOLLONESHOT事件,其他線程沒有機會接觸這個socket,如果工作線程等待5s後仍然沒收到該socket.上的下一批客戶資料,則它将放棄為該socket服務。同時,它調用reset_oneshot 函數來重置該socket上的注冊事件,這将使epoll有機會再次檢測到該socket上的EPOLLIN事件,進而使得其他線程有機會為該socket服務。

由此看來,盡管一個socket在不同時間可能被不同的線程處理,但同一時刻肯定隻有一個線程在為它服務。這就保證了連接配接的完整性,進而避免了很多可能的競态條件。

三種I/O複用函數的比較

繼續閱讀