天天看點

oslo openstack

英文原文:http://docs.OpenStack.org/developer/oslo.messaging/index.html

https://wiki.openstack.org/wiki/Oslo/Messaging

目錄

1.Transport

2.Executors

3.Target

4.RPC Server

5.RPC Client

6.Notifier

7.Notification Driver

8.Notification Listener

9.Serializer

10.Exceptions

11.Configuration Options

12.Testing Configurations

13.Available Drivers

14.Supported Messaging Drivers

15.AMQP 1.0 Protocol Driver Deployment Guide

16.Pika Driver Deployment Guide

17.ZeroMQ Driver Deployment Guide

18.Guide for Transport Driver Implementors

19.Frequently Asked Questions

20.Contributing

寫在前面的話:

1.關于oslo庫:

主要兩種獨立的API: 1, oslo.messaging.rpc(實作了用戶端-伺服器遠端過程調用; 2,oslo.messaging.notify(實作了事件的通知機制)

另外,oslo.messaging.notify中的通用的transport采用的oslo.messaging.rpc中的。

2.olso的目标:

先說在olso之前的問題:

(1)版本相容性問題

(2)之前openstack的消息機制基于AMQP的,這對不懂AMQP的開發者困難。

(3)基于AMQP的不同實作如rabbitMQ,Qpid的接口問題(我的了解是可能不同的實作的一些小的接口不同,在使用時,會造成問題?)

(4)為了支援非AMQP協定,需要一個抽象,将AMQP和非AMQP綜合起來,形成一個統一的接口,屏蔽這些協定問題。

目标就是解決上面的問題,提供一個消息通信的抽象接口,屏蔽底層的具體通信的實作細節,不管是AMQP還是非AMQP,還是AMQP的不同的實作的庫等等這些問題,具體的olso API語句的具體後端稱為 transport drivers。

3.一些基本概念

(1)Server:提供RPC服務

(2)client : 激活Server 方法的

(3)exchagne: 交換器

(4)topic: RPC 接口辨別;

(5)Namespace :伺服器針對某個topic暴露出一系列的方法,這些方法都處于一個namespace中。

(6)method: 有名字的方法,有一系列的命名參數。

(7)transport : 底層的消息系統,可将RPC請求傳遞到server中,并将傳回資料傳遞給client.

一、Transport

1. oslo_messaging.get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None)

參數意義:

conf (cfg.ConfigOpts) –使用者配置

url (str or TransportURL) – a transport URL

allowed_remote_exmods (list) – a list of modules which a client using this transport will deserialize remote exceptions from(會引起exception的子產品清單)

aliases (dict) – DEPRECATED: A map of transport alias to transport name
           

工廠方法,擷取Transport對象。該方法會根據從使用者配置檔案中擷取transport配置資訊+transport URL(可選)構造Transport 對象。

如果提供了transport URL(該URL優先級最高),構造時會采用該URL。如果沒有提供URL,但是使用者配置檔案中提供了URL,則該URL成為該方法中的url實參。如果兩者都沒有,則會從獨立的配置檔案中擷取該URL值。

一個transport URL的例子:

transport URL可為string類型或是一個TransportURL 對象

2.class oslo_messaging.Transport(driver)

messaging transport類。該類為操作底層messaging transport driver的句柄(和driver的關系就像 檔案句柄和檔案的關系,使用檔案句柄來操作檔案,而将檔案的具體實作封裝起來)。該類隻有一個conf成員變量,該變量類型為cfg.ConfigOpts,用來構造具體的transport對象。

3.class oslo_messaging.TransportURL(conf, transport=None, virtual_host=None, hosts=None, aliases=None, query=None)

一個經過解析的transport URL,具體形式如下:

transport://user:[email protected]:port[,userN:[email protected]:portN]/virtual_host?query


Parameters:
conf (oslo.config.cfg.ConfigOpts) – a ConfigOpts instance
transport (str) – a transport name for example ‘rabbit’
virtual_host (str) – a virtual host path for example ‘/’
hosts (list) – a list of TransportHost objects
aliases (dict) – DEPRECATED: a map of transport alias to transport name
query (dict) – a dictionary of URL query parameters
           

