天天看點

ZeroMQ的訂閱釋出(publish-subscribe)模式

ZeroMQ的訂閱釋出模式是一種單向的資料釋出,當用戶端向服務端訂閱消息之後,服務端便會将産生的消息源源不斷的推送給訂閱者,本文的示例代碼來源于文獻[1]示例代碼的修改。

釋出-訂閱圖示

ZeroMQ的訂閱釋出(publish-subscribe)模式

釋出者使用PUB套接字将消息發送到隊列中,訂閱者使用SUB套接字從隊列中源源不斷的接收消息。新的訂閱者可以随時加入,但之前的消息是無法接收到的;已有的訂閱者可以随時退出;訂閱者還可以添加“過濾器”用來有選擇性的接收消息。

使用方法簡介

首先要建立一個上下文環境,然後使用它建立套接字:

對于服務端來說,使用”ZMQ_PUB”建立socket,并且綁定到一個周知的端口,然後便可以不斷的廣播消息了:

void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
           

如果使用TCP連接配接并且訂閱者是慢速的,那麼消息将在釋出方排隊;可以使用高水位标記(High-Water Marks,HWM)來定義緩沖區的大小,在ZeroMQ v2.x版本中HWM預設是無限制的,而在v3.x中預設情況下它是1000。對于PUB套接字,當到達HWM時,将丢棄資料。設定HWM參數:

zmq_setsockopt(publisher, ZMQ_SNDHWM, &nMaxNum, sizeof(nMaxNum));
           

對于用戶端來說,要使用”ZMQ_SUB”建立socket,并且連結(zmq_connect)到待訂閱的服務端;此外,要想接收到服務推送的消息,還必須使用zmq_setsockopt和ZMQ_SUBSCRIBE來配置該訂閱。zmq_setsockopt的ZMQ_SUBSCRIBE選項可以帶有一個”過濾器“,用以選擇性的接收來自服務端的消息。該”過濾器”為空,則接收全部的消息;”過濾器”還可以有多個,它們之間是”or”的關系,即接收滿足任一條件的消息。當然也可以使用zmq_setsockopt配置選項ZMQ_UNSUBSCRIBE來取消訂閱,示例如下:

void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");

char filter1[] = "10001 "; 
char filter2[] = "20002 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1)); //接收消息的字首為filter1的消息
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2)); //接收消息的字首為filter2的消息
           

接收和發送消息:此處使用的方法是zmq_recv()和zmq_send(),相對于zmq_msg_send()和zmq_msg_recv(),它們會自己調用消息發送和接收的初始化方法等。

int zmq_recv (void *s, void *buf, size_t len, int flags);
int zmq_send (void *s, const void *buf, size_t len, int flags);
           

示例代碼

//服務端:
#include <iostream>
#include <string>
#include <sstream>
#include <cstring>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main ()
{
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == );

    //  Initialize random number generator
    srand(time());
    while () {
        int zipcode, temperature, relhumidity;
        zipcode     = rand() % ;
        temperature = rand() %  - ;
        relhumidity = rand() %  + ;

        ostringstream os;
        os << setw() << setfill('0')<< zipcode <<" "
           << temperature <<" "<< relhumidity << "\n";

        zmq_send(publisher, os.str().c_str(), strlen(os.str().c_str()), );
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return ;
}
           
//用戶端:
#include <iostream>
#include <string>
#include <cstring>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main (int argc, char *argv [])
{
    //  Socket to talk to server
    printf ("Collecting updates from weather server...\n");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == );

    char filter1[] = "10001 ";
    char filter2[] = "20002 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1));
    assert (rc == );
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2));
    assert (rc == );

    //  Process 100 updates
    int size;
    char buffer [];
    for (int update_nbr = ; update_nbr < ; update_nbr++) {

        memset(buffer, , *sizeof(char));
        size = zmq_recv (subscriber, buffer, , );
        if (size == -){
            cout<< "receiver error , skip this message"<<endl;
            continue;
        }
        buffer[size] = '\0';
        cout << buffer <<endl;
    }

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return ;
}
           

NOTE:

在文獻[1]中指出:

  • 在ZMQ_SUB套接字上執行相對于zmq_msg_send()和在ZMQ_PUB套接字上執行相對于和zmq_msg_recv()同樣都是錯誤的;
  • PUB-SUB具有”slow joiner”症狀。”slow joiner”的症狀是:即使先啟動訂閱者,稍等片刻再啟動釋出者,訂閱者也可能錯過釋出者發送的第一條消息。建立TCP連接配接需要花費時間,具體取決于網絡狀況,以及主機到服務端的路由,是以即使多個訂閱者同時啟動,它們可能也不會收到同樣的消息;
  • 訂閱者可以使用zmq_connect()同時連接配接到多個釋出者。不同釋出者推送的消息将交錯到達;
  • 如果一個釋出者沒有任何訂閱者,那麼它會簡單地丢棄所有的消息;
  • 從ZMQ v3.x開始,在使用連接配接的協定是tcp或者ipc時,過濾發生在釋出方。使用epgm協定,過濾發生在訂閱方。但在ZMQ v2.x版本中,所有過濾都發生在訂閱方。

[1].《ZeroMQ雲時代極速消息通信庫》.電子工業出版社,2015.