天天看點

Hasen的linux裝置驅動開發學習之旅--異步I/O

/**
 * Author:hasen
 * 參考 :《linux裝置驅動開發詳解》
 * 簡介:android小菜鳥的linux
 * 	         裝置驅動開發學習之旅
 * 主題:異步I/O
 * Date:2014-11-11
 */
           

linux中最常用的輸入/輸出(I/O)模型是同步I/O。在這個模型中,請求發出後,應用就會阻塞,知道請求滿足

為止。但是在某些情況下,I/O請求可能需要與其他的程序進行交疊。可移植作業系統接口(POSIX)異步I/O(AIO)

應用程式接口(API)就提供了這種功能。

AIO基本思想是允許程序發起很多的I/O操作,而不用阻塞或者等待任何操作完成。稍後或在接受到I/O操

作完成的通知時,程序再檢索I/O操作的結果。

在異步非阻塞I/O中,每個傳輸操作都有唯一的上下文,通過aiocb(AIO I/O Control Block)結構體進行區

分,包含了有關傳輸的所有資訊,以及為資料準備的使用者緩沖區。在産生I/O通知(稱為完成)時,該結構唯一标

識所完成的I/O操作。

AIO系列API被GNU C庫函數所包含,它被POSIX.1b所要求,主要有以下的函數:

(1)aio_read
    int aio_read(struct aiocb *aiocbp) ;/*對一個有效的檔案描述符進行異步操作*/
	檔案描述符可以表示檔案,套接字,管道,aio_read()在請求進行排隊後立即傳回,執行成功傳回0,
	出現錯誤傳回-1,并設定errno的值。
(2)aio_write   
    int aio_write(struct aiocb *aiocbp) ;/*用來請求一個異步寫操作*/
    aio_write()函數會立即傳回,說明請求已經進行排隊,成功傳回0,失敗傳回-1,并相應地設定errno。
(3)aio_error
	aio_error(struct aiocb *aiocbp) ;/*用來确定請求的狀态*/
	這個函數可以傳回以下内容。
        EINPROGRESS:說明請求尚未完成。
        ECANCELLED:說明請求被應用程式取消了。
        -1:說明發生了錯誤,具體錯誤原因由errno記錄。
(4)aio_return
	/*異步I/O和标準I/O方式之間的另一個差別是不能立即通路這個函數的傳回狀态,因為異步I/O并沒有阻塞
	在read()調用上。在标準的read()調用中,傳回狀态是在該函數傳回時提供的。但是在異步I/O中,使用
	aio_return()函數*/
	ssize_t aio_return(struct aiocb *aiocbp) ;
	隻有在aio_error()調用确定請求已經完成(可能成功,或者發生錯誤)之後,才會調用這個函數,
	aio_return()函數的傳回值就等價于同步情況中read()和write()的傳回值,成功傳回位元組數,
	錯誤傳回負值。
           

        下面的代碼示例給出了使用者空間進行異步讀操作,它首先打開文檔,然後準備aiocb結構體,之後調用aio_read(&my_aiocb)進行提出異步讀請求,當aio_error(&my_aiocb)==EINPROGRESS,即操作還在進行中,一直等待,結束後通過aio_return(&my_aiocb)獲得傳回值。

#include <aio.h>
...

int fd ,ret ;
struct aiocb my_aiocb ;
fd = open("file.txt",O_RDONLY) ;
if(fd<0)
	perror("open") ;

/*清零aiocb結構體*/
bzero((char *)&my_aiocb,sizeof(struct aiocb)) ;
/*為aiocb結構體請求配置設定資料緩沖區*/
my_aiocb.aio_buf = malloc(BUFSIZE + 1) ;
if(!my_aiocb.aio_buf)
	perror("malloc") ;

/*初始化aiocb的成員*/
my_aiocb.aio_fildes = fd ;
my_aiocb.aio_nbytes = BUFSIZE ;
my_aiocb.aio_offset = 0 ;

ret = aio_read(&my_aiocb) ;
if(ret < 0)
	perror("aio_read") ;

while(aio_error(&my_aiocb) == EINPROGRESS)
	continue ;

if((ret = aio_return(&my_aiocb)) > 0){
	/*獲得異步讀的傳回值*/
}else{
	/*讀失敗,分析errno*/
}
           
(5)aio_suspend
	使用者可以使用aio_suspend()函數來挂起(或阻塞)調用程序,直到異步請求完成為止,此時會産生一個信号,
	或者發生其他逾時操作。調用者提供了一個aiocb引用清單,其中任何一個完成都會導緻aio_suspend()傳回。
	int aio_suspend(const struct aiocb cblist[] ,int n,const struct timespec *timeout) ;
	注意,aio_suspend 的第二個參數是 cblist 中元素的個數,而不是 aiocb 引用的個數。cblist中任何
	NULL元素都會被 aio_suspend 忽略。如果為 aio_suspend 提供了逾時,而逾時情況的确發生了,那麼它
	就會傳回 -1,errno 中會包含 EAGAIN。
           

