天天看點

【muduo源碼剖析】Channel設計分析什麼是Channel成員變量成員函數完整代碼參考

文章目錄

  • 什麼是Channel
  • 成員變量
  • 成員函數
    • 設定此 Channel 對于事件的回調函數
    • 設定 Channel 感興趣的事件到 Poller
    • 更新Channel關注的事件
    • 移除操作
    • 用于增加TcpConnection生命周期的tie方法(防止使用者誤删操作)
    • 根據相應事件執行Channel儲存的回調函數
  • 完整代碼
    • Channel.h
    • Channel.cc
  • 參考

什麼是Channel

參考muduo庫使用C++11重寫網絡庫

GitHub位址:Tiny C++ Network Library

Channel 對檔案描述符和事件進行了一層封裝。平常我們寫網絡程式設計相關函數,基本就是建立套接字,綁定位址,轉變為可監聽狀态(這部分我們在 Socket 類中實作過了,交給 Acceptor 調用即可),然後接受連接配接。

但是得到了一個初始化好的 socket 還不夠,我們是需要監聽這個 socket 上的事件并且處理事件的。比如我們在 Reactor 模型中使用了 epoll 監聽該 socket 上的事件,我們還需将需要被監視的套接字和監視的事件注冊到 epoll 對象中。

可以想到檔案描述符和事件和 IO 函數全都混在在了一起,極其不好維護。而 muduo 中的 Channel 類将檔案描述符和其感興趣的事件(需要監聽的事件)封裝到了一起。而事件監聽相關的代碼放到了 Poller/EPollPoller 類中。

成員變量

/**
* const int Channel::kNoneEvent = 0;
* const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
* const int Channel::kWriteEvent = EPOLLOUT;
*/
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

EventLoop *loop_;   // 目前Channel屬于的EventLoop
const int fd_;      // fd, Poller監聽對象
int events_;        // 注冊fd感興趣的事件
int revents_;       // poller傳回的具體發生的事件
int index_;         // 在Poller上注冊的情況

std::weak_ptr<void> tie_;   // 弱指針指向TcpConnection(必要時更新為shared_ptr多一份引用計數,避免使用者誤删)
bool tied_;  // 标志此 Channel 是否被調用過 Channel::tie 方法

// 儲存着事件到來時的回調函數
ReadEventCallback readCallback_; 	// 讀事件回調函數
EventCallback writeCallback_;		// 寫事件回調函數
EventCallback closeCallback_;		// 連接配接關閉回調函數
EventCallback errorCallback_;		// 錯誤發生回調函數
           
  • int fd_

    :這個Channel對象照看的檔案描述符
  • int events_

    :代表fd感興趣的事件類型集合
  • int revents_

    :代表事件監聽器實際監聽到該fd發生的事件類型集合,當事件監聽器監聽到一個fd發生了什麼事件,通過

    Channel::set_revents()

    函數來設定revents值。
  • EventLoop* loop_

    :這個 Channel 屬于哪個EventLoop對象,因為 muduo 采用的是 one loop per thread 模型,是以我們有不止一個 EventLoop。我們的 manLoop 接收新連接配接,将新連接配接相關事件注冊到線程池中的某一線程的 subLoop 上(輪詢)。我們不希望跨線程的處理函數,是以每個 Channel 都需要記錄是哪個 EventLoop 在處理自己的事情,這其中還涉及到了線程判斷的問題。
  • read_callback_

    write_callback_

    close_callback_

    error_callback_

    :這些是 std::function 類型,代表着這個Channel為這個檔案描述符儲存的各事件類型發生時的處理函數。比如這個fd發生了可讀事件,需要執行可讀事件處理函數,這時候Channel類都替你保管好了這些可調用函數。到時候交給 EventLoop 執行即可。
  • index

    :我們使用 index 來記錄 channel 與 Poller 相關的幾種狀态,Poller 類會判斷目前 channel 的狀态然後處理不同的事情。
    • kNew

      :是否還未被poll監視
    • kAdded

      :是否已在被監視中
    • kDeleted

      :是否已被移除
  • kNoneEvent

    kReadEvent

    kWriteEvent

    :事件狀态設定會使用的變量

