天天看點

nanomsg項目實戰

文章目錄

      • nanomsg下載下傳
      • 編譯
      • 使用方式
      • 基本概念
      • Pipeline
      • Request/Reply
      • Pair
      • Pub/Sub
      • Survey
      • Bus
      • 項目中使用
        • 釋出服務
        • 訂閱服務
        • makefile

nanomsg下載下傳

下載下傳位址:https://github.com/nanomsg/nanomsg/releases

編譯

unzip nanomsg-1.1.5.zip
cd nanomsg-1.1.5/
mkdir build
cd build
cmake ..
cmake --build .
ctest .
sudo cmake --build . --target install
sudo ldconfig
           

使用方式

Pipeline (A One-Way Pipe)管道(單向管道)

Request/Reply (I ask, you answer)請求/回複(我問,你答)

Pair (Two Way Radio)配對(雙向無線電)

Pub/Sub (Topics & Broadcast)釋出/訂閱(主題和廣播)

Survey (Everybody Votes)調查(所有人投票)

Bus (Routing) 總線(路由)

使用方式示例:https://nanomsg.org/gettingstarted/pipeline.html

接口文檔:https://nanomsg.org/v1.0.0/nanomsg.7.html

基本概念

關于url

描述:由以下兩部分組成:transport://address

​ - 傳輸指定要使用的底層傳輸協定

​ - 位址部分的含義是特定于底層傳輸協定的

底層傳輸協定:

  • ipc UNIX域套接字

- 在posix相容的系統上,使用UNIX域套接字,IPC位址是檔案引用*.socket檔案

- 在Windows上,IPC使用命名管道。 IPC位址是不區分大小寫的任意字元串,包含除反斜杠以外的任何字元。

位址示例:

​ - 使用相對(ipc://test.ipc) 此時在程式目前路徑下生成socket檔案,即域套接字檔案

​ - 使用絕對(ipc:///tmp/test.ipc) 此時在/tmp目錄下生成socket檔案,即域套接字檔案

  • tcp tcp://interface:port
    • port 端口
    • interface
      • 本地網絡接口的IPv4位址,數字形式(192.168.0.111)
      • 本地網絡接口的IPv6位址,數字形式(::1)
        • 星号(*)表示所有本地網絡接口
    • 位址示例
      • tcp://*:5555
      • tcp://192.168.0.111:5555
  • ws ws://interface:port
    • port 端口
    • interface

Pipeline

Pipeline (A One-Way Pipe)管道(單向管道)

此模式對于解決生産者/消費者問題(包括負載平衡)非常有用。消息從推側流向拉側。如果連接配接了多個對等點,則模式将嘗試公平分布。

nanomsg項目實戰

示例代碼:pipeline.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>

#define NODE0 "node0"
#define NODE1 "node1"

void
fatal(const char *func)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
        exit(1);
}

int
node0(const char *url)
{
        int sock;
        int rv;
        printf("url: \"%s\"\n", url); 
        if ((sock = nn_socket(AF_SP, NN_PULL)) < 0) {
                fatal("nn_socket");
        }
        if ((rv = nn_bind(sock, url)) < 0) {
                fatal("nn_bind");
        }
        for (;;) {
                char *buf = NULL;
                int bytes;
                if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0) {
                        fatal("nn_recv");
                }
                printf("NODE0: RECEIVED \"%s\"\n", buf); 
                nn_freemsg(buf);
        }
}

int
node1(const char *url, const char *msg)
{
        int sz_msg = strlen(msg) + 1; // '\0' too
        int sock;
        int rv;
        int bytes;

        if ((sock = nn_socket(AF_SP, NN_PUSH)) < 0) {
                fatal("nn_socket");
        }
        if ((rv = nn_connect(sock, url)) < 0) {
                fatal("nn_connect");
        }
        printf("NODE1: SENDING \"%s\"\n", msg);
        if ((bytes = nn_send(sock, msg, sz_msg, 0)) < 0) {
                fatal("nn_send");
        }
        sleep(1); // wait for messages to flush before shutting down
        return (nn_shutdown(sock, 0));
}

