天天看點

一個對Winsock 完成端口模型封裝的類

2009-03-28 14:02

源代碼說明:

在WINDOWS下進行網絡服務端程式開發,毫無疑問,Winsock 完成端口模型是最高效的。Winsock的完成端口模型借助Widnows的重疊IO和完成端口來實作,完成端口模型懂了之後是比較簡單的,但是要想掌握 Winsock完成端口模型,需要對WINDOWS下的線程、線程同步,Winsock API以及WINDOWS IO機制有一定的了解。如果不了解,推薦幾本書:《Inside Windows 2000,《WINDOWS核心程式設計》,《WIN32多線程程式設計》、《WINDOWS網絡程式設計技術》。在去年,我在C語言下用完成端口模型寫了一個 WEBSERVER,前些天,我決定用C++重寫這個WEBSERVER,給這個WEBSERVER增加了一些功能,并改進完成端口操作方法,比如采用 AcceptEx來代替accept和使用LOOKASIDE LIST來管理記憶體,使得WEBSERVER的性能有了比較大的提高。

在重寫的開始,我決定把完成端口模型封裝成一個比較通用的C++類,針對各種網絡服務端程式的開發,隻要簡單地繼承這個類,改寫其中兩個虛拟函數 就能滿足各種需要。到昨天為止,WEBSERVER重寫完畢,我就寫了這篇文章對完成端口模型做一個總結,并介紹一下我的這個類。

DEMO就是一個ECHOSERVER,記得使用Release模式編譯。

================================================================================

/*++

Copyright (c) 2004

子產品名:

     iomodel.h

子產品描述:

     Winsock 完成端口類頭檔案

作者:

開發環境:

     Visual C++ 6.0, Windows 2000.

修訂記錄:

     建立于: 2004.1.16

最後修改日期:

      2004.1.23

--*/

#ifndef      _IOMODEL_H

#define      _IOMODEL_H

//

//Head files

#include <winsock2.h>

#include <mswsock.h>

/////////////////////////////////////////////////////////////////////////////////////

#ifdef __cplusplus

extern "C" {

#endif

#define    BUFFER_SIZE           4096

#define    MAXTHREAD_COUNT       8

#define    PORT                  8080

#define    LOCALADDRESS          "172.29.90.96"

#define    IO_READ_COMPLETION    100

#define    IO_WRITE_COMPLETION   200

#define    IO_ACCEPT_COMPLETION 300

//自定義枚舉資料類型,用來辨別套接字IO動作類型

typedef enum _IO_OPERATION

{

        IoAccept, //AcceptEx/accept

IoRead,   //WSARecv/recv/ReadFile

IoWrite,   //WSASend/send/WriteFile

IoEnd

}IO_OPERATION, *PIO_OPERATION;

//自定義結構,即“完成鍵”(單句柄資料)

typedef struct _PER_HANDLE_CONTEXT

SOCKET                     IoSocket;

_PER_HANDLE_CONTEXT*       pNext;

}PER_HANDLE_CONTEXT, *PPER_HANDLE_CONTEXT;

//單IO資料,擴充的WSAOVERLAPPED

typedef struct _PER_IO_CONTEXT

WSAOVERLAPPED              ol;

char                       szBuffer[BUFFER_SIZE];

WSABUF                     wsaBuffer;

SOCKET                     sClient;

unsigned int               unId;

IO_OPERATION               IoOperation;

_PER_IO_CONTEXT*           pNext;

}PER_IO_CONTEXT, *PPER_IO_CONTEXT;

// global var

static GUID g_GUIDAcceptEx = WSAID_ACCEPTEX;

static GUID g_GUIDTransmitFile = WSAID_TRANSMITFILE;

DWORD __stdcall   CompletionRoutine(LPVOID);

//完成端口模型類

class CompletionPortModel

public:

CompletionPortModel();

~CompletionPortModel();

BOOL                Init();

BOOL                ThreadLoop();

BOOL                AllocEventMessage();

BOOL                PostAcceptEx();

virtual BOOL        HandleData(

         PPER_IO_CONTEXT lpPerIoContext,

            int nFlags

         );

virtual BOOL        DataAction(

            PPER_IO_CONTEXT lpPerIoContext,

            PPER_HANDLE_CONTEXT lpNewperHandletext

            );

void                InsertNode(PPER_IO_CONTEXT pNode, PPER_HANDLE_CONTEXT pHandleNode);