成員函數

設定此 Channel 對于事件的回調函數

// 設定回調函數對象
// 使用右值引用,延長了臨時cb對象的生命周期,避免了拷貝操作
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setCloseCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setErrorCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
           

設定 Channel 感興趣的事件到 Poller

// 設定fd相應的事件狀态,update()其本質調用epoll_ctl
void enableReading() { events_ |= kReadEvent; update(); }     // 設定讀事件到poll對象中
void disableReading() { events_ &= ~kReadEvent; update(); }   // 從poll對象中移除讀時間
void enableWriting() { events_ |= kWriteEvent; update(); }    // 設定寫事件到poll對象中
void disableWriting() { events_ &= ~kWriteEvent; update(); }  // 從poll對象中移除寫時間
void disableAll() { events_ = kNoneEvent; update(); }         // 關閉所有事件
bool isWriting() const { return events_ & kWriteEvent; }      // 是否關注寫事件
bool isReading() const { return events_ & kReadEvent; }       // 是否關注讀事件
           

設定好該 Channel 的監視事件的類型,調用 update 私有函數向 Poller 注冊。實際調用 epoll_ctl

/**
 * 當改變channel所表示fd的events事件後
 * update負責在poller裡面更改fd相應的事件epoll_ctl 
 */
void Channel::update()
{
    // 通過該channel所屬的EventLoop,調用poller對應的方法,注冊fd的events事件
    loop_->updateChannel(this);
}
           

更新Channel關注的事件

// 更新該fd相關事件
void Channel::update()
{
  // 設定該channel狀态為已加入EventLoop
  addedToLoop_ = true;
  // 調用EventLoop::updateChannel,傳入channel指針
  // EventLoop::updateChannel => Poller::updateChannel
  loop_->updateChannel(this);
}
           

移除操作

// 從epoll對象中移除該fd
void Channel::remove()
{
  // 斷言無事件處理
  assert(isNoneEvent());  
  // 設定該Channel沒有被添加到eventLoop
  addedToLoop_ = false;   
  // 調用EventLoop::removeChannel,傳入channel指針
  // EventLoop::removeChannel => Poller::removeChannel
  loop_->removeChannel(this);
}
           

用于增加TcpConnection生命周期的tie方法(防止使用者誤删操作)

// 在TcpConnection建立得時候會調用
void Channel::tie(const std::shared_ptr<void> &obj)
{
    // weak_ptr 指向 obj
    tie_ = obj;
    // 設定tied_标志
    tied_ = true;
}
// fd得到poller通知以後,去處理事件
void Channel::handleEvent(Timestamp receiveTime)
{
    /**
     * 調用了Channel::tie得會設定tid_=true
     * 而TcpConnection::connectEstablished會調用channel_->tie(shared_from_this());
     * 是以對于TcpConnection::channel_ 需要多一份強引用的保證以免使用者誤删TcpConnection對象
     */
    if (tied_)
    {
        std::shared_ptr<void> guard = tie_.lock();
        if (guard)
        {
            handleEventWithGuard(receiveTime);
        }
        else 
        {
            handleEventWithGuard(receiveTime);
        }

    }
}
           

使用者使用muduo庫的時候,會利用到TcpConnection。使用者可以看見 TcpConnection,如果使用者注冊了要監視的事件和處理的回調函數,并在處理 subLoop 處理過程中「誤删」了 TcpConnection 的話會發生什麼呢?

總之,EventLoop 肯定不能很順暢的運作下去。畢竟它的生命周期小于 TcpConnection。為了防止使用者誤删的情況,TcpConnection 在建立之初

TcpConnection::connectEstablished

會調用此函數來提升對象生命周期。

實作方案是在處理事件時,如果對被調用了

tie()