示例:使用者空間異步I/O aio_suspend()函數使用

struct aioct *cblist[MAX_LIST]
	/*清零aioct結構體連結清單*/
	bzero((char *)cblist,sizeof(cblist)) ;
	/*将一個或更多的aiocb放入aioct結構體連結清單*/
	cblist[0] = &my_aiocb ;
	ret = aio_read(&my_aiocb) ;
	ret = aio_suspend(cblist,MAX_LIST,NULL) ;
           
(6)aio_cancel
		aio_cancel()函數允許使用者取消對某個檔案描述符執行的一個或者所有的I/O請求,原型如下:
		
		int aio_cancel(int fd ,struct aiocb *aiocbp) ;

		要取消一個請求,我們需要提供檔案描述符和 aiocb 引用。如果這個請求被成功取消了,那麼這個函數
	就會傳回AIO_CANCELED。如果請求完成了,這個函數就會傳回 AIO_NOTCANCELED。
		要取消對某個給定檔案描述符的所有請求,我們需要提供這個檔案的描述符,以及一個對 aiocbp的 NULL
	引用。如果所有的請求都取消了,這個函數就會傳回 AIO_CANCELED;如果至少有一個請求沒有被取消,那麼這
	個函數就會傳回 AIO_NOT_CANCELED;如果沒有一個請求可以被取消,那麼這個函數就會傳回 AIO_ALLDONE。
	我們然後可以使用 aio_error 來驗證每個 AIO 請求。如果這個請求已經被取消了,那麼 aio_error 就會返
	回-1,并且errno 會被設定為 ECANCELED。
(7)lio_listio
		最後,AIO 提供了一種方法使用 lio_listio API 函數同時發起多個傳輸。這個函數非常重要,因為這意
	味着我們可以在一個系統調用(一次核心上下文切換)中啟動大量的 I/O 操作。從性能的角度來看,這非常重
	要,是以值得我們花點時間探索一下。lio_listio API 函數的原型如下:
	
		int lio_listio( int mode, struct aiocb *list[], int nent,struct sigevent *sig );
		
		mode 參數可以是 LIO_WAIT 或 LIO_NOWAIT。LIO_WAIT 會阻塞這個調用,直到所有的 I/O 都完成為
	止。在操作進行排隊之後,LIO_NOWAIT 就會傳回。list 是一個 aiocb 引用的清單,最大元素的個數是由
	nent定義的。注意 list 的元素可以為 NULL,lio_listio 會将其忽略。sigevent 引用定義了在所有 I/O
	操作都完成時産生信号的方法。
           

示例:使用者空間異步I/O lio_listio()的使用

struct aiocb aiocb1,aiocb2 ;
struct aiocb *list[MAX_LIST] ;
...
/*準備第一個aiocb*/
aiocb1.aio_fildes = fd ;
aiocb1.aio_buf = malloc(BUFSIZE + 1) ;
aiocb1.aio_nbytes = BUFSIZE ;
aiocb1.aio_offset = next_offset ;
/*LIO_READ讀操作,LIO_WRITE寫操作,LIO_NOP空操作*/
aiocb1.aio_lio_opcode = LIO_READ;/*異步讀操作*/
... /*準備多個aiocb*/
bzero((char *)list,sizeof(list)) ;

/*将aiocb填傳入連結表*/
list[0] = aiocb1 ;
list[1] = aiocb2 ;
...
ret = lio_listio(LIO_WAIT,list,MAX_LIST,NULL) ;/*發起大量I/O操作*/
           

使用回調函數作為AIO的通知

除了信号之外,應用程式還可以提供一個回調(Callback)函數給核心,以便AIO的請求完成後核心調用這個函數。

示例:使用回調函數作為AIO異步I/O通知機制

/*設定異步I/O請求*/
void setup_io(...)
{
	int fd ;
	struct aiocb my_aiocb ;
	...
	/*設定AIO請求*/
	bzero((char *)&my_aiocb,sizeof(struct aiocb)) ;
	aiocb.aio_fildes = fd ;
	aiocb.aio_buf = malloc(BUFSIZE + 1) ;
	aiocb.aio_nbytes = BUFSIZE ;
	aiocb.aio_offset = next_offset ;
	
	/*連接配接AIO請求和線程回調函數*/
	my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD ;
	my_aiocb.aio_sigevent.notify_function = aio_completion_handler ;
	/*設定回調函數*/
	my_aiocb.aio_sigevent.notify_attributes = NULL ;
	my_aiocb.aio_sigevent.sigev_value_sival_ptr = &my_aiocb ;
	...
	ret = aio_read(&my_aiocb) ;
}

