天天看點

基于Epoll的Reactor模式

Reactor模式

    • Reactor模式的定義
    • Reactor模式中的主要角色
    • Epoll (ET)伺服器
    • EventItem類的設計
    • Reactor類的設計
    • 回調函數
    • 套接字相關
    • 引入線程池

Reactor模式的定義

Reactor反應堆模式,也就做分發者模式也叫做通知者模式。它是一種設計模式将就緒事件派發給對應伺服器處理程式:其基本理念如下

基于Epoll的Reactor模式

Reactor模式中的主要角色

在Reactor模式當中有幾個關鍵的參與者,下面我們來看看他們分别是什麼?

  • Handle(句柄):由作業系統提供用來辨別每個事件如Socket描述符、檔案描述符等通常是用整數來辨別。
  • EventHandler(事件處理器):由一個或者多個回調函數組成,這些回調方法用來處理對應的事件
  • Dispatcher(派發器):當事件就緒時派發器配置設定任務給對應的事件處理器
  • ConcreteEvent:事件處理器中的各個回調方法的具體實作
  • SychronousEvent Demultiplexer(同步事件分離器):本質是系統調用,在Linux當中是指多路IO複用比如說:select、poll、epoll等。

Epoll (ET)伺服器

下面我們來看一下Reactor模式下的Epoll伺服器(ET模式).一起來感受Reactor模式設計吧。下面我們一起來看一下思路:

在Epoll ET伺服器當中我們需要處理一下事件:

  • 讀事件:如果是監聽套接字的讀事件就緒了那麼我們需要調用accept函數擷取連接配接。如果是其他套接字就緒了我們調用read方法将資料讀取上來
  • 寫事件:如果寫事件就緒了那麼我們需要将資料寫入到發送緩沖區當中
  • 異常事件:如果是異常事件在這裡我們直接将檔案描述符關閉即可。

當Epoll伺服器檢測到某一個事件就緒時,會将給事件交給對應的服務程式進行處理。這也是Epoll伺服器當中的五個角色:

1.任務配置設定器:Reactor類當中的Dispatcher函數

2.具體事件處理器:讀回調,寫回調、異常回調的具體實作

3.句柄:檔案描述符

4.同步事件分離器:IO多路複用此次指的是epoll

5.事件處理器:主要指的是讀回調、寫回調、異常回調

Reactor類當中的Dispatcher函數做的就是調用epoll_wait函數等待事件就緒當有事件就緒時調用對應事件的回調函數。将事件的處理交給對應的伺服器處理程式即可。下面讓我們來看看這個設計當中非常牛逼的地方

EventItem類的設計

在Reactor模式下Epoll伺服器設計當中我個人覺得非常好的地方就是EventItem類的設計。一個Eventem對于一個檔案描述符fd,并将Eventem類當中含有對于事件就緒的回調方法,并且Eventem類當中還有指向Reactor類的指針。所有的檔案描述符都指向同一個Reactor,當我們伺服器處理程式将資料處理完畢之後,可以通過這個指針找到Reactor類并添加寫事件。

類中還包含了一個輸入和輸出緩沖區,至于為什麼需要這個輸入和輸出緩沖區具體請看如下解釋:

  • 當一個檔案描述符的讀事件就緒時我們會調用recv函數進行讀取但是我們并不知道,讀取到的是一個完整的封包嗎?這個時候這個緩沖區的意義就來了。雖然我們不知道讀取到的是不是一個完整的封包但是我們可以将其放入到對于的緩沖區當中當inbuffer當中可以分離出一個完整的封包時。在對資料進行處理,本質上這個緩沖區的作用是為了解決粘包問題.
  • 當我們伺服器的應用程式将資料處理完畢之後,準備将資料發送給用戶端。但是此時就有一個很難受的問題就是我們并不知道TCP的發送緩沖區是否還有空間。這時候我們可以先将我們的資料寫入到緩沖區當中,當TCP的發送緩沖區有足夠的空間了也就是寫事件就緒了。我們在将outbuffer當中的資料發送出去

