文章目錄
- 什麼是Channel
- 成員變量
- 成員函數
-
- 設定此 Channel 對于事件的回調函數
- 設定 Channel 感興趣的事件到 Poller
- 更新Channel關注的事件
- 移除操作
- 用于增加TcpConnection生命周期的tie方法(防止使用者誤删操作)
- 根據相應事件執行Channel儲存的回調函數
- 完整代碼
-
- Channel.h
- Channel.cc
- 參考
什麼是Channel
參考muduo庫使用C++11重寫網絡庫
GitHub位址:Tiny C++ Network Library
Channel 對檔案描述符和事件進行了一層封裝。平常我們寫網絡程式設計相關函數,基本就是建立套接字,綁定位址,轉變為可監聽狀态(這部分我們在 Socket 類中實作過了,交給 Acceptor 調用即可),然後接受連接配接。
但是得到了一個初始化好的 socket 還不夠,我們是需要監聽這個 socket 上的事件并且處理事件的。比如我們在 Reactor 模型中使用了 epoll 監聽該 socket 上的事件,我們還需将需要被監視的套接字和監視的事件注冊到 epoll 對象中。
可以想到檔案描述符和事件和 IO 函數全都混在在了一起,極其不好維護。而 muduo 中的 Channel 類将檔案描述符和其感興趣的事件(需要監聽的事件)封裝到了一起。而事件監聽相關的代碼放到了 Poller/EPollPoller 類中。
成員變量
/**
* const int Channel::kNoneEvent = 0;
* const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
* const int Channel::kWriteEvent = EPOLLOUT;
*/
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop *loop_; // 目前Channel屬于的EventLoop
const int fd_; // fd, Poller監聽對象
int events_; // 注冊fd感興趣的事件
int revents_; // poller傳回的具體發生的事件
int index_; // 在Poller上注冊的情況
std::weak_ptr<void> tie_; // 弱指針指向TcpConnection(必要時更新為shared_ptr多一份引用計數,避免使用者誤删)
bool tied_; // 标志此 Channel 是否被調用過 Channel::tie 方法
// 儲存着事件到來時的回調函數
ReadEventCallback readCallback_; // 讀事件回調函數
EventCallback writeCallback_; // 寫事件回調函數
EventCallback closeCallback_; // 連接配接關閉回調函數
EventCallback errorCallback_; // 錯誤發生回調函數
-
:這個Channel對象照看的檔案描述符int fd_
-
:代表fd感興趣的事件類型集合int events_
-
:代表事件監聽器實際監聽到該fd發生的事件類型集合,當事件監聽器監聽到一個fd發生了什麼事件,通過int revents_
函數來設定revents值。Channel::set_revents()
-
:這個 Channel 屬于哪個EventLoop對象,因為 muduo 采用的是 one loop per thread 模型,是以我們有不止一個 EventLoop。我們的 manLoop 接收新連接配接,将新連接配接相關事件注冊到線程池中的某一線程的 subLoop 上(輪詢)。我們不希望跨線程的處理函數,是以每個 Channel 都需要記錄是哪個 EventLoop 在處理自己的事情,這其中還涉及到了線程判斷的問題。EventLoop* loop_
-
、read_callback_
、write_callback_
、close_callback_
:這些是 std::function 類型,代表着這個Channel為這個檔案描述符儲存的各事件類型發生時的處理函數。比如這個fd發生了可讀事件,需要執行可讀事件處理函數,這時候Channel類都替你保管好了這些可調用函數。到時候交給 EventLoop 執行即可。error_callback_
-
:我們使用 index 來記錄 channel 與 Poller 相關的幾種狀态,Poller 類會判斷目前 channel 的狀态然後處理不同的事情。index
-
-
:是否還未被poll監視kNew
-
:是否已在被監視中kAdded
-
:是否已被移除kDeleted
-
-
、kNoneEvent
、kReadEvent
:事件狀态設定會使用的變量kWriteEvent
成員函數
設定此 Channel 對于事件的回調函數
// 設定回調函數對象
// 使用右值引用,延長了臨時cb對象的生命周期,避免了拷貝操作
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setCloseCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setErrorCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
設定 Channel 感興趣的事件到 Poller
// 設定fd相應的事件狀态,update()其本質調用epoll_ctl
void enableReading() { events_ |= kReadEvent; update(); } // 設定讀事件到poll對象中
void disableReading() { events_ &= ~kReadEvent; update(); } // 從poll對象中移除讀時間
void enableWriting() { events_ |= kWriteEvent; update(); } // 設定寫事件到poll對象中
void disableWriting() { events_ &= ~kWriteEvent; update(); } // 從poll對象中移除寫時間
void disableAll() { events_ = kNoneEvent; update(); } // 關閉所有事件
bool isWriting() const { return events_ & kWriteEvent; } // 是否關注寫事件
bool isReading() const { return events_ & kReadEvent; } // 是否關注讀事件
設定好該 Channel 的監視事件的類型,調用 update 私有函數向 Poller 注冊。實際調用 epoll_ctl
/**
* 當改變channel所表示fd的events事件後
* update負責在poller裡面更改fd相應的事件epoll_ctl
*/
void Channel::update()
{
// 通過該channel所屬的EventLoop,調用poller對應的方法,注冊fd的events事件
loop_->updateChannel(this);
}
更新Channel關注的事件
// 更新該fd相關事件
void Channel::update()
{
// 設定該channel狀态為已加入EventLoop
addedToLoop_ = true;
// 調用EventLoop::updateChannel,傳入channel指針
// EventLoop::updateChannel => Poller::updateChannel
loop_->updateChannel(this);
}
移除操作
// 從epoll對象中移除該fd
void Channel::remove()
{
// 斷言無事件處理
assert(isNoneEvent());
// 設定該Channel沒有被添加到eventLoop
addedToLoop_ = false;
// 調用EventLoop::removeChannel,傳入channel指針
// EventLoop::removeChannel => Poller::removeChannel
loop_->removeChannel(this);
}
用于增加TcpConnection生命周期的tie方法(防止使用者誤删操作)
// 在TcpConnection建立得時候會調用
void Channel::tie(const std::shared_ptr<void> &obj)
{
// weak_ptr 指向 obj
tie_ = obj;
// 設定tied_标志
tied_ = true;
}
// fd得到poller通知以後,去處理事件
void Channel::handleEvent(Timestamp receiveTime)
{
/**
* 調用了Channel::tie得會設定tid_=true
* 而TcpConnection::connectEstablished會調用channel_->tie(shared_from_this());
* 是以對于TcpConnection::channel_ 需要多一份強引用的保證以免使用者誤删TcpConnection對象
*/
if (tied_)
{
std::shared_ptr<void> guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
else
{
handleEventWithGuard(receiveTime);
}
}
}
使用者使用muduo庫的時候,會利用到TcpConnection。使用者可以看見 TcpConnection,如果使用者注冊了要監視的事件和處理的回調函數,并在處理 subLoop 處理過程中「誤删」了 TcpConnection 的話會發生什麼呢?
總之,EventLoop 肯定不能很順暢的運作下去。畢竟它的生命周期小于 TcpConnection。為了防止使用者誤删的情況,TcpConnection 在建立之初
TcpConnection::connectEstablished
會調用此函數來提升對象生命周期。
實作方案是在處理事件時,如果對被調用了
tie()
方法的Channel對象,我們讓一個共享型智能指針指向它,在處理事件期間延長它的生命周期。哪怕外面「誤删」了此對象,也會因為多出來的引用計數而避免銷毀操作。
// 連接配接建立
void TcpConnection::connectEstablished()
{
setState(kConnected); // 建立連接配接,設定一開始狀态為連接配接态
/**
* channel_->tie(shared_from_this());
* tie相當于在底層有一個強引用指針記錄着,防止析構
* 為了防止TcpConnection這個資源被誤删掉,而這個時候還有許多事件要處理
* channel->tie 會進行一次判斷,是否将弱引用指針變成強引用,變成得話就防止了計數為0而被析構得可能
*/
channel_->tie(shared_from_this());
channel_->enableReading(); // 向poller注冊channel的EPOLLIN讀事件
// 新連接配接建立 執行回調
connectionCallback_(shared_from_this());
}
注意,傳遞的是 this 指針,是以是在 Channel 的内部增加對 TcpConnection 對象的引用計數(而不是 Channel 對象)。這裡展現了 shared_ptr 的一處妙用,可以通過引用計數來控制變量的生命周期。巧妙地在内部增加一個引用計數,假設在外面誤删,也不會因為引用計數為 0 而删除對象。
weak_ptr.lock()
會傳回
shared_ptr
(如果 weak_ptr 不為空)。
根據相應事件執行Channel儲存的回調函數
我們的Channel裡面儲存了許多回調函數,這些都是在對應的事件下被調用的。使用者提前設定寫好此事件的回調函數,并綁定到Channel的成員裡。等到事件發生時,Channel自然的調用事件處理方法。借由回調操作實作了異步的操作。
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
// 标志,此時正在處理各個事件
eventHandling_ = true;
LOG_TRACE << reventsToString();
// 對端關閉事件
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
// 内部儲存function,這是判斷是否注冊了處理函數,有則直接調用
if (closeCallback_) closeCallback_();
}
// fd不是一個打開的檔案
if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}
// 發生了錯誤,且fd不可一個可以打開的檔案
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
// 讀事件 且是高優先級讀且發生了挂起
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
// 寫事件
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
完整代碼
Channel.h
class Channel : noncopyable
{
public:
// 儲存對應事件的回調函數
typedef std::function<void()> EventCallback;
// 讀事件的回調函數需要有 Timestamp 參數
typedef std::function<void(Timestamp)> ReadEventCallback;
// Channel需要指定EventLoop對象和要封裝的fd
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
// 設定回調函數用到了移動操作,增加了cb對象的生命周期,否則還有拷貝和銷毀的操作
void setReadCallback(ReadEventCallback cb)
{ readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb)
{ writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb)
{ closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb)
{ errorCallback_ = std::move(cb); }
/// Tie this channel to the owner object managed by shared_ptr,
/// prevent the owner object being destroyed in handleEvent.
// 将此通道綁定到由shared_ptr管理的所有者對象,防止所有者對象在 handleEvent 中被銷毀。
void tie(const std::shared_ptr<void>&);
// 傳回管理的fd
int fd() const { return fd_; }
// 傳回感興趣的事件
int events() const { return events_; }
// 設定發生的事件
void set_revents(int revt) { revents_ = revt; } // used by pollers
// int revents() const { return revents_; }
// 判斷是否無監視事件
bool isNoneEvent() const { return events_ == kNoneEvent; }
void enableReading() { events_ |= kReadEvent; update(); } // 設定讀事件到poll對象中
void disableReading() { events_ &= ~kReadEvent; update(); } // 從poll對象中移除讀時間
void enableWriting() { events_ |= kWriteEvent; update(); } // 設定寫事件到poll對象中
void disableWriting() { events_ &= ~kWriteEvent; update(); } // 從poll對象中移除寫時間
void disableAll() { events_ = kNoneEvent; update(); } // 關閉所有事件
bool isWriting() const { return events_ & kWriteEvent; } // 是否關注寫事件
bool isReading() const { return events_ & kReadEvent; } // 是否關注讀事件
// for Poller
// index對應着幾種狀态 1.是否還未被poll監視 2.是否已在被監視中 3.是否是被移除
// const int kNew = -1;
// const int kAdded = 1;
// const int kDeleted = 2;
int index() { return index_; }
void set_index(int idx) { index_ = idx; }
// for debug
string reventsToString() const;
string eventsToString() const;
void doNotLogHup() { logHup_ = false; }
// 傳回所屬的 EventLoop
EventLoop* ownerLoop() { return loop_; }
// 從epoll對象中移除該fd
void remove();
private:
static string eventsToString(int fd, int ev);
// 更新事件
void update();
// 處理事件
void handleEventWithGuard(Timestamp receiveTime);
/**
* const int Channel::kNoneEvent = 0;
* const int Channel::kReadEvent = POLLIN | POLLPRI;
* const int Channel::kWriteEvent = POLLOUT;
*/
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_; // 所屬的事件循環
const int fd_; // 封裝的fd
int events_; // 該fd關注事件
int revents_; // epoll或poll傳回的發生的事件 it's the received event types of epoll or poll
int index_; // Poller用于判斷的該fd的狀态 used by Poller.
bool logHup_;
std::weak_ptr<void> tie_; // 保證channel所在的類
bool tied_;
bool eventHandling_; // 是否處于事件循環中(正處理事件)
bool addedToLoop_; // 是否已被添加到事件循環
ReadEventCallback readCallback_; // fd發生讀事件的回調函數
EventCallback writeCallback_; // fd發生寫事件的回調函數
EventCallback closeCallback_; // fd發生關閉事件的回調函數
EventCallback errorCallback_; // fd發生錯誤事件的回調函數
};
Channel.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
#include "muduo/net/EventLoop.h"
#include <sstream>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;
const int Channel::kNoneEvent = 0; // 無事件
const int Channel::kReadEvent = POLLIN | POLLPRI; // 讀事件
const int Channel::kWriteEvent = POLLOUT; // 寫事件
Channel::Channel(EventLoop* loop, int fd__)
: loop_(loop),
fd_(fd__),
events_(0),
revents_(0),
index_(-1),
logHup_(true),
tied_(false),
eventHandling_(false),
addedToLoop_(false)
{
}
Channel::~Channel()
{
assert(!eventHandling_); // 斷言此時不處理事件
assert(!addedToLoop_); // 斷言此時Channel并不在某個EventLoop中
// 斷言此時是建立此EventLoop的線程
if (loop_->isInLoopThread())
{
assert(!loop_->hasChannel(this));
}
}
// 調用此方法的Channel對象的tie_被置為true
// 是以調用handleEvent方法時會被一個共享指針所指向,增加引用計數進而延長其生命周期
void Channel::tie(const std::shared_ptr<void>& obj)
{
tie_ = obj;
tied_ = true;
}
// 更新該fd相關事件
void Channel::update()
{
// 設定該channel狀态為已加入EventLoop
addedToLoop_ = true;
// 調用EventLoop::updateChannel,傳入channel指針
// EventLoop::updateChannel => Poller::updateChannel
loop_->updateChannel(this);
}
// 從epoll對象中移除該fd
void Channel::remove()
{
// 斷言無事件處理
assert(isNoneEvent());
// 設定該Channel沒有被添加到eventLoop
addedToLoop_ = false;
// 調用EventLoop::removeChannel,傳入channel指針
// EventLoop::removeChannel => Poller::removeChannel
loop_->removeChannel(this);
}
void Channel::handleEvent(Timestamp receiveTime)
{
/**
* 如果調用了Channel::tie方法會設定tid_=true
* 而TcpConnection::connectEstablished會調用channel_->tie(shared_from_this());
* 因為TcpConnection::channel_ 需要多一份強引用的保證以免使用者誤删TcpConnection對象(它是對外可見的)
* 是以在執行任務期間,TcpConnection不會被銷毀
*/
std::shared_ptr<void> guard;
if (tied_)
{
// weak_ptr::lock() 傳回一個 shared_ptr
guard = tie_.lock();
if (guard)
{
// 處理事件
handleEventWithGuard(receiveTime);
}
}
else
{
// 處理事件
handleEventWithGuard(receiveTime);
}
}
/**
* struct pollfd{
*
* int fd; //檔案描述符
*
* short events; //請求的事件
*
* short revents; //傳回的事件
* };
*
*
* POLLIN 普通或優先級帶資料可讀
* POLLRDNORM 普通資料可讀
* POLLRDBAND 優先級帶資料可讀
* POLLPRI 高優先級資料可讀
* POLLOUT 普通資料可寫
* POLLWRNORM 普通資料可寫
* POLLWRBAND 優先級帶資料可寫
* POLLERR 發生錯誤
* POLLHUP 發生挂起
* POLLNVAL 描述字不是一個打開的檔案
*/
// 處理事件,調用各個事件對應的回調函數
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
// 标志,此時正在處理各個事件
eventHandling_ = true;
LOG_TRACE << reventsToString();
// 對端關閉事件
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
// 内部儲存function,這是判斷是否注冊了處理函數,有則直接調用
if (closeCallback_) closeCallback_();
}
// fd不是一個打開的檔案
if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}
// 發生了錯誤,且fd不可一個可以打開的檔案
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
// 讀事件 且是高優先級讀且發生了挂起
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
// 寫事件
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
string Channel::reventsToString() const
{
return eventsToString(fd_, revents_);
}
string Channel::eventsToString() const
{
return eventsToString(fd_, events_);
}
string Channel::eventsToString(int fd, int ev)
{
std::ostringstream oss;
oss << fd << ": ";
if (ev & POLLIN)
oss << "IN ";
if (ev & POLLPRI)
oss << "PRI ";
if (ev & POLLOUT)
oss << "OUT ";
if (ev & POLLHUP)
oss << "HUP ";
if (ev & POLLRDHUP)
oss << "RDHUP ";
if (ev & POLLERR)
oss << "ERR ";
if (ev & POLLNVAL)
oss << "NVAL ";
return oss.str();
}
參考
萬字長文梳理Muduo庫核心代碼及優秀程式設計細節思想剖析