void                ReleaseNode(PPER_IO_CONTEXT pNode);

void                InsertToLookaside(PPER_IO_CONTEXT lpIoNode, PPER_HANDLE_CONTEXT lpHandleNode);

PPER_IO_CONTEXT     GetIoFromLookaside();

PPER_HANDLE_CONTEXT GetHandleFromLookaside();

HANDLE                        m_hCOP;

SOCKET                        m_ListenSocket;

CRITICAL_SECTION              m_ListCriSection;

CRITICAL_SECTION              m_HandleCriSection;

CRITICAL_SECTION              m_IoCriSection;

LPFN_TRANSMITFILE             lpTransmitFile;

volatile PPER_IO_CONTEXT      m_lpIoLookasideLists;

volatile PPER_HANDLE_CONTEXT m_lpHandleLOOKasideLists;

protected:

BOOL                InitWinsock();

BOOL                BindAndListenSocket();

BOOL                InitLinkListHead();

void                CloseThreadHandle();

void                GetAddressAndPort();

UINT                uPort;

char                szAddress[20];

HANDLE                        m_hThreadArray[MAXTHREAD_COUNT];

HANDLE                        m_hEvent;

volatile LONG                 m_lAcceptExCounter;

volatile PPER_IO_CONTEXT      m_lpConnectionListHead;

LPFN_ACCEPTEX                 lpAcceptEx;

private:

};

}

#endif //_IOMODEL_H

     iomodel.cpp

     Winsock 完成端口類實作檔案

#include <iostream.h>

#include "iomodel.h"

CompletionPortModel::CompletionPortModel()

函數描述:

    構造函數,初始化線程句柄數組,初始化AcceptEx()調用的計數。初始化臨界段代碼變量。

Arguments:

     無。

Return Value:

for (int i=0; i< MAXTHREAD_COUNT; i++)

m_hThreadArray[i] = INVALID_HANDLE_VALUE;

m_lAcceptExCounter = 0;

InitializeCriticalSection(&m_ListCriSection);

InitializeCriticalSection(&m_HandleCriSection);

InitializeCriticalSection(&m_IoCriSection);

m_lpHandleLOOKasideLists = NULL;

m_lpIoLookasideLists = NULL;

#ifndef _DEBUG

GetAddressAndPort();

}//end of CompletionPortModel()

CompletionPortModel::~CompletionPortModel()

    析構函數,釋放連結清單所有結點。

PPER_IO_CONTEXT lpIoNode;

while (m_lpConnectionListHead->pNext)

lpIoNode = m_lpConnectionListHead->pNext;

m_lpConnectionListHead->pNext = lpIoNode->pNext;

closesocket(lpIoNode->sClient);

HeapFree(GetProcessHeap(), 0, lpIoNode);

while(NULL != m_lpIoLookasideLists)

lpIoNode = m_lpIoLookasideLists;

m_lpIoLookasideLists = m_lpIoLookasideLists->pNext;

PPER_HANDLE_CONTEXT lpHandleNode;

while(NULL != m_lpHandleLOOKasideLists)

lpHandleNode = m_lpHandleLOOKasideLists;

m_lpHandleLOOKasideLists = m_lpHandleLOOKasideLists->pNext;

HeapFree(GetProcessHeap(), 0, lpHandleNode);

DeleteCriticalSection(&m_ListCriSection);

DeleteCriticalSection(&m_HandleCriSection);

DeleteCriticalSection(&m_IoCriSection);

}//end of ~CompletionPortModel()

BOOL CompletionPortModel::Init()

    初始化,建立完成端口、建立完成端口線程,并調用類成員函數InitWinsock初始化Winsock、

建立一個監聽套接字m_ListenSocket,并将此套接字同完成端口關聯起來,擷取AcceptEx指針。

     函數調用成功傳回TRUE,失敗傳回FALSE。

    BOOL bSuccess = InitLinkListHead();

if (FALSE == bSuccess)

return FALSE;

m_hCOP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);

if (NULL == m_hCOP)

cout << "CreateIoCompletionPort() failed: " << GetLastError() << endl;

//取得系統中CPU的數目,建立和CPU數目相等的線程,如果事先估計到完成端口處理線程會堵塞,

//可以考慮建立 SysInfo.dwNumberOfProcessors*2個線程。一般在單處理器上建立和CPU數目相等

//的線程就可以了

SYSTEM_INFO SysInfo;

GetSystemInfo(&SysInfo);

if (MAXTHREAD_COUNT < SysInfo.dwNumberOfProcessors)

SysInfo.dwNumberOfProcessors = MAXTHREAD_COUNT;

for (int i=0; i<(int)SysInfo.dwNumberOfProcessors; i++)

m_hThreadArray[i] = CreateThread(NULL, 0, CompletionRoutine, (LPVOID)this, 0, NULL);

if (NULL == m_hThreadArray[i])

   while (i>0)

   {

    CloseHandle(m_hThreadArray[i-1]);

    m_hThreadArray[i-1] = INVALID_HANDLE_VALUE;

    i--;

   }//end of while

   cout << "CreateThread() failed: " << GetLastError() << endl;

   CloseHandle(m_hCOP);

   HeapFree(GetProcessHeap(), 0, m_lpConnectionListHead);

   return FALSE;

}//end of for

//調用InitWinsock函數初始化Winsock、建立一個監聽套接字m_ListenSocket,

//并将此套接字同完成端口關聯起來,擷取AcceptEx指針。

bSuccess = InitWinsock();

if (!bSuccess)

//給完成端口線程發送消息,訓示線程退出。

PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL);

CloseThreadHandle();

CloseHandle(m_hCOP);

HeapFree(GetProcessHeap(), 0, m_lpConnectionListHead);

//調用BindAndListenSocket()綁定套接字并将套接字置于監聽狀态

bSuccess = BindAndListenSocket();

return TRUE;

}//end of Init()

void CompletionPortModel::CloseThreadHandle()

    對每一個建立的線程調用CloseHandle()。

if (INVALID_HANDLE_VALUE != m_hThreadArray[i])

   CloseHandle(m_hThreadArray[i]);

   m_hThreadArray[i] = INVALID_HANDLE_VALUE;

return;

}//end of CloseThreadHandle()

BOOL CompletionPortModel::InitWinsock()

    初始化Winsock,建立一個監聽套接字,擷取AcceptEx函數指針,為監聽套接字配置設定一個單句柄

資料,并将監聽套接字與完成端口hCOP關聯。

WSADATA wsd;

int nResult = WSAStartup(MAKEWORD(2,2), &wsd);

if (0 != nResult)

cout << "WSAStartup() failed" << endl;

m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP,

          NULL, 0, WSA_FLAG_OVERLAPPED);

    if (INVALID_SOCKET == m_ListenSocket)

    {

cout << "WSASocket() failed: " << WSAGetLastError() << endl;

        WSACleanup();

        return FALSE;

    }

DWORD dwResult;

//擷取微軟SOCKET擴充函數指針

nResult = WSAIoctl(

      m_ListenSocket,

      SIO_GET_EXTENSION_FUNCTION_POINTER,

      &g_GUIDAcceptEx,

      sizeof(g_GUIDAcceptEx),

      &lpAcceptEx,

      sizeof(lpAcceptEx),

        &dwResult,

      NULL,

      NULL

      );

if (SOCKET_ERROR == nResult)

cout << "Get AcceptEx failed: " << WSAGetLastError() << endl;

closesocket(m_ListenSocket);

      &g_GUIDTransmitFile,

      sizeof(g_GUIDTransmitFile),

      &lpTransmitFile,

      sizeof(lpTransmitFile),

      &dwResult,

cout << "Get TransmitFile failed: " << WSAGetLastError() << endl;

//為監聽套接字配置設定一個單句柄資料

PPER_HANDLE_CONTEXT lpListenHandleContext = (PPER_HANDLE_CONTEXT)HeapAlloc(

                    GetProcessHeap(), HEAP_ZERO_MEMORY,

                    sizeof(PER_HANDLE_CONTEXT)

                      );

if (NULL == lpListenHandleContext)

cout << "HeapAlloc() failed " << endl;

lpListenHandleContext->IoSocket = m_ListenSocket;

lpListenHandleContext->pNext = NULL;

//将監聽套接字m_ListenSocket和已經建立的完成端口關聯起來

HANDLE hrc = CreateIoCompletionPort(

          (HANDLE)m_ListenSocket,

          m_hCOP,

          (ULONG_PTR)lpListenHandleContext,

          );

if (NULL == hrc)

HeapFree(GetProcessHeap(), 0, lpListenHandleContext);

WSACleanup();

