天天看點

【Muduo源碼分析】 解讀muduo如何發送資料 TcpConnection Buffer

本文旨在講述muduo網絡庫資料的發送流程。

首先明确一點,Muduo中的epoll使用了水準觸發方式,而不是邊緣觸發。

TcpConnection首先需要增加應用層的緩沖區

Buffer outputBuffer_; 
           

我們看到在TcpConnection類内,提供了兩個資料發送函數,分别是TcpConnection::send,TcpConnection::sendInLoop。前者也是調用後者,保證資料的發送在IO線程内的EventLoop執行。(本部落格系列都是基于Muduo的示例教程代碼分析,個人認為這樣更能了解整個網絡庫的整體流程)

void TcpConnection::send(const std::string& message)
{
  if (state_ == kConnected) {
    if (loop_->isInLoopThread()) {
      sendInLoop(message);
    } else {
      loop_->runInLoop(
          boost::bind(&TcpConnection::sendInLoop, this, message));
    }
  }
}

void TcpConnection::sendInLoop(const std::string& message)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  // if no thing in output queue, try writing directly
  //如果channel沒有在寫,并且buffer沒有資料
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
    //直接先盡力寫
    nwrote = ::write(channel_->fd(), message.data(), message.size());
    if (nwrote >= 0) {
      //如果寫的長度小于消息長度
      if (implicit_cast<size_t>(nwrote) < message.size()) {
        LOG_TRACE << "I am going to write more data";
      } else if (writeCompleteCallback_) {
       //如果寫完了,回調writeCompleteCallback_
        loop_->queueInLoop(
            boost::bind(writeCompleteCallback_, shared_from_this()));
      }
    } else {
      nwrote = 0;
      if (errno != EWOULDBLOCK) {
        LOG_SYSERR << "TcpConnection::sendInLoop";
      }
    }
  }
   
  assert(nwrote >= 0);
  //如果消息沒有寫完,那麼将消息添加到Buffer中
  if (implicit_cast<size_t>(nwrote) < message.size()) {
    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
    if (!channel_->isWriting()) {
      channel_->enableWriting();
    }
  }
}
           

我們可以看到sendInLoop()會先嘗試直接發送資料,如果一次發送完畢啟用WriteCallback;如果隻發送了部分資料,則把剩餘的資料放入outputBuffer_,并開始關注writable事件,以後在handlerWrite()中發送剩餘的資料。

channel::enableWriting的代碼如下,通知poller關心本channel的可讀事件

void enableWriting() { events_ |= kWriteEvent; update(); }
           

上面我們隻是寫了一部分的資料,剩下來的資料什麼時候寫呢?當socket變得可寫時,Channel會調用TcpConnection::handleWrite(),這裡我們繼續發送outputBuffer_中的資料。 這裡socket可寫,主要是指發送緩沖區可寫位元組數,低于低水位标志。

我們來看看TcpConnection::handleWrite(),當可讀時,就将Buffer中資料寫入,如果寫完了,需要關閉對套接字讀事件的關心(因為是水準觸發,如果不關閉,将一直觸發)。然後執行writeCompleteCallback_。在TcpConnection::sendInLoop中,當資料一次發送完成後,也調用了writeCompleteCallback_。這個就是使用者的寫完資料的回調函數。

void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  //如果是可寫狀态
  if (channel_->isWriting()) {
    //将Buffer中資料寫入
    ssize_t n = ::write(channel_->fd(),
                        outputBuffer_.peek(),
                        outputBuffer_.readableBytes());
   //如果寫成功
    if (n > 0) {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0) {
        channel_->disableWriting();
        if (writeCompleteCallback_) {
          loop_->queueInLoop(
              boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting) {
          shutdownInLoop();
        }
      } else {   //如果一次沒有寫完,下次還需要繼續寫
        LOG_TRACE << "I am going to write more data";
      }
    } else {
      LOG_SYSERR << "TcpConnection::handleWrite";
    }
  } else {
    LOG_TRACE << "Connection is down, no more writing";
  }
}
           

網絡程式中,資料的發送可能過快,在本地堆積,是以Muduo為了解決資料發送的困難,提供了兩個回調函數,分别是writeCompleteCallback_和HighWaterMarkCallback_,可以稱為低水位回調和高水位回調。如果目前緩沖區中資料大小超出上限值,那麼調用HighWaterMarkCallback_。高水位回調将停止從使用者接收資料。WriteCompleteCallback函數為發送緩沖區為空時調用,在這個函數重新開機開啟接收資料。

最後總結發送流程

1、調用TcpConnection::send

2、調用TcpConnection::sendInLoop

      1、如果目前可以發送(channel沒有在寫,并且緩沖區沒有資料),先嘗試發送

       2、如果沒有完全發送,将資料添加到buffer中,添加對channel對應的socket寫事件關心,等待socket可寫,然後調用          handlewrite

       3、如果發送完全,調用使用者回調函數

3、如果沒有發送完全,并且socket可讀,調用HandleWrite。繼續發送,如果發送完全,就取消對socket可寫事件關系。如果沒有發送完全,等待下一次的發送。

繼續閱讀