天天看點

消息隊列之:oslo_messaging

上一篇我們介紹了 RabbitMq 的相關知識

今天介紹一下 openstack 中關于 RabbitMq 和 oslo_messaging 庫之間的前生今世

一定要弄清楚的一個問題就是:直接使用rabbitmq和使用oslo_messaging這個庫間接的使用有什麼差別。

olso_messaging實際上是在rabbitmq的基礎上通過一些列的調用,最終暴露給使用者一個簡單的使用接口,使用者不必關心内部的實作,隻用配置好配置檔案,進行簡單的函數調用即可。

并且由于這個庫是openstack的标準庫,裡面的一些函數命名和預設參數都是針對于openstack的概念來講的。

基本結構就是:

openstack -> oslo_messaging -> kombu -> AMQP -> socket

首先看下 oslo_messaging 中對象封裝:

概念簡介

Transport

Transport(傳輸層)主要實作RPC底層的通信(比如socket)以及事件循環,多線程等其他功能.可以通過URL來獲得不同transport的句柄.URL的格式為:

transport://user:[email protected]:port[,hostN:portN]/virtual_host

目前支援的Transport有rabbit,qpid與zmq,分别對應不同的後端消息總線.使用者可以使用oslo.messaging.get_transport函數來獲得transport對象執行個體的句柄.

import oslo_messaging
transport = oslo_messaging.get_transport(cfg, url=None, **kwargs)
           

Target

Target封裝了指定某一個消息最終目的地的所有資訊,下表所示為其所具有的屬性:

參數=預設值 說 明
exchange = None (字元串類型)topic所屬的範圍,不指定的話預設使用配置檔案中的control_exchange選項
topic = None (字元串類型)一個topic可以用來辨別伺服器所暴露的一組接口(一個接口包含多個可被遠端調用的方法).允許多個伺服器暴露同一組接口,消息會以輪循的方式發送給多個伺服器中的某一個
namespace = None (字元串類型)用來辨別伺服器所暴露的某個特定接口(多個可被遠端調用的方法)
version = None (字元串類型)伺服器所暴露的接口支援M.N類型的版本号.次版本号(N)的增加表示新的接口向前相容,主版本号(M)的增加表示新接口和舊接口不相容.RPC伺服器可以實作多個不同的主版本号接口.
server = None (字元串類型)用戶端可以指定此參數來要求消息的目的地是某個特定的伺服器,而不是一組同屬某個topic的伺服器中的任意一台.
fanout = None (布爾型)當設定為真時,消息會被發送到同屬某個topic的所有伺服器上,而不是其中的一台.

在不同的應用場景下,構造Target對象需要不同的參數:建立一個RPC伺服器時,需要topic和server參數,exchange參數可選;指定一個endpoint時,namespace和version是可選的;用戶端發送消息時,需要topic參數,其他可選.

Server

一個RPC伺服器可以暴露多個endpoint,每個endpoint包含一組方法,這組方法是可以被用戶端通過某種Transport對象遠端調用的.建立Server對象時,需要指定Transport,Target和一組endpoint.

RPC Client

通過RPC Client,可以遠端調用RPC Sever上的方法.遠端調用時,需要提供一個字典對象來指明調用的上下文,調用方法的名字和傳遞給調用方法的參數(用字典表示).

有cast和call兩種遠端調用方式.通過cast方式遠端調用,請求發送後就直接傳回了;通過call方式調用,需要等響應從伺服器傳回.

Notifier

Notifier用來通過某種transport發送通知消息.通知消息遵循如下的格式:

import six
import uuid
from oslo_utils import timeutils
 
{'message_id': six.text_type(uuid.uuid4()), #消息id号
 'publisher_id': 'compute.hos1',    #發送者id
 'timestamp': timeutils.utcnow(),   #時間戳
 'priority': 'WARN',                #通知優先級
 'event_type': 'compute.create_instance',   #通知類型
 'payload': {'instance_id': 12, ...}}       #通知内容
           

可以在不同的優先級别上發送通知,這些優先級包括sample,critical,error,warn,info,debug,audit等.

Notification Listener

Notification Listener和Server類似,一個Notification Listener對象可以暴露多個endpoint,每個endpoint包含一組方法.但是與Server對象中的endpoint不同的是,這裡的endpoint中的方法對應通知消息的不同優先級.比如:

import oslo_messaging
 
class ErrorEndpoint:
    def error(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)
        return oslo_messaging.NotificationResult.HANDLED
           

endpoint中的方法如果傳回messaging.NotificationResult.HANDLED或者None,表示這個通知消息已經确認被處理;如果傳回messaging.NotificationResult.REQUEUE,表示這個通知消息要重新進入消息隊列.

下面是一個利用oslo_messaging來實作遠端過程調用的示例.

from oslo_config import cfg
import oslo_messaging as messaging
 
class ServerControlEndpoint(object):
    target = messaging.Target(namespace='controle',
                              version='2.0')
 
    def __init__(self, server):
        self.server = server
 
    def stop(self, ctx):
        if self.server:
            self.server.stop()
 
class TestEndpoint(object):
    def test(self, ctx, arg):
        return arg
 
 
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test',
                          server='server1')
endpoints = [
    ServerControlEndpoint(None),
    TestEndpoint(),
]
 
server = messaging.get_rpc_server(transport, target, endpoints,
                                  executor='blocking')
server.start()
server.wait()
           

這個例子裡,定義了兩個不同的endpoint:ServerControlEndpoint與TestEndpoint.這兩個endpoint中的方法stop和test都可以被用戶端遠端調用.

建立rpc server對象之前,需要先建立transport和target對象,這裡使用get_transport()函數來獲得transport對象的句柄,get_transport()的參數如下表所示:

參數=預設值 說 明
conf (oslo.config.cfg.ConfigOpts類型)oslo.config配置項對象
url = None (字元串或者oslo.messaging.Transport類型)transport URL.如果為空,采用conf配置中的transport_url項所指定的值
namespace = None (字元串類型)用來辨別伺服器所暴露的某個特定接口(多個可被遠端調用的方法)
allowed_remote_exmods = None (清單類型)Python子產品的清單.用戶端可用清單裡的子產品來deserialize異常
aliases = None (字典類型)transport别名和transport名稱之間的對應關系

conf對象裡,除了包含transport_url項外,還可以包含control_exchange項.control_exchange用來指明topic所屬的預設範圍,預設為"openstack".可以使用oslo.messaging.set_transport_defaults()函數來修改預設值.