方法的Channel對象,我們讓一個共享型智能指針指向它,在處理事件期間延長它的生命周期。哪怕外面「誤删」了此對象,也會因為多出來的引用計數而避免銷毀操作。

// 連接配接建立
void TcpConnection::connectEstablished()
{
    setState(kConnected); // 建立連接配接,設定一開始狀态為連接配接态
    /**
     * channel_->tie(shared_from_this());
     * tie相當于在底層有一個強引用指針記錄着,防止析構
     * 為了防止TcpConnection這個資源被誤删掉,而這個時候還有許多事件要處理
     * channel->tie 會進行一次判斷,是否将弱引用指針變成強引用,變成得話就防止了計數為0而被析構得可能
     */
    channel_->tie(shared_from_this());
    channel_->enableReading(); // 向poller注冊channel的EPOLLIN讀事件

    // 新連接配接建立 執行回調
    connectionCallback_(shared_from_this());
}
           

注意,傳遞的是 this 指針,是以是在 Channel 的内部增加對 TcpConnection 對象的引用計數(而不是 Channel 對象)。這裡展現了 shared_ptr 的一處妙用,可以通過引用計數來控制變量的生命周期。巧妙地在内部增加一個引用計數,假設在外面誤删,也不會因為引用計數為 0 而删除對象。

weak_ptr.lock()

會傳回

shared_ptr

(如果 weak_ptr 不為空)。

根據相應事件執行Channel儲存的回調函數

我們的Channel裡面儲存了許多回調函數,這些都是在對應的事件下被調用的。使用者提前設定寫好此事件的回調函數,并綁定到Channel的成員裡。等到事件發生時,Channel自然的調用事件處理方法。借由回調操作實作了異步的操作。

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    // 标志,此時正在處理各個事件
    eventHandling_ = true;
    LOG_TRACE << reventsToString();
    // 對端關閉事件
    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
    {
        if (logHup_)
        {
            LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
        }
        // 内部儲存function,這是判斷是否注冊了處理函數,有則直接調用
        if (closeCallback_) closeCallback_();
    }
    // fd不是一個打開的檔案
    if (revents_ & POLLNVAL)
    {
        LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
    }
    // 發生了錯誤,且fd不可一個可以打開的檔案
    if (revents_ & (POLLERR | POLLNVAL))
    {
        if (errorCallback_) errorCallback_();
    }
    // 讀事件 且是高優先級讀且發生了挂起
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    {
        if (readCallback_) readCallback_(receiveTime);
    }
    // 寫事件
    if (revents_ & POLLOUT)
    {
        if (writeCallback_) writeCallback_();
    }
    eventHandling_ = false;
}
           

完整代碼

Channel.h

class Channel : noncopyable
{
 public:
  // 儲存對應事件的回調函數
  typedef std::function<void()> EventCallback;
  // 讀事件的回調函數需要有 Timestamp 參數
  typedef std::function<void(Timestamp)> ReadEventCallback; 

  // Channel需要指定EventLoop對象和要封裝的fd
  Channel(EventLoop* loop, int fd);
  ~Channel();

  void handleEvent(Timestamp receiveTime);
  // 設定回調函數用到了移動操作,增加了cb對象的生命周期,否則還有拷貝和銷毀的操作
  void setReadCallback(ReadEventCallback cb)
  { readCallback_ = std::move(cb); }
  void setWriteCallback(EventCallback cb)
  { writeCallback_ = std::move(cb); }
  void setCloseCallback(EventCallback cb)
  { closeCallback_ = std::move(cb); }
  void setErrorCallback(EventCallback cb)
  { errorCallback_ = std::move(cb); }

  /// Tie this channel to the owner object managed by shared_ptr,
  /// prevent the owner object being destroyed in handleEvent.
  // 将此通道綁定到由shared_ptr管理的所有者對象,防止所有者對象在 handleEvent 中被銷毀。
  void tie(const std::shared_ptr<void>&);