在這裡說一下Eventem類當中的回指指針R就是指向Reactor對象的指針,有了這個指針我們可以快速找到到Reactor對象。當連接配接事件到來時我們需要通過Eventem對象當中的指針R調用類當中AddEvent方法将其添加到任務配置設定器當中

下面給出Eventem類的定義:

typedef int (*callback_t)(EventItem *ev); //函數指針類型用于

// 需要讓epoll管理的基本節點
class EventItem
{
public:
    //對應的檔案描述符
    int _sock;
    //對應的sock,對應的輸入緩沖區
    std::string inbuffer;
    //對應的sock,對應的輸出緩沖區
    std::string outbuffer;
    // sock設定回調
    callback_t _recver;  //讀回調
    callback_t _sender;  //寫回調
    callback_t _errorer; //異常回調

    // 試着Event回指Reactor的回指指針
    Reactor *R;

public:
    EventItem()
    {
        _sock = -1;
        _recver = nullptr;
        _sender = nullptr;
        _errorer = nullptr;
        R = nullptr;
    }
    //注冊回調
    void RegisterCallback(callback_t _recver, callback_t _sender, callback_t _errorer)
    {
        _recver = _recver;
        _sender = _sender;
        _errorer = _errorer;
    }
    ~EventItem()
    {
    }
};
           

Reactor類的設計

下面我們來談一下Reactor類的實作:

在Reactor類當中有一個_events.他是一個哈西表建立了檔案描述符和EventItem的映射關系通過檔案描述符我們能過找到對應的事件結構體,通過事件結構體我們就能找到對應的回調函數也能找到對應的Reactor類。Reactor當中還有一個成員變量_epfd用來建立epoll模型。

  • Reactor類提供了一個初始化方法該方法當中調用了epoll_create建立epoll模型并将對應的檔案描述符保證到_epfd當中友善後續對這個epoll模型進行操作
  • Reactor對象在析構之後會将這個檔案描述符給關閉掉

對應代碼如下:

class Reactor
{
private:
    int _epfd;
    std::unordered_map<int, EventItem *> _events; //我的Epoll類管理的所有的Event的集合
    //一個檔案描述符對應一個EventItem用來建立映射關系
public:
    Reactor() : _epfd(-1)
    {
    }

    void InitReactor()
    {
        _epfd = epoll_create(SIZE);
        if (_epfd < 0)
        {
            std::cerr << "epoll_create error" << std::endl;
            exit(2);
        }
        std::cout << "InitReactor success" << std::endl;
    }

    bool InsertEvent(EventItem *evp, uint32_t evs)
    {
        // 1. 将sock中的sock插入到epoll中
        struct epoll_event ev;
        ev.events = evs;
        ev.data.fd = evp->_sock;
        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, evp->_sock, &ev) < 0)
        {
            std::cerr << "epoll_ctl add event failed" << std::endl;
            return false;
        }
        // 2. 将ev本身插入到unordered_map中
        _events.insert({evp->_sock, evp});
    }

    void DeleteEvent(EventItem *evp)
    {
        int sock = evp->_sock;
        auto iter = _events.find(sock);
        if (iter != _events.end())
        {
            // 1. 将sock中的sock從epoll中删除它
            epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);

            // 2. 将特定的ev從 unordered_map中 移除
            _events.erase(iter);

            // 3. close
            close(sock);

            // 4. 删除event節點
            delete evp;
        }
    }

    //使能讀寫
    bool EnableRW(int sock, bool enbread, bool enbwrite)
    {
        struct epoll_event ev;
        ev.events = EPOLLET | (enbread ? EPOLLIN : 0) | (enbwrite ? EPOLLOUT : 0);
        ev.data.fd = sock;

        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0)
        {
            std::cerr << "epoll_ctl mod event failed" << std::endl;
            return false;
        }
    }
    //判斷是否是合法的檔案描述符
    bool IsSockOk(int sock)
    {
        auto iter = _events.find(sock);
        return iter != _events.end();
    }

    //就緒事件的派發器邏輯
    void Dispatcher(int timeout)
    {
        struct epoll_event revs[NUM];
        int n = epoll_wait(_epfd, revs, NUM, timeout);
        for (int i = 0; i < n; i++)
        {
            int sock = revs[i].data.fd;
            uint32_t revents = revs[i].events;
            //代表差錯處理, 将所有的錯誤問題全部轉化成為讓IO函數去解決
            if (revents & EPOLLERR)
                revents |= (EPOLLIN | EPOLLOUT);
            if (revents & EPOLLHUP)
                revents |= (EPOLLIN | EPOLLOUT);

            //讀資料就緒,可能會有bug,後面解決
            if (revents & EPOLLIN)
            {
                //直接調用回調方法,執行對應的讀取
                if (IsSockOk(sock) && _events[sock]->_recver)
                    _events[sock]->_recver(_events[sock]);
            }
            if (revents & EPOLLOUT)
            {
                if (IsSockOk(sock) && _events[sock]->_sender)
                    _events[sock]->_sender(_events[sock]);
            }
        }
    }

    ~Reactor() {}
};
           