TransportURL的類方法parse(conf, url=None, aliases=None)可以解析url然後傳回一個TranportURL對象。

Parameters: 
conf (oslo.config.cfg.ConfigOpts) – a ConfigOpts instance
url (str) – The URL to parse
aliases (dict) – A map of transport alias to transport name
Returns:    
A TransportURL
           

假如URL為:

解析步驟如下:

  1. 首先,使用‘,’分割字元串(為了支援多主機)
  2. 所有的主機都應該指定username/password。如果沒有指定,則相應的值會被忽略
如:user:pass@host1:port1,host2:port2
解析成如下樣子:
[
  {"username": "user", "password": "pass", "host": "host1:port1"},
  {"host": "host2:port2"}
]
           

4.class oslo_messaging.TransportHost(hostname=None, port=None, username=None, password=None)

代表解析過的transport URL中的主機元素

5.oslo_messaging.set_transport_defaults(control_exchange)

Parameters: control_exchange (str) – the default exchange under which topics are scoped
           

設定預設的messaging transport配置

二、Executors

Executors控制Server在收到message後如何排程處理,這種排程可以是同步的也可以是異步的。

一個同步的executors會在server線程中處理message,這意味着sever一次隻能處理一個message。在目前消息處理完後,才會接着處理其他的收到的message。例如,對于一個RPCServer,一次隻能激活一次調用。同步的executors保證了消息的處理順序和它們的到達順序相同。

異步的executor會并發的處理收到的message,server不會被目前正在處理的message阻塞,依然能夠響應收到的消息。但是各個消息的處理順序沒有保證。該executor可配置一次可以處理的消息的最大個數。

下面主要說幾種可用的executor:

1.blocking

executor使用caller同步執行調用。該executor為調用者提供了一個接口(方法的執行會在調用者所在的線程中)

2.eventlet

使用綠色線程池(協程)來異步執行調用。

關于綠色線程池:見https://docs.python.org/dev/library/concurrent.futures.html and http://eventlet.net/doc/modules/greenpool.html

3.threading

使用普通的線程池來異步執行調用。

https://docs.python.org/dev/library/concurrent.futures.html

三、Target

指定消息的目的地。Target封裝了一個消息目的地的相關資訊,或是對于server而言,描述了該server所監聽的消息類型。

1.class oslo_messaging.Target(exchange=None, topic=None, namespace=None, version=None, server=None, fanout=None, legacy_namespaces=None)

Parameters:

exchange (str) – A scope for topics. Leave unspecified to default to the control_exchange configuration option.

topic (str) – A name which identifies the set of interfaces exposed by a server. Multiple servers may listen on a topic and messages will be dispatched to one of the servers selected in a best-effort round-robin fashion (unless fanout is True).

namespace (str) – Identifies a particular RPC interface (i.e. set of methods) exposed by a server. The default interface has no namespace identifier and is referred to as the null namespace.

version (str) – RPC interfaces have a major.minor version number associated with them. A minor number increment indicates a backwards compatible change and an incompatible change is indicated by a major number bump. Servers may implement multiple major versions and clients may require indicate that their message requires a particular minimum minor version.

server (str) – RPC Clients can request that a message be directed to a specific server, rather than just one of a pool of servers listening on the topic.

fanout (bool) – Clients may request that a copy of the message be delivered to all servers listening on a topic by setting fanout to True, rather than just one of them.

legacy_namespaces (list of strings) – A server always accepts messages specified via the ‘namespace’ parameter, and may also accept messages defined via this parameter. This option should be used to switch namespaces safely during rolling upgrades.

2.Target Versions

Target 版本号使用Major.Minor的形式。對于特定版本号為X.Y的消息,能出來還消息的Server的版本号為A.B,則A==X,并且B>Y.

四、RPC Server

