天天看點

Linux 異步IO介紹--相關函數



epoll

epoll

是Linux對

select

功能的改進,其性能大大提升,而且和監控的IO個數無關。

  • API:
  • epoll_create:
    ***`int epoll_create(int size);`***
    
    建立一個`epoll`執行個體,`size`參數是可監控IO的數量大小,但是Linux 2.6.8之後已不再使用。
               
  • epoll_ctl:
    ***`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);`***
    
    對`epoll`執行個體進行操作,由`op`指定操作類型:
               
  • EPOLL_CTL_ADD:添加一個檔案描述符到

    epoll

    執行個體中。
  • EPOLL_CTL_MOD:更新

    epoll

    執行個體中檔案描述符。
  • EPOLL_CTL_DEL:删除

    epoll

    執行個體中檔案描述符。
  • epoll_wait
    ***```int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);```***
    
    等待已經準備好的IO.`events`參數傳回的資料就是在`epoll_ctl`函數第三個參數設定的值。
               
  • 限制:無
  • 使用範例:

    epoll

    最多的用途就是socket程式設計,可以大大提高伺服器的性能,此處我們實作一個簡單的http伺服器。
    #define MAXFDS 128
    #define EVENTS 100
    #define PORT 8080
    #define MAXEPOLLSIZE 1024*10
    
    typedef enum
    {
        false,
        true
    }bool;
    
    /***************定義處理socket的回調函數類型***********/
    typedef int (*socket_pro)(int fd,void *data);
    
    /***************定義回調函數的使用者資料***********/
    typedef struct userdata
    {
        int fd;
        socket_pro cb;
    }userdata_t;
    
    static int epfd;//epoll句柄
    
    /***************發送一個檔案資料的函數***********/
    static void cws_client_request (int connfd,void *data)
    {
        struct epoll_event ev = {0};
        char buffer[1024*8] = {0};
        int ret;
        char *requestPath = NULL;
        char tmpPath[512] = {"./www/"};
        int pagesize = 0;
        ret = recv (connfd, buffer, sizeof (buffer) -1, 0);
        if (ret > 0)
        {
            if (strncmp (buffer, "GET ", 4) != 0)
            {
                printf("bad request.\n");
            }  
            if (strncmp (buffer, "GET /", 5) == 0)
            {
    
                if (strncmp (buffer, "GET / ", 6) == 0)
                {
                    strcat(tmpPath, "/index.html");
                    requestPath = tmpPath;
                }
                else
                {
                    requestPath = buffer+5;
                    char * pos = strstr(buffer+5, " ");
                    strncat(tmpPath, requestPath, pos - requestPath);
                    requestPath = tmpPath;
                }
            }
            char * badRequest = (char *)"<b>Error 404 Not Found.</b>";
            char * httpStatus200 = (char *)"HTTP/1.0 200 OK\r\nContent-Type:text/html\r\n\r\n";
    
            FILE * fp = fopen(requestPath, "r");
            FILE * connfp = fdopen(connfd, "w");
            if ( connfp == NULL )
            {
                perror("fdopen error");//cout <<"bad connfp"<<endl; 
            }    
            if (fp == NULL)
            {
                setlinebuf(connfp);
                fwrite(badRequest, strlen(badRequest), 1, connfp);
                fclose(connfp);
            }
            else
            {
                setlinebuf(connfp);
                //fwrite(httpStatus200, strlen(httpStatus200), 1, connfp);
                //fflush(connfp);
                while ((ret = fread (buffer, 1, sizeof(buffer) -1, fp)) > 0)
                {
                    fwrite(buffer, 1, ret, connfp);
                    pagesize += ret;
                    fflush(connfp);
                }
                printf("pagesize:%d\n",pagesize);//cout <<"pagesize:" << pagesize << endl;
                fclose(fp);
            }
        }
    
        //1
        close(connfd);
        epoll_ctl (epfd, EPOLL_CTL_DEL, connfd, &ev);
    }
    
    /***************處理已經連接配接socket函數***********/
    static int __process_data_fd(int fd,void *data)
    {
       struct epoll_event *ev = (struct epoll_event *)data;
       cws_client_request(fd,ev);
       free(data);
    
       return;  
    }
    
    /***************處理監聽socket函數***********/
    static int __process_listen_fd(int fd,void *data)
    {
         struct sockaddr_in caddr = {0};
         struct epoll_event ev = {0};
         int len = sizeof (caddr);
         int cfd = accept (fd, (struct sockaddr *) &caddr, (socklen_t *) & len);
         if (-1 == cfd)
         {
            perror("accpet error");//cout << "server has an error, now accept a socket fd" <<endl;
            break;
         }
         setNonBlock (cfd);
    
         userdata_t *cb_data =  malloc(sizeof(userdata_t));
         cb_data->fd = cfd;
         cb_data->cb = __process_data_fd; 
    
         ev.data.ptr = cfd;
         ev.events = EPOLLIN | EPOLLET;
         epoll_ctl (epfd, EPOLL_CTL_ADD, cfd, &ev);
    
         return 0;
    }
    
    /***************設定描述符為非阻塞***********/
    static bool setNonBlock (int fd)
    {
        int flags = fcntl (fd, F_GETFL, 0);
        flags |= O_NONBLOCK;
        if (-1 == fcntl (fd, F_SETFL, flags))
        {
            return false;
        }    
        return true;
    }
    
    /***************主函數***********/
    int main (int argc, char *argv[])
    {
        int listen_fd, nfds;
        int on = 1;
        char buffer[512];
        struct sockaddr_in saddr, caddr;
        struct epoll_event ev, events[EVENTS];
    
        signal(SIGPIPE, SIG_IGN);
    
        if (fork())
        {
            exit(0);
        }
    
    
        if (-1 == (listen_fd = socket(AF_INET, SOCK_STREAM, 0)))
        {
            perror("socket error");//cout << "create socket error!" << endl;
            return -1;
        }
    
        epfd = epoll_create (MAXFDS);
        setsockopt (listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on));
        bzero (&saddr, sizeof (saddr));
        saddr.sin_family = AF_INET;
        saddr.sin_port = htons ((short) (PORT));
        saddr.sin_addr.s_addr = INADDR_ANY;
        if (-1 == bind(listen_fd, (struct sockaddr *) &saddr, sizeof (saddr)))
        {
            perror("bind error");//cout << " cann't bind socket on server " << endl;
            return -1;
        }
    
        if (-1 == listen (listen_fd, 32))
        {
            perror("listen error");//cout << "listen error" << endl;
            return -1;
        }
        userdata_t *cb_data =  (userdata_t *)malloc(sizeof(userdata_t));
        cb_data->fd = listen_fd;
        cb_data->cb = __process_listen_fd; 
        ev.data.ptr = cb_data;
        ev.events = EPOLLIN|EPOLLET;
        epoll_ctl (epfd, EPOLL_CTL_ADD, listen_fd, &ev);
        for (;;)
        {   
            int i;
            nfds = epoll_wait (epfd, events, MAXFDS, -1);
            for (i = 0; i < nfds; ++i)
            {
                userdata_t *cb_data = (userdata_t *)events[i].data.ptr;
                cb_data.cb(cb_data.fd,cb_data);
            }
        }
        if (listen_fd > 0)
        {
            shutdown (listen_fd, SHUT_RDWR);
            close (listen_fd);
        }
    
        return 0;
    }
               

