bool has_more(void* socket)
{
int more = 0;
size_t more_size = sizeof(more);
int rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
if (rc != 0)
return false;
return more != 0;
}
#include <string>
#include <map>
#include <vector>
#include <set>
bool bExitClient = false;
bool bExitServer = false;
//ZmqStreamServerProc - 服務端線程函數
DWORD WINAPI ZmqStreamServerProc(LPVOID lpThreadParameter) {
int rc;
std::set<std::string> peer_ids;
DWORD prev_tick = GetTickCount();
//建立context
void *ctx = zmq_ctx_new();
assert(ctx);
//建立socket并設定屬性
void *server = zmq_socket(ctx, ZMQ_STREAM);
assert(server);
int enabled = 1;
rc = zmq_setsockopt(server, ZMQ_STREAM_NOTIFY, &enabled, sizeof(enabled));
assert(rc == 0);
//綁定端口
rc = zmq_bind(server, "tcp://127.0.0.1:9070");
assert(rc == 0);
while (!bExitServer) {
//對socket進行poll操作
zmq_pollitem_t items[] = {
{ server, 0, ZMQ_POLLIN, 0 },
};
rc = zmq_poll(items, 1, 100);
if (rc == 1) {
if (items[0].revents & ZMQ_POLLIN) {
//有資料可讀,分為兩種情況,一種是STREAM NOTIFY,一種是真實的用戶端發出的資料
zmq_msg_t peer_frame;
//接收1st 對端ID幀
rc = zmq_msg_init(&peer_frame);
assert(rc == 0);
rc = zmq_msg_recv(&peer_frame, server, 0);
assert(rc != -1);
assert(zmq_msg_size(&peer_frame) > 0);
assert(has_more(server));
char * p_peer_frame_data = (char*)zmq_msg_data(&peer_frame);
//TRACE("peer_id:%s\n", p_peer_frame_data);
std::string str_peer_id(p_peer_frame_data, zmq_msg_size(&peer_frame));
zmq_msg_close(&peer_frame);
//接收2nd 對端資料幀
zmq_msg_t data_frame;
rc = zmq_msg_init(&data_frame);
assert(rc == 0);
rc = zmq_msg_recv(&data_frame, server, 0);
assert(rc != -1);
if (zmq_msg_size(&data_frame) == 0) {
if (peer_ids.find(str_peer_id) == peer_ids.end()) {
TRACE("S :a new conn incoming\n");
peer_ids.insert(str_peer_id);
}
else {
TRACE("S :an exsit conn disconnected\n");
peer_ids.erase(str_peer_id);
}
}
else {
char * p_data_frame_data = (char*)zmq_msg_data(&data_frame);
TRACE("S :recv [%s]\n", p_data_frame_data);
}
zmq_msg_close(&data_frame);
}
/*if (items[0].revents & ZMQ_POLLERR) {
TRACE("S :ZMQ_POLLERR\n");
}*/
}
else if (rc == 0) {
//0 表示poll沒有相應事件,可認為socket出于空閑狀态。
//TRACE("S :zmq_poll() return 0\n");
//assert(false);
//模拟每3秒鐘給所有用戶端送出請求
if (peer_ids.size() > 0 && (GetTickCount() - prev_tick) > 3000) {
for (auto iter = peer_ids.begin(); iter != peer_ids.end(); ++iter) {
//伺服器端先送出請求
//發出1st ID幀
rc = zmq_send(server, iter->c_str(), iter->size(), ZMQ_SNDMORE);
//TRACE("S :send identity res=%d errno=%d\n", rc, errno);
//assert(rc != -1);
//發出2nd 資料幀
TRACE("S :send:[%s]\n", "hello");
rc = zmq_send(server, "hello", 6, ZMQ_SNDMORE);
//assert(rc != -1);
}
prev_tick = GetTickCount();
}
}
else {
TRACE("S :zmq_poll() return error\n");
assert(false);
}
}
rc = zmq_close(server);
assert(rc == 0);
rc = zmq_ctx_term(ctx);
assert(rc == 0);
return 0;
}
//ZmqStreamClientProc - 用戶端線程函數
DWORD WINAPI ZmqStreamClientProc(LPVOID lpThreadParameter) {
int rc;
// Set-up our context and sockets
void *ctx = zmq_ctx_new();
assert(ctx);
int enabled = 1;
void *client = zmq_socket(ctx, ZMQ_STREAM);
assert(client);
rc = zmq_setsockopt(client, ZMQ_STREAM_NOTIFY, &enabled, sizeof(enabled));
assert(rc == 0);
rc = zmq_connect(client, "tcp://localhost:9070");
uint8_t id_opt[256];
size_t id_size_opt;
uint8_t id_c[256];
size_t id_size_c = 256;
uint8_t buffer[256];
rc = zmq_getsockopt(client, ZMQ_IDENTITY, id_opt, &id_size_opt);
assert(rc == 0);
Client: First frame is identity, second frame is zero
//id_size_c = zmq_recv(client, id_c, 256, 0);
///*TRACE("id_size = zmq_recv (client, id, 256, 0) -> id_size_c=%d\r\n", id_size_c);
//for (size_t i = 0; i < id_size_c; ++i) {
//TRACE("0x%02x \n", id_c[i]);
//}
//TRACE("\r\n");*/
//assert(id_size_c > 0);
//rc = zmq_recv(client, buffer, 256, 0);
//assert(rc == 0);
while (!bExitClient) {
//對socket進行poll操作
zmq_pollitem_t items[] = {
{ client, 0, ZMQ_POLLIN, 0 },
};
rc = zmq_poll(items, 1, 100);
if (rc == 1) {
if (items[0].revents & ZMQ_POLLIN) {
//有資料可讀,分為兩種情況,一種是STREAM NOTIFY,一種是真實的用戶端發出的資料
zmq_msg_t peer_frame;
//接收1st 對端ID幀
rc = zmq_msg_init(&peer_frame);
assert(rc == 0);
rc = zmq_msg_recv(&peer_frame, client, 0);
assert(rc != -1);
assert(zmq_msg_size(&peer_frame) > 0);
assert(has_more(client));
char * p_peer_frame_data = (char*)zmq_msg_data(&peer_frame);
//TRACE("peer_id:%s\n", p_peer_frame_data);
std::string str_peer_id(p_peer_frame_data, zmq_msg_size(&peer_frame));
zmq_msg_close(&peer_frame);
//接收2nd 對端資料幀
zmq_msg_t data_frame;
rc = zmq_msg_init(&data_frame);
assert(rc == 0);
rc = zmq_msg_recv(&data_frame, client, 0);
assert(rc != -1);
if (zmq_msg_size(&data_frame) == 0) {
}
else {
char * p_data_frame_data = (char*)zmq_msg_data(&data_frame);
TRACE("C :recv [%s]\n", p_data_frame_data);
assert(memcmp(p_data_frame_data, "hello", 6) == 0);
rc = zmq_send(client, str_peer_id.c_str(), str_peer_id.size(), ZMQ_SNDMORE);
assert(rc != -1);
TRACE("C :send:[world!]\n");
rc = zmq_send(client, "world!", 7, 0);
assert(rc != -1);
}
zmq_msg_close(&data_frame);
}
if (items[0].revents & ZMQ_POLLOUT) {
}
if (items[0].revents & ZMQ_POLLERR) {
TRACE("C :ZMQ_POLLERR");
}
}
else if (rc == 0) {
//TRACE("C :zmq_poll() return 0\n");
//assert(false);
}
else {
TRACE("C :zmq_poll() return error\n");
//assert(false);
}
}
rc = zmq_close(client);
assert(rc == 0);
rc = zmq_ctx_term(ctx);
assert(rc == 0);
return 0;
}
void CZmqStreamCommDemoDlg::OnBnClickedButton1()
{
DWORD tid;
bExitServer = false;
HANDLE hServerThread = CreateThread(NULL, 0, ZmqStreamServerProc, NULL, 0, &tid);
CloseHandle(hServerThread);
}
void CZmqStreamCommDemoDlg::OnBnClickedButton2()
{
bExitClient = false;
DWORD tid;
HANDLE hThread = CreateThread(NULL, 0, ZmqStreamClientProc, NULL, 0, &tid);
CloseHandle(hThread);
}
void CZmqStreamCommDemoDlg::OnBnClickedButton3()
{
bExitClient = true;
}
void CZmqStreamCommDemoDlg::OnBnClickedButton4()
{
bExitServer = true;
}
2016-06-08 在實際代碼中測試發現接收方收到的資料雖然是以zmq_msg_t 為邊界,但有可能是由發送方的多包資料組成的,是以接收方還應嚴格按照自定義的協定格式來分解出多個包。