int
main(const int argc, const char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2], argv[3]));

        fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
                NODE0, NODE1);
        return (1);
}
           

編譯

* - gcc pipeline.c -lnanomsg -o pipeline

* 運作

* - ./pipeline node0 ipc:///tmp/pipeline.ipc & node0=$! && sleep 1

* - ./pipeline node1 ipc:///tmp/pipeline.ipc “Hello, World!”

Request/Reply

Request/Reply (我問, 你答)

nanomsg項目實戰

請求/應答用于同步通信,其中每個問題都用一個答案進行響應,例如遠端過程調用(rpc)。 與Pipeline一樣,它也可以執行負載平衡。 這是套件中唯一可靠的消息傳遞模式,因為如果請求與響應不比對,它将自動重試。

示例demo:reqprep.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>

#define NODE0 "node0"
#define NODE1 "node1"
#define DATE "DATE"

void
fatal(const char *func)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
        exit(1);
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}

int
node0(const char *url)
{
        int sz_date = strlen(DATE) + 1; // '\0' too
        int sock;
        int rv;

        if ((sock = nn_socket(AF_SP, NN_REP)) < 0) {
                fatal("nn_socket");
        }
          if ((rv = nn_bind(sock, url)) < 0) {
                fatal("nn_bind");
        }
        for (;;) {
                char *buf = NULL;
                int bytes;
                if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0) {
                        fatal("nn_recv");
                }
                if ((bytes == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {
                        printf("NODE0: RECEIVED DATE REQUEST\n");
                        char *d = date();
                        int sz_d = strlen(d) + 1; // '\0' too
                        printf("NODE0: SENDING DATE %s\n", d);
                        if ((bytes = nn_send(sock, d, sz_d, 0)) < 0) {
                                fatal("nn_send");
                        }
                }
                nn_freemsg(buf);
        }
}

int
node1(const char *url)
{
        int sz_date = strlen(DATE) + 1; // '\0' too
        char *buf = NULL;
        int bytes = -1;
        int sock;
        int rv;

        if ((sock = nn_socket(AF_SP, NN_REQ)) < 0) {
                fatal("nn_socket");
        }
        if ((rv = nn_connect (sock, url)) < 0) {
                fatal("nn_connect");
        }
        printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
        if ((bytes = nn_send(sock, DATE, sz_date, 0)) < 0) {
                fatal("nn_send");
        }
        if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0) {
                fatal("nn_recv");
        }
        printf("NODE1: RECEIVED DATE %s\n", buf);  
        nn_freemsg(buf);
        return (nn_shutdown(sock, 0));
}

int
main(const int argc, const char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2]));

      fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1);
      return (1);
}
           

編譯

gcc reqrep.c -lnanomsg -o reqrep
           

運作

./reqrep node0 ipc:///tmp/reqrep.ipc & node0=$! && sleep 1
./reqrep node1 ipc:///tmp/reqrep.ipc
kill $node0
           

Output

NODE1: SENDING DATE REQUEST DATE
NODE0: RECEIVED DATE REQUEST
NODE0: SENDING DATE Sat Sep  7 17:39:01 2013
NODE1: RECEIVED DATE Sat Sep  7 17:39:01 2013
           

Pair

Pair (雙向通信)

nanomsg項目實戰

當存在一對一的對等關系時,使用pair模式。 一次隻能有一個對等體連接配接到另一個對等體,但雙方都可以自由發言。

示例Demo:pair.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <nanomsg/nn.h>
#include <nanomsg/pair.h>

#define NODE0 "node0"
#define NODE1 "node1"

void
fatal(const char *func)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
        exit(1);
}

int
send_name(int sock, const char *name)
{
        printf("%s: SENDING \"%s\"\n", name, name);
        int sz_n = strlen(name) + 1; // '\0' too
        return (nn_send(sock, name, sz_n, 0));
}

int
recv_name(int sock, const char *name)
{
        char *buf = NULL;
        int result = nn_recv(sock, &buf, NN_MSG, 0);
        if (result > 0) {
                printf("%s: RECEIVED \"%s\"\n", name, buf); 
                nn_freemsg(buf);
        }
        return (result);
}

int
send_recv(int sock, const char *name)
{
        int to = 100;
        if (nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to,
            sizeof (to)) < 0) {
                fatal("nn_setsockopt");
        }

        for (;;) {
                recv_name(sock, name);
                sleep(1);
                send_name(sock, name);
        }
}

int
node0(const char *url)
{
        int sock;
        if ((sock = nn_socket(AF_SP, NN_PAIR)) < 0) {
                fatal("nn_socket");
        }
         if (nn_bind(sock, url) < 0) {
                fatal("nn_bind");
        }
        return (send_recv(sock, NODE0));
}

int
node1(const char *url)
{
        int sock;
        if ((sock = nn_socket(AF_SP, NN_PAIR)) < 0) {
                fatal("nn_socket");
        }
        if (nn_connect(sock, url) < 0) {
                fatal("nn_connect");
        }
        return (send_recv(sock, NODE1));
}

int
main(const int argc, const char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));

        if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2]));

        fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1);
        return 1;
}
           

編譯

gcc pair.c -lnanomsg -o pair
           

運作

./pair node0 ipc:///tmp/pair.ipc & node0=$!
./pair node1 ipc:///tmp/pair.ipc & node1=$!
sleep 3
kill $node0 $node1
           

Output

node0: SENDING "node0"
node1: SENDING "node1"
node1: RECEIVED"node0"
node0: SENDING "node0"
node0: RECEIVED"node1"
           

Pub/Sub

Pub/Sub(釋出/訂閱)

nanomsg項目實戰

此模式用于允許單個廣播機構向多個訂閱者釋出消息,訂閱者可以選擇限制它們接收的消息。

pubsub.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

#define SERVER "server"
#define CLIENT "client"

void
fatal(const char *func)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}

int
server(const char *url)
{
        int sock;

        if ((sock = nn_socket(AF_SP, NN_PUB)) < 0) {
                fatal("nn_socket");
        }
          if (nn_bind(sock, url) < 0) {
                fatal("nn_bind");
        }
        for (;;) {
                char *d = date();
                int sz_d = strlen(d) + 1; // '\0' too
                printf("SERVER: PUBLISHING DATE %s\n", d);
                int bytes = nn_send(sock, d, sz_d, 0);
                if (bytes < 0) {
                        fatal("nn_send");
                }
                sleep(1);
        }
}

int
client(const char *url, const char *name)
{
        int sock;

        if ((sock = nn_socket(AF_SP, NN_SUB)) < 0) {
                fatal("nn_socket");
        }

        // subscribe to everything ("" means all topics)
        if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
                fatal("nn_setsockopt");
        }
        if (nn_connect(sock, url) < 0) {
                fatal("nn_connet");
        }
        for (;;) {
                char *buf = NULL;
                int bytes = nn_recv(sock, &buf, NN_MSG, 0);
                if (bytes < 0) {
                        fatal("nn_recv");
                }
                printf("CLIENT (%s): RECEIVED %s\n", name, buf); 
                nn_freemsg(buf);
        }
}

int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));

          if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client (argv[2], argv[3]));

        fprintf(stderr, "Usage: pubsub %s|%s <URL> <ARG> ...\n",
            SERVER, CLIENT);
        return 1;
}
           

Compilation

gcc pubsub.c -lnanomsg -o pubsub
           

Execution

./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1
./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$!
./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$!
           

Output

SERVER: PUBLISHING DATE Sat Sep  7 17:40:11 2013
SERVER: PUBLISHING DATE Sat Sep  7 17:40:12 2013
SERVER: PUBLISHING DATE Sat Sep  7 17:40:13 2013
CLIENT (client0): RECEIVED Sat Sep  7 17:40:13 2013
CLIENT (client1): RECEIVED Sat Sep  7 17:40:13 2013
           