linux native aio

Linux native aio 有兩種API,一種是libaio提供的API,一種是利用系統調用封裝成的API,後者使用的較多,因為不需要額外的庫且簡單。

  • API
  • io_setup:
    是用來設定一個異步請求的上下文,第一個參數是請求事件的個數,第二個參數唯一辨別一個異步請求。
               
  • io_commit:
    是用來送出一個異步io請求的,在送出之前,需要設定一下結構體`iocb`,該結構體有以下字段需要設定:
               
    • aio_data:

      :使用者設定的資料,到時通過

      io_getevents

      函數傳回。
    • aio_lio_opcode

      :異步操作碼,有以下幾種操作:
      - `IOCB_CMD_PREAD`:讀操作,相當于調用`pread`
      - `IOCB_CMD_PWRITE`:寫操作,相當于`pwrite`
      - `IOCB_CMD_FSYNC`:同步操作,相當于調用`fsync`
      - `IOCB_CMD_FDSYNC`:同步操作,相當于調用`fdatasync`
                 
    • aio_buf

      :使用者提供的存儲資料的buffer
    • aio_offset

      :檔案的中偏移量
    • aio_nbytes

      :IO操作的資料大小
    • aio_flags

      :該字段要麼不設定,要麼設定為

      IOCB_FLAG_RESFD

      ,表示使用eventfd通知事件的完成。
    • aio_resfd

      :如果

      aio_flags

      字段設定為

      IOCB_FLAG_RESFD

      ,該字段設定為

      eventfd

      的傳回值。
  • io_getevents:
    用來擷取完成的io事件,參數`min_nr`是事件個數的的最小值,`nr`是事件個數的最大值,如果沒有足夠的事件發生,該函數會阻塞,
    
    `timeout`參數是阻塞的逾時時間。該函數會傳回一個`io_events`結構體,該結構體有以下字段:
    
    * `data`:這就是在`struct iocb`結構體`aio_data`設定的值。
    * `obj`:傳回的`struct iocb`結構體,是`io_commit`第三個參數設定數組中的值。
    * `res`和`res2`:傳回結果。
               
  • io_destroy:
    在所有時間處理完之後,調用此函數銷毀異步io請求。
               
  • 限制:

    aio隻能使用于正常的檔案IO,不能使用于socket,管道等IO。

  • 使用範例

    上面已經介紹過了,io_getevents在調用之後會阻塞直到有足夠的事件發生,是以要實作真正的異步IO,需要借助

    eventfd

    epoll

    達到目的。
  • 首先我們封裝一下系統調用來作為我們使用的API:
    int io_setup(unsigned nr, aio_context_t *ctxp)
        {
            return syscall(__NR_io_setup, nr, ctxp);
        }
    
        int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
        {
            return syscall(__NR_io_submit, ctx, nr, iocbpp);
        }
    
        int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
                         struct io_event *events, struct timespec *timeout)
        {
            return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
        }
    
        int io_destroy(aio_context_t ctx)
        {
            return syscall(__NR_io_destroy, ctx);
        }
    
        int eventfd2(unsigned int initval, int flags)
        {
            return syscall(__NR_eventfd2, initval, flags);
        }
               
    1. 定義自己的異步使用者資料和回調函數:
      typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res);//回調函數類型
      
      struct userdata//使用者資料
      {
          int64_t offset;
          int64_t filesize;
          int64_t block_size;
      };
      
      struct user_iocb//封裝結構體,以便異步傳回使用者資料。
      {
          struct iocb iocb;
          struct userdata user_cb;
      };
      
      void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2)
      {
          int64_t offset = iocb->aio_offset;
          struct custom_iocb *iocbp = (struct custom_iocb *) iocb;
          printf("data=%.*s\n",page_size, (char *) iocb->aio_buf);
      }
                 
    2. 建立異步請求:
      //異步讀取整個檔案資料
      #define page_size 1024
      char *path = "test.txt"
      
      //計算異步請求的個數
      static int get_event_num(uint64_t len)
      {
          return len / page_size + (len % page_size != 0);
      }
      
      static int get_filesize(char *path)
      {
          struct stat buf = {0};
          int iret = stat(path, &buf);
          return (iret >= 0) ? buf.st_size : iret;
      }
      int main(void)
      {
          int event_fd = 0;
          int file_size = get_filesize(path);
          int event_num = get_event_num(file_size);
          struct iocb *iocbs = malloc(event_num * sizeof (struct iocb));
          struct iocb * iocbps[event_num] = {0};
          aio_context_t ctx = 0;
      
          /****************建立使用的eventfd******************/
          event_fd = eventfd2(0, O_NONBLOCK | O_CLOEXEC);
      
          /****************建立異步請求上下文*****************/
          if (io_setup(event_num, &ctx))
          {
              perror("io_setup");
              return 4;
          }
      
          /*****************設定請求的資料********************/
          int fd = open(path, O_RDWR, 0664);
          for (int i = 0; i < event_num; i++)
          {
              struct iocb *_ = iocbs + i;
              _->aio_buf = (__u64) ((char *) buf + (i * page_size));//設定存儲請求資料的buf
              _->aio_fildes = fd;//請求的檔案描述符
              _->aio_offset = i * page_size;//讀檔案的偏移量,異步請求檔案指針不會移動的,自己設定
              _->aio_nbytes = page_size;//每個異步請求請求資料的大小
              _->aio_resfd = event_fd;//用來通知有事件完成的eventfd
              _->aio_flags = IOCB_FLAG_RESFD;//使用eventfd的标志
              _->aio_data = (__u64) aio_callback;//設定使用者資料,這裡是個回調函數
              iocbps[i] = _;
          }
      
          /*****************送出異步請求********************/
          if (io_submit(ctx, event_num, iocbps) != event_num)
          {
              perror("io_submit");
              return 6;
          }
      
          /***************利用epoll等待異步請求事件完成*****/
          int epfd = 0;
          epfd = epoll_create(1);
          if (epfd == -1)
          {
              perror("epoll_create");
              return 7;
          }
          struct epoll_event epevent = {0};
          epevent.events = EPOLLIN | EPOLLET;
          epevent.data.ptr = NULL;
          if (epoll_ctl(epfd, EPOLL_CTL_ADD, event_fd, &epevent))
          {
              perror("epoll_ctl");
              return 8;
          }
      
          /*****************處理異步請求事件********/
          int i = 0;
          while (i < event_num)
          {
              int64_t finished_aio = 0;
              if (epoll_wait(epfd, &epevent, 1, -1) != 1)//等待事件發生
              {
                  perror("epoll_wait");
                  return 9;
              }
      
              //讀取異步事件完成的個數,finished_aio接收
              if (read(event_fd, &finished_aio, sizeof (finished_aio)) != sizeof (finished_aio))
              {
                  perror("read");
                  return 10;
              }
              printf("finished io number: %"PRIu64"\n", finished_aio);
      
              struct timespec tms;
              struct io_event events[event_num];
              while (finished_aio > 0)
              {
                  tms.tv_sec = 0;
                  tms.tv_nsec = 0;
                  int r = io_getevents(ctx, 1, event_num, events, &tms);//擷取發生的事件
                  if (r > 0)
                  {
                      int j = 0;
                      for (j = 0; j < r; ++j)
                      {
                          ((io_callback_t *) (events[j].data))(ctx, (struct iocb *) events[j].obj, events[j].res,                                                             events[j].res2);//調用使用者callback
                      }
                      i += r;
                      finished_aio -= r;
                  }
              }
          }
      
          /*****************處理異步事件結束********/
          close(epfd);
          io_destroy(ctx);
          close(fd);
          close(event_fd);
      
          return 0;
      
      }
                 