1.Dispatcher函數的說明

在這裡對Dispatcher函數說明一下也就是事件分派器:事件分派器主要是調用epoll_wait等待事件就緒。當某個檔案描述上的事件就緒時我們可以通過Reactor類當中的_events通過哈西的方式找到檔案描述符對應的事件結構體找到對應的回調方法并調用。

在這裡需要說明的一下是此處沒有對epoll_wait的傳回值進行判斷,本質上是通過for的循環判斷條件進行判斷了如果epoll_wait的傳回值為-1不會進入到循環當中,如果傳回值為0(逾時)也不會進行到循環當中。隻有當真正有事件就緒時此時才會進入到循環當中

2.DeleteEvent函數說明:

Reactor類當中的DeleteEvent函數是用于進行事件删除。删除一個事件時我們隻需要将對應的檔案描述符傳入.調用epoll_ctl函數在epoll模型當中删除。并将将哈西表中的映射關系也給删除掉

3.AddEvent函數說明:

Reactor類當中的AddEvent函數是用來往epoll模型當中添加需要關心的事件。調用epoll_ctl函數往epoll模型當中添加需要核心關心對應檔案描述符上的那些事件。并建立檔案描述符和事件結構體的映射關系

4.EnableRW函數說明:

類中的EnableRW函數用于使能某個檔案描述符的讀寫事件。調用該函數時傳入對應的檔案描述符。

另外還有兩個bool類型的變量表示是使能讀還是使能寫還是讀寫。該函數内部調用了epoll_ctl函數修改該檔案描述符的監聽事件

回調函數

之前我們一直提到了回調函數下面我們來看看這些回掉函數

  • recver:讀事件就緒時可以調用該函數讀取用戶端發過來的資料并進行處理
  • sender:寫事件就緒時可以調用該回調函數發送資料給用戶端
  • errorer:當異常事件就緒時調用将對應的檔案描述符關閉
  • accepter:當連接配接事件就緒時調用該函數來擷取連接配接

當我們給某個檔案描述符建立EventItem時,需要調用EventItem類裡面的注冊回調函數。在這裡需要注意的是:

1.我們将監聽套接字對應的EventItem結構體當中的_recver設定為accpter這是因為監聽套接字對應的讀事件就緒了代表的是連接配接就緒了。這也就意味着監聽套接字隻關心讀事件,那麼也就意味着寫回調和異常回調我們可以置為空不用關心。

2.Dispatcher一旦檢測到某個檔案描述符的某個事件就緒了會調用EvetItem當中對應的回調函數。

3.對于用戶端建立連接配接的套接字我們會将其對應的EvetItem中的_recver、_sender、_errorer回調函數都會設定好

下面我們一起來看看這幾個回調函數首先我們來看看accepter回調

1.accepter回調

上面提到過accepter回調主要是用來處理連接配接事件。大緻的流程如下:

  • 首先調用accept函數将就緒的連接配接擷取上來
  • 将擷取上來的檔案描述符設定為非阻塞因為這個我們使用的是epoll的ET模式,事件就緒隻會通知一次是以我們不能讓檔案描述符為阻塞的
  • 建立與這個檔案描述符對應的EvetItem結構體并填寫響應的回調函數并将其想要關心的事件注冊到Dispatcher當中

當我們注冊之後,OS在底層就會給我們關心該套接字對應的事件,當事件就緒時執行我們之前設定好的回調方法。

