天天看点

03_异步通知与异步I/O

异步通知: 一旦设备就绪, 则主动通知应用程序, 这样应用程序根本就不需要查询设备状态, 这一点非常类似于硬件上“中断”的概念, 比较准确的称谓是“信号驱动的异步I/O”。

阻塞I/O意味着一直等待设备可访问后再访问, 非阻塞I/O中使用poll() 意味着查询设备是否可访问, 而异步通知则意味着设备通知用户自身可访问, 之后用户再进行I/O处理。

03_异步通知与异步I/O

1、linux中的信号

信号进行进程间通信(IPC) 是UNIX中的一种传统机制, Linux也支持这种机制。

Linux中可用的信号及其定义如表

03_异步通知与异步I/O

除了SIGSTOP和SIGKILL两个信号外, 进程能够忽略或捕获其他的全部信号。 一个信号被捕获的意思是当一个信号到达时有相应的代码处理它。 如果一个信号没有被这个进程所捕获, 内核将采用默认行为处理。

范例:

在进程执行时, 按下“Ctrl+C”将向其发出SIGINT信号, 正在运行kill的进程将向其发出SIGTERM信号, 代码中的进程可捕获这两个信号并输出信号值

void sigterm_handler(int signo)
{
	printf("Have caught sig N.O. %d\n", signo);
	exit(0);
}

int main(void)
{
	signal(SIGINT, sigterm_handler);
	signal(SIGTERM, sigterm_handler);
	while(1);

	return 0;
}
           

sigaction() 函数可用于改变进程接收到特定信号后的行为

int sigaction(int signum,const struct sigaction *act,struct sigaction *oldact));

该函数的第一个参数为信号的值, 可以是除SIGKILL及SIGSTOP外的任何一个特定有效的信号。

第二个参数是指向结构体sigaction的一个实例的指针, 在结构体sigaction的实例中, 指定了对特定信号的处理函数, 若为空, 则进程会以缺省方式对信号处理;

第三个参数oldact指向的对象用来保存原来对相应信号的处理函数, 可指定oldact为NULL。 如果把第二、 第三个参数都设为NULL, 那么该函数可用于检查信号的有效性。

信号实现异步通信范例:

通过signal(SIGIO, input_handler) 对标准输入文件描述符STDIN_FILENO启动信号机制。 用户输入后, 应用程序将接收到SIGIO信号, 其处理函数input_handler() 将被调用, 代码如下:

#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#define MAX_LEN 100
void input_handler(int num)
{
	char data[MAX_LEN];
	int len;

	/* 读取并输出STDIN_FILENO上的输入 */
	len = read(STDIN_FILENO, &data, MAX_LEN);
	data[len] = 0;
	printf("input available:%s\n", data);
}

main()
{
	int oflags;

	/* 启动信号驱动机制 */
	signal(SIGIO, input_handler);
	fcntl(STDIN_FILENO, F_SETOWN, getpid());
	oflags = fcntl(STDIN_FILENO, F_GETFL);
	fcntl(STDIN_FILENO, F_SETFL, oflags | FASYNC);

	/* 最后进入一个死循环, 仅为保持进程不终止, 如果程序中
	没有这个死循会立即执行完毕 */
	while (1);
}
           

F_SETOWN:一般是使用语句 fcntl(common_fd, F_SETOWN, getpid()),对于某些多进程共用的文件描述符,比如标准输入输出,我们要让操作系统知道这些信号要发往哪个进程。因为每一个进程都有标准输入输出,所以我们需要让操作系统知道当前的标准输入输出属于哪个进程,从而可以对进程发送信号

测试如下:

03_异步通知与异步I/O

2、信号释放

为了使设备支持异步通知机制, 驱动程序中涉及3项工作。

1) 支持F_SETOWN命令, 能在这个控制命令处理中设置filp->f_owner为对应进程ID。 不过此项工作已由内核完成, 设备驱动无须处理。

2) 支持F_SETFL命令的处理, 每当FASYNC标志改变时, 驱动程序中的fasync() 函数将得以执行。因此, 驱动中应该实现fasync() 函数。

3) 在设备资源可获得时, 调用kill_fasync() 函数激发相应的信号

03_异步通知与异步I/O

异步通知结构体

fasync_struct

处理FASYNC标志变更的函数

int fasync_helper(int fd, struct file *filp, int mode, struct fasync_struct **fa);

释放信号用的函数

void kill_fasync(struct fasync_struct **fa, int sig, int band);

3、支持异步通知的globalfifo驱动

将异步结构体指针添加到globalfifo_dev设备结构体内