此處建構的Target對象是用來建立RPC Server的,是以需指明topic和server參數.使用者定義的endpoint對象也可以包含一個target屬性,用來指明這個endpoint所支援的特定的namespace和version.

這裡使用get_rpc_server()函數建立server對象,然後調用server對象的start方法開始接收遠端調用.get_rpc_server()函數的參數如下表所求:

參數=預設值 說 明
transport (Transpor類型)transport對象
target (Target類型)target對象,用來指明監聽的exchange,topic和server
endpoints (清單類型)包含了endpoints對象執行個體的清單
executor=‘blocking’ (字元串類型)用來指明消息接收和發收的方式:目前支援兩種方式: blocking:在這種方式中,使用者調用start函數後,在start函數中開始請求處理循環:使用者線程阻塞,處理下一個請求.直到使用者調用了stop函數後,這個處理循環才會退出.消息的接收和分發處理都在調用start函數的線程中完成. eventlet:在這種方式中,會有一個協程GreenThread來處理消息的接收,然後有其他不同的GreenThread來處理不同消息的分發處理.調用start的使用者線程不會被阻塞
serializer = None (Serializer類型)用來序列化/反序列化消息
#client.py 用戶端
 
from oslo_config import cfg
import oslo_messaging as messaging
 
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test')
client = messaging.RPCClient(transport, target)
ret = client.call(ctxt={},
                  method='test',
                  arg='myarg')
 
cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')
           

這裡target對象構造時,必須要有的參數隻有topic,建立RPCClient對象時,可以接受的參數如下表所示:

參數=預設值 說 明
transport (Transport類型)transport對象
target (Taget類型)該client對象的預設target對象
timeout = None (整數或者浮點數類型)用戶端調用call方法時逾時時間(秒)
version_cap = None (字元串類型)最大所支援的版本号.當版本号超過時,會扔出RPCVersionCapError異常
serializer = None (Serializer類型)用來序列化/反序列化消息
retry = None (整數類型)連接配接重試次數:None或者-1:一直重試0:不重試>0:重試次數

遠端調用時,需要傳入調用上下文,調用方法的名字和傳給調用方法的參數.

Target對象的屬性在RPCClient對象構造以後,還可以通過prepare()方法修改.可以修改的屬性包括exchange,topic,namespace,version,server,fanout,timeout,version_cap和retry.

修改後的target屬性隻在這個prepare()方法傳回的對象中有效.

下面我們再來看一個利用oslo_messaing實作通知消息處理的例子:

#notification_listener.py 消息通知處理
 
from oslo_config import cfg
import oslo_messaging as messaging
 
class NotificationEndPoint(object):
    def warn(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)
 
class ErrorEndpoint(object):
    def error(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)
 
 
transport = messaging.get_transport(cfg.CONF)
targets = [
    messaging.Target(topic='notifications'),
    messaging.Target(topic='notifications_bis')
]
endpoints = [
    NotificationEndPoint(),
    ErrorEndpoint(),
]
listener = messaging.get_notification_listener(transport,
                                               targets,
                                               endpoints)
 
listener.start()
listener.wait()
           

通知消息處理的endpoint對象和遠端過程調用的endpoint對象不同,對象定義的方法要和通知消息的優先級一一對應.我們可以為每個endpoint指定所對應的target對象.

最後調用get_notificaton_listener()函數構造notification listener對象,get_notification_listener()函數的參數如下表所示:

參數=預設值 說 明
transport (Transport類型)transport對象
target (清單類型)target對象的清單,用來指明endpoints清單中的每一個endpoint所偵聽處理的exchange和topic
endpoints (清單類型)包含了endpoints對象執行個體的清單
executor=‘blocking’ (字元串類型)用來指明消息接收和發收的方式:目前支援兩種方式: blocking:在這種方式中,使用者調用start函數後,在start函數中開始請求處理循環:使用者線程阻塞,處理下一個請求.直到使用者調用了stop函數後,這個處理循環才會退出.消息的接收和分發處理都在調用start函數的線程中完成. eventlet:在這種方式中,會有一個協程GreenThread來處理消息的接收,然後有其他不同的GreenThread來處理不同消息的分發處理.調用start的使用者線程不會被阻塞
serializer=None (Serializer類型)用來序列化/反序列化消息
allow_requeue=False (布爾類型)如果為真,表示支援NotificationResult.REQUEUE

相對應的發送消息通知的代碼如下:

#notifier_send.py
 
from oslo_config import cfg
import oslo_messaging as messaging
 
transport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,
                              driver='messaging',
                              topic='notifications')
 
notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},
                event_type='my_type',
                payload={'content': 'error occurred'})
           
發送通知消息時,首先要構造Notifier對象,此時可能需要指定的參數如下表所示:
           
參數=預設值 說 明
transport (Transport類型)transport對象
target (清單類型)target對象的清單,用來指明endpoints清單中的每一個endpoint所偵聽處理的exchange和topic
publish_id = None (字元串類型)發送者id
driver = None (字元串類型)背景驅動.一般采用"messaging".如果沒有指定,會使用配置檔案中的notificaton_driver的值
topic = None (字元串類型)發送消息的topic.如果沒有指定,會使用配置檔案中的notification_topics的值
serializer = None (Serializer類型)用來序列化/反序列化消息

初始化Notifier對象的操作比較複雜,是以可以用prepare()方法修改已建立的Notifier對象,prepare()方法傳回的是新的Notifier對象的執行個體.它的參數如下表所示:

參數 = 預設值 說 明
publish_id = None (字元串類型)發送者id
retry = None (整數類型)連接配接重試次數:None或者-1:一直重試0:不重試>0:重試次數

最後可以調用Notifier對象的不同方法(error, critical, warn, 等等)發送不同優先級的消息通知.

源碼分析

根據上個章節,我們可以看到其實這個庫最終暴漏給使用者的是兩個概念:1.rpc,2.notification

下面我們來根據基本源碼分析一下這兩個概念

rpc

rpc(即遠端調用)的概念被劃分為調用方和被調用方

調用方稱為client:rpc_client

被調用方稱為server:rpc_server

使用時,被調用方server.start,等待調用方client.cast 或 clinet.call即可發起阻塞或非阻塞的遠端調用。

當rpc client執行一次遠端調用時實際發生了什麼呢 ?

(代碼在oslo_messaging/rpc/client.py檔案裡)

rpc client