RPC伺服器暴露出一些endpoint,每個endpoint包涵一系列的可被遠端用戶端通過transport調用的方法。

建立RPC伺服器,你需要提供transport,target和endpoints清單。其中transport可通過get_transport方法擷取,如下:

transport = messaging.get_transport(conf)
根據使用者messaging配置,裝載合适額transport driver.
           

target用來表述RPC 伺服器監聽的topic,server名稱和server監聽的exchange。

可能有多個server同時監聽相同的topic(和exchange)。參見RPC Client,了解RPC 請求在多個server中的配置設定。

每個endpoint對象有target屬性,該target可能有namespace和version域,預設情況下,namespace為null,version=1.0。接收到的方法調用将會配置設定給第一個比對到的endpoint。

在RPC方法調用時,第一個參數總是請求的上下文,由RPC用戶端提供。其他的參數是被調用方法的參數,由用戶端提供。Endpoint方法可能有傳回值,該種情況下,RPC伺服器通過transport傳回傳回值給該RPC用戶端。

executor控制RPC server接受到message後的處理排程方式。預設情況下,使用最簡單的executor,即blocking exector.

Note: If the “eventlet” executor is used, the threading and time library need to be monkeypatched.(不知道怎麼翻譯)

RPC回複操作是best-effor方式,即盡力傳遞。當reply被messaging transport接收到的時候,server就認為reply已經成功發送出去了。Server不保證RPC用戶端一定會處理reply.如果reply 發送失敗,則将此記錄在日志中,server繼續處理接受到的消息(即,server不解決發送失敗問題,出現問題,管理者檢視日志,解決問題)

RPC調用傳入的方法參數和方法傳回的傳回值都是python primitive類型。但是,message中的資料編碼方式可能不是primitvie形式,例如,message中的payload可能是一個ASCII編碼的String的JSON資料,而本來應該是一個字典類型。serializer對象用來将message轉化成primitive類型,或是将primitive類型轉換成标準的message payload形式。

RPC 伺服器主要方法:start,stop,wait方法,用法啟動、停止服務,或是在server停止後,等待所有進行中的請求完成。

1.一個簡單的PRC Server的例子

from oslo_config import cfg
import oslo_messaging
import time

class ServerControlEndpoint(object):

    target = oslo_messaging.Target(namespace='control',
                                   version='2.0')

    def __init__(self, server):
        self.server = server

    def stop(self, ctx):
        if self.server:
            self.server.stop()

class TestEndpoint(object):

    def test(self, ctx, arg):
        return arg

transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
    ServerControlEndpoint(None),
    TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
                                       executor='blocking')
try:
    server.start()
    while True:
        time.sleep()
except KeyboardInterrupt:
    print("Stopping server")

server.stop()
server.wait()
           

方法說明:

oslo_messaging.get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None, access_policy=None)
Construct an RPC server.

Parameters: 
transport (Transport) – the messaging transport
target (Target) – the exchange, topic and server to listen on
endpoints (list) – a list of endpoint objects
executor (str) – name of message executor - available values are ‘eventlet’, ‘blocking’ and ‘threading’
serializer (Serializer) – an optional entity serializer
access_policy (RPCAccessPolicyBase) – an optional access policy. Defaults to LegacyRPCAccessPolicy
           

2.關于access_policy(RPCAccessPolicyBase)

RPCAccessPolicyBase access_policy:決定endpont的那個方法會通過RPC調用。有如下幾種:

(1)class oslo_messaging.LegacyRPCAccessPolicy : 允許RPC調用endpoint的所有的可調用的方法,包括priviate方法(使用__字首的方法)

(2)class oslo_messaging.DefaultRPCAccessPolicy:預設的access policy,可防止調用private方法。

(3)class oslo_messaging.ExplicitRPCAccessPolicy:Policy which requires decorated endpoint methods to allow dispatch(不知道什麼意思)

3.RPCDispatcher

class oslo_messaging.RPCDispatcher(endpoints, serializer, access_policy=None) :

