天天看點

RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

RabbitMQ概述

RabbitMQ可以做什麼?

RabbitMQ是實作AMQP(進階消息隊列協定)的消息中間件的一種,可用于在分布式系統中存儲轉發消息,主要有以下的技術亮點:

  • 可靠性
  • 靈活的路由
  • 叢集部署
  • 高可用的隊列消息
  • 可視化的管理工具

RabbitMQ主要用于系統間的雙向解耦,當生産者(productor)産生大量的資料時,消費者(consumer)無法快速的消費資訊,那麼就需要一個類似于中間件的代理伺服器,用來處理和儲存這些資料,RabbitMQ就扮演了這個角色。

如何使用RabbitMQ

  • Erlang語言包
  • RabbitMQ安裝包

基本概念

1.Broker

用來處理資料的消息隊列伺服器實體

2.虛拟主機(vhost)

由RabbitMQ伺服器建立的虛拟消息主機,擁有自己的權限機制,一個broker裡可以開設多個vhost,用于不同使用者的權限隔離,vhost之間是也完全隔離的。

3.生産者(productor)

産生用于消息通信的資料

4.信道(channel)

消息通道,在AMQP中可以建立多個channel,每個channel代表一個會話任務。

5.交換機(exchange)

(1)接受消息,轉發消息到綁定的隊列,總共有四種類型的交換器:direct,fanout,topic,headers。

  • direct:轉發消息到routing-key指定的隊列
RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用
  • fanout:轉發消息到所有綁定的隊列,類似于一種廣播發送的方式。
  • topic:按照規則轉發消息,這種規則多為模式比對,也顯得更加靈活
RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

(2).交換器在RabbitMQ中是一個存在的實體,不能改變,如有需要隻能删除重建。

(3).topic類型的交換器利用比對規則分析消息的routing-key屬性。

(4).屬性

  • 持久性:聲明時durable屬性為true
  • 自動删除:綁定的queue删除也跟着删除
  • 惰性:不會自動建立

6.隊列(queue)

(1).隊列是RabbitMQ的内部對象,存儲消息

(2).可以動态的增加消費者,隊列将接受到的消息以輪詢(round-robin)的方式均勻的配置設定給多個消費者

(3).隊列的屬性

  • 持久性:如果啟用,隊列将會在server重新開機之前有效
  • 自動删除:消費者停止使用之後就會自動删除
  • 惰性:不會自動建立
  • 排他性:如果啟用,隊列隻能被聲明它的消費者使用。
RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

7.兩個key

  • routing-key:消息不能直接發到queues,需要先發送到exchanges,routing-key指定queues名稱,exchanges通過routing-key來識别與之綁定的queues
channel.queue_publish(exchange=exchange_name,
                     routing-key="rabbitmq",
                     body="openstack")           

複制

  • binding-key:主要是用來表示exchanges和queues之間的關系,為了差別queue_publish的routing-key,就稱作binding-key。
channel.queue_bind(exchange=exchange_name,
                  queue=queue_name,
                  routing-key="rabbitmq")
           

複制

8.綁定(binding)

表示交換機和隊列之間的關系,在進行綁定時,帶有一個額外的參數binding-key,來和routing-key相比對。

9.消費者(consumer)

監聽消息隊列來進行消息資料的讀取

10.高可用性(HA)

(1).在consumer處理完消息後,會發送消息ACK,通知通知RabbitMQ消息已被處理,可以從記憶體删除。如果消費者因當機或連結失敗等原因沒有發送ACK,則RabbitMQ會将消息重新發送給其他監聽在隊列的下一個消費者。

channel.basicConsume(queuename, noAck=false, consumer);           

複制

(2).消息和隊列的持久化

(3).鏡像隊列,實作不同節點之間的中繼資料和消息同步

RabbitMQ在OpenStack中的應用

RPC之neutron專題

基于RabbitMQ的RPC消息通信是neutron中跨子產品進行方法調用的很重要的一種方式,根據上面的描述,要組成一個完整的RPC通信結構,需要資訊的生産者和消費者。

  • client端:用于産生rpc消息。
  • server端:用于監聽消息資料并進行相應的處理。

1.neutron-agent中的RPC

