本文旨在講述muduo網絡庫資料的發送流程。
首先明确一點,Muduo中的epoll使用了水準觸發方式,而不是邊緣觸發。
TcpConnection首先需要增加應用層的緩沖區
Buffer outputBuffer_;
我們看到在TcpConnection類内,提供了兩個資料發送函數,分别是TcpConnection::send,TcpConnection::sendInLoop。前者也是調用後者,保證資料的發送在IO線程内的EventLoop執行。(本部落格系列都是基于Muduo的示例教程代碼分析,個人認為這樣更能了解整個網絡庫的整體流程)
void TcpConnection::send(const std::string& message)
{
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(message);
} else {
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop, this, message));
}
}
}
void TcpConnection::sendInLoop(const std::string& message)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
// if no thing in output queue, try writing directly
//如果channel沒有在寫,并且buffer沒有資料
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
//直接先盡力寫
nwrote = ::write(channel_->fd(), message.data(), message.size());
if (nwrote >= 0) {
//如果寫的長度小于消息長度
if (implicit_cast<size_t>(nwrote) < message.size()) {
LOG_TRACE << "I am going to write more data";
} else if (writeCompleteCallback_) {
//如果寫完了,回調writeCompleteCallback_
loop_->queueInLoop(
boost::bind(writeCompleteCallback_, shared_from_this()));
}
} else {
nwrote = 0;
if (errno != EWOULDBLOCK) {
LOG_SYSERR << "TcpConnection::sendInLoop";
}
}
}
assert(nwrote >= 0);
//如果消息沒有寫完,那麼将消息添加到Buffer中
if (implicit_cast<size_t>(nwrote) < message.size()) {
outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
if (!channel_->isWriting()) {
channel_->enableWriting();
}
}
}
我們可以看到sendInLoop()會先嘗試直接發送資料,如果一次發送完畢啟用WriteCallback;如果隻發送了部分資料,則把剩餘的資料放入outputBuffer_,并開始關注writable事件,以後在handlerWrite()中發送剩餘的資料。
channel::enableWriting的代碼如下,通知poller關心本channel的可讀事件
void enableWriting() { events_ |= kWriteEvent; update(); }
上面我們隻是寫了一部分的資料,剩下來的資料什麼時候寫呢?當socket變得可寫時,Channel會調用TcpConnection::handleWrite(),這裡我們繼續發送outputBuffer_中的資料。 這裡socket可寫,主要是指發送緩沖區可寫位元組數,低于低水位标志。
我們來看看TcpConnection::handleWrite(),當可讀時,就将Buffer中資料寫入,如果寫完了,需要關閉對套接字讀事件的關心(因為是水準觸發,如果不關閉,将一直觸發)。然後執行writeCompleteCallback_。在TcpConnection::sendInLoop中,當資料一次發送完成後,也調用了writeCompleteCallback_。這個就是使用者的寫完資料的回調函數。
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
//如果是可寫狀态
if (channel_->isWriting()) {
//将Buffer中資料寫入
ssize_t n = ::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
//如果寫成功
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting();
if (writeCompleteCallback_) {
loop_->queueInLoop(
boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) {
shutdownInLoop();
}
} else { //如果一次沒有寫完,下次還需要繼續寫
LOG_TRACE << "I am going to write more data";
}
} else {
LOG_SYSERR << "TcpConnection::handleWrite";
}
} else {
LOG_TRACE << "Connection is down, no more writing";
}
}
網絡程式中,資料的發送可能過快,在本地堆積,是以Muduo為了解決資料發送的困難,提供了兩個回調函數,分别是writeCompleteCallback_和HighWaterMarkCallback_,可以稱為低水位回調和高水位回調。如果目前緩沖區中資料大小超出上限值,那麼調用HighWaterMarkCallback_。高水位回調将停止從使用者接收資料。WriteCompleteCallback函數為發送緩沖區為空時調用,在這個函數重新開機開啟接收資料。
最後總結發送流程
1、調用TcpConnection::send
2、調用TcpConnection::sendInLoop
1、如果目前可以發送(channel沒有在寫,并且緩沖區沒有資料),先嘗試發送
2、如果沒有完全發送,将資料添加到buffer中,添加對channel對應的socket寫事件關心,等待socket可寫,然後調用 handlewrite
3、如果發送完全,調用使用者回調函數
3、如果沒有發送完全,并且socket可讀,調用HandleWrite。繼續發送,如果發送完全,就取消對socket可寫事件關系。如果沒有發送完全,等待下一次的發送。