Survey

Survey (每個人投票)

nanomsg項目實戰

投票模式用于發送一個定時的調查,在調查過期之前,将逐個傳回響應。 此模式對于服務發現和投票算法非常有用。

survey.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

#define SERVER "server"
#define CLIENT "client"
#define DATE   "DATE"

void
fatal(const char *func)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
        exit(1);
}

char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}

int
server(const char *url)
{
        int sock;

        if ((sock = nn_socket (AF_SP, NN_SURVEYOR)) < 0) {
                fatal("nn_socket");
        }
        if (nn_bind(sock, url)  < 0) {
                fatal("nn_bind");
        }
        for (;;) {
                printf("SERVER: SENDING DATE SURVEY REQUEST\n");
                int bytes = nn_send(sock, DATE, strlen(DATE) + 1, 0);
                if (bytes < 0) {
                        fatal("nn_send");
                }

                for (;;) {
                        char *buf = NULL;
                        int bytes = nn_recv(sock, &buf, NN_MSG, 0);
                        if (bytes < 0) {
                                if (nn_errno() == ETIMEDOUT) {
                                        break;
                                }
                                fatal("nn_recv");
                        }
                        printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n",
                            buf); 
                        nn_freemsg(buf);
                }

                printf("SERVER: SURVEY COMPLETE\n");
                sleep(1); // Start another survey in a second
        }
}

int
client(const char *url, const char *name)
{
        int sock;

        if ((sock = nn_socket(AF_SP, NN_RESPONDENT)) < 0) {
                fatal("nn_socket");
        }
        if (nn_connect (sock, url) < 0) {
                fatal("nn_connect");
        }
            for (;;) {
                char *buf = NULL;
                int bytes = nn_recv(sock, &buf, NN_MSG, 0);
                if (bytes >= 0) {
                        printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n",
                            name, buf); 
                        nn_freemsg(buf);
                        char *d = date();
                        int sz_d = strlen(d) + 1; // '\0' too
                        printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n",
                           name);
                        if (nn_send(sock, d, sz_d, 0) < 0) {
                                fatal("nn_send");
                        }
                }
        }
}

int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));

        if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client(argv[2], argv[3]));

        fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n",
            SERVER, CLIENT);
        return 1;
}
           

Compilation

gcc survey.c -lnanomsg -o survey
           

Execution

./survey server ipc:///tmp/survey.ipc & server=$!
./survey client ipc:///tmp/survey.ipc client0 & client0=$!
./survey client ipc:///tmp/survey.ipc client1 & client1=$!
kill $server $client0 $client1 $client2
           

Output

SERVER: SENDING DATE SURVEY REQUEST
SERVER: SURVEY COMPLETE
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): SENDING DATE SURVEY RESPONSE
CLIENT (client0): SENDING DATE SURVEY RESPONSE
           

Bus

Bus (路由)

nanomsg項目實戰

總線協定對于路由應用程式或建構完全互連的網狀網絡非常有用。 在這種模式中,消息被發送到每個直接連接配接的對等體。

bus.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <nanomsg/nn.h>
#include <nanomsg/bus.h>

void
fatal(const char *func)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
        exit(1);
}

int
node(const int argc, const char **argv)
{
        int sock;

        if ((sock = nn_socket (AF_SP, NN_BUS)) < 0) {
                fatal("nn_socket");
        }
          if (nn_bind(sock, argv[2]) < 0) {
                fatal("nn_bind");
        }

        sleep(1); // wait for peers to bind
        if (argc >= 3) {
                for (int x = 3; x < argc; x++) {
                        if (nn_connect(sock, argv[x]) < 0) {
                                fatal("nn_connect");
                        }
                }
        }
        sleep(1); // wait for connections
        int to = 100;
        if (nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to,
            sizeof (to)) < 0) {
                fatal("nn_setsockopt");
        }

        // SEND
        int sz_n = strlen(argv[1]) + 1; // '\0' too
        printf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);
        if (nn_send(sock, argv[1], sz_n, 0) < 0) {
                fatal("nn_send");
        }

        // RECV
        for (;;) {
                char *buf = NULL;
                int recv = nn_recv(sock, &buf, NN_MSG, 0);
                if (recv >= 0) {
                        printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf); 
                        nn_freemsg(buf);
                }
        }
        return (nn_shutdown(sock, 0));
}

