天天看點

用戶端發送消息後muduo是怎麼運作的

這篇文章承接于我的上兩篇文章:

muduo核心元件分析_小豬快快跑的部落格-CSDN部落格

有用戶端連接配接後muduo是怎麼運作的_小豬快快跑的部落格-CSDN部落格

如果有不正确的地方,歡迎朋友們指正。

用 telnet 指令給伺服器發送消息。

下面的顔色代表子線程。

下面是 main.cc:

// main.cc
class EchoServer
{
public:
    EchoServer(muduo::net::EventLoop* loop,
             const muduo::net::InetAddress& listenAddr)
      : server_(loop, listenAddr, "EchoServer")
    {
      server_.setConnectionCallback(
          std::bind(&EchoServer::onConnection, this, _1));
      server_.setMessageCallback(
          std::bind(&EchoServer::onMessage, this, _1, _2, _3));
    }
 
    void start() 
    {
        server_.start();
    }
private:
    void onConnection(const muduo::net::TcpConnectionPtr& conn)
    {
    }
 
    void onMessage(const muduo::net::TcpConnectionPtr& conn,
                 muduo::net::Buffer* buf,
                 muduo::Timestamp time)
    {
        muduo::string msg(buf->retrieveAllAsString());
        conn->send(msg);
        conn->shutdown();
    }
 
    muduo::net::TcpServer server_;
};
 
 
int main()
{
    muduo::net::EventLoop loop;
    muduo::net::InetAddress listenAddr(2007);
    EchoServer server(&loop, listenAddr);
    server.start();
    loop.loop();
}
           

在用戶端沒有發消息的時候,子線程阻塞在下面的函數(epoll_wait可以設定逾時時間,這樣就不阻塞了):

// EPollPoller.cc
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
    int numEvents = ::epoll_wait(epollfd_,
        &*events_.begin(),
        static_cast<int>(events_.size()),
        timeoutMs);
    int savedErrno = errno;
    Timestamp now(Timestamp::now());
    if (numEvents > 0)
    {
        fillActiveChannels(numEvents, activeChannels);
        if (implicit_cast<size_t>(numEvents) == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
    else if (numEvents == 0)
    {}
    else
    {
        if (savedErrno != EINTR)
        {
            errno = savedErrno;
        }
    }
    return now;
}
           

此時用戶端發送了一條資訊,就從 epoll_wait 執行到 fillActiveChannels 函數了。

下面跳轉到 fillActiveChannels:

// EPollPoller.cc
void EPollPoller::fillActiveChannels(int numEvents,
    ChannelList* activeChannels) const
{
    for (int i = 0; i < numEvents; ++i)
    {
        Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);
    }
}
           

傳回到 EPollPoller::poll。由于 EPollPoller::poll 是 EventLoop::loop 調用的。那麼再從 EPollPoller::poll 傳回到 EventLoop::loop 的 while 循環裡:

// EventLoop.cc
void EventLoop::loop()
{
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false;  
    while (!quit_)
    {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        ++iteration_;
        eventHandling_ = true;
        for (Channel* channel : activeChannels_)
        {
            currentActiveChannel_ = channel;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;
        doPendingFunctors();
    }
 
    looping_ = false;
}
           

進入 for 循環調用 Channel::handleEvent:

// Channel.cc
void Channel::handleEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if (tied_)
    {
        guard = tie_.lock();
        if (guard)
        {
            handleEventWithGuard(receiveTime);
        }
    }
    else
    {
        handleEventWithGuard(receiveTime);
    }
}
           

進入第二個 if 裡的 handleEventWithGuard(receiveTime):

// Channel.cc
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    eventHandling_ = true;
    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
    {
        if (closeCallback_) closeCallback_();
    }
 
    if (revents_ & POLLNVAL)
    {
    }
 
    if (revents_ & (POLLERR | POLLNVAL))
    {
        if (errorCallback_) errorCallback_();
    }
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    {
        if (readCallback_) readCallback_(receiveTime);
    }
    if (revents_ & POLLOUT)
    {
        if (writeCallback_) writeCallback_();
    }
    eventHandling_ = false;
}
           

發生了來自用戶端的讀事件,進入 readCallback_:

在上一篇文章的主線程裡設定了 Channel::readCallback_,回憶一下:

TcpServer::newConnection 構造 TcpConnection。跳轉到 TcpConnection 的構造函數:
// TcpConnection.cc
TcpConnection::TcpConnection(EventLoop* loop,
    const string& nameArg,
    int sockfd,
    const InetAddress& localAddr,
    const InetAddress& peerAddr)
    : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64 * 1024 * 1024)
{
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead, this, _1));
    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite, this));
    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose, this));
    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError, this));
    socket_->setKeepAlive(true);
}
           

于是跳到 TcpConnection.cc::handleRead:

// TcpConnection.cc
void TcpConnection::handleRead(Timestamp receiveTime)
{
    loop_->assertInLoopThread();
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0)
    {
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0)
    {
        handleClose();
    }
    else
    {
        errno = savedErrno;
        handleError();
    }
}
           

下面這行讀取資料:

ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
           

然後執行  messageCallback_ 回調函數,也就是 main.cc::onMessage()。

這是在上一篇文章的 TcpServer::newConnection 裡設定的:
// TcpServer.cc
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
    EventLoop* ioLoop = threadPool_->getNextLoop();
    char buf[64];
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    string connName = name_ + buf;
 
    InetAddress localAddr(sockets::getLocalAddr(sockfd));
    TcpConnectionPtr conn(new TcpConnection(ioLoop,
        connName,
        sockfd,
        localAddr,
        peerAddr));
    connections_[connName] = conn;
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
           

下面跳轉到 main.cc::onMessage():

// main.cc
void onMessage(const muduo::net::TcpConnectionPtr& conn,
    muduo::net::Buffer* buf,
    muduo::Timestamp time)
{
    muduo::string msg(buf->retrieveAllAsString());
    conn->send(msg);
    conn->shutdown();
}
           

先來執行:

conn->send(msg);
           

跳轉到 TcpConnection::send:

// TcpConnection.cc
void TcpConnection::send(Buffer* buf)
{
    if (state_ == kConnected)
    {
        if (loop_->isInLoopThread())
        {
            sendInLoop(buf->peek(), buf->readableBytes());
            buf->retrieveAll();
        }
        else
        {
            void (TcpConnection:: * fp)(const StringPiece & message) = &TcpConnection::sendInLoop;
            loop_->runInLoop(
                std::bind(fp,
                    this,     // FIXME
                    buf->retrieveAllAsString()));
            //std::forward<string>(message)));
        }
    }
}
           

回到 main.cc::onMessage(),執行:

conn->shutdown();
           

跳轉到 TcpConnection::shutdown:

// TcpConnection.cc
void TcpConnection::shutdown()
{
    // FIXME: use compare and swap
    if (state_ == kConnected)
    {
        setState(kDisconnecting);
        // FIXME: shared_from_this()?
        loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
    }
}
           

loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
           

跳轉到:

// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
    if (isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(std::move(cb));
    }
}
           

下面執行 cb(),cb 是傳進來的參數,即 TcpConnection::shutdownInLoop:

// TcpConnection.cc
void TcpConnection::shutdownInLoop()
{
    loop_->assertInLoopThread();
    if (!channel_->isWriting())
    {
        // we are not writing
        socket_->shutdownWrite();
    }
}
           

if (!channel_->isWriting()) 是判斷輸出緩沖區的資料有沒有發送完畢。我這裡是已經發送完畢了,于是跳轉到 Socket.cc:shutdownWrite:

// Socket.cc
void Socket::shutdownWrite()
{
  sockets::shutdownWrite(sockfd_);
}
           

再轉到:

// sockets.cc
void sockets::shutdownWrite(int sockfd)
{
    if (::shutdown(sockfd, SHUT_WR) < 0)
    {
    }
}
           

回到 EventLoop::loop:

// EventLoop.cc
void EventLoop::loop()
{
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false;  
    while (!quit_)
    {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        ++iteration_;
        eventHandling_ = true;
        for (Channel* channel : activeChannels_)
        {
            currentActiveChannel_ = channel;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;
        doPendingFunctors();
    }
 
    looping_ = false;
}
           

上面都是在處理:

currentActiveChannel_->handleEvent(pollReturnTime_);
           

下面執行:

doPendingFunctors();
           

轉到 EventLoop::doPendingFunctors():

// EventLoop.cc
void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (const Functor& functor : functors)
    {
        functor();
    }
    callingPendingFunctors_ = false;
}
           

functors 為空,不執行這個函數。

到目為止這個函數隻在有用戶端建立新連接配接後子線程執行過,做的是 epoll_ctl。

回到 EventLoop::loop() 的 while 循環的開頭,進入 EPollPoller::poll():

// EPollPoller.cc
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
    int numEvents = ::epoll_wait(epollfd_,
        &*events_.begin(),
        static_cast<int>(events_.size()),
        timeoutMs);
    int savedErrno = errno;
    Timestamp now(Timestamp::now());
    if (numEvents > 0)
    {
        fillActiveChannels(numEvents, activeChannels);
        if (implicit_cast<size_t>(numEvents) == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
    else if (numEvents == 0)
    {}
    else
    {
        if (savedErrno != EINTR)
        {
            errno = savedErrno;
        }
    }
    return now;
}
           

此時 epoll_wait 監測到事件發生,不阻塞。執行 fillActiveChannels(numEvents, activeChannels):

// EPollPoller.cc
void EPollPoller::fillActiveChannels(int numEvents,
    ChannelList* activeChannels) const
{
    for (int i = 0; i < numEvents; ++i)
    {
        Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);
    }
}
           

回到 Eventloop::loop,執行 currentActiveChannel_->handleEvent(pollReturnTime_):

// Channel.cc
void Channel::handleEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if (tied_)
    {
        guard = tie_.lock();
        if (guard)
        {
            handleEventWithGuard(receiveTime);
        }
    }
    else
    {
        handleEventWithGuard(receiveTime);
    }
}
           

下面執行第二個 if 裡的 handleEventWithGuard:

// Channel.cc
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    eventHandling_ = true;
    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
    {
        if (closeCallback_) closeCallback_();
    }
 
    if (revents_ & POLLNVAL)
    {
    }
 
    if (revents_ & (POLLERR | POLLNVAL))
    {
        if (errorCallback_) errorCallback_();
    }
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    {
        if (readCallback_) readCallback_(receiveTime);
    }
    if (revents_ & POLLOUT)
    {
        if (writeCallback_) writeCallback_();
    }
    eventHandling_ = false;
}
           

執行 readCallback_。此時的 readCallback_ 是 TcpConnection::handleRead。也是在上一篇文章的主線程裡設定了 Channel::readCallback_。那麼轉到 TcpConnection::handleRead:

// TcpConnection.cc
void TcpConnection::handleRead(Timestamp receiveTime)
{
    loop_->assertInLoopThread();
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0)
    {
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0)
    {
        handleClose();
    }
    else
    {
        errno = savedErrno;
        handleError();
    }
}
           

此時沒讀到資料,進入 handleClose():

// TcpConnection.cc
void TcpConnection::handleClose()
{
    loop_->assertInLoopThread();
    // we don't close fd, leave it to dtor, so we can find leaks easily.
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr guardThis(shared_from_this());
    connectionCallback_(guardThis);
    // must be the last line
    closeCallback_(guardThis);
}
           

channel_->disableAll() 又是一系列的 update 操作。

然後執行 connectionCallback_。這裡的 connectionCallback_ 是 main::onConnection。在 TcpServer::newConnection 裡設定的。這裡沒有為 main::onConnection 函數寫一些代碼,可以用 if (conn->connected()) 判斷是否斷開。

最後執行 closeCallback_ 回調函數,即 TcpServer::removeConnection,也是在 TcpServer::newConnection 裡設定的:

// TcpServer.cc::newConnection
conn->setCloseCallback(
    std::bind(&TcpServer::removeConnection, this, _1));
           

下面轉到:

// TcpServer.cc
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
    loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
           

調用 EventLoop::runInLoop::

// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
    if (isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(std::move(cb));
    }
}
           

調用 queueInLoop,因為要在主線程删除一些和這個關閉的用戶端有關的東西:

// EventLoop.cc
void EventLoop::queueInLoop(Functor cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(std::move(cb));
    }

    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}
           