了解RPC消息的message dispatcher。RPCDispatcher 檢視message的namespace,version , method value,并在server的endpoints清單中找到和該message比對的endpoint。

4. MessageHandlingServer

class oslo_messaging.MessageHandlingServer(transport, dispatcher, executor=’blocking’) : 處理message的伺服器。

方法:

reset():Reset service.

start(*args, **kwargs):該方法調用後,server開始poll,從transport中接收message,然後轉發給dispatcher.該message處理過程一直進行,直到stop方法被調用。executor決定server的IO處理政策。可能會是用一個新程序、新協程來做poll操作,或是直接簡單的在一個循環中注冊一個回調。同樣,executor也決定配置設定message的方式,是在一個新線程中dispatch或是..... 

stop(*args, **kwargs):當調用stop之後,新的message不會被處理。但是,server可能還在處理一些之前沒有處理完的message,并且底層driver資源也還一直沒有釋放。

wait(*args, **kwargs):在stop調用之後,可能還有message正在被處理,使用wait方法來阻塞目前程序,直到所有的message都處理完成。之後,底層的driver資源會釋放。
           

5.oslo_messaging.expected_exceptions(*exceptions)

用來修飾endpoint的方法,用來說明該方法可能抛出某種異常,而該異常屬于正常情況。

Decorator for RPC endpoint methods that raise expected exceptions.

Decorator for RPC endpoint methods that raise expected exceptions.

Marking an endpoint method with this decorator allows the declaration of expected exceptions that the RPC server should not consider fatal, and not log as if they were generated in a real error scenario.

Note that this will cause listed exceptions to be wrapped in an ExpectedException, which is used internally by the RPC sever. The RPC client will see the original exception type.
           

6.oslo_messaging.expose(func)

用來修飾endpont的方法,表明RPC用戶端能夠調用該方法。如果dispatcher的access_policy設定為 ExplicitRPCAccessPolicy,則endpoint的方法必須顯示的聲明可被通路,例如:

# foo() cannot be invoked by an RPC client
def foo(self):
    pass

# bar() can be invoked by an RPC client
@rpc.expose
def bar(self):
    pass
           

7. exception oslo_messaging.ExpectedException

封裝了RPC endpoint抛出的expected exception。

五、RPC Client

1.class oslo_messaging.RPCClient

oslo_messaging.RPCClient(transport, target, timeout=None, version_cap=None, serializer=None, retry=None)

該類用來調用遠端RPC伺服器上的方法。該類用來發送方法調用請求,并接收方法的傳回值。

RPC client支援兩種模式:RPC call (有傳回值)和RPC cast(無傳回值)

RPC client使用target來聲明RPC請求如何傳輸到RPC server.如果設定了target的topic(或是exchange),則任何監聽了該topic(或是exchange的)的server都可提供服務。如果多個server同時監聽了該topic(或是exchange),則根據best-effort round-robin算法選擇一個server來提供服務。同時,client可以設定target的server屬性,指定特定的server來提供服務。當在cast模式時,RPC請求會被廣播到所有監聽給topic或是exchange的server(當target的fanout屬性值為true時).

在client構造時,可設定預設的target。當在某些方法調用時,可能需要修改某些屬性,可使用prepare方法來修改target的屬性。

在方法調用時,需要傳入的參數包括:請求上下文字典,方法名字,參數字典。

2.一個簡單的使用client調用RPC的例子:

transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
           

一般情況下,會将RPC client功能的代碼包裝起來會更好。如下面的例子:

class TestClient(object):

    def __init__(self, transport):
        target = messaging.Target(topic='test', version='2.0')
        self._client = messaging.RPCClient(transport, target)

    def test(self, ctxt, arg):
        return self._client.call(ctxt, 'test', arg=arg)
           

使用prepare 修改target的例子:

def test(self, ctxt, arg):
    cctxt = self._client.prepare(version='2.5')
    return cctxt.call(ctxt, 'test', arg=arg)
           

3.與server的連接配接說明