首先建構 rpc client ,執行個體化 RPCClient

self.rpc_client = messaging.get_rpc_client(
            messaging.get_transport(),
            version='1.0'
        )
           
def get_rpc_client(transport, retry=None, **kwargs):
    """Return a configured oslo_messaging RPCClient."""
    target = oslo_messaging.Target(**kwargs)
    serializer = oslo_serializer.RequestContextSerializer(
        oslo_serializer.JsonPayloadSerializer())
    return oslo_messaging.RPCClient(transport, target,
                                    serializer=serializer,
                                    retry=retry)
           
class RPCClient(_BaseCallContext):
      _marker = _BaseCallContext._marker

    def __init__(self, transport, target,
                 timeout=None, version_cap=None, serializer=None, retry=None,
                 call_monitor_timeout=None, transport_options=None):
        if serializer is None:
            serializer = msg_serializer.NoOpSerializer()

        if not isinstance(transport, msg_transport.RPCTransport):
            LOG.warning("Using notification transport for RPC. Please use "
                        "get_rpc_transport to obtain an RPC transport "
                        "instance.")

        super(RPCClient, self).__init__(
            transport, target, serializer, timeout, version_cap, retry,
            call_monitor_timeout, transport_options
        )

        self.conf.register_opts(_client_opts)
           

然後通過 call 或者 cast 調用

@six.add_metaclass(abc.ABCMeta)
class _BaseCallContext(object):

    _marker = object()

    def __init__(self, transport, target, serializer,
                 timeout=None, version_cap=None, retry=None,
                 call_monitor_timeout=None, transport_options=None):
        self.conf = transport.conf

        self.transport = transport
        self.target = target
        self.serializer = serializer
        self.timeout = timeout
        self.call_monitor_timeout = call_monitor_timeout
        self.retry = retry
        self.version_cap = version_cap
        self.transport_options = transport_options

        super(_BaseCallContext, self).__init__()
		def cast(self, ctxt, method, **kwargs):
        """Invoke a method and return immediately. See RPCClient.cast()."""
        msg = self._make_message(ctxt, method, kwargs)
        msg_ctxt = self.serializer.serialize_context(ctxt)

        self._check_version_cap(msg.get('version'))

        try:
            self.transport._send(self.target, msg_ctxt, msg,
                                 retry=self.retry,
                                 transport_options=self.transport_options)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)

    def call(self, ctxt, method, **kwargs):
        """Invoke a method and wait for a reply. See RPCClient.call()."""
        if self.target.fanout:
            raise exceptions.InvalidTarget('A call cannot be used with fanout',
                                           self.target)

        msg = self._make_message(ctxt, method, kwargs)
        msg_ctxt = self.serializer.serialize_context(ctxt)

        timeout = self.timeout
        if self.timeout is None:
            timeout = self.conf.rpc_response_timeout

        cm_timeout = self.call_monitor_timeout

        self._check_version_cap(msg.get('version'))

        try:
            result = \
                self.transport._send(self.target, msg_ctxt, msg,
                                     wait_for_reply=True, timeout=timeout,
                                     call_monitor_timeout=cm_timeout,
                                     retry=self.retry,
                                     transport_options=self.transport_options)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)

        return self.serializer.deserialize_entity(ctxt, result)
           

可以看到在 29 行和 54 行,兩個方法都是執行了 transport._send

隻有參數不同,這裡最大的差別其實是wait_for_reply這個參數,顧名思義wait or no wait也就是我們說的阻塞/非阻塞。

那_send這個方法,最重要的兩關鍵一個是transport本身,一個是target參數,這兩個東西是rpc client __init__的時候必須要傳的參數,

transport 參數是由(osllo_messaging/transport.py檔案)_get_transport方法而來:

def _get_transport(conf, url=None, allowed_remote_exmods=None,
                   transport_cls=RPCTransport):
    allowed_remote_exmods = allowed_remote_exmods or []
    conf.register_opts(_transport_opts)

    if not isinstance(url, TransportURL):
        url = TransportURL.parse(conf, url)

    kwargs = dict(default_exchange=conf.control_exchange,
                  allowed_remote_exmods=allowed_remote_exmods)

    try:
        mgr = driver.DriverManager('oslo.messaging.drivers',
                                   url.transport.split('+')[0],
                                   invoke_on_load=True,
                                   invoke_args=[conf, url],
                                   invoke_kwds=kwargs)
    except RuntimeError as ex:
        raise DriverLoadFailure(url.transport, ex)

    return transport_cls(mgr.driver)
           

這裡url是配置檔案裡配的,這裡以rabbitmq為例

entry_point到oslo_messaging._drivers.impl_rabbit:RabbitDriver,最終獲得到的是RabbitDriver的執行個體。

target 直接執行個體化即可,這裡注意到兩個參數exchange和topic,和rabbitmq裡的exchange和routing_key的概念一緻

那我們接着來看 transport._send方法,前面也說到了transport此時是RabbitDriver

RabbitDriver 繼承自 AMQPDriverBase 繼承自 BaseDriver

_send 方法在AMQPDriverBase中:

class AMQPDriverBase(base.BaseDriver):
    missing_destination_retry_timeout = 0

    def __init__(self, conf, url, connection_pool,
                 default_exchange=None, allowed_remote_exmods=None):
        super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
                                             allowed_remote_exmods)

        self._default_exchange = default_exchange

        self._connection_pool = connection_pool

        self._reply_q_lock = threading.Lock()
        self._reply_q = None
        self._reply_q_conn = None
        self._waiter = None    
    def _send(self, target, ctxt, message,
              wait_for_reply=None, timeout=None, call_monitor_timeout=None,
              envelope=True, notify=False, retry=None, transport_options=None):

        msg = message

        if wait_for_reply:
            msg_id = uuid.uuid4().hex
            msg.update({'_msg_id': msg_id})
            msg.update({'_reply_q': self._get_reply_q()})
            msg.update({'_timeout': call_monitor_timeout})

        rpc_amqp._add_unique_id(msg)
        unique_id = msg[rpc_amqp.UNIQUE_ID]

        rpc_amqp.pack_context(msg, ctxt)

        if envelope:
            msg = rpc_common.serialize_msg(msg)

        if wait_for_reply:
            self._waiter.listen(msg_id)
            log_msg = "CALL msg_id: %s " % msg_id
        else:
            log_msg = "CAST unique_id: %s " % unique_id

        try:
            with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
                if notify:
                    exchange = self._get_exchange(target)
                    LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
                              " topic '%(topic)s'", {'exchange': exchange,
                                                     'topic': target.topic})
                    conn.notify_send(exchange, target.topic, msg, retry=retry)
                elif target.fanout:
                    log_msg += "FANOUT topic '%(topic)s'" % {
                        'topic': target.topic}
                    LOG.debug(log_msg)
                    conn.fanout_send(target.topic, msg, retry=retry)
                else:
                    topic = target.topic
                    exchange = self._get_exchange(target)
                    if target.server:
                        topic = '%s.%s' % (target.topic, target.server)
                    LOG.debug(log_msg + "exchange '%(exchange)s'"
                              " topic '%(topic)s'", {'exchange': exchange,
                                                     'topic': topic})
                    conn.topic_send(exchange_name=exchange, topic=topic,
                                    msg=msg, timeout=timeout, retry=retry,
                                    transport_options=transport_options)

            if wait_for_reply:
                result = self._waiter.wait(msg_id, timeout,
                                           call_monitor_timeout)
                if isinstance(result, Exception):
                    raise result
                return result
        finally:
            if wait_for_reply:
                self._waiter.unlisten(msg_id)
           

我們看下 44 行到 66 行,回顧上面的cast和call函數裡調用_send的時候是沒有傳notify找個參數的,是以第一個條件一定不成立

那看接下來的兩個case,elif target.fanout/else(這裡的fanout與rabbitmq本身的fanout意義是一樣的)那也就是說我們在生成target或者client.prepare的時候可以通過指定fanout這個參數來決定進入哪個case,(注意第三個case裡如果指定了target.server那麼topic是target.topic和target.server二者相結合)那我們這裡來看一下conn.fanout_send和conn.topic_send這兩個方法(conn是__enter__ exit __getattr__的産物,具體本文不細說了,這裡隻要知道最終調用到了oslo_messaging/_drivers/impl_rabbit.py裡Connection這個類就可以了):

conn.fanout_send

class Connection(object):
		def fanout_send(self, topic, msg, retry=None):
        """Send a 'fanout' message."""
        exchange = kombu.entity.Exchange(name='%s_fanout' % topic,
                                         type='fanout',
                                         durable=False,
                                         auto_delete=True)

        self._ensure_publishing(self._publish, exchange, msg, retry=retry)
           

其實到這裡基本上就清楚了fanout_send就是往名叫target.topic + "_fanout"這個exchange裡發送fanout模式的消息,所有bind到這個exchange的queue都會收到這條消息,如果這個exchange沒有建立過,在self.publish方法裡會被declare.

conn.topic_send

class Connection(object):
    def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
                   transport_options=None):
        """Send a 'topic' message."""
        exchange = kombu.entity.Exchange(
            name=exchange_name,
            type='topic',
            durable=self.amqp_durable_queues,
            auto_delete=self.amqp_auto_delete)

        self._ensure_publishing(self._publish, exchange, msg,
                                routing_key=topic, timeout=timeout,
                                retry=retry,
                                transport_options=transport_options)
           

topic_send就是以topic做為routing_key 以exchange_name這個參數值命名的exchange裡發送topic模式的消息,這裡注意差別就是exchange_name是上級調用_get_exchange方法得來的

class Connection(object):
		def _get_exchange(self, target):
        return target.exchange or self._default_exchange
           

_default_exchange如果仔細看的話前面其實前面的截圖裡有,就是conf.control_exchange

預設是openstack(這裡大概知道點為啥oslo_messaging是為openstack搞得了吧😄),關鍵還是取決于target,如果target裡沒有指定才會用配置檔案的。

同樣,如果這個exchange沒有建立過,在self.publish方法裡會被declare.

rpc server

首先要獲得一個rpc server的執行個體

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None, access_policy=None):
  	dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,
                                              access_policy)
    return RPCServer(transport, target, dispatcher, executor)
           
class RPCServer(msg_server.MessageHandlingServer):
    def __init__(self, transport, target, dispatcher, executor='blocking'):
        super(RPCServer, self).__init__(transport, dispatcher, executor)
        if not isinstance(transport, msg_transport.RPCTransport):
            LOG.warning("Using notification transport for RPC. Please use "
                        "get_rpc_transport to obtain an RPC transport "
                        "instance.")
        self._target = target
           

RPCServer 繼承自 MessageHandlingServer 繼承自 ServiceBase,_OrderedTaskRunner

同樣的,transport和target是必須要有的,獲得 rpc sever執行個體後,rpc server調用start方法,最終調用到了基類的start方法

@six.add_metaclass(abc.ABCMeta)
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
    @ordered(reset_after='stop')
    def start(self, override_pool_size=None):
        if self._started:
            LOG.warning('The server has already been started. Ignoring '
                        'the redundant call to start().')
            return

        self._started = True

        executor_opts = {}

        if self.executor_type in ("threading", "eventlet"):
            executor_opts["max_workers"] = (
                override_pool_size or self.conf.executor_thread_pool_size
            )
        self._work_executor = self._executor_cls(**executor_opts)

        try:
            self.listener = self._create_listener()
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self.listener.start(self._on_incoming)
           

關鍵看下 20 行到 25 行,因為這裡執行個體化的是 RPCServer,是以_create_listener 調用的是 RPCServer 的方法

class RPCServer(msg_server.MessageHandlingServer):
    def _create_listener(self):
        return self.transport._listen(self._target, 1, None)
           
class Transport(object):
    def _listen(self, target, batch_size, batch_timeout):
        if not (target.topic and target.server):
            raise exceptions.InvalidTarget('A server\'s target must have '
                                           'topic and server names specified',
                                           target)
        return self._driver.listen(target, batch_size,
                                   batch_timeout)
           
class AMQPDriverBase(base.BaseDriver):
    def listen(self, target, batch_size, batch_timeout):
        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

        listener = RpcAMQPListener(self, conn)

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic,
                                                     target.server),
                                    callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

        return base.PollStyleListenerAdapter(listener, batch_size,
                                             batch_timeout)
           

listen方法實際上關鍵是執行了三個declare,以下稱作:

declare_topic_consumer(1)

declare_topic_consumer (2)

declare_fanout_consumer

declare_topic_consumer(1)

oslo_messaging/_drivers/impl_rabbit.py

class Connection(object):
		def declare_topic_consumer(self, exchange_name, topic, callback=None,
                               queue_name=None):
        """Create a 'topic' consumer."""
        consumer = Consumer(exchange_name=exchange_name,
                            queue_name=queue_name or topic,
                            routing_key=topic,
                            type='topic',
                            durable=self.amqp_durable_queues,
                            exchange_auto_delete=self.amqp_auto_delete,
                            queue_auto_delete=self.amqp_auto_delete,
                            callback=callback,
                            rabbit_ha_queues=self.rabbit_ha_queues)

        self.declare_consumer(consumer)
           
def declare_consumer(self, consumer):
        """Create a Consumer using the class that was passed in and
        add it to our list of consumers
        """

        def _connect_error(exc):
            log_info = {'topic': consumer.routing_key, 'err_str': exc}
            LOG.error("Failed to declare consumer for topic '%(topic)s': "
                      "%(err_str)s", log_info)

        def _declare_consumer():
            consumer.declare(self)
            tag = self._active_tags.get(consumer.queue_name)
            if tag is None:
                tag = next(self._tags)
                self._active_tags[consumer.queue_name] = tag
                self._new_tags.add(tag)

            self._consumers[consumer] = tag
            return consumer

        with self._connection_lock:
            return self.ensure(_declare_consumer,
                               error_callback=_connect_error)
           
def declare(self, conn):
        """Re-declare the queue after a rabbit (re)connect."""

        self.queue = kombu.entity.Queue(
            name=self.queue_name,
            channel=conn.channel,
            exchange=self.exchange,
            durable=self.durable,
            auto_delete=self.queue_auto_delete,
            routing_key=self.routing_key,
            queue_arguments=self.queue_arguments)

        try:
            LOG.debug('[%s] Queue.declare: %s',
                      conn.connection_id, self.queue_name)
            self.queue.declare()
           
@python_2_unicode_compatible
class Queue(MaybeChannelBound):
		def declare(self, nowait=False, channel=None):
        """Declare queue and exchange then binds queue to exchange."""
        if not self.no_declare:
            # - declare main binding.
            self._create_exchange(nowait=nowait, channel=channel)
            self._create_queue(nowait=nowait, channel=channel)
            self._create_bindings(nowait=nowait, channel=channel)
        return self.name
           
def _create_exchange(self, nowait=False, channel=None):
        if self.exchange:
            self.exchange.declare(nowait=nowait, channel=channel)

    def _create_queue(self, nowait=False, channel=None):
        self.queue_declare(nowait=nowait, passive=False, channel=channel)
        if self.exchange and self.exchange.name:
            self.queue_bind(nowait=nowait, channel=channel)

    def _create_bindings(self, nowait=False, channel=None):
        for B in self.bindings:
            channel = channel or self.channel
            B.declare(channel)
            B.bind(self, nowait=nowait, channel=channel)
           

一目了然,總結來說就是使用target的exchange(預設openstack)做為exchange_name

使用target的topic做為預設的queue_name,然後declare這個exchange和queue,然後将二者bind。

declare_topic_consumer(2)

與declare_topic_consumer(1)的唯一差別是這裡使用了target.topic結合target.server做為了預設的queue_name。

declare_fanout_consumer

class Connection(object):
		def declare_fanout_consumer(self, topic, callback):
        """Create a 'fanout' consumer."""

        unique = uuid.uuid4().hex
        exchange_name = '%s_fanout' % topic
        queue_name = '%s_fanout_%s' % (topic, unique)

        consumer = Consumer(exchange_name=exchange_name,
                            queue_name=queue_name,
                            routing_key=topic,
                            type='fanout',
                            durable=False,
                            exchange_auto_delete=True,
                            queue_auto_delete=False,
                            callback=callback,
                            rabbit_ha_queues=self.rabbit_ha_queues,
                            rabbit_queue_ttl=self.rabbit_transient_queues_ttl)

        self.declare_consumer(consumer)
           

與上述兩種的差別是,這裡的queue_name變成了target.topic+“fanout”+uuid

exchange_name變成了target.topic+"_fanout",exchange的type變成了fanout

這裡也指定了routing_key ,我覺得應該是沒用的。

小結

server監聽

一、

1.由target.exchange或配置檔案(openstack為預設值)命名的exchange(type為topic模式)

2.以target.topic做為queue_name

2.以target.topic做為routing_key進行queue和exchange的綁定

二、

1.由target.exchange或配置檔案(openstack為預設值)命名的exchange(type為topic模式)

2.以target.topic結合target.server做為queue_name

2.以target.topic結合target.server做為routing_key進行queue和exchange的綁定

三、

1.由target.topic+"fanout"命名的exchange(type為fanout模式)

2.以target.topic+“fanout_”+唯一uuid做為queue_name

2.将queue和exchange的綁定

client調用

1.非阻塞調用:client.cast

2.阻塞調用:client.call

通過prepare來改變client的target進行fanout或者指定server的調用

notification

notification,顧名思義,消息/通知,其概念被分為

通知方:notifier(官方也叫driver), 監聽方:notification_listener

使用時,監聽方listener.start, 調用方notifier.notfiy(具體暴漏給使用者使用時是sample,audit,info等不同level的方法) 即可把消息發給監聽方進行處理。

發送方

以 ceilometer 為例,首先需要執行個體化一個 Notifier 對象

self.notifier = oslo_messaging.Notifier(
            messaging.get_transport(),
            driver=cfg.CONF.publisher_notifier.telemetry_driver,
            publisher_id="ceilometer.polling")
           

在處理完資料之後需要将資料發送出去(具體中間處理資料的部分就不講了,見我的另一篇 polling 源碼分析)

class Notifier(object):
    def sample(self, ctxt, event_type, payload):
        self._notify(ctxt, event_type, payload, 'SAMPLE')
           
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
                retry=None):
        payload = self._serializer.serialize_entity(ctxt, payload)
        ctxt = self._serializer.serialize_context(ctxt)

        msg = dict(message_id=six.text_type(uuid.uuid4()),
                   publisher_id=publisher_id or self.publisher_id,
                   event_type=event_type,
                   priority=priority,
                   payload=payload,
                   timestamp=six.text_type(timeutils.utcnow()))

        def do_notify(ext):
            try:
                ext.obj.notify(ctxt, msg, priority, retry or self.retry)
            except Exception as e:
                _LOG.exception("Problem '%(e)s' attempting to send to "
                               "notification system. Payload=%(payload)s",
                               {'e': e, 'payload': payload})

        if self._driver_mgr.extensions:
            self._driver_mgr.map(do_notify)
           
class ExtensionManager(object):
    def map(self, func, *args, **kwds):
        if not self.extensions:
            # FIXME: Use a more specific exception class here.
            raise NoMatches('No %s extensions found' % self.namespace)
        response = []
        for e in self.extensions:
            self._invoke_one_plugin(response.append, func, e, args, kwds)
        return response
           
分析def map:
(Pdb) p func
<function do_notify at 0x7f2c3c1df758>
(Pdb) p args
()
(Pdb) p kwds
{}

2)
(Pdb) p self.extensions
[<stevedore.extension.Extension object at 0x7f2c48771450>]
(Pdb) p e
<stevedore.extension.Extension object at 0x7f2c48771450>
(Pdb) p e.__dict__
{'obj': <oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f2c48771210>, 'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'), 'name': 'messagingv2', 'plugin': <class 'oslo_messaging.notify.messaging.MessagingV2Driver'>}
           
class ExtensionManager(object):
		def _invoke_one_plugin(self, response_callback, func, e, args, kwds):
        try:
            response_callback(func(e, *args, **kwds))
        except Exception as err:
            if self.propagate_map_exceptions:
                raise
            else:
                LOG.error('error calling %r: %s', e.name, err)
                LOG.exception(err)
           

可以看到 response_callback 傳過來是一個 append 方法,這裡的 func 就是 do_notify,發生調用,即

def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
                retry=None):
        payload = self._serializer.serialize_entity(ctxt, payload)
        ctxt = self._serializer.serialize_context(ctxt)

        msg = dict(message_id=six.text_type(uuid.uuid4()),
                   publisher_id=publisher_id or self.publisher_id,
                   event_type=event_type,
                   priority=priority,
                   payload=payload,
                   timestamp=six.text_type(timeutils.utcnow()))

        def do_notify(ext):
            try:
                ext.obj.notify(ctxt, msg, priority, retry or self.retry)
            except Exception as e:
                _LOG.exception("Problem '%(e)s' attempting to send to "
                               "notification system. Payload=%(payload)s",
                               {'e': e, 'payload': payload})

        if self._driver_mgr.extensions:
            self._driver_mgr.map(do_notify)
           

即調用 13 行方法

(Pdb) p ext
<stevedore.extension.Extension object at 0x7f2c48771450>
(Pdb) p ext.__dict__
{'obj': <oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f2c48771210>, 'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'), 'name': 'messagingv2', 'plugin': <class 'oslo_messaging.notify.messaging.MessagingV2Driver'>}

(Pdb) p ext.obj
<oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f2c48771210>
(Pdb) p ext.obj.__dict__
{'topics': ['notifications'], 'version': 2.0, 'transport': <oslo_messaging.transport.Transport object at 0x7f2c485df890>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x16ba490>}
           
class MessagingDriver(notifier.Driver):
    def __init__(self, conf, topics, transport, version=1.0):
        super(MessagingDriver, self).__init__(conf, topics, transport)
        self.version = version

    def notify(self, ctxt, message, priority, retry):
        priority = priority.lower()
        for topic in self.topics:
            target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))
            try:
                self.transport._send_notification(target, ctxt, message,
                                                  version=self.version,
                                                  retry=retry)
            except Exception:
                LOG.exception("Could not send notification to %(topic)s. "
                              "Payload=%(message)s",
                              {'topic': topic, 'message': message})


class MessagingV2Driver(MessagingDriver):

    "Send notifications using the 2.0 message format."

    def __init__(self, conf, **kwargs):
        super(MessagingV2Driver, self).__init__(conf, version=2.0, **kwargs)
           

實際調用到第 6 行,最關鍵的地方:

43 -> priority = priority.lower()

44 for topic in self.topics:

45 target = oslo_messaging.Target(topic=’%s.%s’ % (topic, priority))

可以看到oslo_messaging封裝得到的真正隊列名稱是:

. 這種形式,

樣例:

notifications.sample

是以,一旦調用oslo_messaging.notifier.sample來發送消息,此時的priority就被設定為

sample了,并且該優先級被用于最終拼接生成oslo_messaging的Target,最後生成了對應的

隊列notifications.sample

class Transport(object):
    def _send_notification(self, target, ctxt, message, version, retry=None):
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                           target)
        self._driver.send_notification(target, ctxt, message, version,
                                       retry=retry)
           
(Pdb) p self._driver
<oslo_messaging._drivers.impl_rabbit.RabbitDriver object at 0x7f2c4876de90>
(Pdb) p self._driver.__dict__
{'_waiter': None, '_allowed_remote_exmods': [], '_reply_q_lock': <thread.lock object at 0x7f2c482efc30>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x16ba490>, '_default_exchange': 'ceilometer', '_connection_pool': <oslo_messaging._drivers.pool.ConnectionPool object at 0x7f2c4876df90>, '_reply_q': None, 'missing_destination_retry_timeout': 60, 'prefetch_size': 0, '_reply_q_conn': None, '_url': <TransportURL transport='rabbit', hosts=[<TransportHost hostname='rabbitmq.openstack.svc.cluster.local', port=5672, username='rabbitmq', password='vut8mvvS'>]>}
           
class AMQPDriverBase(base.BaseDriver):
    def send_notification(self, target, ctxt, message, version, retry=None):
        return self._send(target, ctxt, message,
                          envelope=(version == 2.0), notify=True, retry=retry)
           
def _send(self, target, ctxt, message,
              wait_for_reply=None, timeout=None, call_monitor_timeout=None,
              envelope=True, notify=False, retry=None, transport_options=None):

        msg = message

        if wait_for_reply:
            msg_id = uuid.uuid4().hex
            msg.update({'_msg_id': msg_id})
            msg.update({'_reply_q': self._get_reply_q()})
            msg.update({'_timeout': call_monitor_timeout})

        rpc_amqp._add_unique_id(msg)
        unique_id = msg[rpc_amqp.UNIQUE_ID]

        rpc_amqp.pack_context(msg, ctxt)

        if envelope:
            msg = rpc_common.serialize_msg(msg)

        if wait_for_reply:
            self._waiter.listen(msg_id)
            log_msg = "CALL msg_id: %s " % msg_id
        else:
            log_msg = "CAST unique_id: %s " % unique_id

        try:
            with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
                if notify:
                    exchange = self._get_exchange(target)
                    LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
                              " topic '%(topic)s'", {'exchange': exchange,
                                                     'topic': target.topic})
                    conn.notify_send(exchange, target.topic, msg, retry=retry)
                elif target.fanout:
                    log_msg += "FANOUT topic '%(topic)s'" % {
                        'topic': target.topic}
                    LOG.debug(log_msg)
                    conn.fanout_send(target.topic, msg, retry=retry)
                else:
                    topic = target.topic
                    exchange = self._get_exchange(target)
                    if target.server:
                        topic = '%s.%s' % (target.topic, target.server)
                    LOG.debug(log_msg + "exchange '%(exchange)s'"
                              " topic '%(topic)s'", {'exchange': exchange,
                                                     'topic': topic})
                    conn.topic_send(exchange_name=exchange, topic=topic,
                                    msg=msg, timeout=timeout, retry=retry,
                                    transport_options=transport_options)

            if wait_for_reply:
                result = self._waiter.wait(msg_id, timeout,
                                           call_monitor_timeout)
                if isinstance(result, Exception):
                    raise result
                return result
        finally:
            if wait_for_reply:
                self._waiter.unlisten(msg_id)
           

