基礎知識:
參考部落格:http://blog.csdn.net/happyAnger6/article/details/54777429
- Transport
Transport(傳輸層)主要實作RPC底層的通信(比如socket)以及事件循環,多線程等其他功能.可以通過URL來獲得不同transport的句柄.URL的格式為:
transport://user:[email protected]:port[,hostN:portN]/virtual_host
目前支援的Transport有rabbit,qpid與zmq,分别對應不同的後端消息總線.使用者可以使用oslo.messaging.get_transport函數來獲得transport對象執行個體的句柄.
- Target
Target封裝了指定某一個消息最終目的地的所有資訊
- Endpoint
Endpoint包含一組方法,這組方法是可以被用戶端遠端調用的.
- Server
一個RPC伺服器可以暴露多個endpoint,每個endpoint包含一組方法,這組方法是可以被用戶端通過某種Transport對象遠端調用的.建立Server對象時,需要指定Transport,Target和一組endpoint.
- RPC Client
通過RPC Client,可以遠端調用RPC Sever上的方法.遠端調用時,需要提供一個字典對象來指明調用的上下文,調用方法的名字和傳遞給調用方法的參數(用字典表示). 有cast和call兩種遠端調用方式.通過cast方式遠端調用,請求發送後就直接傳回了;通過call方式調用,需要等響應從伺服器傳回.
RPC-server和PRC-client舉例
rpc_server.py
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
class ServerControlEndpoint(object):
target = messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
print "------ServerControlEndpoint. stop --------"
if self.server:
self.server.stop()
return "stop return"
class TestEndpoint(object):
def test(self, ctx, arg):
print "------ TestEndpoint.test --------"
print "arg:", arg
return arg
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test',
server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
access_policy = dispatcher.DefaultRPCAccessPolicy
server = messaging.get_rpc_server(transport, target, endpoints,
executor='threading',
access_policy=access_policy)
server.start()
server.wait()
rpc_client.py
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1')
client = messaging.RPCClient(transport, target)
ret = client.call(ctxt={},
method='test',
arg='myarg1')
client.call({}, 'test', arg='myarg2')
#client.call({}, 'stop')
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
運作rpc_server.py
運作rpc_client.py:
去掉rpc_cleint.py裡面的注釋
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1')
client = messaging.RPCClient(transport, target)
ret = client.call(ctxt={},
method='test',
arg='myarg1')
client.call({}, 'test', arg='myarg2')
client.call({}, 'stop')
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
運作rpc_client.py
修改rpc_client.py
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1', namespace='control', version='2.0')
client = messaging.RPCClient(transport, target)
client.call({}, 'stop')
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
client.call({}, 'test', arg='myarg2')
運作rpc_client.py
修改rpc_client.py
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', namespace='control', version='2.0')
client = messaging.RPCClient(transport, target)
client.call({}, 'stop')
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
運作rpc_client.py
小結:
(1)rpc_client想要正确調用rpc_server裡面endpoint的方法,需要正确設定RPCClient的target的topic、namespace、version等資訊。
(2)RPCClient的target裡面的server參數:(字元串類型)用戶端可以指定此參數來要求消息的目的地是某個特定的伺服器,而不是一組同屬某個topic的伺服器中的任意一台.
(3)Target對象的屬性在RPCClient對象構造以後,還可以通過prepare()方法修改.可以修改的屬性包括exchange,topic,namespace,version,server,fanout,timeout,version_cap和retry.
- Notification Listener
Notification Listener和Server類似,一個Notification Listener對象可以暴露多個 endpoint,每個endpoint包含一組方法.但是與Server對象中的endpoint不同的是,這裡的endpoint中的方法對應通知消息的不同優先級
- Notifier
Notifier用來通過某種transport發送通知消息
Notification_listener和Notifier_send舉例
notification_listener.py
from oslo_config import cfg
import oslo_messaging as messaging
def do_something(payload):
print "recieve:", payload
class NotificationEndPoint(object):
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something("In NotificationEndPoint")
do_something(payload)
class ErrorEndpoint(object):
def error(self, ctxt, publisher_id,event_type, payload, metadata):
do_something("In ErrorEndpoint")
do_something(payload)
transport = messaging.get_transport(cfg.CONF)
targets = [
messaging.Target(topic='notifications'),
messaging.Target(topic='notifications_bis')
]
endpoints = [
NotificationEndPoint(),
ErrorEndpoint(),
]
listener = messaging.get_notification_listener(transport,
targets,
endpoints)
listener.start()
listener.wait()
notifier_send.py
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
driver='messaging',
topics=['notifications', 'notifications_bis'])
notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
event_type='my_type',
payload={'content': 'error occurred'})
運作notification_listener.py
運作notifier_send.py
修改notifier_send.py
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
driver='messaging',
topics=['notifications'])
notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
event_type='my_type',
payload={'content': 'error occurred'})
運作notifier_send.py
修改notifier_send.py
from oslo_config import cfg
import oslo_messaging as messaging
transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
driver='messaging',
topics=['notifications'])
notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
event_type='my_type',
payload={'content': 'error occurred'})
notifier.error(ctxt={},
event_type='my_type',
payload={'content2': 'error occurred'})
notifier2.warn(ctxt={},
event_type='my_type',
payload={'content': 'warn occurred'})
notifier2.info(ctxt={},
event_type='my_type',
payload={'content': 'info occurred'})
運作notifier_send.py
小結:
(1)發送通知方構造的messaging.Notifier,裡面的topics參數指定了一個清單,表示要發生到遠端多個的topic。
(2)通知消息監聽方的endpoints,如果都實作了某一級别的方法(比如:error),則都是被調用。