天天看點

OpenStack oslo.messaging使用

基礎知識:

    參考部落格:http://blog.csdn.net/happyAnger6/article/details/54777429

  •   Transport

  Transport(傳輸層)主要實作RPC底層的通信(比如socket)以及事件循環,多線程等其他功能.可以通過URL來獲得不同transport的句柄.URL的格式為:

transport://user:[email protected]:port[,hostN:portN]/virtual_host

  目前支援的Transport有rabbit,qpid與zmq,分别對應不同的後端消息總線.使用者可以使用oslo.messaging.get_transport函數來獲得transport對象執行個體的句柄.

  • Target

Target封裝了指定某一個消息最終目的地的所有資訊

  • Endpoint

    Endpoint包含一組方法,這組方法是可以被用戶端遠端調用的.

  • Server

    一個RPC伺服器可以暴露多個endpoint,每個endpoint包含一組方法,這組方法是可以被用戶端通過某種Transport對象遠端調用的.建立Server對象時,需要指定Transport,Target和一組endpoint.

  • RPC Client

    通過RPC Client,可以遠端調用RPC Sever上的方法.遠端調用時,需要提供一個字典對象來指明調用的上下文,調用方法的名字和傳遞給調用方法的參數(用字典表示).     有cast和call兩種遠端調用方式.通過cast方式遠端調用,請求發送後就直接傳回了;通過call方式調用,需要等響應從伺服器傳回.

RPC-server和PRC-client舉例

rpc_server.py

from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher

class ServerControlEndpoint(object):
    target = messaging.Target(namespace='control',
                              version='2.0')

    def __init__(self, server):
        self.server = server

    def stop(self, ctx):
        print "------ServerControlEndpoint. stop --------"
        if self.server:
            self.server.stop()
        return "stop return"

class TestEndpoint(object):
    def test(self, ctx, arg):
        print "------ TestEndpoint.test --------"
        print "arg:", arg
        return arg


transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test',
                          server='server1')
endpoints = [
    ServerControlEndpoint(None),
    TestEndpoint(),
]

access_policy = dispatcher.DefaultRPCAccessPolicy

server = messaging.get_rpc_server(transport, target, endpoints,
                                  executor='threading',
                                  access_policy=access_policy)
server.start()
server.wait()           
           

rpc_client.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1')
client = messaging.RPCClient(transport, target)
ret = client.call(ctxt={},
                  method='test',
                  arg='myarg1')
client.call({}, 'test', arg='myarg2')
#client.call({}, 'stop')
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
           

運作rpc_server.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

運作rpc_client.py:

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

去掉rpc_cleint.py裡面的注釋

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1')
client = messaging.RPCClient(transport, target)
ret = client.call(ctxt={},
                  method='test',
                  arg='myarg1')
client.call({}, 'test', arg='myarg2')
client.call({}, 'stop')
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
           

運作rpc_client.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

修改rpc_client.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1', namespace='control', version='2.0')
client = messaging.RPCClient(transport, target)

client.call({}, 'stop')

cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')

client.call({}, 'test', arg='myarg2')
           

運作rpc_client.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

修改rpc_client.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', namespace='control', version='2.0')
client = messaging.RPCClient(transport, target)

client.call({}, 'stop')

cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
cctx.call({}, 'stop')
           

 運作rpc_client.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

小結:

    (1)rpc_client想要正确調用rpc_server裡面endpoint的方法,需要正确設定RPCClient的target的topic、namespace、version等資訊。

    (2)RPCClient的target裡面的server參數:(字元串類型)用戶端可以指定此參數來要求消息的目的地是某個特定的伺服器,而不是一組同屬某個topic的伺服器中的任意一台.

    (3)Target對象的屬性在RPCClient對象構造以後,還可以通過prepare()方法修改.可以修改的屬性包括exchange,topic,namespace,version,server,fanout,timeout,version_cap和retry.

  • Notification Listener

    Notification Listener和Server類似,一個Notification Listener對象可以暴露多個 endpoint,每個endpoint包含一組方法.但是與Server對象中的endpoint不同的是,這裡的endpoint中的方法對應通知消息的不同優先級

  • Notifier

    Notifier用來通過某種transport發送通知消息

Notification_listener和Notifier_send舉例

notification_listener.py

from oslo_config import cfg
import oslo_messaging as messaging

def do_something(payload):
    print "recieve:", payload

class NotificationEndPoint(object):
    def warn(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)

    def error(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something("In NotificationEndPoint")
        do_something(payload)

class ErrorEndpoint(object):
    def error(self, ctxt, publisher_id,event_type, payload, metadata):
        do_something("In ErrorEndpoint")
        do_something(payload)


transport = messaging.get_transport(cfg.CONF)
targets = [
    messaging.Target(topic='notifications'),
    messaging.Target(topic='notifications_bis')
]
endpoints = [
    NotificationEndPoint(),
    ErrorEndpoint(),
]
listener = messaging.get_notification_listener(transport,
                                               targets,
                                               endpoints)

listener.start()
listener.wait()
           

notifier_send.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
                              driver='messaging',
                              topics=['notifications', 'notifications_bis'])

notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
                event_type='my_type',
                payload={'content': 'error occurred'})
           

運作notification_listener.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

運作notifier_send.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

修改notifier_send.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
                              driver='messaging',
                              topics=['notifications'])

notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
                event_type='my_type',
                payload={'content': 'error occurred'})
           

運作notifier_send.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

修改notifier_send.py

from oslo_config import cfg
import oslo_messaging as messaging

transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
                              driver='messaging',
                              topics=['notifications'])

notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
                event_type='my_type',
                payload={'content': 'error occurred'})
notifier.error(ctxt={},
               event_type='my_type',
               payload={'content2': 'error occurred'})

notifier2.warn(ctxt={},
               event_type='my_type',
               payload={'content': 'warn occurred'})

notifier2.info(ctxt={},
               event_type='my_type',
               payload={'content': 'info occurred'})
           

運作notifier_send.py

OpenStack oslo.messaging使用
OpenStack oslo.messaging使用
OpenStack oslo.messaging使用

  小結:

    (1)發送通知方構造的messaging.Notifier,裡面的topics參數指定了一個清單,表示要發生到遠端多個的topic。

    (2)通知消息監聽方的endpoints,如果都實作了某一級别的方法(比如:error),則都是被調用。

繼續閱讀