int
main(int argc, const char **argv)
{
        if (argc >= 3) {
                return (node(argc, argv));
        }
        fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");
        return 1;
}
           

Compilation

gcc bus.c -lnanomsg -o bus
           

Execution

./bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$!
./bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$!
./bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$!
./bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$!
sleep 5
kill $node0 $node1 $node2 $node3
           

Output

node0: SENDING 'node0' ONTO BUS
node1: SENDING 'node1' ONTO BUS
node2: SENDING 'node2' ONTO BUS
node3: SENDING 'node3' ONTO BUS
node0: RECEIVED 'node1' FROM BUS
node0: RECEIVED 'node2' FROM BUS
node0: RECEIVED 'node3' FROM BUS
node1: RECEIVED 'node0' FROM BUS
node1: RECEIVED 'node2' FROM BUS
node1: RECEIVED 'node3' FROM BUS
node2: RECEIVED 'node0' FROM BUS
node2: RECEIVED 'node1' FROM BUS
node2: RECEIVED 'node3' FROM BUS
node3: RECEIVED 'node0' FROM BUS
node3: RECEIVED 'node1' FROM BUS
node3: RECEIVED 'node2' FROM BUS
           

項目中使用

釋出服務

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

void usage(const char *name)
{
    fprintf(stderr, "%s [ bind url]\n", name);
}

int main(int argc, char **argv)
{
    if(argc != 2) {
        usage(argv[0]);
        exit(-1);
    }

    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_PUB);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %s\n", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind failed: %s\n", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        time_t rawtime;
        struct tm * timeinfo;

        time (&rawtime);
        timeinfo = localtime (&rawtime);
        char *text = asctime (timeinfo);
        int textLen = strlen(text);
        text[textLen - 1] = ' ';

        printf ("SERVER: PUBLISHING DATE %s\n", text);
        // nn_send(sock, text, textLen, 0);
        char temp[128] = "DEV|Alarm|IP:192.168.128.12|Port:3721|Chan:1|$DefinitionAbnormal:On|LT:1637908405|$1637908405#1$1";
        nn_send(sock, temp, strlen(temp), 0);
        sleep(1);
    }

    return 0;
}

//run:  ./pubserver ipc:///tmp/test.ipc
           

訂閱服務

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

int main(int argc, char **argv)
{
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket (AF_SP, NN_SUB);
    if(sock < 0) {
        fprintf(stderr, "fail to create socket: %s\n", nn_strerror(errno));
        exit(-1);
    }
    if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "DEV|Alarm", strlen("DEV|Alarm")) < 0) {
        fprintf(stderr, "fail to set sorket opts: %s\n", nn_strerror(errno));
        exit(-1);
    }

    if (nn_connect(sock, url) < 0) {
        fprintf(stderr, "fail to connect to %s : %s\n", url, nn_strerror(errno));
        exit(-1);
    }

    printf("connect url:%s success\n",url);
    while ( 1 ) {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        printf ("CLIENT (%s): RECEIVED %s\n", name, buf);
        nn_freemsg (buf);
    }

    nn_shutdown(sock, 0);

    return 0;
}

//run: ./pubclient deroy ipc:///tmp/test.ipc
           

makefile

all:pubserver pubclient
pubserver:pubserver.c
	gcc -o pubserver pubserver.c -lnanomsg
pubclient:pubclient.c
	gcc -o pubclient pubclient.c -lnanomsg
.PHONY:clean
clean:
	rm -f pubserver pubclient
           

繼續閱讀