ZeroMQ的訂閱釋出模式是一種單向的資料釋出,當用戶端向服務端訂閱消息之後,服務端便會将産生的消息源源不斷的推送給訂閱者,本文的示例代碼來源于文獻[1]示例代碼的修改。
釋出-訂閱圖示
釋出者使用PUB套接字将消息發送到隊列中,訂閱者使用SUB套接字從隊列中源源不斷的接收消息。新的訂閱者可以随時加入,但之前的消息是無法接收到的;已有的訂閱者可以随時退出;訂閱者還可以添加“過濾器”用來有選擇性的接收消息。
使用方法簡介
首先要建立一個上下文環境,然後使用它建立套接字:
對于服務端來說,使用”ZMQ_PUB”建立socket,并且綁定到一個周知的端口,然後便可以不斷的廣播消息了:
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
如果使用TCP連接配接并且訂閱者是慢速的,那麼消息将在釋出方排隊;可以使用高水位标記(High-Water Marks,HWM)來定義緩沖區的大小,在ZeroMQ v2.x版本中HWM預設是無限制的,而在v3.x中預設情況下它是1000。對于PUB套接字,當到達HWM時,将丢棄資料。設定HWM參數:
zmq_setsockopt(publisher, ZMQ_SNDHWM, &nMaxNum, sizeof(nMaxNum));
對于用戶端來說,要使用”ZMQ_SUB”建立socket,并且連結(zmq_connect)到待訂閱的服務端;此外,要想接收到服務推送的消息,還必須使用zmq_setsockopt和ZMQ_SUBSCRIBE來配置該訂閱。zmq_setsockopt的ZMQ_SUBSCRIBE選項可以帶有一個”過濾器“,用以選擇性的接收來自服務端的消息。該”過濾器”為空,則接收全部的消息;”過濾器”還可以有多個,它們之間是”or”的關系,即接收滿足任一條件的消息。當然也可以使用zmq_setsockopt配置選項ZMQ_UNSUBSCRIBE來取消訂閱,示例如下:
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
char filter1[] = "10001 ";
char filter2[] = "20002 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1)); //接收消息的字首為filter1的消息
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2)); //接收消息的字首為filter2的消息
接收和發送消息:此處使用的方法是zmq_recv()和zmq_send(),相對于zmq_msg_send()和zmq_msg_recv(),它們會自己調用消息發送和接收的初始化方法等。
int zmq_recv (void *s, void *buf, size_t len, int flags);
int zmq_send (void *s, const void *buf, size_t len, int flags);
示例代碼
//服務端:
#include <iostream>
#include <string>
#include <sstream>
#include <cstring>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <assert.h>
#include <zmq.h>
using namespace std;
int main ()
{
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == );
// Initialize random number generator
srand(time());
while () {
int zipcode, temperature, relhumidity;
zipcode = rand() % ;
temperature = rand() % - ;
relhumidity = rand() % + ;
ostringstream os;
os << setw() << setfill('0')<< zipcode <<" "
<< temperature <<" "<< relhumidity << "\n";
zmq_send(publisher, os.str().c_str(), strlen(os.str().c_str()), );
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return ;
}
//用戶端:
#include <iostream>
#include <string>
#include <cstring>
#include <assert.h>
#include <zmq.h>
using namespace std;
int main (int argc, char *argv [])
{
// Socket to talk to server
printf ("Collecting updates from weather server...\n");
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
assert (rc == );
char filter1[] = "10001 ";
char filter2[] = "20002 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1));
assert (rc == );
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2));
assert (rc == );
// Process 100 updates
int size;
char buffer [];
for (int update_nbr = ; update_nbr < ; update_nbr++) {
memset(buffer, , *sizeof(char));
size = zmq_recv (subscriber, buffer, , );
if (size == -){
cout<< "receiver error , skip this message"<<endl;
continue;
}
buffer[size] = '\0';
cout << buffer <<endl;
}
zmq_close (subscriber);
zmq_ctx_destroy (context);
return ;
}
NOTE:
在文獻[1]中指出:
- 在ZMQ_SUB套接字上執行相對于zmq_msg_send()和在ZMQ_PUB套接字上執行相對于和zmq_msg_recv()同樣都是錯誤的;
- PUB-SUB具有”slow joiner”症狀。”slow joiner”的症狀是:即使先啟動訂閱者,稍等片刻再啟動釋出者,訂閱者也可能錯過釋出者發送的第一條消息。建立TCP連接配接需要花費時間,具體取決于網絡狀況,以及主機到服務端的路由,是以即使多個訂閱者同時啟動,它們可能也不會收到同樣的消息;
- 訂閱者可以使用zmq_connect()同時連接配接到多個釋出者。不同釋出者推送的消息将交錯到達;
- 如果一個釋出者沒有任何訂閱者,那麼它會簡單地丢棄所有的消息;
- 從ZMQ v3.x開始,在使用連接配接的協定是tcp或者ipc時,過濾發生在釋出方。使用epgm協定,過濾發生在訂閱方。但在ZMQ v2.x版本中,所有過濾都發生在訂閱方。
[1].《ZeroMQ雲時代極速消息通信庫》.電子工業出版社,2015.