/*異步I/O完成回調函數*/
void aio_completion_handler(sigval_t sigval)
{
	struct aiocb *req ;
	req = (struct aiocb *)sigval.sival_ptr ;
	
	/*AIO請求完成*/
	if(aio_error(req) == 0)
		/*請求完成,獲得傳回值*/
		ret = aio_return (req) ;
}
           
上述程式在建立aiocb請求之後,使用了SIGEV_THREAD請求了一個線程回調函數作為通知方法。在回調

函數中,通過(struct aiocb*)sigval.sival_ptr可以獲得對應的aiocb指針,使用AIO函數可驗證請求是否已經完成。

proc檔案系統包含了兩個虛拟檔案,他們可以用來對異步I/O的性能進行優化。
(1)/proc/sys/fs/aio-nr 檔案提供了系統範圍異步I/O請求現在的數目。
(2)/proc/sys/fs/aio-max-nr 檔案是允許的并發請求的最大個數,最大個數通常是64KB,這對于大部分應用
程式來說都已經足夠了。

AIO與裝置驅動

在核心中,每個I/O請求都對應于一個kiocb結構體,其ki_filp成員指向對應的file指針,通過is_sync_kiocb()

可以判斷某kiocb是否為同步I/O請求,如果傳回非真,表示為異步I/O請求。

塊裝置和網絡裝置本身是異步的,隻有字元裝置必須明确表明應支援AIO,AIO對于大多數字元裝置而言都

不是必須的,隻有極少數裝置需要,比如,對于錄音帶機,由于I/O操作很慢,這時候使用異步I/O可以改善性能。

字元裝置驅動程式中,file_operations包含3個與AIO相關的成員函數:
ssize_t (*aio_read)(struct kiocb *iocb,char *buffer,size_t count,loff_t offset);
	ssize_t (*aio_write)(struct kiocb *iocb,char *buffer,size_t count,loff_t offset);
	ssize_t (*aio_fsync)(struct kiocb *iocb,int datasync) ;
           
aio_read()和aio_write()與filp_operations中的read()和write()中的offset參數不一樣,它直接傳值,

而後者是傳遞的指針,這是因為AIO從來不需要改變檔案的位置。

aio_read()和aio_write()函數本身不一定完成了讀和想寫操作,它隻是發起,初始化讀和寫操作。

示例:裝置驅動中的I/O函數

struct async_work()
{
	struct kiocb *iocb ;/*kiocb結構體指針*/
	int result  ;/*執行結果*/
	struct work_struct work ;/*工作結構體*/
};
...
/*異步讀*/
static ssize_t xxx_aio_read(struct kiocb *iocb,char *buf,size_t count,loff_t pos)
{
	return xxx_defer_op(0,iocb,count,pos) ;
}

/*異步寫*/
static ssize_t xxx_aio_write(struct kiocb *iocb,const char *buf,size_t count,loff_t pos)
{
	return xxx_defer_op(1,iocb,count,pos) ;
}

/*初始化異步I/O*/
static int xxx_defer_op(int write,struct kiocb *iocb,char *buf,size_t count,loff_t pos)
{
	struct async_work *async_wk ;
	int result ;
	/*當我們能通路buffer時進行copy*/
	if(write)
		result = xxx_write(iocb->ki_filp,buf,count,&pos) ;
	else
		result = xxx_read(iocb->filp,buf,count,&pos) ;
	/*如果是同步IOCB,立即傳回狀态*/
	if(is_sync_kiocb(iocb))
		return result ;
	
	/*否則,推後幾秒執行*/
	async_wk = kmalloc(sizeof(*async_wk),GFP_KERNEL) ;
	if(async_wk == NULL)
		return result ;
	/*排程延遲的工作*/
	async_wk->iocb = iocb ;
	async_wk->result = result ;
	INIT_WORK(&async_wk->work,xxx_do_deferred_op,async_wk) ;
	schedule_delay_work(&async_wk->work ,Hz/100) ;
	return -EIOCBQUEUED ;/*控制權傳回使用者空間*/
}

/*延遲後執行*/
static void xxx_do_deferred_op(void *p)
{
	struct async_work *async_wk = (struct async_work *)p ;
	... /*執行I/O操作*/
	aio_complete(async_wk->iocb,async_wk->result,0) ;
	kfree(async_wk) ;
}
           
上述代碼最核心的是使用work_struct機制通過schedule_delay_work()函數将I/O操作延遲執行,而在具體的

I/O操作執行完成後,調用aio_complete()通知核心驅動程式已經完成了I/O操作。

通常而言,具體的字元裝置驅動一般不許呀AIO支援,而核心中僅有fs/direct-io.c,driver/usb/gadget/inode.c,

fs/nfs/direct.c等少量地方使用了AIO。

繼續閱讀