代碼如下:

#pragma once

#include "Reator.hpp"
#include "sock.hpp"
#include "Service.hpp"
#include "Util.hpp"


int Accepter(EventItem *evp)
{
    std::cout << "有新的連結到來了,就緒的sock是: " << evp->_sock << std::endl;
    while(true)
    {
        int sock = Sock::Accept(evp->_sock);
        if(sock < 0)
        {
            std::cout << "Accept Done!" << std::endl;
            break;
        }
        std::cout << "Accept success: " << sock << std::endl;
        SetNonBlock(sock);
        //這裡呢?擷取連結成功,IO socket
        EventItem *other_ev = new EventItem();
        other_ev->_sock = sock;
        other_ev->R = evp->R; //為什麼要讓所有的Event指向自己所屬的Reactor??
        //recver, sender, errorer,就是我們代碼中的較頂層,隻負責真正的讀取!
        other_ev->RegisterCallback(Recver, Sender, Errorer);

        evp->R->InsertEvent(other_ev, EPOLLIN|EPOLLET);
    }
}
           

可能有老鐵注意到在這裡擷取連接配接是循環一次就給擷取完畢。這是因為Reactor模式是基于epoll的邊緣觸發。事件就緒隻會通知上層一次,是以我們在擷取連接配接時需要一次性擷取完畢并且需要将監聽套接字設定為非阻塞。

如果我們不将監聽套接字設定為非阻塞當我們循環調用accept函數擷取連接配接當連接配接擷取完畢之後,我們在調用accept函數時此時底層沒有連接配接就緒accept函數被阻塞住。同樣的accept擷取的新的套接字也需要将其設定為非阻塞防止後面調用read,write時因為事件不就緒而被阻塞住。

下面讓我們來看看如何将檔案描述符設定為非阻塞,其實非常的簡單使用fcntl函數即可。如果不太清楚的老鐵可以看看我之前的進階IO部落格在這裡将連結給出:

Linux下進階IO

在這裡直接給出這個代碼:

//設定一個檔案描述符為非阻塞
void SetNonBlock(int sock)
{
    int f1 = fcntl(sock, F_GETFL);
    //擷取對應檔案描述符的标記
    if (f1 < 0)
    {
        std::cerr << "fcntl faile" << std::endl;
        return;
    }
    //設定檔案描述符為非阻塞
    fcntl(sock, F_SETFL, f1 | O_NONBLOCK);
    //設定檔案描述符為非阻塞
}
           

2.recver回調

recver回調用來讀取用戶端發送過來的資料,大緻流程如下

  • 循環調用read函數進行讀取并将讀取到的資料放入到該套接字對應的EventItem當中的inbuffer緩沖區當中
  • 對應inbuferr當中的資料進行切分将完整的封包切分出來,剩下的放入緩沖區當中。直到湊齊一個完整的封包在将其拿出來
  • 将切分出來的封包進行業務處理
  • 形成響應封包并将響應的資料寫入到套接字對應的EventItem當中的outbuffer當中并将寫事件打開

對應代碼如下:

// 1代表本輪讀取成功,0代表對端關閉,-1代表失敗
static int RecverCore(int sock, std::string &inbuffer) //輸入輸出型參數
{

    while (true)
    {
        char buffer[ONCE_SIZE];
        ssize_t s = recv(sock, buffer, sizeof(ONCE_SIZE) - 1, 0);

        if (s > 0)
        {
            buffer[s] = '\0';
            inbuffer += buffer;
        }
        else if (s < 0)
        {
            //底層沒資料了,或者真的出錯了
            if (errno == ERANGE || errno == EWOULDBLOCK)
            {
                //讀取完了成功讀完
                return 1;
            }
            else if (errno == EINTR)
            {
                // IO被信号打斷
                continue;
            }
            return -1; //真正的出錯了
        }
        else //===0
        {
            //對端關閉連結
            return 0;
        }
    }
}