上面的 cb 是 removeConnectionInLoop。

然後進入 wakeup(),由子線程通過 wakeupFd 給主線程發消息。

又回到 EventLoop.loop:

// EventLoop.cc
void EventLoop::loop()
{
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false;  
    while (!quit_)
    {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        ++iteration_;
        eventHandling_ = true;
        for (Channel* channel : activeChannels_)
        {
            currentActiveChannel_ = channel;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;
        doPendingFunctors();
    }
 
    looping_ = false;
}
           

上面處理了 currentActiveChannel_->handleEvent(pollReturnTime_)。

下面處理 doPendingFunctors(),此時子線程沒什麼好處理的。因為之前已經 update 操作過了。

既然子線程給主線程 wakeup 了,那麼輪到主線程了。顔色換回黑色。

EPollPoller::poll 裡的 epoll_wait 傳回了發生的事件,又是執行:EPollPoller.::fillActiveChannels。

然後傳回 EventLoop::loop,執行 channel->handleEvent(pollReturnTime_):

// Channel.cc
void Channel::handleEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if (tied_)
    {
        guard = tie_.lock();
        if (guard)
        {
            handleEventWithGuard(receiveTime);
        }
    }
    else
    {
        handleEventWithGuard(receiveTime);
    }
}
           

執行 else 裡的 handleEventWithGuard(receiveTime):

// Channel.cc
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    eventHandling_ = true;
    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
    {
        if (closeCallback_) closeCallback_();
    }
 
    if (revents_ & POLLNVAL)
    {
    }
 
    if (revents_ & (POLLERR | POLLNVAL))
    {
        if (errorCallback_) errorCallback_();
    }
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    {
        if (readCallback_) readCallback_(receiveTime);
    }
    if (revents_ & POLLOUT)
    {
        if (writeCallback_) writeCallback_();
    }
    eventHandling_ = false;
}
           

主線程讀到了子線程發的 one,是以執行 readCallback_,也就是 EventLoop::handleRead。在第一篇文章的過程四:server.start() 裡設定的。那時候啟動線程池,每個線程建立 wakeupFd 的時候設定的。readCallback_ 就是 EventLoop::handleRead:

// EventLoop.cc
void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
}
           

再回到 EventLoop,執行 doPendingFunctors():

// EventLoop.cc
void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
 
    {
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    for (const Functor& functor : functors)
    {
        functor();
    }
    callingPendingFunctors_ = false;
}
           

pendingFunctors_ 是在子線程的 queueInLoop 裡添加的。要執行的回調函數是 TcpServer::removeConnectionInLoop。

// removeConnectionInLoop.cc
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
    size_t n = connections_.erase(conn->name());
    EventLoop* ioLoop = conn->getLoop();
    ioLoop->queueInLoop(
        std::bind(&TcpConnection::connectDestroyed, conn));
}
           

又要調用 queueInLoop(cb)。這裡的 cb 是 TcpConnection::connectDestroyed。在 queueInLoop 裡主線程給子線程發消息。

子線程又回到 loop 的 while 循環裡了。

注意主線程沒有 updata 操作,也就沒有 EPOLL_CTL_DEL 操作。

主線程給子線程發送了消息,又到子線程了。

子線程又要到 loop 裡的 doPendingFunctors。

doPendingFunctors 裡的回調操作就是 TcpConnection::connectDestroyed:

// TcpConnection.cc
void TcpConnection::connectDestroyed()
{
    if (state_ == kConnected)
    {
        setState(kDisconnected);
        channel_->disableAll();

        connectionCallback_(shared_from_this());
    }
    channel_->remove();
}
           

connectionCallback_ 和前面一樣沒做任何操作,就是 main.cc 裡定義的。

然後執行 channel_->remove():

// Channel.cc
void Channel::remove()
{
    addedToLoop_ = false;
    loop_->removeChannel(this);
}
           

再:

// EventLoop.cc
void EventLoop::removeChannel(Channel* channel)
{
    poller_->removeChannel(channel);
}
           

再:

// EPollPoller.cc
void EPollPoller::removeChannel(Channel* channel)
{
    int index = channel->index();
    if (index == kAdded)
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(kNew);
}
           

然後再 update。

後面就是析構了。

繼續閱讀