天天看點

異步IO、APC、IO完成端口、線程池與高性能伺服器 (三)

異步IO、APC、IO完成端口、線程池與高性能伺服器 (三)  作者 Fang([email protected])

關鍵字 異步IO APC IO完成端口 線程池 高性能 可伸縮 伺服器

原作者姓名 Fang([email protected])

正文

異步IO、APC、IO完成端口、線程池與高性能伺服器之三 IO完成端口

IO完成端口

I/O完成端口是一種機制,通過這個機制,應用程式在啟動時會首先建立一個線程池,然後該應用程式使用線程池處理異步I/O請求。這些線程被建立的唯一目的就是用于處理I/O請求。對于處理大量并發異步I/O請求的應用程式來說,相比于在I/O請求發生時建立線程來說,使用完成端口(s)它就可以做的更快且更有效率。

CreateIoCompletionPort函數會使一個I/O完成端口與一個或多個檔案句柄發生關聯。當與一個完成端口相關的檔案句柄上啟動的異步I/O操作完成時,一個I/O完成包就會進入到該完成端口的隊列中。對于多個檔案句柄來說,這種機制可以用來把多檔案句柄的同步點放在單個對象中。(言下之意,如果我們需要對每個句柄檔案進行同步,一般而言我們需要多個對象(如:Event來同步),而我們使用IO Complete Port 來實作異步操作,我們可以同多個檔案相關聯,每當一個檔案中的異步操作完成,就會把一個complete package放到隊列中,這樣我們就可以使用這個來完成所有檔案句柄的同步)

調用GetQueuedCompletionStatus函數,某個線程就會等待一個完成包進入到完成端口的隊列中,而不是直接等待異步I/O請求完成。線程(們)就會阻塞于它們的運作在完成端口(按照後進先出隊列順序的被釋放)。這就意味着當一個完成包進入到完成端口的隊列中時,系統會釋放最近被阻塞在該完成端口的線程。

調用GetQueuedCompletionStatus,線程就會将會與某個指定的完成端口建立聯系,一直延續其該線程的存在周期,或被指定了不同的完成端口,或者釋放了與完成端口的聯系。一個線程隻能與最多不超過一個的完成端口發生聯系。

完成端口最重要的特性就是并發量。完成端口的并發量可以在建立該完成端口時指定。該并發量限制了與該完成端口相關聯的可運作線程的數目。當與該完成端口相關聯的可運作線程的總數目達到了該并發量,系統就會阻塞任何與該完成端口相關聯的後續線程的執行,直到與該完成端口相關聯的可運作線程數目下降到小于該并發量為止。最有效的假想是發生在有完成包在隊列中等待,而沒有等待被滿足,因為此時完成端口達到了其并發量的極限。此時,一個正在運作中的線程調用GetQueuedCompletionStatus時,它就會立刻從隊列中取走該完成包。這樣就不存在着環境的切換,因為該處于運作中的線程就會連續不斷地從隊列中取走完成包,而其他的線程就不能運作了。

對于并發量最好的挑選值就是您計算機中CPU的數目。如果您的事務處理需要一個漫長的計算時間,一個比較大的并發量可以允許更多線程來運作。雖然完成每個事務處理需要花費更長的時間,但更多的事務可以同時被處理。對于應用程式來說,很容易通過測試并發量來獲得最好的效果。

PostQueuedCompletionStatus函數允許應用程式可以針對自定義的專用I/O完成包進行排隊,而無需啟動一個異步I/O操作。這點對于通知外部事件的工作者線程來說很有用。

在沒有更多的引用針對某個完成端口時,需要釋放該完成端口。該完成端口句柄以及與該完成端口相關聯的所有檔案句柄都需要被釋放。調用CloseHandle可以釋放完成端口的句柄。

下面的代碼利用IO完成端口做了一個簡單的線程池。

/************************************************************************/

/* Test IOCompletePort.                                                 */

DWORD WINAPI IOCPWorkThread(PVOID pParam)

{

    HANDLE CompletePort = (HANDLE)pParam;

    PVOID UserParam;

    WORK_ITEM_PROC UserProc;

    LPOVERLAPPED pOverlapped;

    for(;;)

    {

        BOOL bRet = GetQueuedCompletionStatus(

            CompletePort,

            (LPDWORD)&UserParam,

            (LPDWORD)&UserProc,

            &pOverlapped,

            INFINITE);

        _ASSERT(bRet);

        if(UserProc == NULL) // Quit signal.

            break;

        // execute user's proc.        

        UserProc(UserParam);        

    }

    return 0;

}

void TestIOCompletePort(BOOL bWaitMode, LONG ThreadNum)

    HANDLE CompletePort;

    OVERLAPPED Overlapped = {0, 0, 0, 0, NULL};

    CompletePort = CreateIoCompletionPort(

        INVALID_HANDLE_VALUE,

        NULL,

        0);

    // Create threads.

    for(int i=0; i<ThreadNum; i++)

        HANDLE hThread = CreateThread(NULL,

            0,

            IOCPWorkThread,

            NULL);

        CloseHandle(hThread);

    CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

    BeginTime = GetTickCount();

    ItemCount = 20;

    for(i=0; i<20; i++)

        PostQueuedCompletionStatus(

            (DWORD)bWaitMode,

            (DWORD)UserProc1,

            &Overlapped);

    WaitForSingleObject(CompleteEvent, INFINITE);

    CloseHandle(CompleteEvent);

    // Destroy all threads.

    for(i=0; i<ThreadNum; i++)

            NULL,

    Sleep(1000); // wait all thread exit.

    CloseHandle(CompletePort);

參考書目

1,    MSDN Library

2,    《Windows進階程式設計指南》

3,    《Windows核心程式設計》

4,    《Windows 2000 裝置驅動程式設計指南》