C++釋出訂閱模式
釋出訂閱模式主要包含三個部分:消息釋出、消息訂閱者、消息進行中心。與觀察者模式相比多出了消息進行中心子產品,這樣在結構上可以解耦訂閱者與釋出者,功能上更加的豐富。
觀察者模式
結構設計
- 有一個消息list,主線程向這個list尾部追加消息,同時另一個子線程從消息list頭部不斷取出第一個消息
- 查找消息訂閱map,訂閱者與消息設計為n:n關系,一個消息可被多個訂閱者訂閱,是以需依次執行訂閱了這個消息的函數
- 如:Stocks Trade消息需被Suber1和Suber4兩個訂閱者處理
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiInBnauYGMwYWOwETNjRTMmFGZ3EDOhZTN1cDZllTM1kzMiZ2LcNXZyVHdjlGcvw1cn9Gbi9CXulWYtB0bnNWaw9CXlJXam1CZsl2dtcmbpx2ayFGcz9CXod2LcRXZu5ic2lGblR2cq5ibkN2Lc9CX6MHc0RHaiojIsJye.jpg)
簡單來說就是:EventDeta*就是一個資料,并為之取了一個别名 * Trade,稱之為消息。而很多類函數(普通函數也一樣的)的正常執行需要這個EventData,是以這個類将其某個函數和EventData通過訂閱清單綁定,當這個EventData也即是消息成為消息清單的第一個元素時,就執行和這個EventData綁定的所有類函數。
代碼
[知乎]](https://zhuanlan.zhihu.com/p/484171260)
消息釋出
EventData.h:
/*
定義有哪些消息、消息對應的資料
當想要發自己的消息,可以在這裡進行添加
*/
// 消息句柄用于辨別消息
char * event_data1 = "Stocks Trade";
// 消息主體,可做為訂閱者的入參,以對此消息進行處理
struct EventData1
{
int e = 0;
};
char * event_data2 = "Bonds Trade";
struct EventData2
{
char * i = "receiver";
};
char * event_data3 = "Funds Trade";
struct EventData3
{
EventData1 data1;
EventData2 data2;
};
消息訂閱者
EventSuber.h: 訂閱者消息處理函數一般接收兩個參數,一個是自己的參數,一個是傳過來的消息
這裡可以不申明類,直接寫三個消息處理函數也可以
#include"EventData.h"
#include<iostream>
#include<string>
using namespace std; // string是标準卡函數
/*訂閱消息處理子產品 3個 */
class Subscriber1
{
public:
// 利用靜态成員函數,保留上一次消息處理結果
// 不能通路非靜态成員變量與非靜态成員函數 => 傳這個類的指針 void* _this
// 強轉_this指針,調用此類的成員函數、成員變量 _this=&(Subscriber1執行個體)
// 給這個訂閱器傳一個消息
static void HandleReceiveEvent (void * _this, void * data)
{
Subscriber1 * th = (Subscriber1 *)_this;
EventData1* d = (EventData1 *)data;
d->e += th->_data;
printf("%s, data:%d \n",__FUNCTION__,d->e);
}
int _data = 10;
};
class Subscriber2
{
public:
static void HandleReceiveEvent (void * _this, void * data)
{
Subscriber2 * th = (Subscriber2 *)_this;
EventData2* d = (EventData2 *)data;
string str(d->i);
str += th->_data;
printf("%s, data:%s \n",__FUNCTION__,str.c_str());
}
char * _data = " Subscriber 2";
};
class Subscriber3
{
public:
static void HandleReceiveEvent (void * _this, void * data)
{
Subscriber3 * th = (Subscriber3 *)_this;
EventData3* d = (EventData3 *)data;
d->data1.e += th->_data1;
string str(d->data2.i);
str += th->_data2;
printf("%s, data:%d , %s \n",__FUNCTION__,d->data1.e,str.c_str());
}
int _data1 = 30;
char * _data2 = " Subscriber 3";
};
消息進行中心
EventDeal.h: 開啟一個子線程,不斷去消息list取消息,并将該消息将由所有訂閱者進行處理
#include<pthread.h>
#include<map>
#include<list>
#include "EventSuber.h"
/*消息進行中心*/
class EventMsgCentre
{
public:
typedef void(*HandleEvent)(void * , void *);
/* 訂閱消息節點 */
// 消息處理函數設定兩個入參:第一個_this指針是訂閱器者,第二個是消息EventData
// HandleReceiveEvent(suber,msg)形式
struct EventSubscriberNode
{
void * _this; // 隻是普通的指針變量
HandleEvent func; // 函數指針,func無傳回類型,且雙入參都為void*
};
/* 釋出消息節點 */
struct EventPublishNode
{
char * event;
void * data;
};
public:
// 初始化消息釋出、訂閱鎖
EventMsgCentre()
{
pthread_mutex_init(&_mutexSubscriber, nullptr);
pthread_mutex_init(&_mutexPublish, nullptr);
}
// 釋放鎖空間
~EventMsgCentre()
{
pthread_mutex_destroy(&_mutexPublish);
pthread_mutex_destroy(&_mutexSubscriber);
}
// 訂閱消息 =》 訂閱map追加元素
void SubscriberEvent(char * event, EventSubscriberNode node)
{
pthread_mutex_lock(&_mutexSubscriber);
sMap[event].push_back(node);
pthread_mutex_unlock(&_mutexSubscriber);
}
// 删除消息訂閱
void releaseSubscriberEvent(char * event, EventSubscriberNode node)
{
pthread_mutex_lock(&_mutexSubscriber);
auto it = sMap.find(event); // 訂閱map中找到這個消息的訂閱清單
if(it != sMap.end())
{
for(auto ite = it->second.begin(); ite != it->second.end(); )
{
// 删除這個消息對應訂閱清單的某個訂閱
if(ite->_this == node._this && ite->func == node.func)
{
ite = it->second.erase(ite);
}
else
{
++ite;
}
}
}
pthread_mutex_unlock(&_mutexSubscriber);
}
// 釋出消息,往消息清單添加一個消息節點
void PublishEvent(char * event, void *data)
{
EventPublishNode node;
node.event = event;
node.data = data;
pthread_mutex_lock(&_mutexPublish);
plist.push_back(node);
pthread_mutex_unlock(&_mutexPublish);
}
/* 消息進行中心 */
static void *EventProcess(void * _this)
{
EventMsgCentre * th = (EventMsgCentre *)_this;
while (1)
{
EventPublishNode cur {nullptr, nullptr}; // 申明消息
pthread_mutex_lock(&th->_mutexPublish); // 消息釋出鎖
// 取出第一個消息
if (th->plist.empty())
{
pthread_mutex_unlock(&th->_mutexPublish);
continue;
}
cur = th->plist.front();
th->plist.pop_front();
pthread_mutex_unlock(&th->_mutexPublish);
// 消息訂閱鎖
// 找到這個消息對應的訂閱
pthread_mutex_lock(&th->_mutexSubscriber);
auto it = th->sMap.find(cur.event);
if(it != th->sMap.end())
{
for(auto ite = it->second.begin(); ite != it->second.end(); ++ite)
{
ite->func(ite->_this, cur.data); // 執行所有訂閱者的消息動作
}
}
pthread_mutex_unlock(&th->_mutexSubscriber);
}
}
// 建立線程
void theardProc()
{
/*
1.向調用者傳遞子線程的線程号
2.線程屬性設定,一個結構體,包括線程優先級,線程棧大小
3.指定子線程允許的函數,需要一個函數指針
4.子線程運作的函數參數值
*/
int ret = pthread_create(&pt_id, nullptr, EventProcess, this);
}
public:
pthread_mutex_t _mutexSubscriber; // 消息訂閱互斥鎖
pthread_mutex_t _mutexPublish; // 消息釋出互斥鎖
pthread_t pt_id; // 線程id
list<EventPublishNode> plist; // 消息釋出清單
map<char *, list<EventSubscriberNode>> sMap; // 消息訂閱map,一個訂閱者可訂閱多個消息
};
main線程
添加三個消息,并分别添加如頂圖所示的訂閱者(訂閱者4換成訂閱者1)
#include<iostream>
#include<map>
#include<list>
#include <windows.h>
#include <process.h>
#include"EventDeal.h"
/* 測試。 */
int main()
{
EventMsgCentre eveMsg; /* 初始化 */
eveMsg.theardProc(); /* 啟線程 */
/* 訂閱消息 */
Subscriber1 ber;
ber._data = 100;
EventMsgCentre::EventSubscriberNode node1;
// 節點的_this就是一個訂閱器
node1._this = &ber;
// 節點的函數指針就是訂閱器的HandleReceiveEvent
node1.func = Subscriber1::HandleReceiveEvent;
eveMsg.SubscriberEvent(event_data1, node1); // 為消息1("Stocks Trade")添加一個訂閱器1
// 由于消息2和消息3不能直接轉換成消息1,是以給消息1添加了一個重複的訂閱者1
eveMsg.SubscriberEvent(event_data1, node1);
Subscriber2 ber2;
ber2._data = " Subscriber2_200";
EventMsgCentre::EventSubscriberNode node2;
node2._this = &ber2;
node2.func = Subscriber2::HandleReceiveEvent;
eveMsg.SubscriberEvent(event_data2, node2);
Subscriber3 ber3;
ber3._data1 = 300;
ber3._data2 = " Subscriber3_300";
EventMsgCentre::EventSubscriberNode node3;
node3._this = &ber3;
node3.func = Subscriber3::HandleReceiveEvent;
eveMsg.SubscriberEvent(event_data3, node3);
/* 釋出消息 */
EventData1 d1{1};
eveMsg.PublishEvent(event_data1, &d1);
EventData2 d2 {"event_data2"};
eveMsg.PublishEvent(event_data2, &d2);
EventData3 d3 {3,"event_data3"};
eveMsg.PublishEvent(event_data3, &d3);
// Sleep(1000); // 休眠一秒,主線程退出,子線程消息處理函數随之退出
// eveMsg.~EventMsgCentre();
while(1){} // 等待EventProcess執行完成
return 0;
}