到了這裡就和 rpc client 那塊差不多了,差別就是這裡發送的時候,傳了 notify=True,後面就不啰嗦了,和前面差不多

接收/監聽方

還是以 ceilometer 為例

urls = cfg.CONF.notification.messaging_urls or [None]
        for url in urls:
            transport = messaging.get_transport(url)
            # NOTE(gordc): ignore batching as we want pull
            # to maintain sequencing as much as possible.
            listener = messaging.get_batch_notification_listener(
                transport, targets, endpoints)
            listener.start()
            self.listeners.append(listener)
           
(Pdb) endpoints
        [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>,
         <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fa1741f6810>,
          <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fa17424d410>,
           <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fa17424d8d0>,
            <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fa1742461d0>,
             <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fa17424dc90>,
              <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fa17467ddd0>]
        (Pdb) cfg.CONF.notification.messaging_urls
        ['rabbit://rabbitmq:[email protected]:5672/']
           
def get_batch_notification_listener(transport, targets, endpoints,
                                    allow_requeue=False,
                                    batch_size=1, batch_timeout=None):
    return oslo_messaging.get_batch_notification_listener(
        transport, targets, endpoints, executor='threading',
        allow_requeue=allow_requeue,
        batch_size=batch_size, batch_timeout=batch_timeout)
           
def get_batch_notification_listener(transport, targets, endpoints,
                                    executor='blocking', serializer=None,
                                    allow_requeue=False, pool=None,
                                    batch_size=None, batch_timeout=None):
    dispatcher = notify_dispatcher.BatchNotificationDispatcher(
        endpoints, serializer)
    return BatchNotificationServer(
        transport, targets, dispatcher, executor, allow_requeue, pool,
        batch_size, batch_timeout
    )
           

其中 BatchNotificationDispatcher 實作了 dispatch 方法,BatchNotificationServer 中實作了 _process_incoming方法

BatchNotificationDispatcher 繼承自 NotificationDispatcher 繼承自 DispatcherBase

BatchNotificationServer 繼承自 NotificationServerBase 繼承自 MessageHandlingServer 繼承自 ServiceBase,_OrderedTaskRunner

因為 listen 執行個體化的是 BatchNotificationServer 對象,是以在調用 listener.start() 的時候,實際調用的是 MessageHandlingServer 的 start 方法

@six.add_metaclass(abc.ABCMeta)
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
		@ordered(reset_after='stop')
    def start(self, override_pool_size=None):
        if self._started:
            LOG.warning('The server has already been started. Ignoring '
                        'the redundant call to start().')
            return

        self._started = True

        executor_opts = {}

        if self.executor_type in ("threading", "eventlet"):
            executor_opts["max_workers"] = (
                override_pool_size or self.conf.executor_thread_pool_size
            )
        self._work_executor = self._executor_cls(**executor_opts)

        try:
            self.listener = self._create_listener()
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self.listener.start(self._on_incoming)
           

重點在 21 和 25 行,第 21 行調到了

class NotificationServerBase(msg_server.MessageHandlingServer):
    def __init__(self, transport, targets, dispatcher, executor='blocking',
                 allow_requeue=True, pool=None, batch_size=1,
                 batch_timeout=None):
        super(NotificationServerBase, self).__init__(transport, dispatcher,
                                                     executor)
        self._allow_requeue = allow_requeue
        self._pool = pool
        self.targets = targets
        self._targets_priorities = set(
            itertools.product(self.targets,
                              self.dispatcher.supported_priorities)
        )

        self._batch_size = batch_size
        self._batch_timeout = batch_timeout

    def _create_listener(self):
        return self.transport._listen_for_notifications(
            self._targets_priorities, self._pool, self._batch_size,
            self._batch_timeout
        )
           
def _listen_for_notifications(self, targets_and_priorities, pool,
                                  batch_size, batch_timeout):
        for target, priority in targets_and_priorities:
            if not target.topic:
                raise exceptions.InvalidTarget('A target must have '
                                               'topic specified',
                                               target)
        return self._driver.listen_for_notifications(
            targets_and_priorities, pool, batch_size, batch_timeout
        )
           
class AMQPDriverBase(base.BaseDriver):
		def listen_for_notifications(self, targets_and_priorities, pool,
                                 batch_size, batch_timeout):
        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

        listener = NotificationAMQPListener(self, conn)
        for target, priority in targets_and_priorities:
            conn.declare_topic_consumer(
                exchange_name=self._get_exchange(target),
                topic='%s.%s' % (target.topic, priority),
                callback=listener, queue_name=pool)
        return base.PollStyleListenerAdapter(listener, batch_size,
                                             batch_timeout)
           

在listen_for_notification中建立連接配接後,建立topic.priority的queue。PollStyleListenerAdapter啟動一個線程對擷取到的資料進行處理,此處傳回此類,是以_create_listener 就是傳回了一個PollStyleListenerAdapter的執行個體

然後調用 self.listener.start(self._on_incoming)

PollStyleListenerAdapter 繼承自 Listener

class PollStyleListenerAdapter(Listener):
    """A Listener that uses a PollStyleListener for message transfer. A
    dedicated thread is created to do message polling.
    """

    def __init__(self, poll_style_listener, batch_size, batch_timeout):
        super(PollStyleListenerAdapter, self).__init__(
            batch_size, batch_timeout, poll_style_listener.prefetch_size
        )
        self._poll_style_listener = poll_style_listener
        self._listen_thread = threading.Thread(target=self._runner)
        self._listen_thread.daemon = True
        self._started = False

    def start(self, on_incoming_callback):
        super(PollStyleListenerAdapter, self).start(on_incoming_callback)
        self._started = True
        self._listen_thread.start()

    @excutils.forever_retry_uncaught_exceptions
    def _runner(self):
        while self._started:
            incoming = self._poll_style_listener.poll(
                batch_size=self.batch_size, batch_timeout=self.batch_timeout)

            if incoming:
                self.on_incoming_callback(incoming)

        # listener is stopped but we need to process all already consumed
        # messages
        while True:
            incoming = self._poll_style_listener.poll(
                batch_size=self.batch_size, batch_timeout=self.batch_timeout)

            if not incoming:
                return
            self.on_incoming_callback(incoming)

    def stop(self):
        self._started = False
        self._poll_style_listener.stop()
        self._listen_thread.join()
        super(PollStyleListenerAdapter, self).stop()

    def cleanup(self):
        self._poll_style_listener.cleanup()
           

NotificationAMQPListener 繼承自 AMQPListener 繼承自 PollStyleListener

可以看到這裡的 start 方法就是調用了 _runner方法,這裡的 self.poll_style_listener 就是 NotificationAMQPListener 執行個體,通過 poll 不斷從隊列中取出資料

即調用 AMQPListener.poll

class NotificationAMQPListener(AMQPListener):
    message_cls = NotificationAMQPIncomingMessage
    
class AMQPListener(base.PollStyleListener):

    def __init__(self, driver, conn):
        super(AMQPListener, self).__init__(driver.prefetch_size)
        self.driver = driver
        self.conn = conn
        self.msg_id_cache = rpc_amqp._MsgIdCache()
        self.incoming = []
        self._shutdown = threading.Event()
        self._shutoff = threading.Event()
        self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
        self._message_operations_handler = MessageOperationsHandler(
            "AMQPListener")
        self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN

    def __call__(self, message):
        ctxt = rpc_amqp.unpack_context(message)
        unique_id = self.msg_id_cache.check_duplicate_message(message)
        if ctxt.msg_id:
            LOG.debug("received message msg_id: %(msg_id)s reply to "
                      "%(queue)s", {'queue': ctxt.reply_q,
                                    'msg_id': ctxt.msg_id})
        else:
            LOG.debug("received message with unique_id: %s", unique_id)

        self.incoming.append(self.message_cls(
            self,
            ctxt.to_dict(),
            message,
            unique_id,
            ctxt.msg_id,
            ctxt.reply_q,
            ctxt.client_timeout,
            self._obsolete_reply_queues,
            self._message_operations_handler))

    @base.batch_poll_helper
    def poll(self, timeout=None):
        stopwatch = timeutils.StopWatch(duration=timeout).start()

        while not self._shutdown.is_set():
            self._message_operations_handler.process()

            if self.incoming:
                return self.incoming.pop(0)

            left = stopwatch.leftover(return_none=True)
            if left is None:
                left = self._current_timeout
            if left <= 0:
                return None

            try:
                self.conn.consume(timeout=min(self._current_timeout, left))
            except rpc_common.Timeout:
                self._current_timeout = max(self._current_timeout * 2,
                                            ACK_REQUEUE_EVERY_SECONDS_MAX)
            else:
                self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN

        # NOTE(sileht): listener is stopped, just processes remaining messages
        # and operations
        self._message_operations_handler.process()
        if self.incoming:
            return self.incoming.pop(0)

        self._shutoff.set()
           

在 call 方法又調用了 self.message_cls ,即 NotificationAMQPIncomingMessage

class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
    def acknowledge(self):
        def _do_ack():
            try:
                self.message.acknowledge()
            except Exception as exc:
                # NOTE(kgiusti): this failure is likely due to a loss of the
                # connection to the broker.  Not much we can do in this case,
                # especially considering the Notification has already been
                # dispatched. This *could* result in message duplication
                # (unacked msg is returned to the queue by the broker), but the
                # driver tries to catch that using the msg_id_cache.
                LOG.warning("Failed to acknowledge received message: %s", exc)
        self._message_operations_handler.do(_do_ack)
        self.listener.msg_id_cache.add(self.unique_id)

    def requeue(self):
        # NOTE(sileht): In case of the connection is lost between receiving the
        # message and requeing it, this requeue call fail
        # but because the message is not acknowledged and not added to the
        # msg_id_cache, the message will be reconsumed, the only difference is
        # the message stay at the beginning of the queue instead of moving to
        # the end.
        def _do_requeue():
            try:
                self.message.requeue()
            except Exception as exc:
                LOG.warning("Failed to requeue received message: %s", exc)
        self._message_operations_handler.do(_do_requeue)
           

NotificationAMQPIncomingMessage 繼承自 AMQPIncomingMessage 繼承自 RpcIncomingMessage 繼承自 IncomingMessage

(self.conn.consume 中有回調函數調用,call 方法,往 incoming 中塞值,poll 再從裡面取值處理)括号中的部分有點疑問,理得不是很清楚,如果有清楚的大佬,歡迎指教。

NotificationAMQPIncomingMessage 是一個消息對象,包含了acknowledge和requeue方法,可以用于消息确認或再次入隊。

取出之後用self.on_incoming_callback(incoming)處理,即MessageHandlingServer中的self.__on_incoming處理

@six.add_metaclass(abc.ABCMeta)
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
		def _on_incoming(self, incoming):
        """Handles on_incoming event

        :param incoming: incoming request.
        """
        self._work_executor.submit(self._process_incoming, incoming)
           

我們看到這邊實際使用的是self._process_incoming來處理,即采用BatchNotificationServer類中的self.__process_incoming處理

class BatchNotificationServer(NotificationServerBase):

    def _process_incoming(self, incoming):
        try:
            not_processed_messages = self.dispatcher.dispatch(incoming)
        except Exception:
						......
           

可以看到該處理函數會調用dispatcher對象來分派消息,這裡的self.dispatcher,就是之前使用NotificationDispatcher初始化後傳過來的參數,後面就是一些資料處理的過程,詳細請見我的另一篇 nogtification 源碼分析

參考:

https://www.cnblogs.com/gange111/p/9560446.html

https://blog.csdn.net/MrYuanRs/article/details/105955720?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param

https://blog.csdn.net/qingyuanluofeng/article/details/102511492?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.channel_param