SIGIO

在Linux下,每一個檔案描述符都可以設定

O_ASYNC

辨別,當檔案可讀或者可寫時可以發送信号通知相關程序,預設信号是

SIGIO

,可以

使用

fcntl

函數改變這個預設信号。

  • 限制:
    1. 在多個檔案描述符的情況下,信号驅動模式不能确定哪個描述符是可以讀或者寫的,是以必須周遊每一個描述符來判斷,一個

      好的方法是使用

      epoll

      擷取哪個描述符可讀或者寫。
    2. 隻能用于socket、管道和終端IO,不能用于普通檔案的IO。
  • 使用範例
  • 設定描述符的

    O_ASYNC

    辨別和描述符的歸屬程序。
  • 設定檔案描述符為非阻塞。
  • 設定信号

    SIGIO

    的處理動作。
    void new_op(int signum, siginfo_t *info, void *myact)//信号處理函數的實作
        {
            int i;
            for (i = 0; i < 1; i++)
            {
                printf("data:%u\n", info->si_int);
            }
            char buf[10] = {0};
            while(read(STDIN_FILENO,buf,10) > 0)
            {
               printf("input:%s",buf);
            }
            printf("handle signal %d over;\n", signum);
        }
    
    
        int main(void)
        {
            int oflags;
            struct sigaction act;
    
            sigemptyset(&act.sa_mask);
            act.sa_sigaction = new_op; //信号處理函數
            if (sigaction(SIGPOLL, &act, NULL) < 0)
            {
                printf("install sigal error\n");
            }
    
            fcntl(STDIN_FILENO, F_SETOWN, getpid());
            oflags = fcntl(STDIN_FILENO, F_GETFL);
            fcntl(STDIN_FILENO, F_SETFL, oflags | FASYNC|O_NONBLOCK);
    
            while (1){
                sleep(1);
            };
        }