如果與messaging 服務的連接配接失效,則client會阻塞(知道連接配接建立)。如果與messaging 服務之間的連接配接無法建立,則client會重試。預設情況下,client會不停的去嘗試建立連接配接。可以通過設定retry參數來控制該過程。

client = messaging.RPCClient(transport, target, retry=None)
client.call(ctxt, 'sync')
try:
    client.prepare(retry=).cast(ctxt, 'ping')
except messaging.MessageDeliveryFailure:
    LOG.error("Failed to send ping message")
           

4.方法說明

(1)call(ctxt, method, **kwargs) : 調用method,并等待傳回值。

5.exception oslo_messaging.RemoteError(exc_type=None, value=None, traceback=None)

表明遠端endpoint抛出異常,包含:string(表示原始異常類型),原始異常值,traceback資訊。

Parameters: 
ctxt (dict) – a request context dict
method (str) – the method name
kwargs (dict) – a dict of method arguments
Raises: 
MessagingTimeout, RemoteError, MessageDeliveryFailure
           

(2)can_send_version(version=):

(3)cast(ctxt, method, **kwargs):

Parameters: 
ctxt (dict) – a request context dict
method (str) – the method name
kwargs (dict) – a dict of method arguments
Raises: 
MessageDeliveryFailure if the messaging transport fails to accept the request.
           

(4)prepare(exchange=, topic=, namespace=, version=, server=, fanout=, timeout=, version_cap=, retry=):

Parameters: 
exchange (str) – see Target.exchange
topic (str) – see Target.topic
namespace (str) – see Target.namespace
version (str) – requirement the server must support, see Target.version
server (str) – send to a specific server, see Target.server
fanout (bool) – send to all servers on topic, see Target.fanout
timeout (int or float) – an optional default timeout (in seconds) for call()s
version_cap (str) – raise a RPCVersionCapError version exceeds this cap
retry (int) – an optional connection retries configuration: None or - means to retry forever.  means no retry is attempted. N means attempt at most N retries.
           

六、Notifier

1.class oslo_messaging.Notifier(transport, publisher_id=None, driver=None, serializer=None, retry=None, topics=None)

通過messaging transport或是其他方式,發送notification messages.

Notification消息格式:

{'message_id': six.text_type(uuid.uuid4()),
 'publisher_id': 'compute.host1',
 'timestamp': timeutils.utcnow(),
 'priority': 'WARN',
 'event_type': 'compute.create_instance',
 'payload': {'instance_id': , ... }}
           

Notifier對象可通過如下方式擷取:

notifier = messaging.Notifier(get_notification_transport(CONF),
‘compute’)
           

notification通過drivers(具體根據driver config選項确定,主題根據topics config 選項确定)。

另外,Nofifier對象還可以根據特定的driver、topic建立,方式如下所示:

transport = notifier.get_notification_transport(CONF)
notifier = notifier.Notifier(transport,
                             'compute.host',
                             driver='messaging',
                             topics=['notifications'])
           

一般來說,Nofifier對象執行個體化花銷很大(主要開銷在裝載driver上),是以,為了減少開銷,可以通過如下方式來重用:

notifier = notifier.prepare(publisher_id='compute')
notifier.info(ctxt, event_type, payload)
           

2.主要方法說明

(1)audit(ctxt, event_type, payload):發送audit notification。

Parameters: 
ctxt (dict) – a request context dict
event_type (str) – describes the event, for example ‘compute.create_instance’
payload (dict) – the notification payload
Raises: 
MessageDeliveryFailure
           

(2)critical(ctxt, event_type, payload):同audit,隻是level 不同。

(3)debug(ctxt, event_type, payload):同audit,隻是level 不同。

(4)error(ctxt, event_type, payload):同audit,隻是level 不同。

(5)info(ctxt, event_type, payload):同audit,隻是level 不同。

(6)is_enabled():檢查notifier是否可以将notification發送出去。如果notifier的dirver設定成 only to noop,則傳回false。其他情況都傳回true.

(7)prepare(publisher_id=, retry=):傳回特定的Notifer執行個體。傳回一個新的publisher_id為參數指定的Notifier執行個體。該方法允許發送多個publisher_id的notification,而無notification driver加載開銷。

(8)sample(ctxt, event_type, payload):同audit,隻是level 不同。

(9)warn(ctxt, event_type, payload):同audit,隻是level 不同。

(10)warning(ctxt, event_type, payload):同audit,隻是level 不同。

3.LoggingNotificationHandler

class oslo_messaging.LoggingNotificationHandler(url, publisher_id=None, driver=None, topic=None, serializer=None)

當應用使用logging 子產品記錄日志時,實際上它是通過發送一條notification來完成的,該notification的level和日志記錄的level是一樣的。

4.可用的Notifier Drivers

(1)log:通過python logging機制釋出notification

(2)messaging:使用1.0 message格式發送notification。使用配置的messaging tranport來發送notification。(這是當consumer不支援2.0 message格式時,才使用的driver)

(3)messagingV2:使用2.0 message格式發送notification。

(4)Noop:

(5)routing:

(6)test:将notification發送到記憶體中,供測試使用。

七、Notification Driver

notification driver通過messaging将notifications發送給notification listener. driver會阻塞notifier線程,知道notification已經傳送到messaging transport。notification driver不保證notification一定會被notification listener處理。

notification messages發送時最多發送一次,保證notification不會重複。

當給driver發送notification時,如果還沒有和messaging service建立有效的連接配接,則該發送過程會阻塞,知道連接配接建立(和RPC client 發送message一樣)。如果連接配接過程建立失敗,則driver會重試。預設情況下,driver會一直嘗試,知道建立連接配接。可以設定retry産生來改變該行為。

1.MessagingDriver

class oslo_messaging.notify.messaging.MessagingDriver(conf, topics, transport, version=1.0)

使用1.0 message格式發送notification(見Notification一節)

2.MessagingV2Driver

class oslo_messaging.notify.messaging.MessagingV2Driver(conf, **kwargs)

使用2.0 message格式發送notification(見Notification一節)

3.Driver

class oslo_messaging.notify.notifier.Driver(conf, topics, transport)

Notification driver基類。

主要方法:

(1)notify(ctxt, msg, priority, retry):

Parameters: 
ctxt – current request context
msg (str) – message to be sent
priority (str) – priority of the message
retry (int) – connection retries configuration (used by the messaging driver): None or - means to retry forever.  means no retry is attempted. N means attempt at most N retries.
           

八、Notification Listener

notification listener 用來處理通過messaging driver方式notification。notification listener通過target 訂閱topic(和exchange,可選).notification 消息通過notifier client 發送,最後由notification listener接收。如果多個listeners 訂閱了相同的target,則隻有一個listener會收到該notification。多個listeners的選擇通過 best-effort round-robin選擇。

notification的傳送模式可通過指定pool的方式來改變。對于訂閱了相同的target(或是exchange)的listener的集合,具有同一個pool屬性的 listener可成為一個子集。每個子集會收到一個notification的拷貝,該拷貝會被該子集中的某個listener處理。是以,總體來說,會發送出去多個notification的拷貝。例如,一個發送到預設的listener中(這些listener沒有pool name),然後,每個子集都會收到一個。

需要主要的是,不是每種transport driver都實作了listener pool機制。如果driver沒有實作pool name,則在調用 get_notification_listener()方法時,會抛出NotImplementedError 異常。

notification listener可以暴露出多個endpoints,每個endpoint包含多個方法。每個方法的名字和notification優先級對應。當收到notification時,會将notification交給對應的具有相同優先級的方法。例如,info notification會配置設定到info()方法。

notification endpoint可定義NotificationFilter,不符合NotificationFilter規則的notification可忽略。

endpoint方法的參數包括:用戶端請求的上下文、publisher_id、event_type、payload和中繼資料。中繼資料是一個map,包涵了message_id和time stamp。

