天天看點

Linux I/O多路複用select機制poll機制epoll

Linux中一切皆檔案,不論是我們存儲在磁盤上的字元檔案,可執行檔案還是我們的接入電腦的I/O裝置等都被VFS抽象成了檔案,比如标準輸入裝置預設是鍵盤,我們在操作标準輸入裝置的時候,其實操作的是預設打開的一個檔案描述符是0的檔案,而一切軟體操作硬體都需要通過OS,而OS操作一切硬體都需要相應的驅動程式,這個驅動程式裡配置了這個硬體的相應配置和使用方法。Linux的I/O分為阻塞I/O,非阻塞I/O,I/O多路複用,信号驅動I/O四種。對于I/O裝置的驅動,一般都會提供關于阻塞和非阻塞兩種配置。我們最常見的I/O裝置之一--鍵盤(标準輸入裝置)的驅動程式預設是阻塞的。

多路複用就是為了使程序能夠從多個阻塞I/O中獲得自己想要的資料并繼續執行接下來的任務。其主要的思路就是同時監視多個檔案描述符,如果有檔案描述符的設定狀态的被觸發,就繼續執行程序,如果沒有任何一個檔案描述符的設定狀态被觸發,程序進入sleep

多路複用的一個主要用途就是實作"I/O多路複用并發伺服器",和多線程并發或者多程序并發相比,這種伺服器的系統開銷更低,更适合做web伺服器,但是由于其并沒有實作真正的多任務,是以當壓力大的時候,部分使用者的請求響應會較慢

阻塞I/O

阻塞I/O,就是當程序試圖通路這個I/O裝置而這個裝置并沒有準備好的時候,裝置的驅動程式會通過核心讓這個試圖通路的程序進入sleep狀态。阻塞I/O的一個好處就是可以大大的節約CPU時間,因為一旦一個程序試圖通路一個沒有準備好的阻塞I/O,就會進入sleep狀态,而進入sleep狀态的程序是不在核心的程序排程連結清單中,直到目标I/O準備好了将其喚醒并加入排程連結清單,這樣就可以節約CPU時間。當然阻塞I/O也有其固有的缺點,如果程序試圖通路一個阻塞I/O,但是否通路成功并不對接下來的任務有決定性影響,那麼直接使其進入sleep狀态顯然會延誤其任務的完成。

  • 典型的預設阻塞IO有标準輸入裝置,socket裝置,管道裝置等,當我們使用

    gets()

    ,

    scanf()

    read()

    等操作請求這些IO時而IO并沒有資料流入,就會造成程序的sleep。 程序會一直阻塞下去直到接收緩沖區中有資料可讀,此時核心再去喚醒該程序,通過相應的函數從中擷取資料。如果阻塞過程中對方發生故障,那麼這個程序将會永遠阻塞下去。
  • 寫操作時發生阻塞的情況要比讀操作少,主要發生在要寫入的緩沖區的大小小于要寫入的資料量的情況下,這時寫操作将不進行任何任何拷貝工作,将發生阻塞。一旦發送緩沖區内有足夠的空間,核心将喚醒程序,将資料從使用者緩沖區中拷貝到相應的發送資料緩沖區。udp不用等待确認,沒有實際的發送緩沖區,是以udp協定中不存在發送緩沖區滿的情況,在udp套接字上執行的寫操作永遠都不會阻塞

現假設一個程序希望通過三個管道中任意一個中讀取資料并顯示,僞代碼如下

read(pipe_0,buf,sizeof(buf));       //sleep
print buf;
read(pipe_1,buf,sizeof(buf));
print buf;
read(pipe_2,buf,sizeof(buf));
print buf;           

由于管道是阻塞I/O,是以如果pipe_0沒有資料流入,程序就是在第一個

read()

處進入sleep狀态而即使pipe_1和pipe_2有資料流入也不會被讀取。

如果我們使用下述代碼重新設定管道的阻塞屬性,顯然,如果三個管道都沒有資料流入,那麼程序就無法獲得請求的資料而繼續執行,倘若這些資料很重要(是以我們才要用阻塞I/O),那結果就會十分的糟糕,改為輪詢卻又大量的占據CPU時間。

int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);           

如何讓程序同時監視三個管道,其中一個有資料就繼續執行而不會sleep,如果全部沒有資料流入再sleep,就是多路複用技術需要解決的問題。

非阻塞I/O

非阻塞I/O就是當一個程序試圖通路一個I/O裝置的時候,無論是否從中擷取了請求的資料都會傳回并繼續執行接下來的任務。,但非常适合請求是否成功對接下來的任務影響不大的I/O請求。但如果通路一個非阻塞I/O,但這個請求如果失敗對程序接下來的任務有緻命影響,最粗暴的就是使用

while(1){read()}

輪詢。顯然,這種方式會占用大量的CPU時間。對于非阻塞IO,除了直接傳回,一個更重要的應用就是利用IO多路複用機制同時監視多個非阻塞IO。

select機制

select是一種非常"古老"的同步I/O接口,但是提供了一種很好的I/O多路複用的思路

模型

fd_set      //建立fd_set對象,将來從中增減需要監視的fd
FD_ZERO()   //清空fd_set對象
FD_SET()    //将一個fd加入fd_set對象中  
select()    //監視fd_set對象中的檔案描述符
pselect()   //先設定信号屏蔽,再監視
FD_ISSET()  //測試fd是否屬于fd_set對象
FD_CLR()    //從fd_set對象中删除fd           

Note:

  • select的第一個參數

    nfds

    是指

    集合中的最大的檔案描述符+1

    ,因為select會無差别周遊整個檔案描述符表直到找到目标,而檔案描述符是從0開始的,是以一共是

    集合中的最大的檔案描述符+1

    次。
  • 上一條導緻了這種機制的低效,如果需要監視的檔案描述符是0和100那麼每一次都會周遊101次
  • select()每次傳回都會修改fd_set,如果要循環select(),需要先對初始的fd_set進行備

例子_I/O多路複用并發伺服器

關于server本身的程式設計模型,參見

tcp/ip協定伺服器模型

udp/ip協定伺服器模型

這裡僅是使用select實作僞并行的部分模型

#define BUFSIZE 100
#define MAXNFD  1024 

int main()
{
    /***********伺服器的listenfd已經準本好了**************/
    fd_set readfds;
    fd_set writefds;
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_SET(listenfd, &readfds);

    fd_set temprfds = readfds;
    fd_set tempwfds = writefds;
    int maxfd = listenfd;


    int nready;
    char buf[MAXNFD][BUFSIZE] = {0};
    while(1){
        temprfds = readfds;
        tempwfds = writefds;

        nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL)
        if(FD_ISSET(listenfd, &temprfds)){          
            //如果監聽到的是listenfd就進行accept
            int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
            
            //将新accept的scokfd加入監聽集合,并保持maxfd為最大fd
            FD_SET(sockfd, &readfds);
            maxfd = maxfd>sockfd?maxfd:sockfd;
            
            //如果意見檢查了nready個fd,就沒有必要再等了,直接下一個循環
            if(--nready==0)
                continue;
        }
        
        int fd = 0;
        //周遊檔案描述符表,處理接收到的消息
        for(;fd<=maxfd; fd++){   
            if(fd == listenfd)
                continue;

            if(FD_ISSET(fd, &temprfds)){
                int ret = read(fd, buf[fd], sizeof buf[0]);
                if(0 == ret){    //用戶端連結已經斷開
                    close(fd);
                    FD_CLR(fd, &readfds);
                    if(maxfd==fd) 
                        --maxfd;
                    continue;
                }
                //将fd加入監聽可寫的集合
                FD_SET(fd, &writefds);  
            }
            //找到了接收消息的socket的fd,接下來将其加入到監視寫的fd_set中
            //将在下一次while()循環開始監視
            if(FD_ISSET(fd, &tempwfds)){
                int ret = write(fd, buf[fd], sizeof buf[0]);
                printf("ret %d: %d\n", fd, ret);
                FD_CLR(fd, &writefds);
            }
        }
    }
    close(listenfd);
}           

poll機制

poll是一種基于select的改良機制,其針對select的一些缺陷進行了重新設計,包括不需要備份fd_set等等,但是依然是周遊整個檔案描述符表,效率較低

struct pollfd   fds     //建立一個pollfd類型的數組
fds[0].fd               //向fds[0]中放入需要監視的fd
fds[0].events           //向fds[0]中放入需要監視的fd的觸發事件
    POLLIN              //I/O有輸入
    POLLPRI             //有緊急資料需要讀取
    POLLOUT             //I/O可寫
    POLLRDHUP           //流式套接字連接配接斷開或套接字處于半關閉狀态
    POLLERR             //錯誤條件(僅針對輸出)
    POLLHUP             //挂起(僅針對輸出)
    POLLNVAL            //無效的請求:fd沒有被打開(僅針對輸出)           

