本編文章分析了wxSocket在Linux作業系統中的實作,并總結了相關使用方法。
wxSocket 屬于wxWidgets的wxNet子module。wxSocket是對系統socket API的簡單封裝,對外提供了wxEvent通知機制,屏蔽了作業系統相關的實作細節。使用者可以通過wxSocketClient/wxSocketServer來很友善的使用。
Note: 此處使用的wxWidgets 庫的版本是 v2.8.9,主要涉及到代碼檔案 src/common/sckstrm.cpp, src/common/socket.cpp 和 src/unix/gsocket.cpp。
1. wxSocket 實作
wxSocket相關類UML圖:

在wx的線上manual裡,可以看到wxNet相關的所有類。
1.1 主要操作接口
wxSocket主要的操作如下:
- basic IO
- - Close
- - Discard
- - Peek
- - Unread
- - Read
- - ReadMsg
- - Write
- - WriteMsg
- Socket state
- 2. Functions to retrieve current state and miscellaneous info.
-
- - Error
- - GetLocal/GetPeer
- - IsData
- - IsDisconnected/IsConnected
- - LastCount
- - LastError
- - IsOk
- - SaveState/RestoreState
-
- 2. Functions that perform a timed wait on a certain IO condition.
-
- - InterruptWait
- - Wait
- - WaitForLost
- - WaitForRead/WaitForWrite
- - and also:
- - wxSocketServer::WaitForAccept
- - wxSocketClient::WaitOnConnect
-
- 2. Functions that allow applications to customize socket IO as needed.
-
- - GetFlags/SetFlags
- - SetTimeout
- - SetLocal
- Handling socket events
- - Notify/SetNotify
- - GetClientData/SetClientData
- - SetEventHandler
使用者可以友善的通過SetNotify來訂閱想要通知的事件。基于此,可以完全把Read/Write操作事件化。
1.2 wxSocketFlags
wxSocketFlags用來控制socket的運作模式,主要的flag如下:
wxSocketFlags | 含義 |
wxSOCKET_BLOCK | Block the GUI (do not yield) while reading/writing data. |
wxSOCKET_NOWAIT | Read/write as much data as possible and return immediately. |
wxSOCKET_WAITALL | Wait for all required data to be read/written unless an error occurs. |
a. wxSOCKET_BLOCK表示在調用wxSocketBase::Read/Write的時候,不會觸發yield的操作。如果wxSocket本身在main thread裡面進行讀寫操作,程式此時會不響應UI消息。而且由于yield操作本身可能會導緻socket相關的操作重入(reentrant),這很可能不是使用者代碼想要的。是以一般推薦在wxSocket_BLOCK+thread環境下使用wxSocket,這樣socket的讀寫操作就不會阻塞UI thread。
b. wxSOCKET_WAITALL與wxSOCKET_NOWAIT正好相反,分别用于同步和異步讀寫。
1.3 wxSocket實作
wxSocket 對 系統socket函數進行封裝,内部的socket fd采用非阻塞模式。
src/unix/gsocket.cpp
- m_fd = socket(m_peer->m_realfamily,m_stream?SOCK_STREAM:SOCK_DGRAM,0);
- if(m_fd == INVALID_SOCKET)
- {
- m_error = GSOCKIOERR;
- return GSOCK_IOERR;
- }
- #ifdef SO_NOSIGPIPE
- setsockopt(m_fd,SOL_SOCKET,SO_NOSIGPIPE,(const char*)&arg,sizeof(arg));
- #endif
- #if defined(__EMX__)||defined(__VISAGECPP__)
- ioctl(m_fd,FIONBIO,(char*)&arg,sizeof(arg));
- #else
- ioctl(m_fd,FIONBIO,&arg);
- #endif
這裡arg的值為1,通過ioctl來設定socket fd為非阻塞。
2. wxSocketBase::Read/Write
wxSocket本質上是non-blocking socket。我們可以通過wxSocketBase::Read/Write直接讀寫資料。同時wxNet也提供了wxSocket stream (wxSocketInputStream, wxSocketOuputStream)來讀寫資料。
通過wxSocket stream來讀寫資料和直接通過wxSocket來讀寫基本一樣。但是wxSocket stream 的作用在于,我們可以利用wx庫提供的通用的stream操作類來處理socket相關的讀寫。比如: 利用wxBufferedInputStream 和 wxBufferedOutputStream來進行帶緩沖區的讀寫。
2.1 未設定wxSOCKET_WAITALL flag時的wxSocketBase::Read
問題: 假如直接調用wxSocketBase::Read或者wxSocketInputStream::Read讀取一大塊資料(e.g. 10MB),可能會發生什麼情況?
答案: wxSocketInputStream::Read傳回後,實際讀取的位元組數很随機,跟目前的系統狀态有很大關系。
當然這樣的行為和直接用系統函數recv(2)或read(2)從non-blocking socket 讀取資料類似。
這個通過檢視wxSocketBase::_Read的實作(src/common/socket.cpp),很容易弄清楚。
- wxUint32 wxSocketBase::_Read(void* buffer, wxUint32 nbytes)
- {
- // ...
- int ret;
- if (m_flags & wxSOCKET_NOWAIT)
- {
- m_socket->SetNonBlocking(1);
- ret = m_socket->Read((char *)buffer, nbytes);
- m_socket->SetNonBlocking(0);
- if (ret > 0)
- total += ret;
- }
- else
- {
- bool more = true;
- while (more)
- {
- if ( !(m_flags & wxSOCKET_BLOCK) && !WaitForRead() )
- break;
- ret = m_socket->Read((char *)buffer, nbytes);
- if (ret > 0)
- {
- total += ret;
- nbytes -= ret;
- buffer = (char *)buffer + ret;
- }
- // If we got here and wxSOCKET_WAITALL is not set, we can leave
- // now. Otherwise, wait until we recv all the data or until there
- // is an error.
- //
- more = (ret > 0 && nbytes > 0 && (m_flags & wxSOCKET_WAITALL));
- }
- }
- return total;
- }
_Read函數内的while循環的啟動條件 要求
“
ret > 0 && nbytes > 0 && (m_flags & wxSOCKET_WAITALL)”
,這意味着要求滿足如下條件:
- 設定 wxSOCKET_WAITALL
- 上次讀到資料
- 還有資料需要讀
即使設定了wxSOCKET_WAITALL, wxSocketBase::_Read也不能完全保證傳回時,請求的nbytes已經完全讀出,這是因為 代碼行 “ret = m_socket->Read((char *)buffer, nbytes);”的傳回值不能保證 “ret > 0”條件。
2.2 輔助調試類
這裡通過繼承wxSocketInputStream來列印更多調試資訊。
- class wxGDSSocketInputStream : public wxSocketInputStream
- {
- public:
- wxGDSSocketInputStream (wxSocketBase& s)
- : wxSocketInputStream (s)
- { }
- protected:
- size_t OnSysRead(void *buffer, size_t bufsize);
- DECLARE_NO_COPY_CLASS(wxGDSSocketInputStream)
- };
-
- static void PrintSocketError(wxSocketBase* sock)
- {
- if (sock->LastError() == wxSOCKET_WOULDBLOCK)
- {
- LOG4TY_DEBUG("socket error: wxSOCKET_WOULDBLOCK");
- }
- else if (sock->LastError() == wxSOCKET_TIMEDOUT)
- {
- LOG4TY_DEBUG("socket error: wxSOCKET_TIMEDOUT");
- }
- else
- {
- LOG4TY_DEBUG("socket error: " << sock->LastError());
- }
- }
-
- size_t wxGDSSocketInputStream::OnSysRead(void *buffer, size_t size)
- {
- size_t count = wxSocketInputStream::OnSysRead(buffer, size);
- wxLogTrace(GDS_STREAM_MASK, "[OnSysRead] request %lu, receive %lu", size, count);
- if (m_i_socket->Error()) ::PrintSocketError(m_i_socket);
- return count;
- }
通過調試發現,wxSocketBase::Read在讀取大資料塊時,很可能會提前傳回,而且此時wxSocket内可能發生了錯誤(具體錯誤及原因見2.3節)。
2.3 GSocket::Read分析
wxSocketBase::Read最終通過GSocket::Read實作讀資料(src/unix/gsocket.cpp)。
- int GSocket::Read(char *buffer, int size)
- {
- int ret;
- assert(this);
- if (m_fd == INVALID_SOCKET || m_server)
- {
- m_error = GSOCK_INVSOCK;
- return -1;
- }
- /* Disable events during query of socket status */
- Disable(GSOCK_INPUT);
- /* If the socket is blocking, wait for data (with a timeout) */
- if (Input_Timeout() == GSOCK_TIMEDOUT) {
- m_error = GSOCK_TIMEDOUT;
- /* Don't return here immediately, otherwise socket events would not be
- * re-enabled! */
- ret = -1;
- }
- else
- {
- /* Read the data */
- if (m_stream)
- ret = Recv_Stream(buffer, size);
- else
- ret = Recv_Dgram(buffer, size);
- /*
- * If recv returned zero for a TCP socket (if m_stream == NULL, it's an UDP
- * socket and empty datagrams are possible), then the connection has been
- * gracefully closed.
- *
- * Otherwise, recv has returned an error (-1), in which case we have lost
- * the socket only if errno does _not_ indicate that there may be more data
- * to read.
- */
- if ((ret == 0) && m_stream)
- {
- /* Make sure wxSOCKET_LOST event gets sent and shut down the socket */
- m_detected = GSOCK_LOST_FLAG;
- Detected_Read();
- return 0;
- }
- else if (ret == -1)
- {
- if ((errno == EWOULDBLOCK) || (errno == EAGAIN))
- m_error = GSOCK_WOULDBLOCK;
- else
- m_error = GSOCK_IOERR;
- }
- }
- /* Enable events again now that we are done processing */
- Enable(GSOCK_INPUT);
- return ret;
- }
GSocket::Read主要調用了兩個函數:Input_Timeout和Recv_Stream。
GSocket::Recv_Stream 通過調用system 函數 recv(2) 實作。
- int GSocket::Recv_Stream(char *buffer, int size)
- {
- int ret;
- do
- {
- ret = recv(m_fd, buffer, size, GSOCKET_MSG_NOSIGNAL);
- }
- while (ret == -1 && errno == EINTR); /* Loop until not interrupted */
- return ret;
- }
可以通過如下方式來檢視socket 預設緩沖區大小:
- pwang@p03bc ~$ cat /proc/sys/net/ipv4/tcp_rmem
- 4096 87380(85K 340B) 174760 //第一個表示最小值,第二個表示預設值,第三個表示最大值。
- pwang@p03bc ~$ cat /proc/sys/net/ipv4/tcp_wmem
- 4096 16384(16k) 131072
這裡read緩沖區最小4KB,最大170KB,預設約85KB。是以recv函數絕對不可能一次讀10MB資料。
GSocket::Input_Timeout函數(src/unix/gsocket.cpp)通過select(2)函數來計時。
- GSocketError GSocket::Input_Timeout()
- {
- struct timeval tv;
- fd_set readfds;
- int ret;
- /* Linux select() will overwrite the struct on return */
- tv.tv_sec = (m_timeout / 1000);
- tv.tv_usec = (m_timeout % 1000) * 1000;
- if (!m_non_blocking) // m_non_blocking預設為false
- {
- wxFD_ZERO(&readfds);
- wxFD_SET(m_fd, &readfds);
- ret = select(m_fd + 1, &readfds, NULL, NULL, &tv);
- if (ret == 0)
- {
- GSocket_Debug(( "GSocket_Input_Timeout, select returned 0\n" ));
- m_error = GSOCK_TIMEDOUT;
- return GSOCK_TIMEDOUT;
- }
- if (ret == -1)
- {
- GSocket_Debug(( "GSocket_Input_Timeout, select returned -1\n" ));
- if (errno == EBADF) { GSocket_Debug(( "Invalid file descriptor\n" )); }
- if (errno == EINTR) { GSocket_Debug(( "A non blocked signal was caught\n" )); }
- if (errno == EINVAL) { GSocket_Debug(( "The highest number descriptor is negative\n" )); }
- if (errno == ENOMEM) { GSocket_Debug(( "Not enough memory\n" )); }
- m_error = GSOCK_TIMEDOUT;
- return GSOCK_TIMEDOUT;
- }
- }
- return GSOCK_NOERROR;
- }
m_timeout的預設值是600秒,是以如果沒有資料可讀,等到timeout錯誤傳回,要等10分鐘。
如果設定了wxSOCKET_NOWAIT flag,GSocket::Input_Timeout 就會直接傳回。
2.4 wxSocketBase::Read/Write可能發生的錯誤
wxSocket 所有可能發生的錯誤如下:
error | Note |
wxSOCKET_NOERROR | No error happened. |
wxSOCKET_INVOP | Invalid operation. |
wxSOCKET_IOERR | Input/Output error. |
wxSOCKET_INVADDR | Invalid address passed to wxSocket. |
wxSOCKET_INVSOCK | Invalid socket (uninitialized). |
wxSOCKET_NOHOST | No corresponding host. |
wxSOCKET_INVPORT | Invalid port. |
wxSOCKET_WOULDBLOCK | The socket is non-blocking and the operation would block. |
wxSOCKET_TIMEDOUT | The timeout for this operation expired. |
wxSOCKET_MEMERR | Memory exhausted. |
調用wxSocketBase::Read可能會觸發哪些錯誤呢?
根據GSocket::Read的實作,如果socket連接配接沒有正常,則可能有如下錯誤:
error | Note |
wxSOCKET_WOULDBLOCK | 沒有資料可讀或者目前socket不可寫(buffer已滿)。 Recv_Stream函數發生錯誤,recv(2)傳回-1,錯誤碼是 EWOULDBLOCK 或 EAGAIN。 |
wxSOCKET_IOERR | Recv_Stream函數發生錯誤,recv(2)傳回-1。 |
wxSOCKET_TIMEDOUT | 一般由GSocket::Input\_Timeout觸發。 但并不見得就是真的timeout了,比如系統調用select出錯,提前傳回。 |
2.5 wxSOCKET_TIMEDOUT
通過SocketInputStream輔助類,當調用wxSocketBase::Read來讀大塊資料時,很可能發生wxSOCKET_TIMEDOUT錯誤。但是wxSocket的預設timeout是600秒,而Read函數明顯沒有等待那麼長時間,為什麼呢?
下面給出了一個程式發生timeout error時的堆棧:
- Breakpoint 2, GSocket::Input_Timeout (this=0x1599430)
- at ./src/unix/gsocket.cpp:1563
- 1563 m_error = GSOCK_TIMEDOUT;
- (gdb) p errno
- $10 = 4
- (gdb) bt 10
- #0 GSocket::Input_Timeout (this=0x1599430) at ./src/unix/gsocket.cpp:1563
- #1 0x0000002aa65c7739 in GSocket::Read (this=0x1599430,
- buffer=0x2ab4bec290 "", size=4117729) at ./src/unix/gsocket.cpp:1164
- #2 0x0000002aa65c21c5 in wxSocketBase::_Read (this=0x1436e30,
- buffer=0x2ab4bec290, nbytes=4117729) at ./src/common/socket.cpp:363
- #3 0x0000002aa65c2068 in wxSocketBase::Read (this=0x1436e30,
- buffer=0x2ab4bec290, nbytes=4117729) at ./src/common/socket.cpp:308
- #4 0x0000002aa65c10b5 in wxSocketInputStream::OnSysRead (this=0x7fbfffa8c0,
- buffer=0x2ab4bec290, size=4117729) at ./src/common/sckstrm.cpp:90
- #5 0x0000002aa3f08eb1 in wxGDSSocketInputStream::OnSysRead (
- this=0x7fbfffa8c0, buffer=0x2ab4bec290, size=4117729)
- at GUI/libComm/src/wxGDSStream.cpp:54
- #6 0x0000002aa655b9b1 in wxInputStream::Read (this=0x7fbfffa8c0,
- buf=0x2ab4ae9010, size=4117729) at ./src/common/stream.cpp:846
根據上面列出的GSocket::Input_Timeout函數的代碼,通過調試發現 select函數傳回值有時是-1, 此時發生系統錯誤EINTR (慢系統調用被中斷)。在這種情況下,Input_Timeout函數會提前傳回。
這說明wxSocketBase::Read來讀大塊資料時,有可能發生wxSOCKET_TIMEDOUT錯誤,但是這個wxSOCKET_TIMEDOUT error并不一定是真實的,很可能是由EINTR引起的。
2.6 wxSocketBase::LastCount函數
wxSocketBase::LastCount 會傳回前一個Read/Write操作中成功的位元組數。
根據wx manual,函數Discard(), Peek(), Read(), ReadMsg(), Unread(), Write(), WriteMsg() 都有可能修改LastCount的值。
LastCount函數内通過一個變量來記錄所有上述操作中成功的位元組數,是以LastCount不是線程安全的。
是以,禁止對于同一個socket:
- 一個線程調用Read,另一個線程調用Write
- 兩個線程同時Read或者Write
wx 3.0提供了接口wxSocketBase::LastReadCount 和 wxSocketBase::LastWriteCount 來解決這個問題。
3. 使用wxBufferedInputStream
3.1 同時使用wxBufferedInputStream 和wxSOCKET_WAITALL flag
假設我們按照下面的方式初始化wxSocket:
initialize wxSocket
- m_socket = new wxSocketClient();
- m_is = new wxGDSSocketInputStream(*m_socket);
- m_buf = new wxStreamBuffer(*m_is, wxStreamBuffer::read);
- m_buf->SetBufferIO(1000000);
- m_buf_is = new wxBufferedInputStream(*m_is, m_buf);
- m_socket->SetFlags(wxSOCKET_BLOCK|wxSOCKET_WAITALL);
- m_socket->SetNotify(wxSOCKET_LOST_FLAG);
- m_socket->Notify(true);
然後利用wxBufferedInputStream 讀資料,可能會發生什麼?
read through wxBufferedInputStream
- char ptr[5];
- memset(ptr, 0x00, 5);
- m_buf_is->Read(ptr,4);
答案是程式很可能會阻塞在m_buf_is->Read函數裡。
下面是tachyon GUI按照上面的方式設定,GUI hang在那裡後,列印的堆棧資訊。
call stack
- (gdb) bt
- #0 0x00000034cefbef86 in select () from /lib64/tls/libc.so.6
- #1 0x0000002aa65e7233 in GSocket::Input_Timeout (this=0x15ba580) at ./src/unix/gsocket.cpp:1548
- #2 0x0000002aa65e6739 in GSocket::Read (this=0x15ba580, buffer=0x14c727f "", size=994257) at ./src/unix/gsocket.cpp:1164
- #3 0x0000002aa65e11c5 in wxSocketBase::_Read (this=0x14c5b40, buffer=0x14c727f, nbytes=994257) at ./src/common/socket.cpp:363 //請求的位元組數減少5473
- #4 0x0000002aa65e1068 in wxSocketBase::Read (this=0x14c5b40, buffer=0x14c5c10, nbytes=1000000) at ./src/common/socket.cpp:308
- #5 0x0000002aa65e00b5 in wxSocketInputStream::OnSysRead (this=0x14c5a80, buffer=0x14c5c10, size=1000000) at ./src/common/sckstrm.cpp:90
- #6 0x0000002aa3f28eb1 in wxGDSSocketInputStream::OnSysRead (this=0x14c5a80, buffer=0x14c5c10, size=1000000) at GUI/libComm/src/wxGDSStream.cpp:54
- #7 0x0000002aa65793a9 in wxStreamBuffer::FillBuffer (this=0x14c5ac0) at ./src/common/stream.cpp:204
- #8 0x0000002aa6579539 in wxStreamBuffer::GetDataLeft (this=0x14c5ac0) at ./src/common/stream.cpp:241
- #9 0x0000002aa6579a02 in wxStreamBuffer::Read (this=0x14c5ac0, buffer=0x409fed50, size=4) at ./src/common/stream.cpp:398
- #10 0x0000002aa657bde6 in wxBufferedInputStream::Read (this=0x14c1e70, buf=0x409fed50, size=4) at ./src/common/stream.cpp:1230 //請求4Bytes
根據調用棧可知,在frame 10的位置,我們調用了 wxBufferedInputStream::Read (buf, 4), 但是此調用觸發了wxStreamBuffer::FillBuffer操作。于是,wxSocketInputStream::OnSysRead 比較野蠻的要求從socket讀整個buffer大小的内容。
實際上,socket端沒有這麼多資料,由于設定了 wxSOCKET_WAITALL flag,wxSocketBase::_Read 函數會一直等待,直到讀完要求的位元組,或者産生timeout錯誤後傳回。
3.2 安全使用socket Read
可見,wxBufferedInputStream和wxSOCKET_WAITALL不能同時使用,那我們的代碼該怎麼寫呢?
可行的辦法是在wxInputStream外封一個函數,就像下面的代碼:
- Uint32 wxGDSStream::Read( void *buffer, Uint32 size )
- {
- wxLogTrace(GDS_STREAM_MASK, "[wxGDSStream::Read] request %u", size);
- m_stream_impl.Read(buffer,size);
- size_t read_size = m_stream_impl.LastRead();
- while( read_size < size)
- {
- m_stream_impl.Read((char*)buffer + read_size, size - read_size);
- read_size += m_stream_impl.LastRead();
- wxLogTrace(GDS_STREAM_MASK, "[wxGDSStream::Read] continue read %lu, totally receive %lu",
- m_stream_impl.LastRead(), read_size);
- LOG4TY_DEBUG("continue read:" << m_stream_impl.LastRead() << ", totally receive " << read_size);
- if (!m_sock->IsConnected()) break;
- }
- return read_size;
- }
此函數通過一個while循環來實作讀取要求大小的資料,同時要考慮socket出錯的情況。隻要socket沒有斷開,我們就可以繼續循環讀取資料。
3.3 安全使用socket Write
寫操作也類似,但略有不同: 1. 對于寫操作來說,目前協定要發送的資料的大小是已知的,讀操作則一般是在讀取的過程中才知道目前協定剩餘的資料還有多少。 2. 我們調用Write的時候,一般都會把所有要發送的資料先按照協定組織好,然後再發送。這樣就可以做到盡量少調用Write,進而少進行系統調用。
為了提高socket讀寫的效率,對于Read操作,我們一般傾向于使用帶緩沖區的方式。對于Write操作,則不會使用緩沖區,盡量讓資料盡早發送出去,在外部代碼裡面來控制盡量少調用Write。
在Write時不使用緩沖區,是以相對安全的Write操作有兩種實作:
1. 利用wxSOCKET_WAITALL flag.
- int wxGDSSocket::Write( const void * buffer, Uint32 nbytes)
- {
- wxSocketFlags old_flag = m_socket->GetFlags();
- m_socket->SetFlags(old_flag | wxSOCKET_WAITALL);
- while (nbytes > 0)
- {
- int length = nbytes > m_max_write_length ? m_max_write_length : nbytes;
- m_socket->Write((char*)buffer + write_size, length);
- write_size += length;
- nbytes -= length;
- } // while
- m_socket->SetFlags(old_flag);
- }
根據2.1節的結論,這個Write的實作其實也是有潛在問題的(wxSocketBase::Write函數傳回值不能完全保證length長度的資料寫成功),雖然可能很少發生 2. 不使用wxSOCKET_WAITALL
- int wxGDSSocket::Write( const void * buffer, Uint32 nbytes)
- {
- //...
- int write_size = 0;
- while (nbytes > 0)
- {
- int length = nbytes > m_max_write_length ? m_max_write_length : nbytes;
- m_socket->Write((char*)buffer + write_size, length);
- length = m_socket->LastCount();
- write_size += length;
- nbytes -= length;
- if (m_socket->Error()) ::PrintSocketError(m_socket);
- if (!m_socket->IsConnected()) break;
- } // while
- return write_size;
- }
4. reference
- wx 3.0 wxSocketBase Class Reference
- wx 2.8.9 源碼