endpoint方法可顯示的傳回一個oslo_messaging.NotificationResult.HANDLED對象,用來聲明一條消息或是oslo_messaging.NotificationResult.REQUEUE 将重新入隊了。需要注意的是并不是所有的transport driver都支援requeueing.為了能夠使用該特性,應用應該使用斷言測試該特性是否可用(在調用get_notification_listener()方法時,傳入參數allow_requeue=True)。如果不支援requeueing,則會抛出NotImplementedError 異常。

每個notification listener都和一個executor綁定,來控制收到的notification如何配置設定。預設情況下,使用的是blocking executor(具體特性參加executor一節)

notification listener的方法start,stop,wait方法和RPC Server的方法作用大緻相同。

1.notification listener建立

建立notification listener執行個體時,需要指定transport,target清單,endpoint清單。

(1)transport的擷取:

(2)一個簡單的notification listener例子如下:

from oslo_config import cfg
import oslo_messaging


class NotificationEndpoint(object):
    filter_rule = oslo_messaging.NotificationFilter(
        publisher_id='^compute.*')

    def warn(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)


class ErrorEndpoint(object):
    filter_rule = oslo_messaging.NotificationFilter(
        event_type='^instance\..*\.start$',
        context={'ctxt_key': 'regexp'})

    def error(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)

transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
    oslo_messaging.Target(topic='notifications'),
    oslo_messaging.Target(topic='notifications_bis')
]
endpoints = [
    NotificationEndpoint(),
    ErrorEndpoint(),
]
pool = "listener-workers"
server = oslo_messaging.get_notification_listener(transport, targets,
                                                  endpoints, pool=pool)
server.start()
server.wait()
           

2.相關方法

(1)oslo_messaging.get_notification_listener(transport, targets, endpoints, executor=’blocking’, serializer=None, allow_requeue=False, pool=None):構造一個notification listener.

Parameters: 
transport (Transport) – the messaging transport
targets (list of Target) – the exchanges and topics to listen on
endpoints (list) – a list of endpoint objects
executor (str) – name of message executor - available values are ‘eventlet’, ‘blocking’ and ‘threading’
serializer (Serializer) – an optional entity serializer
allow_requeue (bool) – whether NotificationResult.REQUEUE support is needed
pool (str) – the pool name
Raises: 
NotImplementedError
           

(2) oslo_messaging.get_batch_notification_listener(transport, targets, endpoints, executor=’blocking’, serializer=None, allow_requeue=False, pool=None, batch_size=None, batch_timeout=None):構造一個batch notification listener.

Parameters: 
transport (Transport) – the messaging transport
targets (list of Target) – the exchanges and topics to listen on
endpoints (list) – a list of endpoint objects
executor (str) – name of message executor - available values are ‘eventlet’, ‘blocking’ and ‘threading’
serializer (Serializer) – an optional entity serializer
allow_requeue (bool) – whether NotificationResult.REQUEUE support is needed
pool (str) – the pool name
batch_size (int) – number of messages to wait before calling endpoints callacks
batch_timeout (int) – number of seconds to wait before calling endpoints callacks
Raises: 
NotImplementedError
           

九、Serializer

在python 對象和message(notification)資料做序列化或是反序列化的基類。

1.class oslo_messaging.Serializer

主要方法:

(1)deserialize_context(ctxt) :對字典變成 request contenxt.

(2)deserialize_entity(ctxt, entity) :對entity做反序列化,其中ctxt是已經deserialize過的,entity是要處理的。

(3)serialize_context(ctxt) :将Request context變成字典類型

(4)serialize_entity(ctxt, entity) :對entity做序列化,其中ctxt是已經deserialize過的,entity是要處理的。

(5)

2.class oslo_messaging.NoOpSerializer

空類,什麼都不做。

