天天看点

一个对Winsock 完成端口模型封装的类

2009-03-28 14:02

源代码说明:

在WINDOWS下进行网络服务端程序开发,毫无疑问,Winsock 完成端口模型是最高效的。Winsock的完成端口模型借助Widnows的重叠IO和完成端口来实现,完成端口模型懂了之后是比较简单的,但是要想掌握 Winsock完成端口模型,需要对WINDOWS下的线程、线程同步,Winsock API以及WINDOWS IO机制有一定的了解。如果不了解,推荐几本书:《Inside Windows 2000,《WINDOWS核心编程》,《WIN32多线程程序设计》、《WINDOWS网络编程技术》。在去年,我在C语言下用完成端口模型写了一个 WEBSERVER,前些天,我决定用C++重写这个WEBSERVER,给这个WEBSERVER增加了一些功能,并改进完成端口操作方法,比如采用 AcceptEx来代替accept和使用LOOKASIDE LIST来管理内存,使得WEBSERVER的性能有了比较大的提高。

在重写的开始,我决定把完成端口模型封装成一个比较通用的C++类,针对各种网络服务端程序的开发,只要简单地继承这个类,改写其中两个虚拟函数 就能满足各种需要。到昨天为止,WEBSERVER重写完毕,我就写了这篇文章对完成端口模型做一个总结,并介绍一下我的这个类。

DEMO就是一个ECHOSERVER,记得使用Release模式编译。

================================================================================

/*++

Copyright (c) 2004

模块名:

     iomodel.h

模块描述:

     Winsock 完成端口类头文件

作者:

开发环境:

     Visual C++ 6.0, Windows 2000.

修订记录:

     创建于: 2004.1.16

最后修改日期:

      2004.1.23

--*/

#ifndef      _IOMODEL_H

#define      _IOMODEL_H

//

//Head files

#include <winsock2.h>

#include <mswsock.h>

/////////////////////////////////////////////////////////////////////////////////////

#ifdef __cplusplus

extern "C" {

#endif

#define    BUFFER_SIZE           4096

#define    MAXTHREAD_COUNT       8

#define    PORT                  8080

#define    LOCALADDRESS          "172.29.90.96"

#define    IO_READ_COMPLETION    100

#define    IO_WRITE_COMPLETION   200

#define    IO_ACCEPT_COMPLETION 300

//自定义枚举数据类型,用来标识套接字IO动作类型

typedef enum _IO_OPERATION

{

        IoAccept, //AcceptEx/accept

IoRead,   //WSARecv/recv/ReadFile

IoWrite,   //WSASend/send/WriteFile

IoEnd

}IO_OPERATION, *PIO_OPERATION;

//自定义结构,即“完成键”(单句柄数据)

typedef struct _PER_HANDLE_CONTEXT

SOCKET                     IoSocket;

_PER_HANDLE_CONTEXT*       pNext;

}PER_HANDLE_CONTEXT, *PPER_HANDLE_CONTEXT;

//单IO数据,扩展的WSAOVERLAPPED

typedef struct _PER_IO_CONTEXT

WSAOVERLAPPED              ol;

char                       szBuffer[BUFFER_SIZE];

WSABUF                     wsaBuffer;

SOCKET                     sClient;

unsigned int               unId;

IO_OPERATION               IoOperation;

_PER_IO_CONTEXT*           pNext;

}PER_IO_CONTEXT, *PPER_IO_CONTEXT;

// global var

static GUID g_GUIDAcceptEx = WSAID_ACCEPTEX;

static GUID g_GUIDTransmitFile = WSAID_TRANSMITFILE;

DWORD __stdcall   CompletionRoutine(LPVOID);

//完成端口模型类

class CompletionPortModel

public:

CompletionPortModel();

~CompletionPortModel();

BOOL                Init();

BOOL                ThreadLoop();

BOOL                AllocEventMessage();

BOOL                PostAcceptEx();

virtual BOOL        HandleData(

         PPER_IO_CONTEXT lpPerIoContext,

            int nFlags

         );

virtual BOOL        DataAction(

            PPER_IO_CONTEXT lpPerIoContext,

            PPER_HANDLE_CONTEXT lpNewperHandletext

            );

void                InsertNode(PPER_IO_CONTEXT pNode, PPER_HANDLE_CONTEXT pHandleNode);

void                ReleaseNode(PPER_IO_CONTEXT pNode);

void                InsertToLookaside(PPER_IO_CONTEXT lpIoNode, PPER_HANDLE_CONTEXT lpHandleNode);

PPER_IO_CONTEXT     GetIoFromLookaside();

PPER_HANDLE_CONTEXT GetHandleFromLookaside();

HANDLE                        m_hCOP;

SOCKET                        m_ListenSocket;

CRITICAL_SECTION              m_ListCriSection;

CRITICAL_SECTION              m_HandleCriSection;

CRITICAL_SECTION              m_IoCriSection;

LPFN_TRANSMITFILE             lpTransmitFile;

volatile PPER_IO_CONTEXT      m_lpIoLookasideLists;

volatile PPER_HANDLE_CONTEXT m_lpHandleLOOKasideLists;

protected:

BOOL                InitWinsock();

BOOL                BindAndListenSocket();

BOOL                InitLinkListHead();

void                CloseThreadHandle();

void                GetAddressAndPort();

UINT                uPort;

char                szAddress[20];

HANDLE                        m_hThreadArray[MAXTHREAD_COUNT];

HANDLE                        m_hEvent;

volatile LONG                 m_lAcceptExCounter;

volatile PPER_IO_CONTEXT      m_lpConnectionListHead;

LPFN_ACCEPTEX                 lpAcceptEx;

private:

};

}

#endif //_IOMODEL_H

     iomodel.cpp

     Winsock 完成端口类实现文件

#include <iostream.h>

#include "iomodel.h"

CompletionPortModel::CompletionPortModel()

函数描述:

    构造函数,初始化线程句柄数组,初始化AcceptEx()调用的计数。初始化临界段代码变量。

Arguments:

     无。

Return Value:

for (int i=0; i< MAXTHREAD_COUNT; i++)

m_hThreadArray[i] = INVALID_HANDLE_VALUE;

m_lAcceptExCounter = 0;

InitializeCriticalSection(&m_ListCriSection);

InitializeCriticalSection(&m_HandleCriSection);

InitializeCriticalSection(&m_IoCriSection);

m_lpHandleLOOKasideLists = NULL;

m_lpIoLookasideLists = NULL;

#ifndef _DEBUG

GetAddressAndPort();

}//end of CompletionPortModel()

CompletionPortModel::~CompletionPortModel()

    析构函数,释放链表所有结点。

PPER_IO_CONTEXT lpIoNode;

while (m_lpConnectionListHead->pNext)

lpIoNode = m_lpConnectionListHead->pNext;

m_lpConnectionListHead->pNext = lpIoNode->pNext;

closesocket(lpIoNode->sClient);

HeapFree(GetProcessHeap(), 0, lpIoNode);

while(NULL != m_lpIoLookasideLists)

lpIoNode = m_lpIoLookasideLists;

m_lpIoLookasideLists = m_lpIoLookasideLists->pNext;

PPER_HANDLE_CONTEXT lpHandleNode;

while(NULL != m_lpHandleLOOKasideLists)

lpHandleNode = m_lpHandleLOOKasideLists;

m_lpHandleLOOKasideLists = m_lpHandleLOOKasideLists->pNext;

HeapFree(GetProcessHeap(), 0, lpHandleNode);

DeleteCriticalSection(&m_ListCriSection);

DeleteCriticalSection(&m_HandleCriSection);

DeleteCriticalSection(&m_IoCriSection);

}//end of ~CompletionPortModel()

BOOL CompletionPortModel::Init()

    初始化,创建完成端口、创建完成端口线程,并调用类成员函数InitWinsock初始化Winsock、

建立一个监听套接字m_ListenSocket,并将此套接字同完成端口关联起来,获取AcceptEx指针。

     函数调用成功返回TRUE,失败返回FALSE。

    BOOL bSuccess = InitLinkListHead();

if (FALSE == bSuccess)

return FALSE;

m_hCOP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);

if (NULL == m_hCOP)

cout << "CreateIoCompletionPort() failed: " << GetLastError() << endl;

//取得系统中CPU的数目,创建和CPU数目相等的线程,如果事先估计到完成端口处理线程会堵塞,

//可以考虑创建 SysInfo.dwNumberOfProcessors*2个线程。一般在单处理器上创建和CPU数目相等

//的线程就可以了

SYSTEM_INFO SysInfo;

GetSystemInfo(&SysInfo);

if (MAXTHREAD_COUNT < SysInfo.dwNumberOfProcessors)

SysInfo.dwNumberOfProcessors = MAXTHREAD_COUNT;

for (int i=0; i<(int)SysInfo.dwNumberOfProcessors; i++)

m_hThreadArray[i] = CreateThread(NULL, 0, CompletionRoutine, (LPVOID)this, 0, NULL);

if (NULL == m_hThreadArray[i])

   while (i>0)

   {

    CloseHandle(m_hThreadArray[i-1]);

    m_hThreadArray[i-1] = INVALID_HANDLE_VALUE;

    i--;

   }//end of while

   cout << "CreateThread() failed: " << GetLastError() << endl;

   CloseHandle(m_hCOP);

   HeapFree(GetProcessHeap(), 0, m_lpConnectionListHead);

   return FALSE;

}//end of for

//调用InitWinsock函数初始化Winsock、建立一个监听套接字m_ListenSocket,

//并将此套接字同完成端口关联起来,获取AcceptEx指针。

bSuccess = InitWinsock();

if (!bSuccess)

//给完成端口线程发送消息,指示线程退出。

PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL);

CloseThreadHandle();

CloseHandle(m_hCOP);

HeapFree(GetProcessHeap(), 0, m_lpConnectionListHead);

//调用BindAndListenSocket()绑定套接字并将套接字置于监听状态

bSuccess = BindAndListenSocket();

return TRUE;

}//end of Init()

void CompletionPortModel::CloseThreadHandle()

    对每一个创建的线程调用CloseHandle()。

if (INVALID_HANDLE_VALUE != m_hThreadArray[i])

   CloseHandle(m_hThreadArray[i]);

   m_hThreadArray[i] = INVALID_HANDLE_VALUE;

return;

}//end of CloseThreadHandle()

BOOL CompletionPortModel::InitWinsock()

    初始化Winsock,创建一个监听套接字,获取AcceptEx函数指针,为监听套接字分配一个单句柄

数据,并将监听套接字与完成端口hCOP关联。

WSADATA wsd;

int nResult = WSAStartup(MAKEWORD(2,2), &wsd);

if (0 != nResult)

cout << "WSAStartup() failed" << endl;

m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP,

          NULL, 0, WSA_FLAG_OVERLAPPED);

    if (INVALID_SOCKET == m_ListenSocket)

    {

cout << "WSASocket() failed: " << WSAGetLastError() << endl;

        WSACleanup();

        return FALSE;

    }

DWORD dwResult;

//获取微软SOCKET扩展函数指针

nResult = WSAIoctl(

      m_ListenSocket,

      SIO_GET_EXTENSION_FUNCTION_POINTER,

      &g_GUIDAcceptEx,

      sizeof(g_GUIDAcceptEx),

      &lpAcceptEx,

      sizeof(lpAcceptEx),

        &dwResult,

      NULL,

      NULL

      );

if (SOCKET_ERROR == nResult)

cout << "Get AcceptEx failed: " << WSAGetLastError() << endl;

closesocket(m_ListenSocket);

      &g_GUIDTransmitFile,

      sizeof(g_GUIDTransmitFile),

      &lpTransmitFile,

      sizeof(lpTransmitFile),

      &dwResult,

cout << "Get TransmitFile failed: " << WSAGetLastError() << endl;

//为监听套接字分配一个单句柄数据

PPER_HANDLE_CONTEXT lpListenHandleContext = (PPER_HANDLE_CONTEXT)HeapAlloc(

                    GetProcessHeap(), HEAP_ZERO_MEMORY,

                    sizeof(PER_HANDLE_CONTEXT)

                      );

if (NULL == lpListenHandleContext)

cout << "HeapAlloc() failed " << endl;

lpListenHandleContext->IoSocket = m_ListenSocket;

lpListenHandleContext->pNext = NULL;

//将监听套接字m_ListenSocket和已经建立的完成端口关联起来

HANDLE hrc = CreateIoCompletionPort(

          (HANDLE)m_ListenSocket,

          m_hCOP,

          (ULONG_PTR)lpListenHandleContext,

          );

if (NULL == hrc)

HeapFree(GetProcessHeap(), 0, lpListenHandleContext);

WSACleanup();

cout << "CreateIoCompletionPort failed: " << GetLastError() << endl;

}//end of InitWinsock()