/* ... */

int main()
{
    /* ... */
    struct pollfd myfds[MAXNFD] = {0};
    myfds[0].fd = listenfd;
    myfds[0].events = POLLIN;
    int maxnum = 1;
    
    int nready;
    //準備二維數組buf,每個fd使用buf的一行,資料幹擾
    char buf[MAXNFD][BUFSIZE] = {0};
    while(1){
        //poll直接傳回event被觸發的fd的個數
        nready = poll(myfds, maxnum, -1)
        int i = 0;
        for(;i<maxnum; i++){
            //poll通過将相應的二進制位置一來表示已經設定
            //如果下面的條件成立,表示revent[i]裡的POLLIN位已經是1了
            if(myfds[i].revents & POLLIN){
                if(myfds[i].fd == listenfd){
                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
                    //将新accept的scokfd加入監聽集合
                    myfds[maxnum].fd = sockfd;
                    myfds[maxnum].events = POLLIN;
                    maxnum++;
                    
                    //如果意見檢查了nready個fd,就直接下一個循環
                    if(--nready==0)
                        continue;
                }
                else{
                    int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
                    if(0 == ret){    //如果連接配接斷開了
                        close(myfds[i].fd);
                        
                         //初始化将檔案描述符表所有的檔案描述符标記為-1
                         //close的檔案描述符也标記為-1
                         //打開新的描述符時從表中搜尋第一個-1
                         //open()就是這樣實作始終使用最小的fd
                         //這裡為了示範并沒有使用這種機制
                         myfds[i].fd = -1;  
                        continue;
                    }
                    myfds[i].events = POLLOUT;
                }
            }
            else if(myfds[i].revents & POLLOUT){
                int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
                myfds[i].events = POLLIN;
            }
        }
    }
    close(listenfd);
}           

epoll

epoll在poll基礎上實作的更為健壯的接口,它每次隻會周遊我們關心的檔案描述符,也是現在主流的web伺服器使用的多路複用技術,epoll一大特色就是支援

EPOLLET(邊沿觸發)

EPOLLLT (水準觸發)

,前者表示如果讀取之後緩沖區還有資料,那麼隻要讀取結束,剩餘的資料也會丢棄,而後者表示裡面的資料不會丢棄,下次讀的時候還在,預設是

EPOLLLT

epoll_create()          //建立epoll對象
struct epoll_event      //準備事件結構體和事件結構體數組
    event.events
    event.data.fd ...
epoll_ctl()             //配置epoll對象
epoll_wait()            //監控epoll對象中的fd及其相應的event
           

/* ... */

int main()
{
    /* ... */
    /* 建立epoll對象 */
    int epoll_fd = epoll_create(1024);
    
    //準備一個事件結構體
    struct epoll_event event = {0};
    event.events = EPOLLIN;
    event.data.fd = listenfd;   //data是一個共用體,除了fd還可以傳回其他資料
    
    //ctl是監控listenfd是否有event被觸發
    //如果發生了就把event通過wait帶出。
    //是以,如果event裡不标明fd,我們将來擷取就不知道哪個fd
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event);
    
    struct epoll_event revents[MAXNFD] = {0};
    int nready;
    char buf[MAXNFD][BUFSIZE] = {0};
    while(1){
        //wait傳回等待的event發生的數目
        //并把相應的event放到event類型的數組中
        nready = epoll_wait(epoll_fd, revents, MAXNFD, -1)
        int i = 0;
        for(;i<nready; i++){
            //wait通過在events中設定相應的位來表示相應事件的發生
            //如果輸入可用,那麼下面的這個結果應該為真
            if(revents[i].events & EPOLLIN){
                //如果是listenfd有資料輸入
                if(revents[i].data.fd == listenfd){
                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
                    struct epoll_event event = {0};
                    event.events = EPOLLIN;
                    event.data.fd = sockfd;
                    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);
                }
                else{
                    int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
                    if(0 == ret){
                        close(revents[i].data.fd);
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]);
                    }
                    
                    revents[i].events = EPOLLOUT;
                    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
                }
            }
            else if(revents[i].events & EPOLLOUT){
                int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
                revents[i].events = EPOLLIN;
                epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
            }
        }
    }
    close(listenfd);
}           

繼續閱讀