在dhcp_agent、l3_agent、metadata_agent,metering_agent的main函數中都存在一段建立一個rpc服務端的代碼,下面以dhcp_agent為例。

def main():
    register_options(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-dhcp-agent',
        topic=topics.DHCP_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
    service.launch(cfg.CONF, server).wait()
           

複制

最核心的,也是跟rpc相關的部分包括兩部分,首先是建立rpc服務端。

server = neutron_service.Service.create(
    binary='neutron-dhcp-agent',
    topic=topics.DHCP_AGENT,
    report_interval=cfg.CONF.AGENT.report_interval,
    manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
           

複制

該代碼實際上建立了一個rpc服務端,監聽指定的topic并運作manager上的tasks。

create()方法傳回一個neutron.service.Service對象,neutron.service.Service繼承自neutron.common.rpc.Service類。

首先看neutron.common.rpc.Service類,該類定義了start方法,該方法主要完成兩件事情:一件事情是将manager添加到endpoints中;一件是建立rpc的consumer,分别監聽topic的隊列消息。

而在neutron.service.Service類中,初始化中生成了一個manager執行個體(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport);并為start方法添加了周期性執行report_state方法和periodic_tasks方法。report_state方法沒有具體實作,periodic_tasks方法則調用manager的periodic_tasks方法。

manager執行個體(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的時候首先建立一個rpc的client端,通過代碼

self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)           

複制

該client端實際上定義了report_state方法,可以狀态以rpc消息的方式發送給plugin。

manager在初始化後,還會指定周期性運作_report_state方法,實際上就是調用client端的report_state方法。

至此,對rpc服務端的建立算是完成了,之後執行代碼。

service.launch(server).wait()
           

複制

service.launch(server)方法首先會将server放到協程組中,并調用server的start方法來啟動server。

RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

2.neutron-plugin中的RPC

主要對ML2Plugin進行分析,包括兩個類:RpcCallbacks和AgentNotifierApi。

  • RpcCallbacks:負責當agent往plugin發出rpc請求時候,plugin實作請求的相關動作,除了繼承自父類(dhcp rpc、dvr rpc、sg_db rpc和tunnel rpc)中的方法,還包括get_port_from_device、get_device_details、get_devices_details_list、update_device_down、update_device_up、get_dvr_mac_address_by_host、get_compute_ports_on_host_by_subnet、get_subnet_for_dvr等方法。
  • AgentNotifierApi:負責當plugin往agent發出rpc請求(plugin通知agent)的時候,plugin端的方法。
def start_rpc_listeners(self):
    """RpcCallbacks中實作的方法:Start the RPC loop to let the plugin communicate with agents."""
    self._setup_rpc()
    self.topic = topics.PLUGIN
    self.conn = n_rpc.create_connection(new=True)
    self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
    return self.conn.consume_in_threads()
           

複制

建立一個通知rpc的用戶端,用于向OVS的agent發出通知。所有plugin都需要有這樣一個發出通知消息的用戶端,建立了一個OVS agent的通知rpc用戶端。之後,建立兩個跟service agent相關的consumer,分别監聽topics.PLUGIN

RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

ovs_neutron_agent也會建立RPC的consumer,用來監聽topics.UPDATE、topics.DELETE等操作。

def setup_rpc(self):
        self.agent_id = 'ovs-agent-%s' % self.conf.host
        self.topic = topics.AGENT
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
        self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.endpoints = [self]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.PORT, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE],
                     [constants.TUNNEL, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE],
                     [topics.DVR, topics.UPDATE],
                     [topics.NETWORK, topics.UPDATE]]
           

複制

3.neutron-server中的RPC

這個rpc服務端主要通過neutron.server中主函數中代碼執行

neutron_rpc = service.serve_rpc()
           

複制

方法的實作代碼(目錄:neutron/neutron/service.py)如下

