天天看點

從linux核心看io_uring的實作

作者:linux技術棧

本文介紹一下io_uring在核心的實作,因為io_uring實作代碼量大,邏輯複雜,是以隻能慢慢分析。這一篇介紹io_uring初始化接口io_uring_setup的實作。

io_uring_setup的聲明非常簡單,但是實作的細節卻非常複雜,下面我們開始分析。

static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
	struct io_uring_params p;
	int i;

	if (copy_from_user(&p, params, sizeof(p)))
		return -EFAULT;
	// 支援的flag
	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
			IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ))
		return -EINVAL;

	return  io_uring_create(entries, &p, params);
}           

io_uring_setup是對io_uring_create的封裝。第一個參數entries指定請求隊列的長度,第二個參數params是用于調用方和核心通信的結構體。我們看一下定義。

struct io_uring_params {
	// 定義請求隊列長度(2的sq_entries次方),調用方定義
	__u32 sq_entries;
	// 完成隊列長度,預設是2 * 請求隊列長度
	__u32 cq_entries;
	// 控制核心行為的标記
	__u32 flags;
	// poll模式下開啟的核心線程綁定的cpu
	__u32 sq_thread_cpu;
	// poll模式下開啟的核心線程空閑時間,之後會挂起。
	__u32 sq_thread_idle;
	// 核心目前支援的能力,核心設定
	__u32 features;
	__u32 wq_fd;
	__u32 resv[3];
	// 記錄核心資料的結構體,調用方後續調用mmap需要用到。
	struct io_sqring_offsets sq_off;
	struct io_cqring_offsets cq_off;
};           

我們接着看io_uring_create。

static int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params)
{
struct user_struct *user = NULL;
struct io_ring_ctx *ctx;
bool limit_mem;
int ret;
p->sq_entries = roundup_pow_of_two(entries);
// 自定義完成隊列長度
if (p->flags & IORING_SETUP_CQSIZE) {
p->cq_entries = roundup_pow_of_two(p->cq_entries);
// 完成隊列不能小于請求隊列
if (p->cq_entries < p->sq_entries)
return -EINVAL;
// 超過門檻值則需要設定IORING_SETUP_CLAMP标記
if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
if (!(p->flags & IORING_SETUP_CLAMP))
return -EINVAL;
p->cq_entries = IORING_MAX_CQ_ENTRIES;
}
} else {
// 預設是兩倍的請求隊列長度
p->cq_entries = 2 * p->sq_entries;
}
// 使用者資訊
user = get_uid(current_user());
// 配置設定一個ctx記錄上下文,因為調用方隻能拿到fd,後續操作fd的時候會拿到關聯的上下文
ctx = io_ring_ctx_alloc(p);
ctx->user = user;
// 和poll模式相關的資料結構
ctx->sqo_task = get_task_struct(current);
// 配置設定一個io_rings
ret = io_allocate_scq_urings(ctx, p);
// 處理poll模式的邏輯
ret = io_sq_offload_start(ctx, p);
// 後面還有很多,一會分析
}           

io_uring_create代碼比較多,我們分步分析。首先配置設定了一個io_ring_ctx結構體,這是核心的資料結構,用于記錄io_uring執行個體的上下文,不過我們暫時不需要了解它具體的定義,因為實在太多,隻關注本文相關的字段。

從linux核心看io_uring的實作

1 配置設定一個io_rings結構體

接着調用io_allocate_scq_urings配置設定一個io_rings結構體,這是非常核心的邏輯,我們看一下io_rings的定義。

struct io_rings {
	struct io_uring		sq, cq;
	u32			sq_ring_mask, cq_ring_mask;
	u32			sq_ring_entries, cq_ring_entries;
	u32			sq_dropped;
	u32			sq_flags;
	u32         cq_flags;
	u32			cq_overflow;
	struct io_uring_cqe	cqes[];
};           

io_rings主要用于記錄請求和完成隊列的資訊。我們繼續看io_allocate_scq_urings。

static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
				  struct io_uring_params *p)
{
	struct io_rings *rings;
	size_t size, sq_array_offset;
	// 記錄請求和完成隊列大小到ctx
	ctx->sq_entries = p->sq_entries;
	ctx->cq_entries = p->cq_entries;
	/* 
		計算結構體和額外數組的大小,sq_array_offset儲存結構體大小,
		size儲存結構體+額外數組+另一個額外數組的大小
	*/
	size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
	// 配置設定記憶體
	rings = io_mem_alloc(size);
	// ...
}           

io_allocate_scq_urings細節比較多,我們分開分析,我們看一下rings_size的邏輯。

static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
				size_t *sq_offset)
{
	struct io_rings *rings;
	size_t off, sq_array_size;
	// 計算結構體和格外數組的大小,見io_rings定義
	off = struct_size(rings, cqes, cq_entries);
	// sq_offset記錄結構體大小
	if (sq_offset)
		*sq_offset = off;
	// 計算多個u32元素的數組的大小
	sq_array_size = array_size(sizeof(u32), sq_entries);
	// 計算結構體大小 + sq_array_size的大小儲存到off
	if (check_add_overflow(off, sq_array_size, &off))
		return SIZE_MAX;
	return off;
}           

struct_size是計算結構體和額外字段大小的宏,我們剛才看到io_rings結構體的定義中,最後一個字段是struct io_uring_cqe cqes[],看起來是個空數組,其實他的記憶體是緊跟着結構體後面配置設定的,結構如下。

從linux核心看io_uring的實作

下面我們看struct_size是如何計算的。

#define struct_size(p, member, count)					\
	__ab_c_size(count,						\
		    sizeof(*(p)->member) + __must_be_array((p)->member),\
		    sizeof(*(p)))

static inline __must_check size_t __ab_c_size(size_t a, size_t b, size_t c)
{
	size_t bytes;
	// 計算a * b儲存到bytes
	if (check_mul_overflow(a, b, &bytes))
		return SIZE_MAX;
	// 計算bytes + c儲存搭配bytes
	if (check_add_overflow(bytes, c, &bytes))
		return SIZE_MAX;

	return bytes;
}           

我們看到計算方式就是數組元素大小*元素個數+結構體本身的大小。計算完結構體大小後又通過array_size計算了另一個數組的大小并加起來,是以io_rings的結構體如下所示。

從linux核心看io_uring的實作

配置設定了io_rings之後我們繼續看接下來的邏輯。

static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
// ...
// 記錄到ctx中
ctx->rings = rings;
// sq_array記錄rings結構體中,u32數組的首位址
ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
// 用于回環處理
rings->sq_ring_mask = p->sq_entries - 1;
rings->cq_ring_mask = p->cq_entries - 1;
// 隊列長度
rings->sq_ring_entries = p->sq_entries;
rings->cq_ring_entries = p->cq_entries;
ctx->sq_mask = rings->sq_ring_mask;
ctx->cq_mask = rings->cq_ring_mask;
// 請求隊列的數組大小
size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
// 配置設定記憶體并記錄到sq_sqes
ctx->sq_sqes = io_mem_alloc(size);
return 0;
}           

進行了一系列設定後,架構如下。

從linux核心看io_uring的實作

建立完io_rings結構體後,我們繼續回到io_uring_create中。

相關視訊推薦

io_uring 新起之秀的linux io模式,是如何媲美epoll的

linux下的epoll實戰揭秘——支撐億級IO的底層基石

學習位址:C/C++Linux伺服器開發/背景架構師【零聲教育】-學習視訊教程-騰訊課堂

需要C/C++ Linux伺服器架構師學習資料加qun812855908擷取(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享

從linux核心看io_uring的實作

2 設定io_uring_params

核心申請完系列結構體後,需要通過io_uring_params結構體傳回給調用方。

static int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params) {
ret = io_allocate_scq_urings(ctx, p);
// 初始化poll模式相關邏輯,如果開啟了的話
ret = io_sq_offload_start(ctx, p);
memset(&p->sq_off, 0, sizeof(p->sq_off));
// 記錄字段在結構體的偏移
p->sq_off.head = offsetof(struct io_rings, sq.head);
p->sq_off.tail = offsetof(struct io_rings, sq.tail);
p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
p->sq_off.flags = offsetof(struct io_rings, sq_flags);
p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;

memset(&p->cq_off, 0, sizeof(p->cq_off));
p->cq_off.head = offsetof(struct io_rings, cq.head);
p->cq_off.tail = offsetof(struct io_rings, cq.tail);
p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
p->cq_off.cqes = offsetof(struct io_rings, cqes);
p->cq_off.flags = offsetof(struct io_rings, cq_flags);
// 核心支援的屬性
p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
IORING_FEAT_POLL_32BITS;
copy_to_user(params, p, sizeof(*p))
// 擷取fd
ret = io_uring_get_fd(ctx);
return ret;
}           

io_uring_create繼續進行了一系列指派,指派完後架構如下。

從linux核心看io_uring的實作

3 擷取檔案描述符

核心通過io_uring_get_fd擷取檔案描述符傳回給調用方。