  // 傳回管理的fd
  int fd() const { return fd_; }
  // 傳回感興趣的事件
  int events() const { return events_; }
  // 設定發生的事件
  void set_revents(int revt) { revents_ = revt; } // used by pollers
  // int revents() const { return revents_; }
  // 判斷是否無監視事件
  bool isNoneEvent() const { return events_ == kNoneEvent; }

  void enableReading() { events_ |= kReadEvent; update(); }     // 設定讀事件到poll對象中
  void disableReading() { events_ &= ~kReadEvent; update(); }   // 從poll對象中移除讀時間
  void enableWriting() { events_ |= kWriteEvent; update(); }    // 設定寫事件到poll對象中
  void disableWriting() { events_ &= ~kWriteEvent; update(); }  // 從poll對象中移除寫時間
  void disableAll() { events_ = kNoneEvent; update(); }         // 關閉所有事件
  bool isWriting() const { return events_ & kWriteEvent; }      // 是否關注寫事件
  bool isReading() const { return events_ & kReadEvent; }       // 是否關注讀事件

  // for Poller
  // index對應着幾種狀态 1.是否還未被poll監視 2.是否已在被監視中 3.是否是被移除
  // const int kNew = -1;
  // const int kAdded = 1;
  // const int kDeleted = 2;
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }

  // for debug
  string reventsToString() const;
  string eventsToString() const;

  void doNotLogHup() { logHup_ = false; }

  // 傳回所屬的 EventLoop
  EventLoop* ownerLoop() { return loop_; }
  // 從epoll對象中移除該fd
  void remove();

 private:
  static string eventsToString(int fd, int ev);

  // 更新事件
  void update();
  // 處理事件
  void handleEventWithGuard(Timestamp receiveTime);

  /**
   * const int Channel::kNoneEvent = 0;
   * const int Channel::kReadEvent = POLLIN | POLLPRI;
   * const int Channel::kWriteEvent = POLLOUT;
   */
  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  EventLoop* loop_;     // 所屬的事件循環
  const int  fd_;       // 封裝的fd
  int        events_;   // 該fd關注事件
  int        revents_;  // epoll或poll傳回的發生的事件 it's the received event types of epoll or poll
  int        index_;    // Poller用于判斷的該fd的狀态 used by Poller.
  bool       logHup_;   

  std::weak_ptr<void> tie_; // 保證channel所在的類
  bool tied_;
  bool eventHandling_;      // 是否處于事件循環中(正處理事件)
  bool addedToLoop_;        // 是否已被添加到事件循環
  ReadEventCallback readCallback_;  // fd發生讀事件的回調函數
  EventCallback writeCallback_;     // fd發生寫事件的回調函數
  EventCallback closeCallback_;     // fd發生關閉事件的回調函數
  EventCallback errorCallback_;     // fd發生錯誤事件的回調函數
};
           

Channel.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
#include "muduo/net/EventLoop.h"

#include <sstream>

#include <poll.h>

using namespace muduo;
using namespace muduo::net;

const int Channel::kNoneEvent = 0;                // 無事件
const int Channel::kReadEvent = POLLIN | POLLPRI; // 讀事件
const int Channel::kWriteEvent = POLLOUT;         // 寫事件

Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1),
    logHup_(true),
    tied_(false),
    eventHandling_(false),
    addedToLoop_(false)
{
}

Channel::~Channel()
{
  assert(!eventHandling_);  // 斷言此時不處理事件
  assert(!addedToLoop_);    // 斷言此時Channel并不在某個EventLoop中
  // 斷言此時是建立此EventLoop的線程
  if (loop_->isInLoopThread())
  {
    assert(!loop_->hasChannel(this));
  }
}

// 調用此方法的Channel對象的tie_被置為true
// 是以調用handleEvent方法時會被一個共享指針所指向,增加引用計數進而延長其生命周期
void Channel::tie(const std::shared_ptr<void>& obj)
{
  tie_ = obj;
  tied_ = true;
}