int Recver(EventItem *ep)
{
    std::cout<<"Recver been called"<<std::endl;
    //通過EventItem就可以拿到事件的所有内容
    //開始真正的讀取,分包我們隻想讀取一個或者多個封包
    //解決粘包問題,反序列化。針對一個封包提取有效參與計算存儲的資訊
    //業務邏輯----->得到結果。建構響應  ,嘗試直接間接進行發送----條件成熟了才能發送寫事件一般都是就緒的但是就是使用者不一定就就緒的
    //對于寫事件我們通常是按需設定不能EPOLLIN和EPOLLOUT一起設定否則寫事件一直就緒被派發而使用者的資料沒有就緒此時會導緻伺服器基本不會等待導緻伺服器
    // 壓力過大
    int ret = RecverCore(ep->_sock, ep->inbuffer);
    if (ret <= 0)
    {
        if (ep->_errorer)
        {
            ep->_errorer(ep);
        }
        return -1;
    }
    std::vector<std::string> tokens; //一個封包放到容器裡面
    std::string sep = "X";
    SplitSegment(ep->inbuffer, &tokens, sep);

    for (auto &seg : tokens)
    {
        std::string data1, data2;
        std::string op;
        if (Deserialize(seg, &data1, &data2,&op)) //反序列化和業務強相關
        {
         
             int x=std::stoi(data1);
             int y=std::stoi(data2);

             char Op=op[0];
             Task t(x,y,Op,ep);
             ThreadPool<Task>::GetInstance()->Push(t);
            //發送緩存區DFZGVzdccsd
        }
    }
    //将所有的響應添加到outbuffer當中
  

    return 1;
}
           

在這裡需要解釋一下的是:

我們使用一個輔助函數RecverCore來幫我們将資料讀取到inbuffer當中,在這裡需要注意的是當recv函數的傳回值為負數時此時并不一定是讀取出錯有可能是底層資料不就緒但是此時錯誤碼會被設定為

EAGIN或者EWOULDBLOCK此時說明底層事件已經讀取完畢,如果此時錯誤碼為EINTR說明讀取的過程當中被信号打斷此時我們繼續進行讀取即可

在上述過程當中涉及到了封包分隔其實就是為了防止粘包問題,粘包問題涉及到了協定的定制在這裡為了簡單起見我們就規定封包和封包之間以’X’進行分隔。是以我們可以将封包分隔好的資料放入到vector當中,而無法分割成一個完整封包的資料就留在緩沖區當中即可。

下面讓我們來看看分割的代碼:

void SplitSegment(std::string &inbuffer, std::vector<std::string> *tokens, std::string sep)
{
   //分隔封包
    while (true)
    {
        int pos = inbuffer.find(sep);
        if (pos == -1)
        {
            break;
        }
        std::string sub = inbuffer.substr(0, pos);
        tokens->push_back(std::move(sub));
        inbuffer.erase(0, pos + sep.size()); //将分割符也删掉
    }
}
           

其中還涉及到了反序列化,反序列化也根定制的協定強相關。在這裡我們的Epoll伺服器提供的是計算服務當然也可以定制其他的服務在這裡簡單起見。也就是發過來的資料類似于"a+bX"這種這樣我們分割起來就非常的簡單下面我們來看看是如何進行分割的。

bool Deserialize(std::string &seg,std::string *x,std::string *y,std::string*op)
{
    //進行反序列化
    std::cout<<seg<<std::endl;
    int pos=0;
    int N=seg.size();
    while(pos<N)
    {
        if(seg[pos]=='*'||seg[pos]=='/'||seg[pos]=='+'||seg[pos]=='-'||seg[pos]=='%'){
            break;
        }
        pos++;
    }
    if(pos<N)
    {
        
        std::string left=seg.substr(0,pos);
        std::string right=seg.substr(pos+1);
        *x=left;
        *y=right;
        *op=std::string(1,seg[pos]);
    
        return true;
    }
    return false;
   
}
           

處理完畢之後我們可以将處理完的結果寫入到該檔案描述符對應的outbuffer當中并打開該套接字對應的寫事件,當事件就緒後就會調用寫回調将資料發送給用戶端

3.sender回調

sender回調主要用來處理寫事件,大緻流程如下:

  • 循環調用send函數發送資料發送完畢之後将對應的緩沖區清空
  • 當我們将outbuffer當中的資料發送完畢之後我們需要将該套接字對應的寫事件關閉,但是如果outbuffer當中還有資料那麼寫事件就需要被打開

