天天看點

muduo網絡庫之net庫源碼分析(1)

<TCP網絡程式設計本質>

TCP網絡程式設計最本質是的處理三個半事件:

連接配接建立:伺服器accept(被動)接受連接配接,用戶端connect(主動)發起連接配接

連接配接斷開:主動斷開(close、shutdown),被動斷開(read傳回0)

消息到達:檔案描述符可讀

消息發送完畢:這算半個。對于低流量的服務,可不必關心這個事件;這裡的發送完畢是指資料寫入作業系統緩沖區(如果資料全部填到了核心緩沖區那麼調用回掉函數OnWriteComplete函數),将由TCP協定棧負責資料的發送與重傳,不代表對方已經接收到資料。對于高流量的服務就要關心這個事件了,因為核心緩沖區可能不足以容納資料,此時我們要将資料追加到應用層的發送緩沖區,如果核心緩沖區空閑了,那麼就把應用層緩沖區中的資料發送出去,發送完了之後,也會調用OnWriteComplete函數。

下圖示範了消息到達後和消息發送的資料處理流程:

muduo網絡庫之net庫源碼分析(1)

muduo網絡庫之net庫源碼分析(1)
muduo網絡庫之net庫源碼分析(1)

下面我們看一下TcpSerer如何設定上面三個半事件的回調函數的:

muduo網絡庫之net庫源碼分析(1)

EchoServer類是使用者自己實作的類,由于muduo使用的是基于對象程式設計思想,是以從上面可以看出EchoServer包含TcpServer執行個體,而EcpServer這個類中又包含了上面set*開頭的幾個函數,它門分别用于注冊連結建立和斷開、消息到達、資料發送完畢這三個回調函數,其中setConnectionCallback同時是連結建立和斷開的回調函數。是以我們如何使用EchoServer呢?很明顯我們可以先定義EchoServer類型,這個類型包含我們自己實作的事件回調函數,并且包含TcpServer執行個體,然後調用TcpServer執行個體中的函數注冊回調函數。

TcpServer使用執行個體如下:     EchoServer.h源碼如下:

class EchoServer
{
 public:
  EchoServer(muduo::net::EventLoop* loop,
             const muduo::net::InetAddress& listenAddr);

  void start();  // calls server_.start();

 private:
  void onConnection(const muduo::net::TcpConnectionPtr& conn);//連結建立或斷開回調函數

  void onMessage(const muduo::net::TcpConnectionPtr& conn,   //收到消息後回調函數
                 muduo::net::Buffer* buf,
                 muduo::Timestamp time);

  muduo::net::EventLoop* loop_;  
  muduo::net::TcpServer server_; //包含注冊三個半事件回調函數的方法
};
           

EchoServer.cpp源碼如下:

#include "echo.h"

#include <muduo/base/Logging.h>

#include <boost/bind.hpp>

// using namespace muduo;
// using namespace muduo::net;

EchoServer::EchoServer(muduo::net::EventLoop* loop,
                       const muduo::net::InetAddress& listenAddr)
  : loop_(loop),
    server_(loop, listenAddr, "EchoServer")
{//注冊回調函數
  server_.setConnectionCallback(
      boost::bind(&EchoServer::onConnection, this, _1));
  server_.setMessageCallback(
      boost::bind(&EchoServer::onMessage, this, _1, _2, _3));
}

void EchoServer::start()
{
  server_.start();
}

void EchoServer::onConnection(const muduo::net::TcpConnectionPtr& conn)
{
  LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN"); //<span style="font-family: Arial, Helvetica, sans-serif;">conn->connected()判斷連結斷開還是建立連結 </span>
}

void EchoServer::onMessage(const muduo::net::TcpConnectionPtr& conn, //隻是簡單回射
                           muduo::net::Buffer* buf, 
                           muduo::Timestamp time)
{
  muduo::string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
           << "data received at " << time.toString();
  conn->send(msg);
}
           

main.cpp源碼如下:

#include "echo.h"

#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>

// using namespace muduo;
// using namespace muduo::net;

int main()
{
  LOG_INFO << "pid = " << getpid();
  muduo::net::EventLoop loop;
  muduo::net::InetAddress listenAddr(2007);
  EchoServer server(&loop, listenAddr);
  server.start();
  loop.loop(); //監聽事件
}
           

<這裡首先分析什麼也不做的EventLoop類,後面再深入分析>

one loop per thread意思是說每個線程最多隻能有一個EventLoop對象。

EventLoop對象構造的時候,會檢查目前線程是否已經建立了其他EventLoop對象,如果已建立,終止程式(LOG_FATAL)

EventLoop構造函數會記住本對象所屬線程(threadId_)。

建立了EventLoop對象的線程稱為IO線程,其功能是運作事件循環(EventLoop::loop)

//EventLoop.cpp的代碼本身很複雜,這裡是把它精簡後的源碼
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <poll.h>

using namespace muduo;
using namespace muduo::net;

namespace
{
// 目前線程EventLoop對象指針
// __thread線程局部存儲,也就是每個線程都有一個這個變量
__thread EventLoop* t_loopInThisThread = 0;
}

EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
  return t_loopInThisThread;
}

EventLoop::EventLoop()
  : looping_(false),  //是否處于循環狀态
    threadId_(CurrentThread::tid()) //把線程id設定為EventLoop對象所屬的線程
{
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  // 如果目前線程已經建立了EventLoop對象,終止(LOG_FATAL)
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;  //設定目前線程EventLoop對象指針
  }
}

EventLoop::~EventLoop()
{
  t_loopInThisThread = NULL;
}

// 事件循環,該函數不能跨線程調用
// 隻能在建立該對象的線程中調用
void EventLoop::loop()
{
  assert(!looping_);
  // 斷言目前處于建立該對象的線程中
  assertInLoopThread();
  looping_ = true;
  LOG_TRACE << "EventLoop " << this << " start looping";

  ::poll(NULL, 0, 5*1000);  //什麼都沒做隻是簡單地休息5秒鐘

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

void EventLoop::abortNotInLoopThread()
{
  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
            << " was created in threadId_ = " << threadId_
            << ", current thread id = " <<  CurrentThread::tid();
}
           
//EventLoop.h精簡後的源碼如下
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <boost/noncopyable.hpp>

#include <muduo/base/CurrentThread.h>
#include <muduo/base/Thread.h>

namespace muduo
{
namespace net
{

///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
 public:
  EventLoop();
  ~EventLoop();  // force out-line dtor, for scoped_ptr members.

  ///
  /// Loops forever.
  ///
  /// Must be called in the same thread as creation of the object.
  ///
  void loop();
  
  //斷言EventLoop是否處于建立它的線程當中
  void assertInLoopThread()
  {
    if (!isInLoopThread())
    {
      abortNotInLoopThread();
    }
  }
  
  //如果目前線程id和建立EventLoop的線程id一緻那麼傳回true
  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }  

  static EventLoop* getEventLoopOfCurrentThread();

 private:
  void abortNotInLoopThread();
  
  bool looping_; /* atomic */    //标志是否處于EventLoop狀态
  const pid_t threadId_;		// 目前對象所屬線程ID
};

}
}
#endif  // MUDUO_NET_EVENTLOOP_H
           

正确使用的執行個體:

#include <muduo/net/EventLoop.h>
#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void threadFunc()
{
	printf("threadFunc(): pid = %d, tid = %d\n",
		getpid(), CurrentThread::tid());

	EventLoop loop;
	loop.loop();
}

int main(void)
{
	printf("main(): pid = %d, tid = %d\n",
		getpid(), CurrentThread::tid());

	EventLoop loop;

	Thread t(threadFunc);
	t.start();

	loop.loop();//隻是等待5秒
	t.join();
	return 0;
}
           

錯誤的使用執行個體:

#include <muduo/net/EventLoop.h>
#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

EventLoop* g_loop;

void threadFunc()
{
	g_loop->loop();  //錯誤!!!在不是建立該EventLoop執行個體的線程中調用了該執行個體的loop函數,loop函數不能跨線程調用
}

int main(void)
{
	EventLoop loop;
	g_loop = &loop;
	Thread t(threadFunc);
	t.start();
	t.join();
	return 0;
}
           

繼續閱讀