RabbitMQ概述
RabbitMQ可以做什麼?
RabbitMQ是實作AMQP(進階消息隊列協定)的消息中間件的一種,可用于在分布式系統中存儲轉發消息,主要有以下的技術亮點:
- 可靠性
- 靈活的路由
- 叢集部署
- 高可用的隊列消息
- 可視化的管理工具
RabbitMQ主要用于系統間的雙向解耦,當生産者(productor)産生大量的資料時,消費者(consumer)無法快速的消費資訊,那麼就需要一個類似于中間件的代理伺服器,用來處理和儲存這些資料,RabbitMQ就扮演了這個角色。
如何使用RabbitMQ
- Erlang語言包
- RabbitMQ安裝包
基本概念
1.Broker
用來處理資料的消息隊列伺服器實體
2.虛拟主機(vhost)
由RabbitMQ伺服器建立的虛拟消息主機,擁有自己的權限機制,一個broker裡可以開設多個vhost,用于不同使用者的權限隔離,vhost之間是也完全隔離的。
3.生産者(productor)
産生用于消息通信的資料
4.信道(channel)
消息通道,在AMQP中可以建立多個channel,每個channel代表一個會話任務。
5.交換機(exchange)
(1)接受消息,轉發消息到綁定的隊列,總共有四種類型的交換器:direct,fanout,topic,headers。
- direct:轉發消息到routing-key指定的隊列