代碼如下:

int SendCore(int sock, std::string &outbuffer)
{
    while (true)
    {
        int toatl = 0; //本輪累計發送的資料量
        const char *start = outbuffer.c_str();
        int size = outbuffer.size();
        ssize_t cur = send(sock, start + toatl, size - toatl, 0);

        if (cur > 0)
        {
            toatl += cur;
            if (toatl == size)
            {
                outbuffer.clear();
                //全部發送完成
                break;
            }
        }

        else
        {
            //有可能沒有發完但是不能發完
            if (errno == EINTR)
            {
                continue;
            }
            else if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                outbuffer.erase(0, toatl);
                return 0;
            }
            else
            {
                //發送失敗
                return -1;
            }
        }
    }4

    return 1;
}

int Sender(EventItem *ep)
{
    std::cout<<"Sender been called"<<std::endl;
    int ret = SendCore(ep->_sock, ep->outbuffer);

    if (ret == 1)
    {
        ep->R->EnableRW(ep->_sock, true, false);
    }
    else if (ret == 0)
    {
        //可以什麼都不做,在這裡我們再次打開
        ep->R->EnableRW(ep->_sock, true, true);
    }
    else
    {
        //出錯了之間調用該檔案描述符對應的異常回調
        ep->_errorer(ep);
    }
    return 0;
}
           

3.errorer回調

errorer回調主要是用來處理異常事件:

1.對應異常事件在這裡我們不做過多的處理,我們隻是簡單将對應的檔案描述符給關閉掉

2.在調用recver和sender回調之前我們需要判斷一下Evetem當中的檔案描述符是否有效,如果是無效的根本就沒必要進行下一步處理。

對應代碼:

int Errorer(EventItem *ep)
{
     std::cout<<"Error been called"<<std::endl;
     ep->R->DeleteEvent(ep);

}
           

套接字相關

#pragma once
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include<cstring>


    class Sock
    {
    public:
        static int Socket()
        {
            int sock = socket(AF_INET, SOCK_STREAM, 0);//建立套接字
            int opt=1;
            setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
            //端口複用
            if (sock < 0)
            {
                std::cerr << "socket error" << std::endl;
                exit(1);
            }
            return sock;
        }
        static bool Bind(int sock,unsigned short port)
        {
             struct sockaddr_in local;
             memset(&local,0,sizeof(local));
             local.sin_addr.s_addr=INADDR_ANY;
             local.sin_family=AF_INET;
             local.sin_port=htons(port);
             if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
             {
                std::cerr<<"bind error"<<std::endl;
                exit(2);
             }
             

             return true;
        }

        static bool Listen(int sock,int backlog)
        {
            if(listen(sock,backlog)<0)
            {
               std::cerr<<"listen error"<<std::endl;
            }
            return true;
        }
        static int Accept(int sock)
        {
             struct sockaddr_in peer;
             socklen_t len=sizeof(peer);
             int fd=accept(sock,(struct sockaddr*)&peer,&len);
            return fd>=0?fd:-1;
        }
    };

           

在這裡我們封裝一個Socket類對一些接口進行封裝讓外部能直接調用Socket類當中的函數

引入線程池

我們目前的Epoll伺服器比較簡單是以了單程序的epoll伺服器看上去并沒有什麼壓力,但是如果我們的業務比較複雜這時候處理業務的時間就可能比較長。此時epoll伺服器無法接受其他新的連接配接,這樣就會使得伺服器的效率降低。是以我們可以在伺服器當中引入線程池,當recver回調将資料反序列化之後将反序列化得到的資料封裝成一個任務将其放入到線程池當中。線程池部落客之前有部落格已經介紹過了再這裡就不再介紹

下面直接給出代碼:

ThreadPool.hpp

