io_uring是Linux核心在v5.1引入的一套異步IO接口,和aio不同的是,它可以提供更高的性能
io_uring 具體有三個系統調用
分别是
io_uring_setup
,
io_uring_enter
,
io_uring_register
,我們可以通過這三個系統調用來完成異步事件送出,收割,自己處理的流程
異步io的優點在于,不用我們自己去等待 io操作的完成,我們隻需要告訴核心,我們的任務,核心來幫我們完成。這樣就可以讓我們的程序去幹其他的事情,實作更高的吞吐量。
io_uring 利用 mmap 開辟出一塊空間,讓使用者态和核心态的程式都可以共享的一塊區域
io_uring 分為 送出隊列 和 完成隊列
使用者送出的任務放在送出隊列中,由核心去處理,處理好的東西會放在完成隊列中。
核心如何處理,不是使用者關系的問題,核心可以回調、輪詢的方式都可以進行處理。
使用者隻需要設定好就可以使用
由于這三個系統調用要用好并不容易
開發作者也提供了一個liburing來給我們使用
注: 核心版本最好高一點,比如 Linux 5.4 不支援 read,但支援 readv (親身教訓)
除了 io_uring的結構要了解外,我們還需了解兩個用到的東西
這兩個分别代表了 完成隊列和送出隊列的一項元素
其中的user_data可以是一個可以由我們進行diy的指針
struct io_uring_cqe {
__u64 user_data; /* sqe->data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;
/*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
__u64 big_cqe[];
};
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};
我們來看兩個使用 liburing的簡單例子
例子 1
第一個例子诠釋了 io_uring 最直接的一個流程
/**
* 讀取檔案
**/
#include <bits/stdc++.h>
#include <liburing.h>
#include <unistd.h>
char buf[1024] = {0};
int main() {
int fd = open("1.txt", O_RDONLY, 0);
io_uring ring;
io_uring_queue_init(32, &ring, 0); // 初始化
auto sqe = io_uring_get_sqe(&ring); // 從環中得到一塊空位
io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 為這塊空位準備好操作
io_uring_submit(&ring); // 送出任務
io_uring_cqe* res; // 完成隊列指針
io_uring_wait_cqe(&ring, &res); // 阻塞等待一項完成的任務
assert(res);
std::cout << "read bytes: " << res->res << " \n";
std::cout << buf << std::endl;
io_uring_cqe_seen(&ring, res); // 将任務移出完成隊列
io_uring_queue_exit(&ring); // 退出
return 0;
}
io_uring 有三個東西
送出隊列
完成隊列
任務實體
送出隊列和完成隊列都可以看成持有一項指針
我們得到一個 任務實體,通過
io_uring_prep_read
準備任務 和
io_uring_submit
送出任務
送出任務之後就到了送出隊列中去
在送出隊列裡面,核心操作完以後。
任務就到了完成隊列中去。
然後我們可以阻塞等待
io_uring_wait_cqe
一項任務
當然,我們也可以使用非阻塞的方式,去幹其他事情
在拿到這一項任務之後,我們就可以對其進行處理,處理完成記得 從完成隊列中清除
(至于 完成隊列和送出隊列是如何高效的且不出錯的并發執行 暫且不談)
例子 2
echo_server
上述 的 io_uring寫着還是比較長,我們可以把它封裝一下。
比如要用 read的操作,要用accpet的操作,都給他封裝一下
同時,我們在寫echo_server時,我們是 幾個不同的操作
可能是 ACCEPT 操作,可能是 READ 操作, 可能是 WRITE操作
并且READ和WRITE操作都要有自己的緩沖區
是以,我們定義一下我們在任務之間傳遞的結構體
然後把它放到 user_data 中。
__u64 user_data; /* data to be passed back at completion time */
結構體如下:
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
我們對可能用到的操作進行一下封裝
class IOuring {
io_uring ring;
public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }
~IOuring() { io_uring_queue_exit(&ring); }
void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }
int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }
int submit() { return io_uring_submit(&ring); }
void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}
void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};
整體代碼
#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
const int BUFSIZE = 1024;
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
class IOuring {
io_uring ring;
public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }
~IOuring() { io_uring_queue_exit(&ring); }
void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }
int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }
int submit() { return io_uring_submit(&ring); }
void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}
void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};
int main() {
/*init socket*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in sock_addr;
sock_addr.sin_port = htons(8000);
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
perror("");
listen(sock_fd, 10);
std::cout << "listen begin ..." << std::endl;
/*io_uring*/
IOuring ring(1024);
ring.accpet_asyn(sock_fd, new request);
ring.submit();
while (true) {
io_uring_cqe* cqe;
ring.wait(&cqe);
request* res = (request*)cqe->user_data;
switch (res->state) {
case request::ACCEPT:
if (cqe->res > 0) {
int client_fd = cqe->res;
ring.accpet_asyn(sock_fd, res);
ring.read_asyn(client_fd, new request);
ring.submit();
}
std::cout << cqe->res << std::endl;
break;
case request::READ:
if (cqe->res > 0) std::cout << res->buf << std::endl;
ring.write_asyn(res->fd, res);
ring.submit();
break;
case request::WRITE:
if (cqe->res > 0) {
close(res->fd);
delete res;
}
break;
default:
std::cout << "error " << std::endl;
break;
}
ring.seen(cqe);
}
return 0;
}
在這裡,我們的程式隻是簡單的單線程,對于任務,我們可以将其放入工作線程中進行操作
這樣我們的主線程主負責事件分發,工作線程負責處理邏輯
[amjieker]