這篇文章承接于我的上兩篇文章:
muduo核心元件分析_小豬快快跑的部落格-CSDN部落格
有用戶端連接配接後muduo是怎麼運作的_小豬快快跑的部落格-CSDN部落格
如果有不正确的地方,歡迎朋友們指正。
用 telnet 指令給伺服器發送消息。
下面的顔色代表子線程。
下面是 main.cc:
// main.cc
class EchoServer
{
public:
EchoServer(muduo::net::EventLoop* loop,
const muduo::net::InetAddress& listenAddr)
: server_(loop, listenAddr, "EchoServer")
{
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}
void start()
{
server_.start();
}
private:
void onConnection(const muduo::net::TcpConnectionPtr& conn)
{
}
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp time)
{
muduo::string msg(buf->retrieveAllAsString());
conn->send(msg);
conn->shutdown();
}
muduo::net::TcpServer server_;
};
int main()
{
muduo::net::EventLoop loop;
muduo::net::InetAddress listenAddr(2007);
EchoServer server(&loop, listenAddr);
server.start();
loop.loop();
}
在用戶端沒有發消息的時候,子線程阻塞在下面的函數(epoll_wait可以設定逾時時間,這樣就不阻塞了):
// EPollPoller.cc
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size() * 2);
}
}
else if (numEvents == 0)
{}
else
{
if (savedErrno != EINTR)
{
errno = savedErrno;
}
}
return now;
}
此時用戶端發送了一條資訊,就從 epoll_wait 執行到 fillActiveChannels 函數了。
下面跳轉到 fillActiveChannels:
// EPollPoller.cc
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
傳回到 EPollPoller::poll。由于 EPollPoller::poll 是 EventLoop::loop 調用的。那麼再從 EPollPoller::poll 傳回到 EventLoop::loop 的 while 循環裡:
// EventLoop.cc
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
looping_ = false;
}
進入 for 循環調用 Channel::handleEvent:
// Channel.cc
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
進入第二個 if 裡的 handleEventWithGuard(receiveTime):
// Channel.cc
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
發生了來自用戶端的讀事件,進入 readCallback_:
在上一篇文章的主線程裡設定了 Channel::readCallback_,回憶一下:
TcpServer::newConnection 構造 TcpConnection。跳轉到 TcpConnection 的構造函數:// TcpConnection.cc TcpConnection::TcpConnection(EventLoop* loop, const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), reading_(true), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64 * 1024 * 1024) { channel_->setReadCallback( std::bind(&TcpConnection::handleRead, this, _1)); channel_->setWriteCallback( std::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( std::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( std::bind(&TcpConnection::handleError, this)); socket_->setKeepAlive(true); }
于是跳到 TcpConnection.cc::handleRead:
// TcpConnection.cc
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
handleError();
}
}
下面這行讀取資料:
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
然後執行 messageCallback_ 回調函數,也就是 main.cc::onMessage()。
這是在上一篇文章的 TcpServer::newConnection 裡設定的:// TcpServer.cc void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[64]; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; InetAddress localAddr(sockets::getLocalAddr(sockfd)); TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
下面跳轉到 main.cc::onMessage():
// main.cc
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp time)
{
muduo::string msg(buf->retrieveAllAsString());
conn->send(msg);
conn->shutdown();
}
先來執行:
conn->send(msg);
跳轉到 TcpConnection::send:
// TcpConnection.cc
void TcpConnection::send(Buffer* buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else
{
void (TcpConnection:: * fp)(const StringPiece & message) = &TcpConnection::sendInLoop;
loop_->runInLoop(
std::bind(fp,
this, // FIXME
buf->retrieveAllAsString()));
//std::forward<string>(message)));
}
}
}
回到 main.cc::onMessage(),執行:
conn->shutdown();
跳轉到 TcpConnection::shutdown:
// TcpConnection.cc
void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}
由
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
跳轉到:
// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
下面執行 cb(),cb 是傳進來的參數,即 TcpConnection::shutdownInLoop:
// TcpConnection.cc
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}
if (!channel_->isWriting()) 是判斷輸出緩沖區的資料有沒有發送完畢。我這裡是已經發送完畢了,于是跳轉到 Socket.cc:shutdownWrite:
// Socket.cc
void Socket::shutdownWrite()
{
sockets::shutdownWrite(sockfd_);
}
再轉到:
// sockets.cc
void sockets::shutdownWrite(int sockfd)
{
if (::shutdown(sockfd, SHUT_WR) < 0)
{
}
}
回到 EventLoop::loop:
// EventLoop.cc
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
looping_ = false;
}
上面都是在處理:
currentActiveChannel_->handleEvent(pollReturnTime_);
下面執行:
doPendingFunctors();
轉到 EventLoop::doPendingFunctors():
// EventLoop.cc
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
functors 為空,不執行這個函數。
到目為止這個函數隻在有用戶端建立新連接配接後子線程執行過,做的是 epoll_ctl。
回到 EventLoop::loop() 的 while 循環的開頭,進入 EPollPoller::poll():
// EPollPoller.cc
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size() * 2);
}
}
else if (numEvents == 0)
{}
else
{
if (savedErrno != EINTR)
{
errno = savedErrno;
}
}
return now;
}
此時 epoll_wait 監測到事件發生,不阻塞。執行 fillActiveChannels(numEvents, activeChannels):
// EPollPoller.cc
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
回到 Eventloop::loop,執行 currentActiveChannel_->handleEvent(pollReturnTime_):
// Channel.cc
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
下面執行第二個 if 裡的 handleEventWithGuard:
// Channel.cc
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
執行 readCallback_。此時的 readCallback_ 是 TcpConnection::handleRead。也是在上一篇文章的主線程裡設定了 Channel::readCallback_。那麼轉到 TcpConnection::handleRead:
// TcpConnection.cc
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
handleError();
}
}
此時沒讀到資料,進入 handleClose():
// TcpConnection.cc
void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);
// must be the last line
closeCallback_(guardThis);
}
channel_->disableAll() 又是一系列的 update 操作。
然後執行 connectionCallback_。這裡的 connectionCallback_ 是 main::onConnection。在 TcpServer::newConnection 裡設定的。這裡沒有為 main::onConnection 函數寫一些代碼,可以用 if (conn->connected()) 判斷是否斷開。
最後執行 closeCallback_ 回調函數,即 TcpServer::removeConnection,也是在 TcpServer::newConnection 裡設定的:
// TcpServer.cc::newConnection
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1));
下面轉到:
// TcpServer.cc
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
調用 EventLoop::runInLoop::
// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
調用 queueInLoop,因為要在主線程删除一些和這個關閉的用戶端有關的東西:
// EventLoop.cc
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
上面的 cb 是 removeConnectionInLoop。
然後進入 wakeup(),由子線程通過 wakeupFd 給主線程發消息。
又回到 EventLoop.loop:
// EventLoop.cc
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
looping_ = false;
}
上面處理了 currentActiveChannel_->handleEvent(pollReturnTime_)。
下面處理 doPendingFunctors(),此時子線程沒什麼好處理的。因為之前已經 update 操作過了。
既然子線程給主線程 wakeup 了,那麼輪到主線程了。顔色換回黑色。
EPollPoller::poll 裡的 epoll_wait 傳回了發生的事件,又是執行:EPollPoller.::fillActiveChannels。
然後傳回 EventLoop::loop,執行 channel->handleEvent(pollReturnTime_):
// Channel.cc
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
執行 else 裡的 handleEventWithGuard(receiveTime):
// Channel.cc
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
主線程讀到了子線程發的 one,是以執行 readCallback_,也就是 EventLoop::handleRead。在第一篇文章的過程四:server.start() 裡設定的。那時候啟動線程池,每個線程建立 wakeupFd 的時候設定的。readCallback_ 就是 EventLoop::handleRead:
// EventLoop.cc
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
}
再回到 EventLoop,執行 doPendingFunctors():
// EventLoop.cc
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
pendingFunctors_ 是在子線程的 queueInLoop 裡添加的。要執行的回調函數是 TcpServer::removeConnectionInLoop。
// removeConnectionInLoop.cc
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
size_t n = connections_.erase(conn->name());
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
又要調用 queueInLoop(cb)。這裡的 cb 是 TcpConnection::connectDestroyed。在 queueInLoop 裡主線程給子線程發消息。
子線程又回到 loop 的 while 循環裡了。
注意主線程沒有 updata 操作,也就沒有 EPOLL_CTL_DEL 操作。
主線程給子線程發送了消息,又到子線程了。
子線程又要到 loop 裡的 doPendingFunctors。
doPendingFunctors 裡的回調操作就是 TcpConnection::connectDestroyed:
// TcpConnection.cc
void TcpConnection::connectDestroyed()
{
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();
}
connectionCallback_ 和前面一樣沒做任何操作,就是 main.cc 裡定義的。
然後執行 channel_->remove():
// Channel.cc
void Channel::remove()
{
addedToLoop_ = false;
loop_->removeChannel(this);
}
再:
// EventLoop.cc
void EventLoop::removeChannel(Channel* channel)
{
poller_->removeChannel(channel);
}
再:
// EPollPoller.cc
void EPollPoller::removeChannel(Channel* channel)
{
int index = channel->index();
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}
然後再 update。
後面就是析構了。