- fanout:轉發消息到所有綁定的隊列,類似于一種廣播發送的方式。
- topic:按照規則轉發消息,這種規則多為模式比對,也顯得更加靈活
(2).交換器在RabbitMQ中是一個存在的實體,不能改變,如有需要隻能删除重建。
(3).topic類型的交換器利用比對規則分析消息的routing-key屬性。
(4).屬性
- 持久性:聲明時durable屬性為true
- 自動删除:綁定的queue删除也跟着删除
- 惰性:不會自動建立
6.隊列(queue)
(1).隊列是RabbitMQ的内部對象,存儲消息
(2).可以動态的增加消費者,隊列将接受到的消息以輪詢(round-robin)的方式均勻的配置設定給多個消費者
(3).隊列的屬性
- 持久性:如果啟用,隊列将會在server重新開機之前有效
- 自動删除:消費者停止使用之後就會自動删除
- 惰性:不會自動建立
- 排他性:如果啟用,隊列隻能被聲明它的消費者使用。
7.兩個key
- routing-key:消息不能直接發到queues,需要先發送到exchanges,routing-key指定queues名稱,exchanges通過routing-key來識别與之綁定的queues
channel.queue_publish(exchange=exchange_name,
routing-key="rabbitmq",
body="openstack")
複制
- binding-key:主要是用來表示exchanges和queues之間的關系,為了差別queue_publish的routing-key,就稱作binding-key。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing-key="rabbitmq")
複制
8.綁定(binding)
表示交換機和隊列之間的關系,在進行綁定時,帶有一個額外的參數binding-key,來和routing-key相比對。
9.消費者(consumer)
監聽消息隊列來進行消息資料的讀取
10.高可用性(HA)
(1).在consumer處理完消息後,會發送消息ACK,通知通知RabbitMQ消息已被處理,可以從記憶體删除。如果消費者因當機或連結失敗等原因沒有發送ACK,則RabbitMQ會将消息重新發送給其他監聽在隊列的下一個消費者。
channel.basicConsume(queuename, noAck=false, consumer);
複制
(2).消息和隊列的持久化
(3).鏡像隊列,實作不同節點之間的中繼資料和消息同步
RabbitMQ在OpenStack中的應用
RPC之neutron專題
基于RabbitMQ的RPC消息通信是neutron中跨子產品進行方法調用的很重要的一種方式,根據上面的描述,要組成一個完整的RPC通信結構,需要資訊的生産者和消費者。
- client端:用于産生rpc消息。
- server端:用于監聽消息資料并進行相應的處理。
1.neutron-agent中的RPC
在dhcp_agent、l3_agent、metadata_agent,metering_agent的main函數中都存在一段建立一個rpc服務端的代碼,下面以dhcp_agent為例。
def main():
register_options(cfg.CONF)
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
service.launch(cfg.CONF, server).wait()
複制
最核心的,也是跟rpc相關的部分包括兩部分,首先是建立rpc服務端。
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
複制
該代碼實際上建立了一個rpc服務端,監聽指定的topic并運作manager上的tasks。
create()方法傳回一個neutron.service.Service對象,neutron.service.Service繼承自neutron.common.rpc.Service類。
首先看neutron.common.rpc.Service類,該類定義了start方法,該方法主要完成兩件事情:一件事情是将manager添加到endpoints中;一件是建立rpc的consumer,分别監聽topic的隊列消息。
而在neutron.service.Service類中,初始化中生成了一個manager執行個體(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport);并為start方法添加了周期性執行report_state方法和periodic_tasks方法。report_state方法沒有具體實作,periodic_tasks方法則調用manager的periodic_tasks方法。
manager執行個體(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的時候首先建立一個rpc的client端,通過代碼
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
複制
該client端實際上定義了report_state方法,可以狀态以rpc消息的方式發送給plugin。
manager在初始化後,還會指定周期性運作_report_state方法,實際上就是調用client端的report_state方法。
至此,對rpc服務端的建立算是完成了,之後執行代碼。
service.launch(server).wait()
複制
service.launch(server)方法首先會将server放到協程組中,并調用server的start方法來啟動server。
2.neutron-plugin中的RPC
主要對ML2Plugin進行分析,包括兩個類:RpcCallbacks和AgentNotifierApi。
- RpcCallbacks:負責當agent往plugin發出rpc請求時候,plugin實作請求的相關動作,除了繼承自父類(dhcp rpc、dvr rpc、sg_db rpc和tunnel rpc)中的方法,還包括get_port_from_device、get_device_details、get_devices_details_list、update_device_down、update_device_up、get_dvr_mac_address_by_host、get_compute_ports_on_host_by_subnet、get_subnet_for_dvr等方法。
- AgentNotifierApi:負責當plugin往agent發出rpc請求(plugin通知agent)的時候,plugin端的方法。
def start_rpc_listeners(self):
"""RpcCallbacks中實作的方法:Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
複制
建立一個通知rpc的用戶端,用于向OVS的agent發出通知。所有plugin都需要有這樣一個發出通知消息的用戶端,建立了一個OVS agent的通知rpc用戶端。之後,建立兩個跟service agent相關的consumer,分别監聽topics.PLUGIN
ovs_neutron_agent也會建立RPC的consumer,用來監聽topics.UPDATE、topics.DELETE等操作。
def setup_rpc(self):
self.agent_id = 'ovs-agent-%s' % self.conf.host
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
複制
3.neutron-server中的RPC
這個rpc服務端主要通過neutron.server中主函數中代碼執行
neutron_rpc = service.serve_rpc()
複制
方法的實作代碼(目錄:neutron/neutron/service.py)如下
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
if not plugin.rpc_workers_supported():
LOG.debug("Active plugin doesn't implement start_rpc_listeners")
if 0 < cfg.CONF.rpc_workers:
LOG.error(_LE("'rpc_workers = %d' ignored because "
"start_rpc_listeners is not implemented."),
cfg.CONF.rpc_workers)
raise NotImplementedError()
try:
rpc = RpcWorker(service_plugins)
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
複制
其中,RpcWorker(plugin)主要通過調用plugin的方法來建立rpc服務端,最重要的工作是調用plugin的start_rpc_listeners來監聽消息隊列:
self._servers = self._plugin.start_rpc_listeners()
複制
該方法在大多數plugin中并未被實作,目前ml2支援該方法。
在neutron.plugin.ml2.plugin.ML2Plugin類中,該方法建立了一個topic為topics.PLUGIN的消費rpc。
def start_rpc_listeners(self):
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()
複制
RPC之nova專題
在Openstack中,每一個Nova服務初始化時會建立兩個隊列,一個名為“NODE-TYPE.NODE-ID”,另一個名為“NODE-TYPE”,NODE-TYPE是指服務的類型,NODE-ID指節點名稱。
1.nova中實作exchange的種類
- direct:初始化中,各個子產品對每一條系統消息自動生成多個隊列放入RabbitMQ伺服器中,隊列中綁定的binding-key要與routing-key比對
- topic:各個子產品也會自動生成兩個隊列放入RabbitMQ伺服器中。
2.nova中調用RPC的方式
- RPC.CALL:用于請求和響應方式
- RPC.CAST:隻是提供單向請求
3.nova中子產品的邏輯功能
- Invoker:向消息隊列中發送系統請求資訊,如Nova-API和Nova-Scheduler,通過RPC.CALL和RPC.CAST兩個程序發送系統請求消息。
- Worker:從消息隊列中擷取Invoker子產品發送的系統請求消息以及向Invoker子產品回複系統響應消息,如Nova-Compute、Nova-Volume和Nova-Network,對RPC.CALL做出響應。
4.nova中的exchange domain
- direct exchange domain: Topic消息生産者(Nova-API或者Nova-Scheduler)與Topic交換器生成邏輯連接配接,通過PRC.CALL或者RPC.CAST程序将系統請求消息發往Topic交換器。交換器根據不同的routing-key将系統請求消息轉發到不同的類型的消息隊列。Topic消息消費者探測到新消息已進入響應隊列,立即從隊列中接收消息并調用執行系統消息所請求的應用程式。
- 點到點消息隊列:Topic消息消費者應用程式接收RPC.CALL的遠端調用請求,并在執行相關計算任務之後将結果以系統響應消息的方式通過Direct交換器回報給Direct消息消費者。
- 共享消息隊列:Topic消息消費者應用程式隻是接收RPC.CAST的遠端調用請求來執行相關的計算任務,并沒有響應消息回報。
- topic exchange domain: Direct交換域并不是獨立運作,而是受限于Topic交換域中RPC.CALL的遠端調用流程與結果,每一個RPC.CALL激活一次Direct消息交換的運作。
以nova啟動虛拟機的過程為例,詳細介紹RPC通信過程。
RPC.CAST缺少了系統消息響應流程。一個Topic消息生産者發送系統請求消息到Topic交換器,Topic交換器根據消息的Routing Key将消息轉發至共享消息隊列,與共享消息隊列相連的所有Topic消費者接收該系統請求消息,并把它傳遞給響應的Worker進行處理,其調用流程如圖所示:
source: //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction