天天看點

C++釋出訂閱模式C++釋出訂閱模式

C++釋出訂閱模式

釋出訂閱模式主要包含三個部分:消息釋出、消息訂閱者、消息進行中心。與觀察者模式相比多出了消息進行中心子產品,這樣在結構上可以解耦訂閱者與釋出者,功能上更加的豐富。

觀察者模式

結構設計

  • 有一個消息list,主線程向這個list尾部追加消息,同時另一個子線程從消息list頭部不斷取出第一個消息
  • 查找消息訂閱map,訂閱者與消息設計為n:n關系,一個消息可被多個訂閱者訂閱,是以需依次執行訂閱了這個消息的函數
    • 如:Stocks Trade消息需被Suber1和Suber4兩個訂閱者處理
C++釋出訂閱模式C++釋出訂閱模式

簡單來說就是:EventDeta*就是一個資料,并為之取了一個别名 * Trade,稱之為消息。而很多類函數(普通函數也一樣的)的正常執行需要這個EventData,是以這個類将其某個函數和EventData通過訂閱清單綁定,當這個EventData也即是消息成為消息清單的第一個元素時,就執行和這個EventData綁定的所有類函數。

代碼

[知乎]](https://zhuanlan.zhihu.com/p/484171260)

消息釋出

EventData.h:

/* 
    定義有哪些消息、消息對應的資料
    當想要發自己的消息,可以在這裡進行添加
*/
// 消息句柄用于辨別消息
char * event_data1  = "Stocks Trade";  
// 消息主體,可做為訂閱者的入參,以對此消息進行處理 
struct EventData1
{
    int e = 0;
};

char * event_data2 = "Bonds Trade";
struct EventData2
{
    char * i = "receiver";
};

char * event_data3 = "Funds Trade";
struct EventData3
{
    EventData1  data1;
    EventData2 data2;
};
           

消息訂閱者

EventSuber.h: 訂閱者消息處理函數一般接收兩個參數,一個是自己的參數,一個是傳過來的消息

這裡可以不申明類,直接寫三個消息處理函數也可以

#include"EventData.h"
#include<iostream>
#include<string>

using namespace std;   // string是标準卡函數

/*訂閱消息處理子產品 3個 */
class Subscriber1
{
public:
    // 利用靜态成員函數,保留上一次消息處理結果
    // 不能通路非靜态成員變量與非靜态成員函數 => 傳這個類的指針 void* _this
    // 強轉_this指針,調用此類的成員函數、成員變量 _this=&(Subscriber1執行個體)

    // 給這個訂閱器傳一個消息
    static void HandleReceiveEvent (void * _this, void * data)
    {
        Subscriber1 * th = (Subscriber1 *)_this;
        EventData1* d = (EventData1 *)data;  
        d->e += th->_data;
        printf("%s, data:%d \n",__FUNCTION__,d->e);
    }
    int _data = 10;
};

class Subscriber2
{
public:
    static void HandleReceiveEvent (void * _this, void * data)
    {
        Subscriber2 * th = (Subscriber2 *)_this;
        EventData2* d = (EventData2 *)data;
        
        string str(d->i);
        str += th->_data;
        printf("%s, data:%s \n",__FUNCTION__,str.c_str());
    }
    char * _data = " Subscriber 2";
};

class Subscriber3
{
public:
    static void HandleReceiveEvent (void * _this, void * data)
    {
        Subscriber3 * th = (Subscriber3 *)_this;
        EventData3* d = (EventData3 *)data;
        d->data1.e += th->_data1;
        
        string str(d->data2.i);
        str += th->_data2;
        
        printf("%s, data:%d , %s \n",__FUNCTION__,d->data1.e,str.c_str());
    }
    int _data1 = 30;
    char * _data2 = " Subscriber 3";
};
           

消息進行中心

EventDeal.h: 開啟一個子線程,不斷去消息list取消息,并将該消息将由所有訂閱者進行處理

#include<pthread.h>
#include<map>
#include<list>
#include "EventSuber.h"

/*消息進行中心*/
class EventMsgCentre
{
public:
    typedef  void(*HandleEvent)(void * , void *);
    /* 訂閱消息節點 */
    // 消息處理函數設定兩個入參:第一個_this指針是訂閱器者,第二個是消息EventData
    // HandleReceiveEvent(suber,msg)形式
    struct EventSubscriberNode
    {
        void * _this;    // 隻是普通的指針變量
        HandleEvent func;   // 函數指針,func無傳回類型,且雙入參都為void*
    };

    /* 釋出消息節點 */
    struct EventPublishNode
    {
        char * event;
        void * data;
    };
    
public:
    // 初始化消息釋出、訂閱鎖
    EventMsgCentre()
    {
        pthread_mutex_init(&_mutexSubscriber, nullptr);
        pthread_mutex_init(&_mutexPublish, nullptr);
    }

    // 釋放鎖空間
    ~EventMsgCentre()
    {
        pthread_mutex_destroy(&_mutexPublish);
        pthread_mutex_destroy(&_mutexSubscriber);
    }
    
    // 訂閱消息 =》 訂閱map追加元素
    void SubscriberEvent(char * event, EventSubscriberNode node)
    {
        pthread_mutex_lock(&_mutexSubscriber);
        sMap[event].push_back(node);    
        pthread_mutex_unlock(&_mutexSubscriber);
    }
    
    // 删除消息訂閱
    void releaseSubscriberEvent(char * event, EventSubscriberNode node)
    {
        pthread_mutex_lock(&_mutexSubscriber);
        auto it = sMap.find(event);   // 訂閱map中找到這個消息的訂閱清單
        if(it != sMap.end())
        {
            for(auto ite = it->second.begin(); ite != it->second.end(); )
            {
                // 删除這個消息對應訂閱清單的某個訂閱
                if(ite->_this == node._this && ite->func == node.func)
                {
                    ite = it->second.erase(ite);
                }
                else
                {
                    ++ite;
                }
            }
        }
        pthread_mutex_unlock(&_mutexSubscriber);
    }

    // 釋出消息,往消息清單添加一個消息節點
    void PublishEvent(char * event, void *data)
    {
        EventPublishNode  node;
        node.event = event;
        node.data = data;
        
        pthread_mutex_lock(&_mutexPublish);
        plist.push_back(node);
        pthread_mutex_unlock(&_mutexPublish);
    }
    
    /* 消息進行中心  */
    static void *EventProcess(void * _this)
    {
        EventMsgCentre * th = (EventMsgCentre *)_this;  
        while (1)
        {
            EventPublishNode cur {nullptr, nullptr};  // 申明消息
            pthread_mutex_lock(&th->_mutexPublish);  // 消息釋出鎖
            // 取出第一個消息
            if (th->plist.empty())
            {
                pthread_mutex_unlock(&th->_mutexPublish);
                continue;
            }
            cur = th->plist.front();
            th->plist.pop_front();
            pthread_mutex_unlock(&th->_mutexPublish);
            
            // 消息訂閱鎖
            // 找到這個消息對應的訂閱 
            pthread_mutex_lock(&th->_mutexSubscriber);
            auto it = th->sMap.find(cur.event);
            if(it != th->sMap.end())
            {
                for(auto ite = it->second.begin(); ite != it->second.end(); ++ite)
                {
                    ite->func(ite->_this, cur.data);   // 執行所有訂閱者的消息動作
                }
            }
            pthread_mutex_unlock(&th->_mutexSubscriber);
        }
    }
    
    // 建立線程
    void theardProc()
    {
        /* 
        1.向調用者傳遞子線程的線程号 
        2.線程屬性設定,一個結構體,包括線程優先級,線程棧大小
        3.指定子線程允許的函數,需要一個函數指針
        4.子線程運作的函數參數值
        */
        int ret = pthread_create(&pt_id, nullptr, EventProcess, this);
    }
    
public:
    pthread_mutex_t _mutexSubscriber;   // 消息訂閱互斥鎖
    pthread_mutex_t _mutexPublish;   // 消息釋出互斥鎖
    pthread_t pt_id;   // 線程id
    list<EventPublishNode> plist;   // 消息釋出清單
    map<char *, list<EventSubscriberNode>> sMap;   // 消息訂閱map,一個訂閱者可訂閱多個消息
};
           

main線程

添加三個消息,并分别添加如頂圖所示的訂閱者(訂閱者4換成訂閱者1)

#include<iostream>
#include<map>
#include<list>
#include <windows.h>
#include <process.h>
#include"EventDeal.h"

/* 測試。 */
int main()
{
    EventMsgCentre eveMsg; /* 初始化 */
    eveMsg.theardProc(); /* 啟線程 */    
    
    /* 訂閱消息 */
    Subscriber1  ber;
    ber._data = 100;     
    EventMsgCentre::EventSubscriberNode node1;
    // 節點的_this就是一個訂閱器
    node1._this = &ber;      
    // 節點的函數指針就是訂閱器的HandleReceiveEvent
    node1.func = Subscriber1::HandleReceiveEvent;  
    eveMsg.SubscriberEvent(event_data1, node1);  // 為消息1("Stocks Trade")添加一個訂閱器1
    // 由于消息2和消息3不能直接轉換成消息1,是以給消息1添加了一個重複的訂閱者1
    eveMsg.SubscriberEvent(event_data1, node1);  
    
    Subscriber2 ber2;
    ber2._data = " Subscriber2_200";
    EventMsgCentre::EventSubscriberNode node2;
    node2._this = &ber2;
    node2.func = Subscriber2::HandleReceiveEvent;
    eveMsg.SubscriberEvent(event_data2, node2);
    
    Subscriber3 ber3;
    ber3._data1 = 300;
    ber3._data2 = " Subscriber3_300";
    EventMsgCentre::EventSubscriberNode node3;
    node3._this = &ber3;
    node3.func = Subscriber3::HandleReceiveEvent;
    eveMsg.SubscriberEvent(event_data3, node3);
    
    /* 釋出消息 */
    EventData1 d1{1};
    eveMsg.PublishEvent(event_data1, &d1);
    
    EventData2 d2 {"event_data2"};
    eveMsg.PublishEvent(event_data2, &d2);
    
    EventData3 d3 {3,"event_data3"};
    eveMsg.PublishEvent(event_data3, &d3);

    
    // Sleep(1000);    // 休眠一秒,主線程退出,子線程消息處理函數随之退出
    // eveMsg.~EventMsgCentre();

    while(1){}     // 等待EventProcess執行完成

    return 0;
}
           
C++釋出訂閱模式C++釋出訂閱模式

繼續閱讀