python examples https://github.com/imatix/zguide/tree/master/examples/Python
hwserver.py
Python代碼
- #
- # Hello World server in Python
- # Binds REP socket to tcp://*:5555
- # Expects "Hello" from client, replies with "World"
- import zmq
- import time
- context = zmq.Context()
- socket = context.socket(zmq.REP)
- socket.bind("tcp://*:5555")
- while True:
- # Wait for next request from client
- message = socket.recv()
- print "Received request: ", message
- # Do some 'work'
- time.sleep (1) # Do some 'work'
- # Send reply back to client
- socket.send("World")
#
# Hello World server in Python
# Binds REP socket to tcp://*:5555
# Expects "Hello" from client, replies with "World"
#
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print "Received request: ", message
# Do some 'work'
time.sleep (1) # Do some 'work'
# Send reply back to client
socket.send("World")
hwclient.py
- # Hello World client in Python
- # Connects REQ socket to tcp://localhost:5555
- # Sends "Hello" to server, expects "World" back
- # Socket to talk to server
- print "Connecting to hello world server..."
- socket = context.socket(zmq.REQ)
- socket.connect ("tcp://localhost:5555")
- # Do 10 requests, waiting each time for a response
- for request in range (1,10):
- print "Sending request ", request,"..."
- socket.send ("Hello")
- # Get the reply.
- print "Received reply ", request, "[", message, "]"
#
# Hello World client in Python
# Connects REQ socket to tcp://localhost:5555
# Sends "Hello" to server, expects "World" back
#
import zmq
context = zmq.Context()
# Socket to talk to server
print "Connecting to hello world server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:5555")
# Do 10 requests, waiting each time for a response
for request in range (1,10):
print "Sending request ", request,"..."
socket.send ("Hello")
# Get the reply.
message = socket.recv()
print "Received reply ", request, "[", message, "]"
問題3:zeroMQ實作一個消息層?
答:
實作一個ZeroMQ消息層需要三個步驟:
1.選擇傳輸協定
0MQ提供了4種不同的傳輸協定
INPROC an In-Process communication model
IPC an Inter-Process communication model
MULTICAST multicast via PGM, possibly encapsulated in UDP
TCP a network based transport
2.建立基礎
由于在網絡中兩個端點是相對動态的,很難有一個穩定的單一連接配接點。
如果是這種情況,可以使用由0MQ提供的轉發裝置。
轉發裝置可以綁定2個不同端口,并且轉發消息從一個端點到另一個端點。
這樣做的話,在網絡中轉發裝置能夠變成一個穩定的點,其它元件都可以去連接配接。
0MQ提供了3種類型的裝置
QUEUE, a forwarder for the request/response messaging pattern
FORWARDER, a forwarder for the publish/subscribe messaging pattern
STREAMER, a forwarder for the pipelined messaging pattern
3.選擇通訊模式
0MQ支援4種模式
REQUEST/REPLY, bidirectional, load balanced and state based
PUBLISH/SUBSCRIBE, publish to multiple recipients at once
UPSTREAM / DOWNSTREAM, distribute data to nodes arranged in a pipeline
PAIR, communication exclusively between peers
Req/Rep
均衡負載請求:
server 1
- socket.bind("tcp://127.0.0.1:5000")
- msg = socket.recv()
- print "Got", msg
- socket.send(msg)
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://127.0.0.1:5000")
while True:
msg = socket.recv()
print "Got", msg
socket.send(msg)
server 2
- socket.bind("tcp://127.0.0.1:6000")
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://127.0.0.1:6000")
while True:
msg = socket.recv()
print "Got", msg
socket.send(msg)
client
- socket.connect("tcp://127.0.0.1:5000")
- socket.connect("tcp://127.0.0.1:6000")
- for i in range(10):
- msg = "msg %s" % i
- print "Sending", msg
- msg_in = socket.recv()
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://127.0.0.1:5000")
socket.connect("tcp://127.0.0.1:6000")
for i in range(10):
msg = "msg %s" % i
socket.send(msg)
print "Sending", msg
msg_in = socket.recv()
會發現client的請求會被均衡的配置設定給兩個server
Example client output:
Sending msg 0
Sending msg 1
Sending msg 2
Sending msg 3
Sending msg 4
Sending msg 5
Sending msg 6
Sending msg 7
Sending msg 8
Sending msg 9
Example output server 1 at port 5000:
Got msg 0
Got msg 2
Got msg 4
Got msg 6
Got msg 8
Example output server 2 at port 6000:
Got msg 1
Got msg 3
Got msg 5
Got msg 7
Got msg 9
現在,如果我們要加入一個額外的server去管理我們的請求,我們将不得不修改我們的代碼。
這是非常麻煩的,我們需要讓每個client都知道有一個額外的server可以均衡請求。
為了解決這個問題,替代client直接去連接配接多個server的方式,client去連接配接轉發裝置,再由轉發裝置路由全部的消息給server。
Pub/Sub
在pub/sub模式下元件是松耦合的。類似于廣播電台。
一個廣播server為現場足球賽
- from random import choice
- socket = context.socket(zmq.PUB)
- countries = ['netherlands','brazil','germany','portugal']
- events = ['yellow card', 'red card', 'goal', 'corner', 'foul']
- msg = choice( countries ) +" "+ choice( events )
- print "->",msg
- socket.send( msg )<span style="white-space: normal;"> </span>
import zmq
from random import choice
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5000")
countries = ['netherlands','brazil','germany','portugal']
events = ['yellow card', 'red card', 'goal', 'corner', 'foul']
while True:
msg = choice( countries ) +" "+ choice( events )
print "->",msg
socket.send( msg )
輸出
-> portugal corner
-> portugal yellow card
-> portugal goal
-> netherlands yellow card
-> germany yellow card
-> brazil yellow card
-> germany corner
…
一個用戶端去收聽特定的消息
- socket = context.socket(zmq.SUB)
- socket.setsockopt(zmq.SUBSCRIBE, "netherlands")
- socket.setsockopt(zmq.SUBSCRIBE, "germany")
- print socket.recv()
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5000")
socket.setsockopt(zmq.SUBSCRIBE, "netherlands")
socket.setsockopt(zmq.SUBSCRIBE, "germany")
while True:
print socket.recv()
netherlands red card
netherlands goal
germany foul
netherlands yellow card
netherlands corner
Pipelining
并發處理資料,其工作模式
一個工作者得到來自上遊socket的消息,一旦處理完成後發送消息到下遊。
Paired socket
伺服器監聽某個端口,用戶端連接配接到這個端口,消息可以雙向流動。
server
- socket = context.socket(zmq.PAIR)
- socket.bind("tcp://127.0.0.1:5555")
import zmq
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://127.0.0.1:5555")
- socket.connect("tcp://127.0.0.1:5555")
import zmq
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://127.0.0.1:5555")