static int io_uring_get_fd(struct io_ring_ctx *ctx)
{
struct file *file;
// 擷取一個可用fd
int ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
// 配置設定一個file結構體,設定函數集為io_uring_fops,并關聯上下文ctx
file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
O_RDWR | O_CLOEXEC);
// 關聯fd和file結構體
fd_install(ret, file);
return ret;
}           

io_uring_get_fd申請了一個fd和file,這是遵循vfs的設計,最重要的是把io_uring的函數集挂在到file上,後續通過fd操作的io_uring執行個體的時候,經過vfs後就會執行對應的函數,另外還需要把ctx和file關聯起來,因為後續通過fd操作io_uring時,需要拿到fd對應的io_uring上下文。至此。

io_uring_setup就分析完了,但是還不能使用。io_uring在設計中,為了減少系統調用和使用者、核心資料通信的成本,實作了使用者、核心共享資料結構的方式,這樣使用者和核心就可以操作同一份資料結構達到通信目的,而不用通過系統調用,更不需要設計來回複制。為了達到這個目的,使用者拿到io_uring執行個體後,還需要調用mmap擷取對應的記憶體映射。我們通過liburing庫的邏輯來分析。

4 從liburing庫看io_uring的使用

int io_uring_queue_init_params(unsigned entries, struct io_uring *ring,
struct io_uring_params *p)
{
int fd, ret;
// 調用io_uring_setup,拿到fd
fd = __sys_io_uring_setup(entries, p);
if (fd < 0)
return -errno;
// 記憶體映射
ret = io_uring_queue_mmap(fd, p, ring);
// 儲存系統支援的屬性
ring->features = p->features;
return 0;
}           

我們重點看一下io_uring_queue_mmap。

int io_uring_queue_mmap(int fd, struct io_uring_params *p, struct io_uring *ring)
{
int ret;
memset(ring, 0, sizeof(*ring));
ret = io_uring_mmap(fd, p, &ring->sq, &ring->cq);
// 記錄flags和fd
if (!ret) {
ring->flags = p->flags;
ring->ring_fd = fd;
}
return ret;
}           

繼續看io_uring_mmap。

static int io_uring_mmap(int fd, struct io_uring_params *p,
struct io_uring_sq *sq, struct io_uring_cq *cq)
{
size_t size;
int ret;
// 請求隊列需要映射的記憶體大小,即整個結構體struct io_rings結構體的大小
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
// 請求隊列和完成隊列映射的記憶體大小一樣,等于請求隊列的
cq->ring_sz = sq->ring_sz;
// 映射并拿到虛拟位址,大小是sq->ring_sz
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
cq->ring_ptr = sq->ring_ptr;
// 通過首位址和偏移拿到對應字段的位址
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
// 映射儲存請求隊列節點的記憶體
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd,
IORING_OFF_SQES);
// 同上
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
if (p->cq_off.flags)
cq->kflags = cq->ring_ptr + p->cq_off.flags;
return 0;
}           

io_uring_mmap除了儲存一些常用的字段資訊外,最重要的是做了記憶體映射。我們看看mmap的最後一個參數分别是IORING_OFF_SQ_RING和IORING_OFF_SQES,接下來我們看看io_uring的mmap鈎子的實作。

static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
{
size_t sz = vma->vm_end - vma->vm_start;
unsigned long pfn;
void *ptr;
ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);

pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
}
static void *io_uring_validate_mmap_request(struct file *file,
loff_t pgoff, size_t sz)
{
struct io_ring_ctx *ctx = file->private_data;
loff_t offset = pgoff << PAGE_SHIFT;
struct page *page;
void *ptr;
switch (offset) {
case IORING_OFF_SQ_RING:
case IORING_OFF_CQ_RING:
ptr = ctx->rings;
break;
case IORING_OFF_SQES:
ptr = ctx->sq_sqes;
break;
default:
return ERR_PTR(-EINVAL);
}
page = virt_to_head_page(ptr);
if (sz > page_size(page))
return ERR_PTR(-EINVAL);
return ptr;
}           

這裡設計的内容涉及到了複雜的記憶體管理,從代碼中我們大概知道,傳回的位址分别是ctx->rings和ctx->sq_sqes。即我們操作mmap傳回的虛拟位址時,映射到核心的資料結構是ctx的字段。這樣就完成了資料共享。最後形成的架構圖如下。

從linux核心看io_uring的實作

至此,分析就告一段落,io_uring的實作實在是複雜,需要反複閱讀和思考,才能慢慢了解和了解它的原理。

後記:io_uring作為新一代IO架構,未來應該會在各大軟體中使用,尤其是對性能有極高要求的伺服器,是以是非常值得關注和學習的。

繼續閱讀