#pragma once
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <queue>
static const int ThreadNUM =5;
template <class T>
class ThreadPool
{
private:
  void LockQueue()
  {
    pthread_mutex_lock(&_mtx);
  }
  void UnlockQueue()
  {
    pthread_mutex_unlock(&_mtx);
  }
  void Wait()
  {
    pthread_cond_wait(&_cond, &_mtx);
  }
  void WakeUp()
  {
    pthread_cond_signal(&_cond);
  }
  bool IsEmpty()
  {
    return _task_queue.size() == 0;
  }

public:
  static void *Routine(void *arg) //為什麼必須是static方法在部落格中細說
  {
    auto *self = (ThreadPool<T> *)arg;
    pthread_detach(pthread_self()); //分離線程
    while (true)
    {
      self->LockQueue();
      while (self->IsEmpty()) //看此時的任務隊列當中是否有任務
      {
        self->Wait();
      }
      T task;
      self->pop(&task);
      self->UnlockQueue();
      task.Run(); //處理任務
    }
  }

  void Push(const T &val)
  {
    LockQueue();
    _task_queue.push(val);
    UnlockQueue();
    WakeUp(); //喚醒在條件變量下等待的一個線程
  }

  void pop(T *out)
  {

    *out = _task_queue.front();
    _task_queue.pop();
  }

  void InitThreadPool()
  {
    //初始化線程池并建立線程
    pthread_t tid;
    for (int i = 0; i < _num; i++)
    {
      pthread_create(&tid, NULL, Routine, this);
    }
  }
  //銷毀信号量
  ~ThreadPool()
  {
    pthread_mutex_destroy(&_mtx);
    pthread_cond_destroy(&_cond);
  }
  //擷取單例
  static ThreadPool<T> *GetInstance()
  {
    static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
    if (NULL == tp)
    {
      pthread_mutex_lock(&mtx); //多個線程可能同時進來
      if (NULL == tp)
      {
        tp = new ThreadPool<T>();
      }
      pthread_mutex_unlock(&mtx);
    }
    return tp;
  }

private:
  ThreadPool(int num = ThreadNUM) : _num(num)
  {
    pthread_mutex_init(&_mtx, NULL);
    pthread_cond_init(&_cond, NULL);
  }
  //禁掉拷貝構造和指派
  ThreadPool(const ThreadPool<T> &tp) = delete;
  ThreadPool &operator=(const ThreadPool<T> &tp) = delete;

  static ThreadPool<T> *tp;
  pthread_mutex_t _mtx;
  pthread_cond_t _cond;
  int _num;                  //線程的數量
  std::queue<T> _task_queue; //任務隊列
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = NULL;

           

對應Task.hpp

#pragma once
#include <iostream>
#include"Reator.hpp"
//任務類
class Task
{
public:
	Task(int x = 0, int y = 0, char op = 0,EventItem*e=nullptr)
		: _x(x), _y(y), _op(op),ep(e)
	{}
	~Task()
	{}

	//處理任務的方法
	void Run()
	{
		int result = 0;
		switch (_op)
		{
		case '+':
			result = _x + _y;
			break;
		case '-':
			result = _x - _y;
			break;
		case '*':
			result = _x * _y;
			break;
		case '/':
			if (_y == 0){
				std::cerr << "Error: div zero!" << std::endl;
				return;
			}
			else{
				result = _x / _y;
			}
			break;
		case '%':
			if (_y == 0){
				std::cerr << "Error: mod zero!" << std::endl;
				return;
			}
			else{
				result = _x % _y;
			}
			break;
		default:
			std::cerr << "operation error!" << std::endl;
			return;
		}
		std::cout << "thread[" << pthread_self() << "]:" << _x << _op << _y << "=" << result << std::endl;
		std::string response;
		response+=std::to_string(_x);
		response+=_op;
		response+=std::to_string(_y);
		response+=" = ";
		response+=std::to_string(result);
		response+="X";//分隔符
		ep->outbuffer+=response;
		//開啟使能讀寫
		if(!ep->outbuffer.empty()){
			ep->R->EnableRW(ep->_sock,true,true);
		}
	}
private:
	int _x;
	int _y;
	char _op;
	EventItem*ep;
};



           

線程池接入完畢之後我們來測試一下我們的伺服器:

基于Epoll的Reactor模式

我們再看看線程的個數

基于Epoll的Reactor模式

對應代碼位址

代碼位址

繼續閱讀