cout << "CreateIoCompletionPort failed: " << GetLastError() << endl;

}//end of InitWinsock()

BOOL CompletionPortModel::BindAndListenSocket()

    private函數,供Init調用。

将監聽套接字m_ListenSocket綁定到本地IP位址,并置于監聽模式。

    SOCKADDR_IN InternetAddr;

    InternetAddr.sin_family = AF_INET;

#ifdef _DEBUG

    InternetAddr.sin_addr.s_addr = inet_addr(LOCALADDRESS);

    InternetAddr.sin_port = htons(PORT);  

#else

    InternetAddr.sin_addr.s_addr = inet_addr(szAddress);

    InternetAddr.sin_port = htons(uPort);   

    int nResult = bind(m_ListenSocket, (PSOCKADDR)&InternetAddr, sizeof(InternetAddr));

    if (SOCKET_ERROR == nResult)

cout << "bind() failed: " << WSAGetLastError() << endl;

nResult = listen(m_ListenSocket, 20);

cout << "listen() failed: " << WSAGetLastError() << endl;

}//end of BindAndListenSocket()

DWORD __stdcall CompletionRoutine(LPVOID Param)

    完成端口處理線程,循環調用GetQueuedCompletionStatus來擷取IO操作結果。

     線程退出代碼。

CompletionPortModel* pThis = (CompletionPortModel*)Param;

DWORD dwNumberBytes;

PPER_HANDLE_CONTEXT lpHandleContext = NULL;

LPWSAOVERLAPPED lpOverlapped = NULL;

int nResult;

BOOL bSuccess;

while (TRUE)

bSuccess = GetQueuedCompletionStatus(

           pThis->m_hCOP,

           &dwNumberBytes,

           (PULONG_PTR )&lpHandleContext,

           &lpOverlapped,

           INFINITE

           );

   cout << "GetQueuedCompletionStatus() failed: " << GetLastError() << endl;

   continue;

if (NULL == lpHandleContext)

   //

   //PostQueuedCompletionStatus發過來一個空的單句柄資料,表示線程要退出了。

   return 0;

PPER_IO_CONTEXT lpPerIoContext = (PPER_IO_CONTEXT)lpOverlapped;

cout << "recv buffer data: " << lpPerIoContext->szBuffer << endl;

if(IoAccept != lpPerIoContext->IoOperation)

   if((!bSuccess) || (bSuccess && (0 == dwNumberBytes)))

    closesocket(lpPerIoContext->sClient);

    lpPerIoContext->pNext = NULL;

    pThis->InsertToLookaside(lpPerIoContext, NULL);

    lpHandleContext->pNext = NULL;

    pThis->InsertToLookaside(NULL, lpHandleContext);

    continue;

   }

HANDLE hResult;

PPER_HANDLE_CONTEXT lpNewperHandleContext;

switch(lpPerIoContext->IoOperation)

case IoAccept :

   if (dwNumberBytes)

    //

    //第一次連接配接成功并且收到了資料,将這個結點從連結清單中解除

    EnterCriticalSection(&pThis->m_ListCriSection);

    pThis->ReleaseNode(lpPerIoContext);

    LeaveCriticalSection(&pThis->m_ListCriSection);

   nResult = setsockopt(

        lpPerIoContext->sClient,

        SOL_SOCKET,

        SO_UPDATE_ACCEPT_CONTEXT,

        (char *)&pThis->m_ListenSocket,

        sizeof(pThis->m_ListenSocket)

        );

   if(SOCKET_ERROR == nResult)

    cout << "SO_UPDATE_ACCEPT_CONTEXT failed to update accept socket" << endl;

   lpNewperHandleContext = pThis->GetHandleFromLookaside();

   if (NULL == lpNewperHandleContext)

    lpNewperHandleContext = (PPER_HANDLE_CONTEXT)HeapAlloc(

     GetProcessHeap(),

     HEAP_ZERO_MEMORY,

     sizeof(PER_HANDLE_CONTEXT)

     );

    if (NULL == lpNewperHandleContext)

     cout << "HeapAlloc() for lpNewperHandlecontext failed" << endl;

     closesocket(lpPerIoContext->sClient);

     lpPerIoContext->pNext = NULL;

     pThis->InsertToLookaside(lpPerIoContext, NULL);

     continue;

    }   

   lpNewperHandleContext->IoSocket = lpPerIoContext->sClient;

   lpNewperHandleContext->pNext = NULL;

   //将建立立的套接字關聯到完成端口

   hResult = CreateIoCompletionPort(

           (HANDLE)lpPerIoContext->sClient,/

           (DWORD_PTR)lpNewperHandleContext,

   if (NULL == hResult)

    cout << "CreateIoCompletionPort() failed: " << GetLastError();

    lpNewperHandleContext->pNext = NULL;

    pThis->InsertToLookaside(NULL, lpNewperHandleContext);

    //分析處理資料。

    pThis->HandleData(lpPerIoContext, IO_READ_COMPLETION);

    bSuccess = pThis->DataAction(lpPerIoContext, lpNewperHandleContext);

    if (FALSE == bSuccess)

   //如果連接配接成功但是沒有收到資料

   else

    pThis->HandleData(lpPerIoContext, IO_ACCEPT_COMPLETION);

   break;//end of case IoAccept

case IoRead:

   pThis->HandleData(lpPerIoContext, IO_READ_COMPLETION);

   bSuccess = pThis->DataAction(lpPerIoContext, lpNewperHandleContext);

   if (FALSE == bSuccess)

   break;//end of case IoRead

case IoWrite:

   pThis->HandleData(lpPerIoContext, IO_WRITE_COMPLETION);

   break;

default:

return 0;

}//end of CompletionRoutine()