BOOL CompletionPortModel::BindAndListenSocket()

    private函数,供Init调用。

将监听套接字m_ListenSocket绑定到本地IP地址,并置于监听模式。

    SOCKADDR_IN InternetAddr;

    InternetAddr.sin_family = AF_INET;

#ifdef _DEBUG

    InternetAddr.sin_addr.s_addr = inet_addr(LOCALADDRESS);

    InternetAddr.sin_port = htons(PORT);  

#else

    InternetAddr.sin_addr.s_addr = inet_addr(szAddress);

    InternetAddr.sin_port = htons(uPort);   

    int nResult = bind(m_ListenSocket, (PSOCKADDR)&InternetAddr, sizeof(InternetAddr));

    if (SOCKET_ERROR == nResult)

cout << "bind() failed: " << WSAGetLastError() << endl;

nResult = listen(m_ListenSocket, 20);

cout << "listen() failed: " << WSAGetLastError() << endl;

}//end of BindAndListenSocket()

DWORD __stdcall CompletionRoutine(LPVOID Param)

    完成端口处理线程,循环调用GetQueuedCompletionStatus来获取IO操作结果。

     线程退出代码。

CompletionPortModel* pThis = (CompletionPortModel*)Param;

DWORD dwNumberBytes;

PPER_HANDLE_CONTEXT lpHandleContext = NULL;

LPWSAOVERLAPPED lpOverlapped = NULL;

int nResult;

BOOL bSuccess;

while (TRUE)

bSuccess = GetQueuedCompletionStatus(

           pThis->m_hCOP,

           &dwNumberBytes,

           (PULONG_PTR )&lpHandleContext,

           &lpOverlapped,

           INFINITE

           );

   cout << "GetQueuedCompletionStatus() failed: " << GetLastError() << endl;

   continue;

if (NULL == lpHandleContext)

   //

   //PostQueuedCompletionStatus发过来一个空的单句柄数据,表示线程要退出了。

   return 0;

PPER_IO_CONTEXT lpPerIoContext = (PPER_IO_CONTEXT)lpOverlapped;

cout << "recv buffer data: " << lpPerIoContext->szBuffer << endl;

if(IoAccept != lpPerIoContext->IoOperation)

   if((!bSuccess) || (bSuccess && (0 == dwNumberBytes)))

    closesocket(lpPerIoContext->sClient);

    lpPerIoContext->pNext = NULL;

    pThis->InsertToLookaside(lpPerIoContext, NULL);

    lpHandleContext->pNext = NULL;

    pThis->InsertToLookaside(NULL, lpHandleContext);

    continue;

   }

HANDLE hResult;

PPER_HANDLE_CONTEXT lpNewperHandleContext;

switch(lpPerIoContext->IoOperation)

case IoAccept :

   if (dwNumberBytes)

    //

    //第一次连接成功并且收到了数据,将这个结点从链表中解除

    EnterCriticalSection(&pThis->m_ListCriSection);

    pThis->ReleaseNode(lpPerIoContext);

    LeaveCriticalSection(&pThis->m_ListCriSection);

   nResult = setsockopt(

        lpPerIoContext->sClient,

        SOL_SOCKET,

        SO_UPDATE_ACCEPT_CONTEXT,

        (char *)&pThis->m_ListenSocket,

        sizeof(pThis->m_ListenSocket)

        );

   if(SOCKET_ERROR == nResult)

    cout << "SO_UPDATE_ACCEPT_CONTEXT failed to update accept socket" << endl;

   lpNewperHandleContext = pThis->GetHandleFromLookaside();

   if (NULL == lpNewperHandleContext)

    lpNewperHandleContext = (PPER_HANDLE_CONTEXT)HeapAlloc(

     GetProcessHeap(),

     HEAP_ZERO_MEMORY,

     sizeof(PER_HANDLE_CONTEXT)

     );

    if (NULL == lpNewperHandleContext)

     cout << "HeapAlloc() for lpNewperHandlecontext failed" << endl;

     closesocket(lpPerIoContext->sClient);

     lpPerIoContext->pNext = NULL;

     pThis->InsertToLookaside(lpPerIoContext, NULL);

     continue;

    }   

   lpNewperHandleContext->IoSocket = lpPerIoContext->sClient;

   lpNewperHandleContext->pNext = NULL;

   //将新建立的套接字关联到完成端口

   hResult = CreateIoCompletionPort(

           (HANDLE)lpPerIoContext->sClient,/

           (DWORD_PTR)lpNewperHandleContext,

   if (NULL == hResult)

    cout << "CreateIoCompletionPort() failed: " << GetLastError();

    lpNewperHandleContext->pNext = NULL;

    pThis->InsertToLookaside(NULL, lpNewperHandleContext);

    //分析处理数据。

    pThis->HandleData(lpPerIoContext, IO_READ_COMPLETION);

    bSuccess = pThis->DataAction(lpPerIoContext, lpNewperHandleContext);

    if (FALSE == bSuccess)

   //如果连接成功但是没有收到数据

   else

    pThis->HandleData(lpPerIoContext, IO_ACCEPT_COMPLETION);

   break;//end of case IoAccept

case IoRead:

   pThis->HandleData(lpPerIoContext, IO_READ_COMPLETION);

   bSuccess = pThis->DataAction(lpPerIoContext, lpNewperHandleContext);

   if (FALSE == bSuccess)

   break;//end of case IoRead

case IoWrite:

   pThis->HandleData(lpPerIoContext, IO_WRITE_COMPLETION);

   break;

default:

return 0;

}//end of CompletionRoutine()

