天天看點

io_uring 之 liburing 的簡單使用例子 1例子 2

io_uring是Linux核心在v5.1引入的一套異步IO接口,和aio不同的是,它可以提供更高的性能

io_uring 具體有三個系統調用

分别是

io_uring_setup

,

io_uring_enter

,

io_uring_register

,我們可以通過這三個系統調用來完成異步事件送出,收割,自己處理的流程

異步io的優點在于,不用我們自己去等待 io操作的完成,我們隻需要告訴核心,我們的任務,核心來幫我們完成。這樣就可以讓我們的程序去幹其他的事情,實作更高的吞吐量。

io_uring 利用 mmap 開辟出一塊空間,讓使用者态和核心态的程式都可以共享的一塊區域

io_uring 分為 送出隊列 和 完成隊列

使用者送出的任務放在送出隊列中,由核心去處理,處理好的東西會放在完成隊列中。

核心如何處理,不是使用者關系的問題,核心可以回調、輪詢的方式都可以進行處理。

使用者隻需要設定好就可以使用

由于這三個系統調用要用好并不容易

開發作者也提供了一個liburing來給我們使用

注: 核心版本最好高一點,比如 Linux 5.4 不支援 read,但支援 readv (親身教訓)

除了 io_uring的結構要了解外,我們還需了解兩個用到的東西

這兩個分别代表了 完成隊列和送出隊列的一項元素

其中的user_data可以是一個可以由我們進行diy的指針

struct io_uring_cqe {
	__u64	user_data;	/* sqe->data submission passed back */
	__s32	res;		/* result code for this event */
	__u32	flags;

	/*
	 * If the ring is initialized with IORING_SETUP_CQE32, then this field
	 * contains 16-bytes of padding, doubling the size of the CQE.
	 */
	__u64 big_cqe[];
};
           
struct io_uring_sqe {
	__u8	opcode;		/* type of operation for this sqe */
	__u8	flags;		/* IOSQE_ flags */
	__u16	ioprio;		/* ioprio for the request */
	__s32	fd;		/* file descriptor to do IO on */
	union {
		__u64	off;	/* offset into file */
		__u64	addr2;
		struct {
			__u32	cmd_op;
			__u32	__pad1;
		};
	};
	union {
		__u64	addr;	/* pointer to buffer or iovecs */
		__u64	splice_off_in;
	};
	__u32	len;		/* buffer size or number of iovecs */
	union {
		__kernel_rwf_t	rw_flags;
		__u32		fsync_flags;
		__u16		poll_events;	/* compatibility */
		__u32		poll32_events;	/* word-reversed for BE */
		__u32		sync_range_flags;
		__u32		msg_flags;
		__u32		timeout_flags;
		__u32		accept_flags;
		__u32		cancel_flags;
		__u32		open_flags;
		__u32		statx_flags;
		__u32		fadvise_advice;
		__u32		splice_flags;
		__u32		rename_flags;
		__u32		unlink_flags;
		__u32		hardlink_flags;
		__u32		xattr_flags;
		__u32		msg_ring_flags;
	};
	__u64	user_data;	/* data to be passed back at completion time */
	/* pack this to avoid bogus arm OABI complaints */
	union {
		/* index into fixed buffers, if used */
		__u16	buf_index;
		/* for grouped buffer selection */
		__u16	buf_group;
	} __attribute__((packed));
	/* personality to use, if used */
	__u16	personality;
	union {
		__s32	splice_fd_in;
		__u32	file_index;
		struct {
			__u16	addr_len;
			__u16	__pad3[1];
		};
	};
	union {
		struct {
			__u64	addr3;
			__u64	__pad2[1];
		};
		/*
		 * If the ring is initialized with IORING_SETUP_SQE128, then
		 * this field is used for 80 bytes of arbitrary command data
		 */
		__u8	cmd[0];
	};
};
           

我們來看兩個使用 liburing的簡單例子

例子 1

第一個例子诠釋了 io_uring 最直接的一個流程
/**
* 讀取檔案
**/
#include <bits/stdc++.h>
#include <liburing.h>
#include <unistd.h>

char buf[1024] = {0};

int main() {
  int fd = open("1.txt", O_RDONLY, 0);
  
  io_uring ring;
  io_uring_queue_init(32, &ring, 0); // 初始化
  auto sqe = io_uring_get_sqe(&ring); // 從環中得到一塊空位
  io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 為這塊空位準備好操作
  io_uring_submit(&ring); // 送出任務
  io_uring_cqe* res; // 完成隊列指針
  io_uring_wait_cqe(&ring, &res); // 阻塞等待一項完成的任務
  assert(res);
  std::cout << "read bytes: " << res->res << " \n";
  std::cout << buf << std::endl;
  io_uring_cqe_seen(&ring, res); // 将任務移出完成隊列
  io_uring_queue_exit(&ring); // 退出
  return 0;
}

           

io_uring 有三個東西

送出隊列

完成隊列