BOOL CompletionPortModel::PostAcceptEx()

Fucntion Description:

連續發出10個AcceptEx調用。

函數調用成功傳回TRUE,失敗傳回FALSE。

while (m_lAcceptExCounter < 10)

   SOCKET AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, /

   if (INVALID_SOCKET == AcceptSocket)

    cout << "WSASocket failed " << endl;

    return FALSE;

   PPER_IO_CONTEXT lpAcceptExIoContext = GetIoFromLookaside();

   if (NULL == lpAcceptExIoContext)

    lpAcceptExIoContext = (PPER_IO_CONTEXT)HeapAlloc(/

     GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PER_IO_CONTEXT));

    if (NULL == lpAcceptExIoContext)

     cout << "HeapAlloc() failed " << endl;

     closesocket(AcceptSocket);

     return FALSE;

   ZeroMemory(&(lpAcceptExIoContext->ol), sizeof(lpAcceptExIoContext->ol));

   lpAcceptExIoContext->sClient = AcceptSocket;

   lpAcceptExIoContext->IoOperation = IoAccept;

   lpAcceptExIoContext->pNext = NULL;

   ZeroMemory(lpAcceptExIoContext->szBuffer, BUFFER_SIZE);

   lpAcceptExIoContext->wsaBuffer.buf = lpAcceptExIoContext->szBuffer;

   lpAcceptExIoContext->wsaBuffer.len = BUFFER_SIZE;

   lpAcceptExIoContext->unId = lpAcceptExIoContext->sClient;

   DWORD dwBytes;

   BOOL bSuccess = lpAcceptEx(

     m_ListenSocket,

     lpAcceptExIoContext->sClient,

     lpAcceptExIoContext->szBuffer,

     lpAcceptExIoContext->wsaBuffer.len - ((sizeof(SOCKADDR_IN) + 16) * 2),

     sizeof(SOCKADDR_IN) + 16,

     &dwBytes,

     &(lpAcceptExIoContext->ol));

    int nResult = WSAGetLastError();

    if (nResult != ERROR_IO_PENDING)

     cout << "AcceptEx() failed :" << nResult << endl;

     HeapFree(GetProcessHeap(), 0 , lpAcceptExIoContext);

    InsertNode(lpAcceptExIoContext, NULL);

    InterlockedExchangeAdd(&m_lAcceptExCounter, 1);

InterlockedExchangeAdd(&m_lAcceptExCounter, -10);

}//end of PostAccetExRoutine()

void CompletionPortModel::InsertNode(PPER_IO_CONTEXT pNode, PPER_HANDLE_CONTEXT pHandleNode)

根據參數類型将傳遞進來結點插入到相應的連結清單頭。

    pNode       - 要插傳入連結表中的結點

pHandleNode - 要插傳入連結表中的結點

無.

if (NULL != pNode)

EnterCriticalSection(&m_ListCriSection);

pNode->pNext = m_lpConnectionListHead->pNext;

m_lpConnectionListHead->pNext = pNode;

LeaveCriticalSection(&m_ListCriSection);

}//end of InsertNode