關于ZeroMQ(又稱ØMQ)有很多傳神的解說,我這裡給各位看官上一段比較凡爾賽的描述。
Underneath the brown paper wrapping of ZeroMQ’s socket API lies the world of messaging patterns.
"在ZeroMQ發黃的封皮下面,流淌的是全世界的消息通信模式",一翻譯就沒有文藝範兒了,總之口氣很大(敢自稱Zero就已經展現了作者的雄心),聲稱ZeroMQ涵蓋了所有的通信模式,包括程序内/程序間通訊、TCP、廣播等等模式。
閑言少叙,書歸正傳。網上講go語言ZeroMQ程式設計的例子比較少,今天列一些常見的通信模式,給初學者一個指引,希望大家少走彎路。
REQ-REP
即request-response模式,典型的client-server架構,client發送request給server,server傳回response給client,一個server可以同時跟多個client建立連接配接。

在ZeroMQ中凡是涉及到“1對多”的模式,都是“1”的那一方調用Bind()函數在某個端口上開始監聽,“多”的那一方調用Connect()函數請求跟“1”方建立連接配接。在普通的socket程式設計中,必須先啟動Server,再啟動Client,否則Client的Connect()函數會失敗進而退出程式。然而ZeroMQ的Connect()函數實際上是異步的,如果如果Server端還沒有啟起來它不會以失敗告終,而是立即傳回,你可以緊接着調用Send()函數。Send()又支援阻塞模式和非阻塞模式,阻塞模式會一直等待對方就緒(比如至少要等連接配接建立好吧)再發送資料,非阻塞模式會先把消息存到本的緩沖隊列裡然後立即傳回。Server端由于維護了多條連接配接,它會為每個連接配接都單獨建立一個緩沖隊列。
在req-rep模式中,每台機器send(發送資料)和revc(接收資料)必須交替調用,否則會報錯。
import (
"fmt"
"strconv"
"time"
zmq "github.com/pebbe/zmq4"
)
func startServer(port int) {
//REP 表示server端
socket, _ := zmq.NewSocket(zmq.REP)
//Bind 綁定端口,并指定傳輸層協定
socket.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))
fmt.Printf("bind to port %d\n", port)
defer socket.Close()
for {
//Recv和Send必須交替進行
resp, _ := socket.Recv(0) //0表示阻塞模式
socket.Send("Hello "+resp, 0) //同步發送
}
}
func startClient(port int, msg string) {
//REQ 表示client端
socket, _ := zmq.NewSocket(zmq.REQ)
//Connect 請求建立連接配接,并指定傳輸層協定
socket.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
fmt.Println("connect to server")
defer socket.Close()
for i := 0; i < 10; i++ {
//Send和Recv必須交替進行
socket.Send(msg, zmq.DONTWAIT) //非阻塞模式,異步發送(隻是将資料寫入本地buffer,并沒有真正發送到網絡上)
resp, _ := socket.Recv(0)
fmt.Printf("receive [%s]\n", resp)
time.Sleep(5 * time.Second)
}
}
DEALER-ROUTER
DEALER-ROUTER跟REQ-REP模式比較接近,都是client-server構架,但是DEALER-ROUTER有它的2個特别之處:
- send()和revc()不需要交替調用
- ROUTER端的消息需要分成多幀發送,至少2幀,因為第一幀需要發送對端的位址。消息正文如果需要分成多幀發送,那麼除後一幀外其他幀在send時都需要設定SNDMORE辨別。對于DEALER發過來的一條消息,ROUTER需要調用2次Recv(),第一次Recv()讀出來的是對端的位址
import (
"fmt"
"strconv"
"time"
zmq "github.com/pebbe/zmq4"
)
func router(port int) {
//ROUTER 表示server端
socket, _ := zmq.NewSocket(zmq.ROUTER)
//Bind 綁定端口,并指定傳輸層協定
socket.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))
fmt.Printf("bind to port %d\n", port)
defer socket.Close()
for {
//Send和Recv沒必要交替進行
addr, _ := socket.RecvBytes(0) //接收到的第一幀表示對方的位址UUID
resp, _ := socket.Recv(0)
socket.SendBytes(addr, zmq.SNDMORE) //第一幀需要指明對方的位址,SNDMORE表示消息還沒發完
socket.Send("Hello", zmq.SNDMORE) //如果不用SNDMORE表示這已經是最後一幀了,下一次Send就是下一段消息的第一幀了,需要指明對方的位址
socket.Send(resp, 0)
}
}
func dealer(port int, msg string) {
//DEALER 表示client端
socket, _ := zmq.NewSocket(zmq.DEALER)
//Connect 請求建立連接配接,并指定傳輸層協定
socket.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
fmt.Println("connect to server")
defer socket.Close()
for i := 0; i < 10; i++ {
//Send和Recv沒必要交替進行
socket.Send(msg, 0) //非阻塞模式,異步發送(隻是将資料寫入本地buffer,并沒有真正發送到網絡上)
resp1, _ := socket.Recv(0)
resp2, _ := socket.Recv(0)
fmt.Printf("receive [%s %s]\n", resp1, resp2)
time.Sleep(5 * time.Second)
}
}
SUB-PUB
PUB代表釋出者publisher,SUB代表訂閱者subscriber,釋出者發送一條消息,所有的訂閱者都會收到,是典型的廣播模式。
訂閱者隻接收特定主題的消息,釋出者在每條消息前面加一個特定的字首表示主題。訂閱者通過調用SetSubscribe(prefix string)函數過濾出特定字首的消息,如果給SetSubscribe()傳的是空字元串則訂閱者不過濾任何消息,全部接收。
顯然釋出者隻能調用Send,訂閱者隻能調用Recv。
import (
"fmt"
"strconv"
"time"
zmq "github.com/pebbe/zmq4"
)
func publish(port int, prefix string) {
ctx, _ := zmq.NewContext()
defer ctx.Term()
//PUB 表示publisher角色
publisher, _ := ctx.NewSocket(zmq.PUB)
defer publisher.Close()
//Bind 綁定端口,并指定傳輸層協定
publisher.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))
//publisher會把消息發送給所有subscriber,subscriber可以動态加入
for i := 0; i < 5; i++ {
//publisher隻能調用send方法
publisher.Send(prefix+"Hello my followers", 0)
publisher.Send(prefix+"How are you", 0)
fmt.Printf("loop %d send over\n", i+1)
time.Sleep(10 * time.Second)
}
publisher.Send(prefix+"END", 0)
}
func subscribe(port int, prefix string) {
//SUB 表示subscriber角色
subscriber, _ := zmq.NewSocket(zmq.SUB)
defer subscriber.Close()
//Bind 綁定端口,并指定傳輸層協定
subscriber.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
subscriber.SetSubscribe(prefix) //隻接收字首為prefix的消息
fmt.Printf("listen to port %d\n", port)
for {
//接收廣播
if resp, err := subscriber.Recv(0); err == nil {
resp = resp[len(prefix):] //去掉字首
fmt.Printf("receive [%s]\n", resp)
if resp == "END" {
break
}
} else {
fmt.Println(err)
break
}
}
}
PUSH-PULL
跟PUB-SUB模式一樣,PUSH-PULL也隻能實作消息的單向傳輸,但是pusher會采用輪詢法選擇一個worker把消息發送給它,其他worker都收不到這條消息,是以PUSH-PULL模式常用來做任務的分發。
import (
"fmt"
"strconv"
"time"
zmq "github.com/pebbe/zmq4"
)
func push(port int) {
ctx, _ := zmq.NewContext()
defer ctx.Term()
//PUSH 表示pusher角色
pusher, _ := ctx.NewSocket(zmq.PUSH)
defer pusher.Close()
//Bind 綁定端口,并指定傳輸層協定
pusher.SetSndhwm(110)
pusher.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))
//pusher把消息送給一個puller(采用公平輪轉的方式選擇一個puller),puller可以動态加入
for i := 0; i < 5; i++ {
pusher.Send("Hello my followers", 0)
pusher.Send("How are you", 0)
fmt.Printf("loop %d send over\n", i+1)
time.Sleep(5 * time.Second)
}
pusher.Send("END", 0)
}
func pull(port int) {
//PULL 表示puller角色
puller, _ := zmq.NewSocket(zmq.PULL)
defer puller.Close()
//Bind 綁定端口,并指定傳輸層協定
puller.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
fmt.Printf("listen to port %d\n", port)
for {
//接收廣播
if resp, err := puller.Recv(0); err == nil {
fmt.Printf("receive [%s]\n", resp)
if resp == "END" {
break
}
} else {
fmt.Println(err)
break
}
}
}
聊天室後端架構
最後來一個綜合練習,實作一個聊天室的後端。采用hub-client架構,client代表聊天室中發言的使用者,client通過DEALER-ROUTER模式把發言内容發給hub(hub不需要專門響應該client,是以不能采用REQ-REP模式,REQ-REP要求send和recv必須交替進行),hub通過PUB-SUB模式把消息廣播給所有client。
import (
"bufio"
"encoding/base64"
"fmt"
"os"
"strconv"
"strings"
zmq "github.com/pebbe/zmq4"
)
func hub(subPort, pubPort int) {
//接收所有client的消息
socket, _ := zmq.NewSocket(zmq.ROUTER)
socket.Bind("tcp://127.0.0.1:" + strconv.Itoa(subPort))
fmt.Printf("bind to port %d\n", subPort)
defer socket.Close()
//把消息廣播給所有client
ctx, _ := zmq.NewContext()
defer ctx.Term()
publisher, _ := ctx.NewSocket(zmq.PUB)
defer publisher.Close()
publisher.Bind("tcp://127.0.0.1:" + strconv.Itoa(pubPort))
for {
//把接收到的client的消息再廣播給所有client
if addr, err := socket.RecvBytes(0); err == nil { //第一幀讀出對端的位址
client := base64.StdEncoding.EncodeToString(addr) //用對端位址來辨別消息是誰發出來的
if resp, err := socket.Recv(0); err == nil {
if _, err := publisher.Send(client+"say: "+resp, 0); err != nil { //在消息前加上發送者的辨別
fmt.Println(err)
break
}
} else {
fmt.Println(err)
break
}
} else {
fmt.Println(err)
break
}
}
}
func client(pubPort, subPort int) {
//把消息廣播給hub
socket, _ := zmq.NewSocket(zmq.DEALER)
socket.Connect("tcp://127.0.0.1:" + strconv.Itoa(pubPort))
fmt.Println("connect to server")
defer socket.Close()
//訂閱hub的消息
subscriber, _ := zmq.NewSocket(zmq.SUB)
defer subscriber.Close()
subscriber.Connect("tcp://127.0.0.1:" + strconv.Itoa(subPort))
subscriber.SetSubscribe("")
go func() {
for {
//把接收到的client的消息再廣播給所有client
if resp, err := subscriber.Recv(0); err == nil {
fmt.Println(resp)
} else {
fmt.Println(err)
break
}
}
}()
fmt.Println("please type message")
reader := bufio.NewReader(os.Stdin)
for {
text, _ := reader.ReadString('\n')
text = strings.Replace(text, "\n", "", -1)
socket.Send(text, 0)
}
}