BOOL CompletionPortModel::PostAcceptEx()

Fucntion Description:

连续发出10个AcceptEx调用。

函数调用成功返回TRUE,失败返回FALSE。

while (m_lAcceptExCounter < 10)

   SOCKET AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, /

   if (INVALID_SOCKET == AcceptSocket)

    cout << "WSASocket failed " << endl;

    return FALSE;

   PPER_IO_CONTEXT lpAcceptExIoContext = GetIoFromLookaside();

   if (NULL == lpAcceptExIoContext)

    lpAcceptExIoContext = (PPER_IO_CONTEXT)HeapAlloc(/

     GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PER_IO_CONTEXT));

    if (NULL == lpAcceptExIoContext)

     cout << "HeapAlloc() failed " << endl;

     closesocket(AcceptSocket);

     return FALSE;

   ZeroMemory(&(lpAcceptExIoContext->ol), sizeof(lpAcceptExIoContext->ol));

   lpAcceptExIoContext->sClient = AcceptSocket;

   lpAcceptExIoContext->IoOperation = IoAccept;

   lpAcceptExIoContext->pNext = NULL;

   ZeroMemory(lpAcceptExIoContext->szBuffer, BUFFER_SIZE);

   lpAcceptExIoContext->wsaBuffer.buf = lpAcceptExIoContext->szBuffer;

   lpAcceptExIoContext->wsaBuffer.len = BUFFER_SIZE;

   lpAcceptExIoContext->unId = lpAcceptExIoContext->sClient;

   DWORD dwBytes;

   BOOL bSuccess = lpAcceptEx(

     m_ListenSocket,

     lpAcceptExIoContext->sClient,

     lpAcceptExIoContext->szBuffer,

     lpAcceptExIoContext->wsaBuffer.len - ((sizeof(SOCKADDR_IN) + 16) * 2),

     sizeof(SOCKADDR_IN) + 16,

     &dwBytes,

     &(lpAcceptExIoContext->ol));

    int nResult = WSAGetLastError();

    if (nResult != ERROR_IO_PENDING)

     cout << "AcceptEx() failed :" << nResult << endl;

     HeapFree(GetProcessHeap(), 0 , lpAcceptExIoContext);

    InsertNode(lpAcceptExIoContext, NULL);

    InterlockedExchangeAdd(&m_lAcceptExCounter, 1);

InterlockedExchangeAdd(&m_lAcceptExCounter, -10);

}//end of PostAccetExRoutine()

void CompletionPortModel::InsertNode(PPER_IO_CONTEXT pNode, PPER_HANDLE_CONTEXT pHandleNode)

根据参数类型将传递进来结点插入到相应的链表头。

    pNode       - 要插入链表中的结点

pHandleNode - 要插入链表中的结点

无.

if (NULL != pNode)

EnterCriticalSection(&m_ListCriSection);

pNode->pNext = m_lpConnectionListHead->pNext;

m_lpConnectionListHead->pNext = pNode;

LeaveCriticalSection(&m_ListCriSection);

}//end of InsertNode