// 更新該fd相關事件
void Channel::update()
{
  // 設定該channel狀态為已加入EventLoop
  addedToLoop_ = true;
  // 調用EventLoop::updateChannel,傳入channel指針
  // EventLoop::updateChannel => Poller::updateChannel
  loop_->updateChannel(this);
}

// 從epoll對象中移除該fd
void Channel::remove()
{
  // 斷言無事件處理
  assert(isNoneEvent());  
  // 設定該Channel沒有被添加到eventLoop
  addedToLoop_ = false;   
  // 調用EventLoop::removeChannel,傳入channel指針
  // EventLoop::removeChannel => Poller::removeChannel
  loop_->removeChannel(this);
}

void Channel::handleEvent(Timestamp receiveTime)
{
  /**
  * 如果調用了Channel::tie方法會設定tid_=true
  * 而TcpConnection::connectEstablished會調用channel_->tie(shared_from_this());
  * 因為TcpConnection::channel_ 需要多一份強引用的保證以免使用者誤删TcpConnection對象(它是對外可見的)
  * 是以在執行任務期間,TcpConnection不會被銷毀
  */
  std::shared_ptr<void> guard;
  if (tied_)
  {
    // weak_ptr::lock() 傳回一個 shared_ptr
    guard = tie_.lock();
    if (guard)
    {
      // 處理事件
      handleEventWithGuard(receiveTime);
    }
  }
  else
  {
    // 處理事件
    handleEventWithGuard(receiveTime);
  }
}

/**
 * struct pollfd{
 *
 *   int fd;          //檔案描述符
 *
 *  short events;    //請求的事件
 *
 *  short revents;   //傳回的事件
 * };
 * 
 * 
 * POLLIN	普通或優先級帶資料可讀
 * POLLRDNORM	普通資料可讀
 * POLLRDBAND	優先級帶資料可讀
 * POLLPRI	高優先級資料可讀
 * POLLOUT	普通資料可寫
 * POLLWRNORM	普通資料可寫
 * POLLWRBAND	優先級帶資料可寫
 * POLLERR	發生錯誤
 * POLLHUP	發生挂起
 * POLLNVAL	描述字不是一個打開的檔案
*/
// 處理事件,調用各個事件對應的回調函數
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    // 标志,此時正在處理各個事件
    eventHandling_ = true;
    LOG_TRACE << reventsToString();
    // 對端關閉事件
    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
    {
        if (logHup_)
        {
            LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
        }
        // 内部儲存function,這是判斷是否注冊了處理函數,有則直接調用
        if (closeCallback_) closeCallback_();
    }
    // fd不是一個打開的檔案
    if (revents_ & POLLNVAL)
    {
        LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
    }
    // 發生了錯誤,且fd不可一個可以打開的檔案
    if (revents_ & (POLLERR | POLLNVAL))
    {
        if (errorCallback_) errorCallback_();
    }
    // 讀事件 且是高優先級讀且發生了挂起
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    {
        if (readCallback_) readCallback_(receiveTime);
    }
    // 寫事件
    if (revents_ & POLLOUT)
    {
        if (writeCallback_) writeCallback_();
    }
    eventHandling_ = false;
}

string Channel::reventsToString() const
{
  return eventsToString(fd_, revents_);
}

string Channel::eventsToString() const
{
  return eventsToString(fd_, events_);
}

string Channel::eventsToString(int fd, int ev)
{
  std::ostringstream oss;
  oss << fd << ": ";
  if (ev & POLLIN)
    oss << "IN ";
  if (ev & POLLPRI)
    oss << "PRI ";
  if (ev & POLLOUT)
    oss << "OUT ";
  if (ev & POLLHUP)
    oss << "HUP ";
  if (ev & POLLRDHUP)
    oss << "RDHUP ";
  if (ev & POLLERR)
    oss << "ERR ";
  if (ev & POLLNVAL)
    oss << "NVAL ";

  return oss.str();
}
           

參考

萬字長文梳理Muduo庫核心代碼及優秀程式設計細節思想剖析