任務實體

送出隊列和完成隊列都可以看成持有一項指針

我們得到一個 任務實體,通過

io_uring_prep_read

準備任務 和

io_uring_submit

送出任務

送出任務之後就到了送出隊列中去

在送出隊列裡面,核心操作完以後。

任務就到了完成隊列中去。

然後我們可以阻塞等待

io_uring_wait_cqe

一項任務

當然,我們也可以使用非阻塞的方式,去幹其他事情

在拿到這一項任務之後,我們就可以對其進行處理,處理完成記得 從完成隊列中清除

(至于 完成隊列和送出隊列是如何高效的且不出錯的并發執行 暫且不談)

例子 2

echo_server

上述 的 io_uring寫着還是比較長,我們可以把它封裝一下。

比如要用 read的操作,要用accpet的操作,都給他封裝一下

同時,我們在寫echo_server時,我們是 幾個不同的操作

可能是 ACCEPT 操作,可能是 READ 操作, 可能是 WRITE操作

并且READ和WRITE操作都要有自己的緩沖區

是以,我們定義一下我們在任務之間傳遞的結構體

然後把它放到 user_data 中。

__u64	user_data;	/* data to be passed back at completion time */
           

結構體如下:

struct request {
  enum STATE { ACCEPT, READ, WRITE };
  int fd;
  STATE state;
  union {
    struct {
      sockaddr_in ipv4_addr;
      socklen_t lens;
    } addr;
    char buf[BUFSIZE];
  };
};
           

我們對可能用到的操作進行一下封裝

class IOuring {
  io_uring ring;

 public:
  IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

  ~IOuring() { io_uring_queue_exit(&ring); }

  void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

  int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

  int submit() { return io_uring_submit(&ring); }

  void accpet_asyn(int sock_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::ACCEPT;
    body->fd = sock_fd;
    body->addr.lens = sizeof(sockaddr_in);
    io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
                         &(body->addr.lens), 0);
    io_uring_sqe_set_data(sqe, body);
  }

  void read_asyn(int client_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::READ;
    body->fd = client_fd;
    io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
    io_uring_sqe_set_data(sqe, body);
  }

  void write_asyn(int client_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::WRITE;
    body->fd = client_fd;
    io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
    io_uring_sqe_set_data(sqe, body);
  }
};

           

整體代碼

#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

const int BUFSIZE = 1024;
struct request {
  enum STATE { ACCEPT, READ, WRITE };
  int fd;
  STATE state;
  union {
    struct {
      sockaddr_in ipv4_addr;
      socklen_t lens;
    } addr;
    char buf[BUFSIZE];
  };
};
class IOuring {
  io_uring ring;

 public:
  IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

  ~IOuring() { io_uring_queue_exit(&ring); }

  void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

  int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

  int submit() { return io_uring_submit(&ring); }

  void accpet_asyn(int sock_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::ACCEPT;
    body->fd = sock_fd;
    body->addr.lens = sizeof(sockaddr_in);
    io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
                         &(body->addr.lens), 0);
    io_uring_sqe_set_data(sqe, body);
  }

  void read_asyn(int client_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::READ;
    body->fd = client_fd;
    io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
    io_uring_sqe_set_data(sqe, body);
  }

  void write_asyn(int client_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::WRITE;
    body->fd = client_fd;
    io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
    io_uring_sqe_set_data(sqe, body);
  }
};

int main() {
  /*init socket*/
  int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  sockaddr_in sock_addr;
  sock_addr.sin_port = htons(8000);
  sock_addr.sin_family = AF_INET;
  sock_addr.sin_addr.s_addr = INADDR_ANY;
  int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
  perror("");
  listen(sock_fd, 10);

  std::cout << "listen begin ..." << std::endl;

  /*io_uring*/
  IOuring ring(1024);

  ring.accpet_asyn(sock_fd, new request);
  ring.submit();

  while (true) {
    io_uring_cqe* cqe;
    ring.wait(&cqe);
    request* res = (request*)cqe->user_data;
    switch (res->state) {
      case request::ACCEPT:
        if (cqe->res > 0) {
          int client_fd = cqe->res;
          ring.accpet_asyn(sock_fd, res);
          ring.read_asyn(client_fd, new request);
          ring.submit();
        }
        std::cout << cqe->res << std::endl;
        break;
      case request::READ:
        if (cqe->res > 0) std::cout << res->buf << std::endl;
        ring.write_asyn(res->fd, res);
        ring.submit();
        break;
      case request::WRITE:
        if (cqe->res > 0) {
          close(res->fd);
          delete res;
        }
        break;
      default:
        std::cout << "error " << std::endl;
        break;
    }
    ring.seen(cqe);
  }

  return 0;
}
           

在這裡,我們的程式隻是簡單的單線程,對于任務,我們可以将其放入工作線程中進行操作

這樣我們的主線程主負責事件分發,工作線程負責處理邏輯

[amjieker]

繼續閱讀