編寫 Windows Socket TCP 用戶端其實并不困難,Windows 提供了6種 I/O 通信模型供大家選擇。但本座看過很多用戶端程式都把 Socket 通信和業務邏輯混在一起,剪不斷理還亂。每個程式都 Copy / Parse 類似的代碼再進行修改,實在有點情何以堪。是以本座利用一些閑暇時光寫了一個 基于 IOCP 的通用異步 Windows Socket TCP 高性能服務端元件 和一個通用異步 Windows Socket TCP 用戶端元件供各位看官參詳參詳,希望能激發下大家的靈感。本篇文章講述用戶端元件。閑話少說,我們現在步入正題。
- 最重要的第一個問題:如何才能達到通用?
答:很簡單。
1、限制元件的職能,說白了,通信元件的唯一職責就是接受和發送位元組流,絕對不能參與上層協定解析等工作。不在其位不謀其政就是這個意思。
2、與上層使用者解耦、互不依賴,元件與使用者通過接口方法進行互動,元件實作 ISocketClient 接口為上層提供操作方法;使用者通過 IClientSocketListener 接口把自己注冊為元件的 Listener,接收元件通知。是以,任何使用者隻要實作了 IClientSocketListener 接口都可以使用元件;另一方面,你甚至可以自己重新寫一個實作方式完全不同的元件實作給使用者調用,隻要該元件遵從 ISocketClient 接口。這也是 DIP 設計原則的展現(
若想了解更多關于設計原則的内容請猛擊這裡 ^_^)。
- 最重要的第二個問題:可用性如何,也就是說使用起來是否是否友善?
答:這個問題問得很好,可用性對所有通用元件都是至關重要的,如果太難用還不如自己重頭寫一個來得友善。是以,ISocketClient 和 IClientSocketListener 接口設計得盡量簡單易用(通俗來說就是“傻瓜化”),這兩個接口的主要方法均不超過 5 個。
- 最重要的第三個問題:元件的性能如何?
作為底層的通用元件,性能問題是必須考慮的,絕對不能成為系統的瓶頸。而另一方面,從實際出發,畢竟隻是一個用戶端元件,它的并發性要求遠沒有服務端那麼高。是以,元件在設計上充分考慮了性能、現實使用情景、可用性和實作複雜性等因素,確定滿足性能要求的同時又不會寫得太複雜。做出以下兩點設計決策:
-
- 在單獨線程中實作 Socket 通信互動。這樣可以避免與主線程或其他線程互相幹擾。
- I/O 模型選擇 WSAEventSelect。細說一下選擇這種 I/O 模型的原因:(各種 I/O 模型的性能比較可以參考:《Windows 網絡程式設計(中文第二版)》第 154 頁)
- 阻塞模型:(不解析,你懂的^_^)
- 非阻塞模型:(性能太低)
- WSAAsyncSelect: (兩個原因:a、性能太低;b、對于純 Console 程式還要背負 HWND 實在是傷不起呀!)
- 重疊 I/O:(有點複雜了)
- 完成端口:(何必呢?)
唉,理論的東西就先别吹那麼多了,直接上代碼吧,求你了 !!
OK!先看看 ISocketClient 和 IClientSocketListener 的接口定義:
// 元件操作類型
enum EnSocketOperation
{
SO_UNKNOWN = 0,
SO_ACCEPT = 1,
SO_CONNECT = 2,
SO_SEND = 3,
SO_RECEIVE = 4,
};
// 元件監聽器基接口
class ISocketListener
{
public:
enum EnHandleResult
{
HR_OK = 0,
HR_IGNORE = 1,
HR_ERROR = 2,
};
public:
// 已發出資料通知
virtual EnHandleResult OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;
// 已接收資料通知
virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;
// 關閉連接配接通知
virtual EnHandleResult OnClose(DWORD dwConnectionID) = 0;
// 通信錯誤通知
virtual EnHandleResult OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode) = 0;
public:
virtual ~ISocketListener() {}
};
// 服務端元件監聽器接口(暫時無視之)
class IServerSocketListener : public ISocketListener
{
public:
// 接收連接配接通知
virtual EnHandleResult OnAccept(DWORD dwConnectionID) = 0;
// 服務關閉通知
virtual EnHandleResult OnServerShutdown() = 0;
};
// 用戶端元件監聽器接口
class IClientSocketListener : public ISocketListener
{
public:
// 連接配接完成通知
virtual EnHandleResult OnConnect(DWORD dwConnectionID) = 0;
};
// 服務端元件接口(暫時無視之)
class ISocketServer
{
public:
enum En_ISS_Error
{
ISS_OK = 0,
ISS_SOCKET_CREATE = 1,
ISS_SOCKET_BIND = 2,
ISS_SOCKET_LISTEN = 3,
ISS_CP_CREATE = 4,
ISS_WORKER_THREAD_CREATE = 5,
ISS_SOCKE_ATTACH_TO_CP = 6,
ISS_ACCEPT_THREAD_CREATE = 7,
};
public:
virtual BOOL Start (LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount) = 0;
virtual BOOL Stop () = 0;
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen) = 0;
virtual BOOL HasStarted () = 0;
virtual En_ISS_Error GetLastError () = 0;
virtual LPCTSTR GetLastErrorDesc() = 0;
virtual BOOL GetConnectionAddress(DWORD dwConnID, CString& strAddress, USHORT& usPort) = 0;
public:
virtual ~ISocketServer() {}
};
// 服務端元件接口智能指針
typedef auto_ptr<ISocketServer> ISocketServerPtr;
// 用戶端元件接口
class ISocketClient
{
public:
// 操作結果碼
enum En_ISC_Error
{
ISC_OK = 0,
ISC_CLIENT_HAD_STARTED = 1,
ISC_CLIENT_NOT_STARTED = 2,
ISC_SOCKET_CREATE_FAIL = 3,
ISC_CONNECT_SERVER_FAIL = 4,
ISC_WORKER_CREATE_FAIL = 5,
ISC_NETWORK_ERROR = 6,
ISC_PROTOCOL_ERROR = 7,
};
public:
// 啟動通信
virtual BOOL Start (LPCTSTR pszRemoteAddress, USHORT usPort) = 0;
// 關閉通信
virtual BOOL Stop () = 0;
// 發送資料
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen) = 0;
// 是否已啟動
virtual BOOL HasStarted () = 0;
// 擷取錯誤碼
virtual En_ISC_Error GetLastError () = 0;
// 擷取錯誤描述
virtual LPCTSTR GetLastErrorDesc() = 0;
public:
virtual ~ISocketClient() {}
};
// 用戶端元件接口智能指針
typedef auto_ptr<ISocketClient> ISocketClientPtr;
ISocketClient 接口主要有以下三個方法:
- Start():啟動通信
- Send():發送資料
- Stop():停止通信
IClientSocketListener 接口有以下五個通知方法:
- OnConnect()
- OnSend()
- OnReceive()
- OnClose()
- OnError()
夠簡單了吧^_^,使用者隻需通過三個方法操作元件,然後處理五個元件通知。下面我們再看看元件的具體實作,先看元件類定義:
/* 元件實作類 */
class CSocketClient : public ISocketClient
{
// ISocketClient 接口方法
public:
virtual BOOL Start (LPCTSTR pszRemoteAddress, USHORT usPortt);
virtual BOOL Stop ();
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen);
virtual BOOL HasStarted () {return m_bStarted;}
virtual En_ISC_Error GetLastError () {return sm_enLastError;}
virtual LPCTSTR GetLastErrorDesc();
private:
BOOL CreateClientSocket();
BOOL ConnectToServer(LPCTSTR pszRemoteAddress, USHORT usPort);
BOOL CreateWorkerThread();
// 網絡事件處理方法
BOOL ProcessNetworkEvent();
void WaitForWorkerThreadEnd();
BOOL ReadData();
BOOL SendData();
void SetLastError(En_ISC_Error code, LPCTSTR func, int ec);
// 通信線程函數
static
#ifndef _WIN32_WCE
UINT
#else
DWORD
#endif
WINAPI WorkerThreadProc(LPVOID pv);
private:
static const int RECEIVE_BUFFER_SIZE = 8 * 1024;
static const int WORKER_THREAD_END_TIME = 3 * 1000;
static const long DEFALUT_KEEPALIVE_TIMES = 3;
static const long DEFALUT_KEEPALIVE_INTERVAL = 10 * 1000;
// 構造函數
public:
CSocketClient(IClientSocketListener* pListener)
: m_pListener(pListener) // 設定監聽器對象
, m_soClient(INVALID_SOCKET)
, m_evSocket(NULL)
, m_dwConnID(0)
, m_hWorker(NULL)
, m_dwWorkerID(0)
, m_bStarted(FALSE)
#ifdef _WIN32_WCE
, sm_enLastError(ISC_OK)
#endif
{
ASSERT(m_pListener);
}
virtual ~CSocketClient() {if(HasStarted()) Stop();}
private:
// 這是神馬 ???
CInitSocket m_wsSocket;
SOCKET m_soClient;
HANDLE m_evSocket;
DWORD m_dwConnID;
CCriSec m_scStop;
CEvt m_evStop;
HANDLE m_hWorker;
#ifndef _WIN32_WCE
UINT
#else
DWORD
#endif
m_dwWorkerID;
CBufferPtr m_sndBuffer;
CCriSec m_scBuffer;
CEvt m_evBuffer;
volatile BOOL m_bStarted;
private:
// 監聽器對象指針
IClientSocketListener* m_pListener;
#ifndef _WIN32_WCE
__declspec(thread) static En_ISC_Error sm_enLastError;
#else
volatile En_ISC_Error sm_enLastError;
#endif
};
從上面的定義可以看出,元件實作類本身并沒有提供額外的公共方法,它完全是可以被替換的。元件在構造函數中接收監聽器對象,并且儲存為其成員屬性,是以可以在需要的時候向監聽器發送事件通知。
另外,不知各位看官是否注意到一個奇怪的成員屬性:“CInitSocket m_wsSocket; ”,這個屬性在其它地方從來都不會用到,那麼它是幹嘛的呢?在回答這個問題之前,首先想問問大家:Windows Socket 操作的整個操作過程中,第一個以及最後一個被調用的方法是什麼?是 socket()、connect()、bind()、還是 closesocket() 嗎?都錯!答案是 —— ::WSAStartup() 和 ::WSACleanup()。每個程式都要調用一下這兩個方法确實是很煩的,又不雅觀。 其實,m_wsSocket 的唯一目的就是為了避免手工調用者兩個方法,看看它的定義就明白了:
class CInitSocket
{
public:
CInitSocket(LPWSADATA lpWSAData = NULL, BYTE minorVersion = 2, BYTE majorVersion = 2)
{
LPWSADATA lpTemp = lpWSAData;
if(!lpTemp)
lpTemp = (LPWSADATA)_alloca(sizeof(WSADATA));
m_iResult = ::WSAStartup(MAKEWORD(minorVersion, majorVersion), lpTemp);
}
~CInitSocket()
{
if(IsValid())
::WSACleanup();
}
int GetResult() {return m_iResult;}
BOOL IsValid() {return m_iResult == 0;}
private:
int m_iResult;
};
現在我們看看元件類實作檔案中幾個重要方法的定義:
// 元件事件觸發宏定義
#define FireConnect(id) m_pListener->OnConnect(id)
#define FireSend(id, data, len) (m_bStarted ? m_pListener->OnSend(id, data, len) : ISocketListener::HR_IGNORE)
#define FireReceive(id, data, len) (m_bStarted ? m_pListener->OnReceive(id, data, len) : ISocketListener::HR_IGNORE)
#define FireClose(id) (m_bStarted ? m_pListener->OnClose(id) : ISocketListener::HR_IGNORE)
#define FireError(id, op, code) (m_bStarted ? m_pListener->OnError(id, op, code) : ISocketListener::HR_IGNORE)
// 啟動元件
BOOL CSocketClient::Start(LPCTSTR pszRemoteAddress, USHORT usPort)
{
BOOL isOK = FALSE;
if(HasStarted())
{
SetLastError(ISC_CLIENT_HAD_STARTED, _T(__FUNCTION__), 0);
return isOK;
}
// 建立 socket
if(CreateClientSocket())
{
// 連接配接伺服器(内部會調用 FireConnect() )
if(ConnectToServer(pszRemoteAddress, usPort))
{
// 建立工作線程
if(CreateWorkerThread())
isOK = TRUE;
else
SetLastError(ISC_WORKER_CREATE_FAIL, _T(__FUNCTION__), 0);
}
else
SetLastError(ISC_CONNECT_SERVER_FAIL, _T(__FUNCTION__), ::WSAGetLastError());
}
else
SetLastError(ISC_SOCKET_CREATE_FAIL, _T(__FUNCTION__), ::WSAGetLastError());
isOK ? m_bStarted = TRUE : Stop();
return isOK;
}
// 關閉元件
BOOL CSocketClient::Stop()
{
{
CCriSecLock locallock(m_scStop);
m_bStarted = FALSE;
if(m_hWorker != NULL)
{
// 停止工作線程
if(::GetCurrentThreadId() != m_dwWorkerID)
WaitForWorkerThreadEnd();
::CloseHandle(m_hWorker);
m_hWorker = NULL;
m_dwWorkerID = 0;
}
if(m_evSocket != NULL)
{
// 關閉 WSAEvent
::WSACloseEvent(m_evSocket);
m_evSocket = NULL;
}
if(m_soClient != INVALID_SOCKET)
{
// 關閉socket
shutdown(m_soClient, SD_SEND);
closesocket(m_soClient);
m_soClient = INVALID_SOCKET;
}
m_dwConnID = 0;
}
// 釋放其它資源
m_sndBuffer.Free();
m_evBuffer.Reset();
m_evStop.Reset();
return TRUE;
}
// 發送資料
BOOL CSocketClient::Send(DWORD dwConnID, const BYTE* pBuffer, int iLen)
{
ASSERT(iLen > 0);
if(!HasStarted())
{
SetLastError(ISC_CLIENT_NOT_STARTED, _T(__FUNCTION__), 0);
return FALSE;
}
CCriSecLock locallock(m_scBuffer);
// 把資料存入緩沖器
m_sndBuffer.Cat(pBuffer, iLen);
// 喚醒工作現場,發送資料
m_evBuffer.Set();
return TRUE;
}
// 工作線程函數
#ifndef _WIN32_WCE
UINT
#else
DWORD
#endif
WINAPI CSocketClient::WorkerThreadProc(LPVOID pv)
{
CSocketClient* pClient = (CSocketClient*)pv;
TRACE0("---------------> 啟動工作線程 <---------------\n");
HANDLE hEvents[] = {pClient->m_evSocket, pClient->m_evBuffer, pClient->m_evStop};
while(pClient->HasStarted())
{
// 等待 socket 事件、發送資料事件和停止通信事件
DWORD retval = ::MsgWaitForMultipleObjectsEx(3, hEvents, WSA_INFINITE, QS_ALLINPUT, MWMO_INPUTAVAILABLE);
if(retval == WSA_WAIT_EVENT_0)
{
// 處理網絡消息
if(!pClient->ProcessNetworkEvent())
{
if(pClient->HasStarted())
pClient->Stop();
break;
}
}
else if(retval == WSA_WAIT_EVENT_0 + 1)
{
// 發送資料(内部調用 FireSend() )
if(!pClient->SendData())
{
if(pClient->HasStarted())
pClient->Stop();
break;
}
}
else if(retval == WSA_WAIT_EVENT_0 + 2)
break;
else if(retval == WSA_WAIT_EVENT_0 + 3)
// 消息循環
::PeekMessageLoop();
else
ASSERT(FALSE);
}
TRACE0("---------------> 退出工作線程 <---------------\n");
return 0;
}
// 處理網絡消息
BOOL CSocketClient::ProcessNetworkEvent()
{
::WSAResetEvent(m_evSocket);
WSANETWORKEVENTS events;
int rc = ::WSAEnumNetworkEvents(m_soClient, m_evSocket, &events);
if(rc == SOCKET_ERROR)
{
int code = ::WSAGetLastError();
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), code);
FireError(m_dwConnID, SO_UNKNOWN, code);
return FALSE;
}
/* 可讀取 */
if(events.lNetworkEvents & FD_READ)
{
int iCode = events.iErrorCode[FD_READ_BIT];
if(iCode == 0)
// 讀取資料(内部調用 FireReceive() )
return ReadData();
else
{
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), iCode);
FireError(m_dwConnID, SO_RECEIVE, iCode);
return FALSE;
}
}
/* 可發送 */
if(events.lNetworkEvents & FD_WRITE)
{
int iCode = events.iErrorCode[FD_WRITE_BIT];
if(iCode == 0)
// 發送資料(内部調用 FireSend() )
return SendData();
else
{
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), iCode);
FireError(m_dwConnID, SO_SEND, iCode);
return FALSE;
}
}
/* socket 已關閉 */
if(events.lNetworkEvents & FD_CLOSE)
{
int iCode = events.iErrorCode[FD_CLOSE_BIT];
if(iCode == 0)
FireClose(m_dwConnID);
else
{
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), iCode);
FireError(m_dwConnID, SO_UNKNOWN, iCode);
}
return FALSE;
}
return TRUE;
}
從上面的代碼可以看出:通信過程中,元件的使用者不需要對通信過程進行任何幹預,整個底層通信過程對使用者來說是透明的,使用隻需集中精力處理好幾個元件通知。下面來看看元件的一個使用示例:
/* 元件使用者:實作 IClientSocketListener */
class CMainClient : public IClientSocketListener
{
// 這些方法會操作元件
public:
bool Login(LPCTSTR pszAddress, USHORT usPort, const T_101_Data* pData);
bool Logout(const T_201_Data* pData);
BOOL SendData(EnCommandType enCmdType, const TCommandData* pCmdData, WORD wCmdDataLen);
long GetLastError();
LPCTSTR GetLastErrorDesc();
// 實作 IClientSocketListener
public:
virtual EnHandleResult OnConnect(DWORD dwConnectionID);
virtual EnHandleResult OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength);
virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLen);
virtual EnHandleResult OnClose(DWORD dwConnectionID);
virtual EnHandleResult OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode);
private:
BOOL ParseReceiveBuffer();
// 其它方法 。。。
// 構造函數
public:
CMainClient()
// 建立元件,并把自己設定為元件的監聽器
: m_pscClient(new CSocketClient(this))
, m_dwConnID(0)
{
}
virtual ~CMainClient() {}
private:
// 元件屬性
ISocketClientPtr m_pscClient;
DWORD m_dwConnID;
// 其它屬性 。。。
};
BOOL CMainClient::Login(LPCTSTR pszAddress, USHORT usPort, const T_101_Data* pData)
{
// 啟動通信
return m_pscClient->Start(pszAddress, usPort) &&
SendData(CS_C_LOGIN_REQ, pData, sizeof(T_101_Data));
}
BOOL CMainClient::Logout(const T_201_Data* pData)
{
if(pData)
{
SendData(CS_C_SET_STATUS, pData, sizeof(T_201_Data));
::WaitWithMessageLoop(LOGOUT_WAIT_TIME);
}
// 停止通信
return m_pscClient->Stop();
}
BOOL CMainClient::SendData(EnCommandType enCmdType, const TCommandData* pCmdData, WORD wCmdDataLen)
{
const WORD wBufferLen = CMD_ADDITIVE_SIZE + wCmdDataLen;
CPrivateHeapByteBuffer buffer(m_hpPrivate, wBufferLen);
BYTE* pBuffer = buffer;
memcpy(pBuffer, &wBufferLen, CMD_LEN_SIZE);
pBuffer += CMD_LEN_SIZE;
memcpy(pBuffer, &enCmdType, CMD_TYPE_SIZE);
pBuffer += CMD_TYPE_SIZE;
memcpy(pBuffer, pCmdData, wCmdDataLen);
pBuffer += wCmdDataLen;
memcpy(pBuffer, &CMD_FLAG, CMD_FLAG_SIZE);
// 發送資料
return m_pscClient->Send(m_dwConnID, buffer, wBufferLen);
}
long CMainClient::GetLastError()
{
// 擷取通信錯誤碼
return m_pscClient->GetLastError();
}
LPCTSTR CMainClient::GetLastErrorDesc()
{
// 擷取通信錯誤描述
return m_pscClient->GetLastErrorDesc();
}
/* 處理連接配接成功事件 */
ISocketListener::EnHandleResult CMainClient::OnConnect(DWORD dwConnectionID)
{
TRACE1("<CNNID: %d> 已連接配接\n", dwConnectionID);
m_dwConnID = dwConnectionID;
return HR_OK;
}
/* 處理資料已發出事件 */
ISocketListener::EnHandleResult CMainClient::OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength)
{
TRACE2("<CNNID: %d> 發出資料包 (%d bytes)\n", dwConnectionID, iLength);
return HR_OK;
}
/* 處理接收到資料事件*/
ISocketListener::EnHandleResult CMainClient::OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLen)
{
TRACE2("<CNNID: %d> 接收資料包 (%d bytes)\n", dwConnectionID, iLen);
ASSERT(pData != NULL && iLen > 0);
// 儲存資料
m_rcBuffer.Cat(pData, iLen);
// 解析資料
return ParseReceiveBuffer() ? HR_OK : HR_ERROR;;
}
/* 處理通信關閉事件*/
ISocketListener::EnHandleResult CMainClient::OnClose(DWORD dwConnectionID)
{
TRACE1("CNNID: %d> 關閉連接配接\n", dwConnectionID);
// 清理緩沖區
m_rcBuffer.Realloc(0);
return HR_OK;
}
/* 處理通信錯誤事件 */
ISocketListener::EnHandleResult CMainClient::OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode)
{
TRACE3("<CNNID: %d> 網絡錯誤 (OP: %d, CO: %d)\n", dwConnectionID, enOperation, iErrorCode);
// 清理緩沖區
m_rcBuffer.Realloc(0);
return HR_OK;
}
好了,碼了一個晚上的字,累啊!到此為止吧,感謝收看~
敬請繼續收看《
基于 IOCP 的通用異步 Windows Socket TCP 高性能服務端元件的設計與實作》,晚安 ^_^
(
想看源代碼的朋友請狂點這裡)
原文出處:
項目首頁 怪獸的部落格 怪獸的GitHub 怪獸樂園Q群