天天看點

python zeromq

python examples https://github.com/imatix/zguide/tree/master/examples/Python

hwserver.py

Python代碼 ​​

python zeromq
​​
python zeromq
python zeromq

  1. #
  2. # Hello World server in Python
  3. # Binds REP socket to tcp://*:5555
  4. # Expects "Hello" from client, replies with "World"
  5. import zmq
  6. import time
  7. context = zmq.Context()
  8. socket = context.socket(zmq.REP)
  9. socket.bind("tcp://*:5555")
  10. while True:
  11. # Wait for next request from client
  12. message = socket.recv()
  13. print "Received request: ", message
  14. # Do some 'work'
  15. time.sleep (1) # Do some 'work'
  16. # Send reply back to client
  17. socket.send("World")
#
#   Hello World server in Python
#   Binds REP socket to tcp://*:5555
#   Expects "Hello" from client, replies with "World"
#
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print "Received request: ", message

    #  Do some 'work'
    time.sleep (1)        #   Do some 'work'

    #  Send reply back to client
    socket.send("World")      

hwclient.py

python zeromq
python zeromq
python zeromq
  1. # Hello World client in Python
  2. # Connects REQ socket to tcp://localhost:5555
  3. # Sends "Hello" to server, expects "World" back
  4. # Socket to talk to server
  5. print "Connecting to hello world server..."
  6. socket = context.socket(zmq.REQ)
  7. socket.connect ("tcp://localhost:5555")
  8. # Do 10 requests, waiting each time for a response
  9. for request in range (1,10):
  10. print "Sending request ", request,"..."
  11. socket.send ("Hello")
  12. # Get the reply.
  13. print "Received reply ", request, "[", message, "]"
#
#   Hello World client in Python
#   Connects REQ socket to tcp://localhost:5555
#   Sends "Hello" to server, expects "World" back
#
import zmq

context = zmq.Context()

#  Socket to talk to server
print "Connecting to hello world server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:5555")

#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print "Sending request ", request,"..."
    socket.send ("Hello")

    #  Get the reply.
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"      

問題3:zeroMQ實作一個消息層?

答:

實作一個ZeroMQ消息層需要三個步驟:

1.選擇傳輸協定

0MQ提供了4種不同的傳輸協定

​​INPROC​​ an In-Process communication model

​​IPC​​ an Inter-Process communication model

​​MULTICAST​​ multicast via PGM, possibly encapsulated in UDP

​​TCP​​ a network based transport

2.建立基礎

由于在網絡中兩個端點是相對動态的,很難有一個穩定的單一連接配接點。

如果是這種情況,可以使用由0MQ提供的轉發裝置。

轉發裝置可以綁定2個不同端口,并且轉發消息從一個端點到另一個端點。

這樣做的話,在網絡中轉發裝置能夠變成一個穩定的點,其它元件都可以去連接配接。

0MQ提供了3種類型的裝置

​​QUEUE​​, a forwarder for the request/response messaging pattern

​​FORWARDER​​, a forwarder for the publish/subscribe messaging pattern

​​STREAMER​​, a forwarder for the pipelined messaging pattern

3.選擇通訊模式

0MQ支援4種模式

​​REQUEST/REPLY​​, bidirectional, load balanced and state based

​​PUBLISH/SUBSCRIBE​​, publish to multiple recipients at once

​​UPSTREAM / DOWNSTREAM​​, distribute data to nodes arranged in a pipeline

​​PAIR​​, communication exclusively between peers

Req/Rep

均衡負載請求:

server 1

python zeromq
python zeromq
python zeromq
  1. socket.bind("tcp://127.0.0.1:5000")
  2. msg = socket.recv()
  3. print "Got", msg
  4. socket.send(msg)
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://127.0.0.1:5000")

while True:
    msg = socket.recv()
    print "Got", msg
    socket.send(msg)      

server 2

python zeromq
python zeromq
python zeromq
  1. socket.bind("tcp://127.0.0.1:6000")
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://127.0.0.1:6000")

while True:
    msg = socket.recv()
    print "Got", msg
    socket.send(msg)      

client

python zeromq
python zeromq
python zeromq
  1. socket.connect("tcp://127.0.0.1:5000")
  2. socket.connect("tcp://127.0.0.1:6000")
  3. for i in range(10):
  4. msg = "msg %s" % i
  5. print "Sending", msg
  6. msg_in = socket.recv()
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://127.0.0.1:5000")
socket.connect("tcp://127.0.0.1:6000")

for i in range(10):
    msg = "msg %s" % i
    socket.send(msg)
    print "Sending", msg
    msg_in = socket.recv()      

會發現client的請求會被均衡的配置設定給兩個server

Example client output:

Sending msg 0

Sending msg 1

Sending msg 2

Sending msg 3

Sending msg 4

Sending msg 5

Sending msg 6

Sending msg 7

Sending msg 8

Sending msg 9

Example output server 1 at port 5000:

Got msg 0

Got msg 2

Got msg 4

Got msg 6

Got msg 8

Example output server 2 at port 6000:

Got msg 1

Got msg 3

Got msg 5

Got msg 7

Got msg 9

現在,如果我們要加入一個額外的server去管理我們的請求,我們将不得不修改我們的代碼。

這是非常麻煩的,我們需要讓每個client都知道有一個額外的server可以均衡請求。

為了解決這個問題,替代client直接去連接配接多個server的方式,client去連接配接轉發裝置,再由轉發裝置路由全部的消息給server。

Pub/Sub

在pub/sub模式下元件是松耦合的。類似于廣播電台。

一個廣播server為現場足球賽

python zeromq
python zeromq
python zeromq
  1. from random import choice
  2. socket = context.socket(zmq.PUB)
  3. countries = ['netherlands','brazil','germany','portugal']
  4. events = ['yellow card', 'red card', 'goal', 'corner', 'foul']
  5. msg = choice( countries ) +" "+ choice( events )
  6. print "->",msg
  7. socket.send( msg )<span style="white-space: normal;"> </span>
import zmq
from random import choice
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5000")

countries = ['netherlands','brazil','germany','portugal']
events = ['yellow card', 'red card', 'goal', 'corner', 'foul']

while True:
    msg = choice( countries ) +" "+ choice( events )
    print "->",msg
    socket.send( msg )      

輸出

-> portugal corner

-> portugal yellow card

-> portugal goal

-> netherlands yellow card

-> germany yellow card

-> brazil yellow card

-> germany corner

一個用戶端去收聽特定的消息

python zeromq
python zeromq
python zeromq
  1. socket = context.socket(zmq.SUB)
  2. socket.setsockopt(zmq.SUBSCRIBE, "netherlands")
  3. socket.setsockopt(zmq.SUBSCRIBE, "germany")
  4. print socket.recv()
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5000")
socket.setsockopt(zmq.SUBSCRIBE, "netherlands")
socket.setsockopt(zmq.SUBSCRIBE, "germany")

while True:
    print  socket.recv()      

netherlands red card

netherlands goal

germany foul

netherlands yellow card

netherlands corner

Pipelining

并發處理資料,其工作模式

一個工作者得到來自上遊socket的消息,一旦處理完成後發送消息到下遊。

Paired socket

伺服器監聽某個端口,用戶端連接配接到這個端口,消息可以雙向流動。

server

python zeromq
python zeromq
python zeromq
  1. socket = context.socket(zmq.PAIR)
  2. socket.bind("tcp://127.0.0.1:5555")
import zmq
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://127.0.0.1:5555")      
python zeromq
python zeromq
python zeromq
  1. socket.connect("tcp://127.0.0.1:5555")
import zmq
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://127.0.0.1:5555")