struct globalfifo_dev {
	struct cdev cdev;
	unsigned int current_len;
	unsigned char mem[GLOBALFIFO_SIZE];
	struct mutex mutex;
	wait_queue_head_t r_wait;
	wait_queue_head_t w_wait;
	struct fasync_struct *async_queue;
};
           

globalfifo设备驱动fasync() 函数

static int globalfifo_fasync(int fd, struct file *filp, int mode)
{
	struct globalfifo_dev *dev = filp->private_data;
	return fasync_helper(fd, filp, mode, &dev->async_queue);
}
           

在globalfifo设备被正确写入之后, 它变得可读, 这个时候驱动应释放SIGIO信号, 以便应用程序捕获, 如下代码给出了支持异步通知的globalfifo设备驱动的写函数

static ssize_t globalfifo_write(struct file *filp, const char __user *buf, size_t count, loff_t *ppos)
{
	struct globalfifo_dev *dev = filp->private_data;
	int ret;
	DECLARE_WAITQUEUE(wait, current);

	mutex_lock(&dev->mutex);
	add_wait_queue(&dev->w_wait, &wait);

	while (dev->current_len == GLOBALFIFO_SIZE) {
		if (filp->f_flags & O_NONBLOCK) {
			ret = -EAGAIN;
			goto out;
		}
		__set_current_state(TASK_INTERRUPTIBLE);

		mutex_unlock(&dev->mutex);

		schedule();
		if (signal_pending(current)) {
			ret = -ERESTARTSYS;
			goto out2;
		}

		mutex_lock(&dev->mutex);
	}
	if (count > GLOBALFIFO_SIZE - dev->current_len)
		count = GLOBALFIFO_SIZE - dev->current_len;

	if (copy_from_user(dev->mem + dev->current_len, buf, count)) {
		ret = -EFAULT;
		goto out;
	} else {
		dev->current_len += count;
		printk(KERN_INFO "written %d bytes(s),current_len:%d\n", count,
		dev->current_len);

		wake_up_interruptible(&dev->r_wait);

		if (dev->async_queue) {
			kill_fasync(&dev->async_queue, SIGIO, POLL_IN);
			printk(KERN_DEBUG "%s kill SIGIO\n", __func__);
		}

		ret = count;
	}

out:
	mutex_unlock(&dev->mutex);
out2:
	remove_wait_queue(&dev->w_wait, &wait);
	set_current_state(TASK_RUNNING);
	return ret;
}
           

用户空间验证globalfifo异步通知的程序, 这个程序在接收到由globalfifo发出的信号后将输出信号值, 代码如下:

static void signalio_handler(int signum)
{
	printf("receive a signal from globalfifo,signalnum:%d\n", signum);
}

void main(void)
{
	int fd, oflags;
	fd = open("/dev/globalfifo", O_RDWR, S_IRUSR | S_IWUSR);
	if (fd != -1) {
		signal(SIGIO, signalio_handler);
		fcntl(fd, F_SETOWN, getpid());
		oflags = fcntl(fd, F_GETFL);
		fcntl(fd, F_SETFL, oflags | FASYNC);
		while (1) {
			sleep(100);
		}
	} else {
		printf("device open failure\n");
	}
}
           

运行演示

03_异步通知与异步I/O

4、linux异步I/O

同步I/O

请求发出之后, 应用程序就会阻塞, 直到请求满足为止。

异步I/O

应用程序发起I/O动作后, 直接开始执行, 并不等待I/O结束, 它要么过一段时间来查询之前的I/O请求完成情况, 要么I/O请求完成了会自动被调用与I/O完成绑定的回调函数。

03_异步通知与异步I/O

NIO 是同步非阻塞方式的 I/O,而 AIO 是异步非阻塞方式的 I/O

用户空间异步读例程

#include <aio.h>
	...
	int fd, ret;
	struct aiocb my_aiocb;
	fd = open("file.txt", O_RDONLY);
	if (fd < 0)
		perror("open");

	/* 清零aiocb结构体 */
	bzero(&my_aiocb, sizeof(struct aiocb));

	/* 为aiocb请求分配数据缓冲区 */
	my_aiocb.aio_buf = malloc(BUFSIZE + 1);
	if (!my_aiocb.aio_buf)
		perror("malloc");

	/* 初始化aiocb的成员 */
	my_aiocb.aio_fildes = fd;
	my_aiocb.aio_nbytes = BUFSIZE;
	my_aiocb.aio_offset = 0;

	ret = aio_read(&my_aiocb);  // 限制性读操作
	if (ret < 0)
		perror("aio_read");

	while (aio_error(&my_aiocb) == EINPROGRESS) // 检查读操作是否被处理
		continue;

	if ((ret = aio_return(&my_iocb)) > 0) {  // 读成功
		/* 获得异步读的返回值 */
	} else {
		/* 读失败, 分析errorno */
	}
           