def serve_rpc():
    plugin = manager.NeutronManager.get_plugin()
    service_plugins = (
        manager.NeutronManager.get_service_plugins().values())
    if cfg.CONF.rpc_workers < 1:
        cfg.CONF.set_override('rpc_workers', 1)
    if not plugin.rpc_workers_supported():
        LOG.debug("Active plugin doesn't implement start_rpc_listeners")
        if 0 < cfg.CONF.rpc_workers:
            LOG.error(_LE("'rpc_workers = %d' ignored because "
                          "start_rpc_listeners is not implemented."),
                      cfg.CONF.rpc_workers)
        raise NotImplementedError()
    try:
        rpc = RpcWorker(service_plugins)
        LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
        session.dispose()
        launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
        launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
        if (cfg.CONF.rpc_state_report_workers > 0 and
            plugin.rpc_state_report_workers_supported()):
            rpc_state_rep = RpcReportsWorker([plugin])
            LOG.debug('using launcher for state reports rpc, workers=%s',
                      cfg.CONF.rpc_state_report_workers)
            launcher.launch_service(
                rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
        return launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log for '
                              'details.'))
           

複制

其中,RpcWorker(plugin)主要通過調用plugin的方法來建立rpc服務端,最重要的工作是調用plugin的start_rpc_listeners來監聽消息隊列:

self._servers = self._plugin.start_rpc_listeners()
           

複制

該方法在大多數plugin中并未被實作,目前ml2支援該方法。

在neutron.plugin.ml2.plugin.ML2Plugin類中,該方法建立了一個topic為topics.PLUGIN的消費rpc。

def start_rpc_listeners(self):
        self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
                          agents_db.AgentExtRpcCallback()]
        self.topic = topics.PLUGIN
        self.conn = n_rpc.create_connection(new=True)
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        return self.conn.consume_in_threads()
           

複制

RPC之nova專題

在Openstack中,每一個Nova服務初始化時會建立兩個隊列,一個名為“NODE-TYPE.NODE-ID”,另一個名為“NODE-TYPE”,NODE-TYPE是指服務的類型,NODE-ID指節點名稱。

RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

1.nova中實作exchange的種類

  • direct:初始化中,各個子產品對每一條系統消息自動生成多個隊列放入RabbitMQ伺服器中,隊列中綁定的binding-key要與routing-key比對
  • topic:各個子產品也會自動生成兩個隊列放入RabbitMQ伺服器中。

2.nova中調用RPC的方式

  • RPC.CALL:用于請求和響應方式
  • RPC.CAST:隻是提供單向請求

3.nova中子產品的邏輯功能

  • Invoker:向消息隊列中發送系統請求資訊,如Nova-API和Nova-Scheduler,通過RPC.CALL和RPC.CAST兩個程序發送系統請求消息。
  • Worker:從消息隊列中擷取Invoker子產品發送的系統請求消息以及向Invoker子產品回複系統響應消息,如Nova-Compute、Nova-Volume和Nova-Network,對RPC.CALL做出響應。

4.nova中的exchange domain

  • direct exchange domain: Topic消息生産者(Nova-API或者Nova-Scheduler)與Topic交換器生成邏輯連接配接,通過PRC.CALL或者RPC.CAST程序将系統請求消息發往Topic交換器。交換器根據不同的routing-key将系統請求消息轉發到不同的類型的消息隊列。Topic消息消費者探測到新消息已進入響應隊列,立即從隊列中接收消息并調用執行系統消息所請求的應用程式。
    • 點到點消息隊列:Topic消息消費者應用程式接收RPC.CALL的遠端調用請求,并在執行相關計算任務之後将結果以系統響應消息的方式通過Direct交換器回報給Direct消息消費者。
    • 共享消息隊列:Topic消息消費者應用程式隻是接收RPC.CAST的遠端調用請求來執行相關的計算任務,并沒有響應消息回報。
  • topic exchange domain: Direct交換域并不是獨立運作,而是受限于Topic交換域中RPC.CALL的遠端調用流程與結果,每一個RPC.CALL激活一次Direct消息交換的運作。
RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

以nova啟動虛拟機的過程為例,詳細介紹RPC通信過程。

RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

RPC.CAST缺少了系統消息響應流程。一個Topic消息生産者發送系統請求消息到Topic交換器,Topic交換器根據消息的Routing Key将消息轉發至共享消息隊列,與共享消息隊列相連的所有Topic消費者接收該系統請求消息,并把它傳遞給響應的Worker進行處理,其調用流程如圖所示:

RabbitMQ 概念和應用詳解RabbitMQ在OpenStack中的應用

source: //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction