2009-03-28 14:02
源代碼說明:
在WINDOWS下進行網絡服務端程式開發,毫無疑問,Winsock 完成端口模型是最高效的。Winsock的完成端口模型借助Widnows的重疊IO和完成端口來實作,完成端口模型懂了之後是比較簡單的,但是要想掌握 Winsock完成端口模型,需要對WINDOWS下的線程、線程同步,Winsock API以及WINDOWS IO機制有一定的了解。如果不了解,推薦幾本書:《Inside Windows 2000,《WINDOWS核心程式設計》,《WIN32多線程程式設計》、《WINDOWS網絡程式設計技術》。在去年,我在C語言下用完成端口模型寫了一個 WEBSERVER,前些天,我決定用C++重寫這個WEBSERVER,給這個WEBSERVER增加了一些功能,并改進完成端口操作方法,比如采用 AcceptEx來代替accept和使用LOOKASIDE LIST來管理記憶體,使得WEBSERVER的性能有了比較大的提高。
在重寫的開始,我決定把完成端口模型封裝成一個比較通用的C++類,針對各種網絡服務端程式的開發,隻要簡單地繼承這個類,改寫其中兩個虛拟函數 就能滿足各種需要。到昨天為止,WEBSERVER重寫完畢,我就寫了這篇文章對完成端口模型做一個總結,并介紹一下我的這個類。
DEMO就是一個ECHOSERVER,記得使用Release模式編譯。
================================================================================
/*++
Copyright (c) 2004
子產品名:
iomodel.h
子產品描述:
Winsock 完成端口類頭檔案
作者:
開發環境:
Visual C++ 6.0, Windows 2000.
修訂記錄:
建立于: 2004.1.16
最後修改日期:
2004.1.23
--*/
#ifndef _IOMODEL_H
#define _IOMODEL_H
//
//Head files
#include <winsock2.h>
#include <mswsock.h>
/////////////////////////////////////////////////////////////////////////////////////
#ifdef __cplusplus
extern "C" {
#endif
#define BUFFER_SIZE 4096
#define MAXTHREAD_COUNT 8
#define PORT 8080
#define LOCALADDRESS "172.29.90.96"
#define IO_READ_COMPLETION 100
#define IO_WRITE_COMPLETION 200
#define IO_ACCEPT_COMPLETION 300
//自定義枚舉資料類型,用來辨別套接字IO動作類型
typedef enum _IO_OPERATION
{
IoAccept, //AcceptEx/accept
IoRead, //WSARecv/recv/ReadFile
IoWrite, //WSASend/send/WriteFile
IoEnd
}IO_OPERATION, *PIO_OPERATION;
//自定義結構,即“完成鍵”(單句柄資料)
typedef struct _PER_HANDLE_CONTEXT
SOCKET IoSocket;
_PER_HANDLE_CONTEXT* pNext;
}PER_HANDLE_CONTEXT, *PPER_HANDLE_CONTEXT;
//單IO資料,擴充的WSAOVERLAPPED
typedef struct _PER_IO_CONTEXT
WSAOVERLAPPED ol;
char szBuffer[BUFFER_SIZE];
WSABUF wsaBuffer;
SOCKET sClient;
unsigned int unId;
IO_OPERATION IoOperation;
_PER_IO_CONTEXT* pNext;
}PER_IO_CONTEXT, *PPER_IO_CONTEXT;
// global var
static GUID g_GUIDAcceptEx = WSAID_ACCEPTEX;
static GUID g_GUIDTransmitFile = WSAID_TRANSMITFILE;
DWORD __stdcall CompletionRoutine(LPVOID);
//完成端口模型類
class CompletionPortModel
public:
CompletionPortModel();
~CompletionPortModel();
BOOL Init();
BOOL ThreadLoop();
BOOL AllocEventMessage();
BOOL PostAcceptEx();
virtual BOOL HandleData(
PPER_IO_CONTEXT lpPerIoContext,
int nFlags
);
virtual BOOL DataAction(
PPER_IO_CONTEXT lpPerIoContext,
PPER_HANDLE_CONTEXT lpNewperHandletext
);
void InsertNode(PPER_IO_CONTEXT pNode, PPER_HANDLE_CONTEXT pHandleNode);
void ReleaseNode(PPER_IO_CONTEXT pNode);
void InsertToLookaside(PPER_IO_CONTEXT lpIoNode, PPER_HANDLE_CONTEXT lpHandleNode);
PPER_IO_CONTEXT GetIoFromLookaside();
PPER_HANDLE_CONTEXT GetHandleFromLookaside();
HANDLE m_hCOP;
SOCKET m_ListenSocket;
CRITICAL_SECTION m_ListCriSection;
CRITICAL_SECTION m_HandleCriSection;
CRITICAL_SECTION m_IoCriSection;
LPFN_TRANSMITFILE lpTransmitFile;
volatile PPER_IO_CONTEXT m_lpIoLookasideLists;
volatile PPER_HANDLE_CONTEXT m_lpHandleLOOKasideLists;
protected:
BOOL InitWinsock();
BOOL BindAndListenSocket();
BOOL InitLinkListHead();
void CloseThreadHandle();
void GetAddressAndPort();
UINT uPort;
char szAddress[20];
HANDLE m_hThreadArray[MAXTHREAD_COUNT];
HANDLE m_hEvent;
volatile LONG m_lAcceptExCounter;
volatile PPER_IO_CONTEXT m_lpConnectionListHead;
LPFN_ACCEPTEX lpAcceptEx;
private:
};
}
#endif //_IOMODEL_H
iomodel.cpp
Winsock 完成端口類實作檔案
#include <iostream.h>
#include "iomodel.h"
CompletionPortModel::CompletionPortModel()
函數描述:
構造函數,初始化線程句柄數組,初始化AcceptEx()調用的計數。初始化臨界段代碼變量。
Arguments:
無。
Return Value:
for (int i=0; i< MAXTHREAD_COUNT; i++)
m_hThreadArray[i] = INVALID_HANDLE_VALUE;
m_lAcceptExCounter = 0;
InitializeCriticalSection(&m_ListCriSection);
InitializeCriticalSection(&m_HandleCriSection);
InitializeCriticalSection(&m_IoCriSection);
m_lpHandleLOOKasideLists = NULL;
m_lpIoLookasideLists = NULL;
#ifndef _DEBUG
GetAddressAndPort();
}//end of CompletionPortModel()
CompletionPortModel::~CompletionPortModel()
析構函數,釋放連結清單所有結點。
PPER_IO_CONTEXT lpIoNode;
while (m_lpConnectionListHead->pNext)
lpIoNode = m_lpConnectionListHead->pNext;
m_lpConnectionListHead->pNext = lpIoNode->pNext;
closesocket(lpIoNode->sClient);
HeapFree(GetProcessHeap(), 0, lpIoNode);
while(NULL != m_lpIoLookasideLists)
lpIoNode = m_lpIoLookasideLists;
m_lpIoLookasideLists = m_lpIoLookasideLists->pNext;
PPER_HANDLE_CONTEXT lpHandleNode;
while(NULL != m_lpHandleLOOKasideLists)
lpHandleNode = m_lpHandleLOOKasideLists;
m_lpHandleLOOKasideLists = m_lpHandleLOOKasideLists->pNext;
HeapFree(GetProcessHeap(), 0, lpHandleNode);
DeleteCriticalSection(&m_ListCriSection);
DeleteCriticalSection(&m_HandleCriSection);
DeleteCriticalSection(&m_IoCriSection);
}//end of ~CompletionPortModel()
BOOL CompletionPortModel::Init()
初始化,建立完成端口、建立完成端口線程,并調用類成員函數InitWinsock初始化Winsock、
建立一個監聽套接字m_ListenSocket,并将此套接字同完成端口關聯起來,擷取AcceptEx指針。
函數調用成功傳回TRUE,失敗傳回FALSE。
BOOL bSuccess = InitLinkListHead();
if (FALSE == bSuccess)
return FALSE;
m_hCOP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
if (NULL == m_hCOP)
cout << "CreateIoCompletionPort() failed: " << GetLastError() << endl;
//取得系統中CPU的數目,建立和CPU數目相等的線程,如果事先估計到完成端口處理線程會堵塞,
//可以考慮建立 SysInfo.dwNumberOfProcessors*2個線程。一般在單處理器上建立和CPU數目相等
//的線程就可以了
SYSTEM_INFO SysInfo;
GetSystemInfo(&SysInfo);
if (MAXTHREAD_COUNT < SysInfo.dwNumberOfProcessors)
SysInfo.dwNumberOfProcessors = MAXTHREAD_COUNT;
for (int i=0; i<(int)SysInfo.dwNumberOfProcessors; i++)
m_hThreadArray[i] = CreateThread(NULL, 0, CompletionRoutine, (LPVOID)this, 0, NULL);
if (NULL == m_hThreadArray[i])
while (i>0)
{
CloseHandle(m_hThreadArray[i-1]);
m_hThreadArray[i-1] = INVALID_HANDLE_VALUE;
i--;
}//end of while
cout << "CreateThread() failed: " << GetLastError() << endl;
CloseHandle(m_hCOP);
HeapFree(GetProcessHeap(), 0, m_lpConnectionListHead);
return FALSE;
}//end of for
//調用InitWinsock函數初始化Winsock、建立一個監聽套接字m_ListenSocket,
//并将此套接字同完成端口關聯起來,擷取AcceptEx指針。
bSuccess = InitWinsock();
if (!bSuccess)
//給完成端口線程發送消息,訓示線程退出。
PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL);
CloseThreadHandle();
CloseHandle(m_hCOP);
HeapFree(GetProcessHeap(), 0, m_lpConnectionListHead);
//調用BindAndListenSocket()綁定套接字并将套接字置于監聽狀态
bSuccess = BindAndListenSocket();
return TRUE;
}//end of Init()
void CompletionPortModel::CloseThreadHandle()
對每一個建立的線程調用CloseHandle()。
if (INVALID_HANDLE_VALUE != m_hThreadArray[i])
CloseHandle(m_hThreadArray[i]);
m_hThreadArray[i] = INVALID_HANDLE_VALUE;
return;
}//end of CloseThreadHandle()
BOOL CompletionPortModel::InitWinsock()
初始化Winsock,建立一個監聽套接字,擷取AcceptEx函數指針,為監聽套接字配置設定一個單句柄
資料,并将監聽套接字與完成端口hCOP關聯。
WSADATA wsd;
int nResult = WSAStartup(MAKEWORD(2,2), &wsd);
if (0 != nResult)
cout << "WSAStartup() failed" << endl;
m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP,
NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == m_ListenSocket)
{
cout << "WSASocket() failed: " << WSAGetLastError() << endl;
WSACleanup();
return FALSE;
}
DWORD dwResult;
//擷取微軟SOCKET擴充函數指針
nResult = WSAIoctl(
m_ListenSocket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&g_GUIDAcceptEx,
sizeof(g_GUIDAcceptEx),
&lpAcceptEx,
sizeof(lpAcceptEx),
&dwResult,
NULL,
NULL
);
if (SOCKET_ERROR == nResult)
cout << "Get AcceptEx failed: " << WSAGetLastError() << endl;
closesocket(m_ListenSocket);
&g_GUIDTransmitFile,
sizeof(g_GUIDTransmitFile),
&lpTransmitFile,
sizeof(lpTransmitFile),
&dwResult,
cout << "Get TransmitFile failed: " << WSAGetLastError() << endl;
//為監聽套接字配置設定一個單句柄資料
PPER_HANDLE_CONTEXT lpListenHandleContext = (PPER_HANDLE_CONTEXT)HeapAlloc(
GetProcessHeap(), HEAP_ZERO_MEMORY,
sizeof(PER_HANDLE_CONTEXT)
);
if (NULL == lpListenHandleContext)
cout << "HeapAlloc() failed " << endl;
lpListenHandleContext->IoSocket = m_ListenSocket;
lpListenHandleContext->pNext = NULL;
//将監聽套接字m_ListenSocket和已經建立的完成端口關聯起來
HANDLE hrc = CreateIoCompletionPort(
(HANDLE)m_ListenSocket,
m_hCOP,
(ULONG_PTR)lpListenHandleContext,
);
if (NULL == hrc)
HeapFree(GetProcessHeap(), 0, lpListenHandleContext);
WSACleanup();
cout << "CreateIoCompletionPort failed: " << GetLastError() << endl;
}//end of InitWinsock()
BOOL CompletionPortModel::BindAndListenSocket()
private函數,供Init調用。
将監聽套接字m_ListenSocket綁定到本地IP位址,并置于監聽模式。
SOCKADDR_IN InternetAddr;
InternetAddr.sin_family = AF_INET;
#ifdef _DEBUG
InternetAddr.sin_addr.s_addr = inet_addr(LOCALADDRESS);
InternetAddr.sin_port = htons(PORT);
#else
InternetAddr.sin_addr.s_addr = inet_addr(szAddress);
InternetAddr.sin_port = htons(uPort);
int nResult = bind(m_ListenSocket, (PSOCKADDR)&InternetAddr, sizeof(InternetAddr));
if (SOCKET_ERROR == nResult)
cout << "bind() failed: " << WSAGetLastError() << endl;
nResult = listen(m_ListenSocket, 20);
cout << "listen() failed: " << WSAGetLastError() << endl;
}//end of BindAndListenSocket()
DWORD __stdcall CompletionRoutine(LPVOID Param)
完成端口處理線程,循環調用GetQueuedCompletionStatus來擷取IO操作結果。
線程退出代碼。
CompletionPortModel* pThis = (CompletionPortModel*)Param;
DWORD dwNumberBytes;
PPER_HANDLE_CONTEXT lpHandleContext = NULL;
LPWSAOVERLAPPED lpOverlapped = NULL;
int nResult;
BOOL bSuccess;
while (TRUE)
bSuccess = GetQueuedCompletionStatus(
pThis->m_hCOP,
&dwNumberBytes,
(PULONG_PTR )&lpHandleContext,
&lpOverlapped,
INFINITE
);
cout << "GetQueuedCompletionStatus() failed: " << GetLastError() << endl;
continue;
if (NULL == lpHandleContext)
//
//PostQueuedCompletionStatus發過來一個空的單句柄資料,表示線程要退出了。
return 0;
PPER_IO_CONTEXT lpPerIoContext = (PPER_IO_CONTEXT)lpOverlapped;
cout << "recv buffer data: " << lpPerIoContext->szBuffer << endl;
if(IoAccept != lpPerIoContext->IoOperation)
if((!bSuccess) || (bSuccess && (0 == dwNumberBytes)))
closesocket(lpPerIoContext->sClient);
lpPerIoContext->pNext = NULL;
pThis->InsertToLookaside(lpPerIoContext, NULL);
lpHandleContext->pNext = NULL;
pThis->InsertToLookaside(NULL, lpHandleContext);
continue;
}
HANDLE hResult;
PPER_HANDLE_CONTEXT lpNewperHandleContext;
switch(lpPerIoContext->IoOperation)
case IoAccept :
if (dwNumberBytes)
//
//第一次連接配接成功并且收到了資料,将這個結點從連結清單中解除
EnterCriticalSection(&pThis->m_ListCriSection);
pThis->ReleaseNode(lpPerIoContext);
LeaveCriticalSection(&pThis->m_ListCriSection);
nResult = setsockopt(
lpPerIoContext->sClient,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char *)&pThis->m_ListenSocket,
sizeof(pThis->m_ListenSocket)
);
if(SOCKET_ERROR == nResult)
cout << "SO_UPDATE_ACCEPT_CONTEXT failed to update accept socket" << endl;
lpNewperHandleContext = pThis->GetHandleFromLookaside();
if (NULL == lpNewperHandleContext)
lpNewperHandleContext = (PPER_HANDLE_CONTEXT)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
sizeof(PER_HANDLE_CONTEXT)
);
if (NULL == lpNewperHandleContext)
cout << "HeapAlloc() for lpNewperHandlecontext failed" << endl;
closesocket(lpPerIoContext->sClient);
lpPerIoContext->pNext = NULL;
pThis->InsertToLookaside(lpPerIoContext, NULL);
continue;
}
lpNewperHandleContext->IoSocket = lpPerIoContext->sClient;
lpNewperHandleContext->pNext = NULL;
//将建立立的套接字關聯到完成端口
hResult = CreateIoCompletionPort(
(HANDLE)lpPerIoContext->sClient,/
(DWORD_PTR)lpNewperHandleContext,
if (NULL == hResult)
cout << "CreateIoCompletionPort() failed: " << GetLastError();
lpNewperHandleContext->pNext = NULL;
pThis->InsertToLookaside(NULL, lpNewperHandleContext);
//分析處理資料。
pThis->HandleData(lpPerIoContext, IO_READ_COMPLETION);
bSuccess = pThis->DataAction(lpPerIoContext, lpNewperHandleContext);
if (FALSE == bSuccess)
//如果連接配接成功但是沒有收到資料
else
pThis->HandleData(lpPerIoContext, IO_ACCEPT_COMPLETION);
break;//end of case IoAccept
case IoRead:
pThis->HandleData(lpPerIoContext, IO_READ_COMPLETION);
bSuccess = pThis->DataAction(lpPerIoContext, lpNewperHandleContext);
if (FALSE == bSuccess)
break;//end of case IoRead
case IoWrite:
pThis->HandleData(lpPerIoContext, IO_WRITE_COMPLETION);
break;
default:
return 0;
}//end of CompletionRoutine()
BOOL CompletionPortModel::PostAcceptEx()
Fucntion Description:
連續發出10個AcceptEx調用。
函數調用成功傳回TRUE,失敗傳回FALSE。
while (m_lAcceptExCounter < 10)
SOCKET AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, /
if (INVALID_SOCKET == AcceptSocket)
cout << "WSASocket failed " << endl;
return FALSE;
PPER_IO_CONTEXT lpAcceptExIoContext = GetIoFromLookaside();
if (NULL == lpAcceptExIoContext)
lpAcceptExIoContext = (PPER_IO_CONTEXT)HeapAlloc(/
GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PER_IO_CONTEXT));
if (NULL == lpAcceptExIoContext)
cout << "HeapAlloc() failed " << endl;
closesocket(AcceptSocket);
return FALSE;
ZeroMemory(&(lpAcceptExIoContext->ol), sizeof(lpAcceptExIoContext->ol));
lpAcceptExIoContext->sClient = AcceptSocket;
lpAcceptExIoContext->IoOperation = IoAccept;
lpAcceptExIoContext->pNext = NULL;
ZeroMemory(lpAcceptExIoContext->szBuffer, BUFFER_SIZE);
lpAcceptExIoContext->wsaBuffer.buf = lpAcceptExIoContext->szBuffer;
lpAcceptExIoContext->wsaBuffer.len = BUFFER_SIZE;
lpAcceptExIoContext->unId = lpAcceptExIoContext->sClient;
DWORD dwBytes;
BOOL bSuccess = lpAcceptEx(
m_ListenSocket,
lpAcceptExIoContext->sClient,
lpAcceptExIoContext->szBuffer,
lpAcceptExIoContext->wsaBuffer.len - ((sizeof(SOCKADDR_IN) + 16) * 2),
sizeof(SOCKADDR_IN) + 16,
&dwBytes,
&(lpAcceptExIoContext->ol));
int nResult = WSAGetLastError();
if (nResult != ERROR_IO_PENDING)
cout << "AcceptEx() failed :" << nResult << endl;
HeapFree(GetProcessHeap(), 0 , lpAcceptExIoContext);
InsertNode(lpAcceptExIoContext, NULL);
InterlockedExchangeAdd(&m_lAcceptExCounter, 1);
InterlockedExchangeAdd(&m_lAcceptExCounter, -10);
}//end of PostAccetExRoutine()
void CompletionPortModel::InsertNode(PPER_IO_CONTEXT pNode, PPER_HANDLE_CONTEXT pHandleNode)
根據參數類型将傳遞進來結點插入到相應的連結清單頭。
pNode - 要插傳入連結表中的結點
pHandleNode - 要插傳入連結表中的結點
無.
if (NULL != pNode)
EnterCriticalSection(&m_ListCriSection);
pNode->pNext = m_lpConnectionListHead->pNext;
m_lpConnectionListHead->pNext = pNode;
LeaveCriticalSection(&m_ListCriSection);
}//end of InsertNode