其中:

在aio_error() 调用确定请求已经完成(可能成功, 也可能发生了错误) 之后, 才会调用 aio_return()函数。 aio_return() 的返回值就等价于同步情况中read() 或write() 系统调用的返回值(所传输的字节数如果发生错误, 返回值为负数)

返回值:

EINPROGRESS: 说明请求尚未完成。

ECANCELED: 说明请求被应用程序取消了。

-1: 说明发生了错误, 具体错误原因由errno记录。

lio_listio()

lio_listio() 函数可用于同时发起多个传输。它使得用户可以在一个系统调用中启动大量的I/O操作。原型如下:

int lio_listio( int mode, struct aiocb *list[], int nent, struct sigevent *sig );

mode参数可以是LIO_WAIT或LIO_NOWAIT。 LIO_WAIT会阻塞这个调用, 直到所有的I/O都完成为止。 但是若是LIO_NOWAIT模型, 在I/O操作进行排队之后, 该函数就会返回。 list是一个aiocb引用的列表, 最大元素的个数是由nent定义的。 如果list的元素为NULL, lio_listio() 会将其忽略。

示例:

struct aiocb aiocb1, aiocb2;
struct aiocb *list[MAX_LIST];
...
/* 准备第一个aiocb */
aiocb1.aio_fildes = fd;
aiocb1.aio_buf = malloc( BUFSIZE+1 );
aiocb1.aio_nbytes = BUFSIZE;
aiocb1.aio_offset = next_offset;
aiocb1.aio_lio_opcode = LIO_READ; /* 异步读操作*/
... /* 准备多个aiocb */
bzero( (char *)list, sizeof(list) );

/* 将aiocb填入链表*/
list[0] = &aiocb1;
list[1] = &aiocb2;
...
ret = lio_listio( LIO_WAIT, list, MAX_LIST, NULL ); /* 发起大量I/O操作*/
           

一个简单的利用libaio向内核发起AIO请求的模版

#define _GNU_SOURCE /* O_DIRECT is not POSIX */
#include <stdio.h> /* for perror() */
#include <unistd.h> /* for syscall() */
#include <fcntl.h> /* O_RDWR */
#include <string.h> /* memset() */
#include <inttypes.h> /* uint64_t */
#include <stdlib.h>

#include <libaio.h>

#define BUF_SIZE 4096

int main(int argc, char **argv)
{
	io_context_t ctx = 0;
	struct iocb cb;
	struct iocb *cbs[1];
	unsigned char *buf;
	struct io_event events[1];
	int ret;
	int fd;

	if (argc < 2) {
		printf("the command format: aior [FILE]\n");
		exit(1);
	}

	fd = open(argv[1], O_RDWR | O_DIRECT);
	if (fd < 0) {
		perror("open error");
		goto err;
	}

	/* Allocate aligned memory */
	ret = posix_memalign((void **)&buf, 512, (BUF_SIZE + 1));
	if (ret < 0) {
		perror("posix_memalign failed");
		goto err1;
	}
	memset(buf, 0, BUF_SIZE + 1);

	ret = io_setup(128, &ctx);
	if (ret < 0) {
		printf("io_setup error:%s", strerror(-ret));
		goto err2;
	}

	/* setup I/O control block */
	io_prep_pread(&cb, fd, buf, BUF_SIZE, 0);

	cbs[0] = &cb;
	ret = io_submit(ctx, 1, cbs);
	if (ret != 1) {
	if (ret < 0) {
		printf("io_submit error:%s", strerror(-ret));
	} else {
		fprintf(stderr, "could not sumbit IOs");
	}
		goto err3;
	}

	/* get the reply */
	ret = io_getevents(ctx, 1, 1, events, NULL);
	if (ret != 1) {
		if (ret < 0) {
			printf("io_getevents error:%s", strerror(-ret));
		} else {
			fprintf(stderr, "could not get Events");
		}
		goto err3;
	}
	if (events[0].res2 == 0) {
		printf("%s\n", buf);
	} else {
		printf("AIO error:%s", strerror(-events[0].res));
		goto err3;
	}

	if ((ret = io_destroy(ctx)) < 0) {
		printf("io_destroy error:%s", strerror(-ret));
		goto err2;
	}

	free(buf);
	close(fd);
	return 0;

err3:
	if ((ret = io_destroy(ctx)) < 0)
		printf("io_destroy error:%s", strerror(-ret));
err2:
	free(buf);
err1:
	close(fd);
err:
	return -1;
}
           

继续阅读