十、Exceptions

  1. oslo_messaging.ClientSendError(target, ex) :當發送message失敗時抛出。
  2. oslo_messaging.DriverLoadFailure(driver, ex):當transport driver加載失敗時抛出。
  3. oslo_messaging.ExecutorLoadFailure(executor, ex):executor加載失敗時抛出。
  4. oslo_messaging.InvalidTransportURL(url, msg) :tranport URL 無效
  5. oslo_messaging.MessagingException:exception記錄
  6. oslo_messaging.MessagingTimeout
  7. oslo_messaging.MessagingServerError :MessageHandlingServer 異常基類。
  8. oslo_messaging.NoSuchMethod(method):
  9. oslo_messaging.RPCDispatcherError:RPC dispatcher 異常基類。
  10. oslo_messaging.RPCVersionCapError(version, version_cap)
  11. oslo_messaging.ServerListenError(target, ex)
  12. oslo_messaging.UnsupportedVersion(version, method=None)

十一、Configuration Options

預設的參數:

1. rpc_conn_pool_size :integer,default=30

2.

matchmaker_redis相關的參數:

1. host :Type=string,Host to locate redis.

2. port: port number

3. password :Password for Redis server (optional).

oslo_messaging_amqp 相關的參數:

1. container_name: Type=string,Name for the AMQP container. must be globally unique. Defaults to a generated UUID

2. idle_timeout :Type= integer,Timeout for inactive connections (in seconds)

oslo_messaging_kafka 相關的參數:

1. kafka_default_host :Type= string,Default Kafka broker Host

2. kafka_default_port: Type=port number

oslo_messaging_notifications 相關的參數:

1. driver :Type=multi-valued

2. transport_url: Type=string

3. topics : Type= list

4. retry :Type=integer

oslo_messaging_rabbit 相關的參數:

1. amqp_durable_queues

2. amqp_auto_delete

oslo_messaging_zmq 相關的參數:

1. rpc_zmq_bind_address:

2. rpc_zmq_matchmaker:

參數相關的API:

1. oslo_messaging.opts.list_opts() :Return a list of oslo.config options available in the library.

十二、Testing Configurations

1.class oslo_messaging.conffixture.ConfFixture(conf)

oslo.messaging注冊了一系列的配置選擇。使用者應用使用API來查詢、修改具體的參數配置。

用法示例:

self.messaging_conf = self.useFixture(messaging.ConfFixture(cfg.CONF))
self.messaging_conf.transport_driver = 'fake'
           

Parameters: conf (oslo.config.cfg.ConfigOpts) – a ConfigOpts instance

十三、Available Drivers

transport的driver有如下幾種:

  1. amqp :https://docs.openstack.org/developer/oslo.messaging/AMQP1.0.html
  2. fake :測試使用,該driver将消息發送到記憶體中。
  3. kafka :experimental
  4. kombu :RabbitMQ Driver。openstack預設的。
  5. pika :successor to the existing rabbit/kombu driver。 https://docs.openstack.org/developer/oslo.messaging/pika_driver.html
  6. rabbit :RabbitMQ Driver
  7. zmq :通過ZeroMQ 實作了RPC和Notifer API.詳見:https://docs.openstack.org/developer/oslo.messaging/zmq_driver.html

十四、Available message driver

講述driver的要求,不符合的就從olso.message庫中移除。

十五、AMQP 1.0 Protocol Driver Deployment Guide

原文位址 :https://docs.openstack.org/developer/oslo.messaging/AMQP1.0.html

翻譯位址:

十六、Pika Driver Deployment Guide

原文位址 :https://docs.openstack.org/developer/oslo.messaging/pika_driver.html

翻譯位址:

十七、ZeroMQ Driver Deployment Guide

原文位址 :https://docs.openstack.org/developer/oslo.messaging/zmq_driver.html

翻譯位址:

十八、Guide for Transport Driver Implementors

原文位址 :https://docs.openstack.org/developer/oslo.messaging/driver-dev-guide.html

翻譯位址: http://blog.csdn.net/youyou1543724847/article/details/71173439

十九、Frequently Asked Questions

原文位址 :https://docs.openstack.org/developer/oslo.messaging/FAQ.html

翻譯位址:

二十、Contributing

省略

繼續閱讀