1 Nova Rpc調用
Openstack中使用RabbitMQ作為消息隊列。本文從Kombu入手,講述openstack中RPC遠端過程調用的機制。
1.1 Kombu介紹
Kombu是什麼?Openstack使用的消息隊列是RabbitMQ,它是基于AMQP協定實作的。而Kombu是AMQP消息架構。Kombu對RabbitMQ提供的API進行了封裝,使得程式更加面向對象化,比如封裝出了Exchange, Queue等這些類,使得對RabbitMQ的操作更加簡單,同時,功能更加強悍。
下面來看一下AMQP協定,如圖1所示。Producer産生消息,将消息賦予路由資訊,發送給exchange。exchange 接收消息,并路由到相應隊列(發送過來的Message帶有路由資訊,exchange提取路由資訊和queues與exchange綁定的路由資訊比對, 比對成功後,将消息發給相應queues)。queue存儲消息,并将消息發送Consumer。Consumer 從queues中提取資訊,并消耗。
圖 1 AMQP協定
下面分别看一下producer和consumer是怎麼實作的。
1.1.1 producer
下圖展示了producer的具體流程:
首先,建立一個連結,連接配接rabbitmq(5行)。并擷取channel(6行)。
第二,建立exchange,名為“media”,類型為“direct”(8行)。
第三,建立producer,指定exchange和routing_key,這樣發送消息的時候就發送到指定exchange上去(9行)。
第四,發送消息(11行)。
圖 2 producer
1.1.2 consumer
圖3展示了consumer的流程:
首先,建立一個連結,連接配接rabbitmq,并擷取channel(9,10行)。
第二,建立exchange,名為’media’,類型為’direct’(12行)。
第三,建立隊列,并與exchange綁定(13行)。
第四,建立consumer,設定消息隊列和回調函數(16行)。
第五,consume像server注冊,表明現在可以接受消息啦(17行)。
第六,drain_events阻塞程序,等待消息,當消息到來時,執行回調函數process_media(20)。
圖 3 consumer
1.2 nova rpc調用
基于1.1節的介紹,我們接下來看一下nova的rpc調用過程。
1.2.1 producer
Nova建立虛機的過程詳見文檔“nova建立虛機流程”,此文檔忽略了虛拟建立過程中的消息傳遞機制RPC,以及排程政策Nova scheduler。“Nova Scheduler”文檔1.6.4小節,在排程完成選擇好主控端之後,就調用compute 的rpcapi,向主控端發送建立虛機的rpc請求。具體過程如下:
1.2.1.1 ComputeAPI
ComputeAPI利用基類nova.openstack.common.rpc.proxy.RpcProxy的cast方法啟動虛機。/opt/stack/nova/nova/compute/rpcapi.py
1.2.1.2 RpcProxy
RpcProxy的cast方法,調用nova.openstack.common.rpc中的cast方法。
/opt/stack/nova/nova/openstack/common/rpc/proxy.py
1.2.1.3 rpc
rpc是個子產品,其中的__init__.py檔案負責子產品的基本動作。其中cast方法調用了_get_impl()傳回的具體實作架構(本例為impl_kombu)對應的cast方法。
/opt/stack/nova/nova/openstack/common/rpc/__init__.py
1.2.1.4 impl_kombu
在impl_kombu的cast方法中,rpc_amqp.get_connection_pool(conf, Connection)的功能是從類“Connection”的對象池中選擇一個Connection,若不存在則建立一個連接配接rabbitmq的連接配接池。rpc_amqp.cast方法執行對應connection中的topic_send方法,發送消息給rabbitmq。
/opt/stack/nova/nova/openstack/common/rpc/impl_kombu.py
/opt/stack/nova/nova/openstack/common/rpc/amqp.py
類“Connection”的初始化函數的主要功能是設定rabbitmq的連接配接參數并連接配接。
/opt/stack/nova/nova/openstack/common/rpc/impl_kombu.py
1.2.2 consumer
1.2.2.1 nova-compute啟動
nova compute在啟動的時候執行了nova-compute腳本,該腳本:
1. 調用nova/service.py檔案中類“Service”的create方法,該方法利用了@classmethod修飾符,傳回一個Service的執行個體,即server。
2. service.serve(server)調用service的start函數,啟動server,對外提供服務。
/opt/stack/nova/bin/nova-compute
1.2.2.2 service
service的代碼路徑為:/opt/stack/nova/nova/service.py。類“Service”各函數說明:
1.2.2.2.1 函數__init__
__init__方法對Service對象進行初始化。包括host,binary,topic,manager等。
1.2.2.2.2 函數create
create方法利用了@classmethod修飾符,傳回一個Service執行個體。其中各參數:
² topic=’compute’,
² manager=’nova.compute.manager.ComputeManager’
該參數是nova/flags.py中定義的compute_manager
1.2.2.2.3 函數start
start方法啟動server。主要步驟如下:
² rpc.create_connection(),建立rabbitmq的連接配接conn,詳見1.2.2.3.1小節
² self.manager.create_rpc_dispatcher(),調用nova.compute.manager.ComputeManager中的create_rpc_dispatcher方法,建立一個RpcDispatcher的執行個體 rpc_dispatcher,詳見1.2.2.4小節
² 調用建立好的連接配接conn的create_consumer函數,topic=‘compute’,proxy=rpc_dispatcher(詳見1.2.2.3.3小節)。
1.2.2.3 Connection
具體代碼路徑為:/opt/stack/nova/nova/openstack/common/rpc/impl_kombu.py
1.2.2.3.1 函數create connection
該函數建立一個rabbitmq的連接配接。
1.2.2.3.2 函數__init__
類“Connection”的__init__函數的主要功能是設定rabbitmq的連接配接參數并連接配接。
1.2.2.3.3 函數create_consumer
類“Connection”中create_consumer這個方法,參數”proxy”是1.2.2.2.3小節生成的RPCDispatcher執行個體rpc_dispatcher。 利用該參數生成ProxyCallback執行個體proxy_cb(詳見1.2.2.5節),作為consumer的callback函數。
/opt/stack/nova/nova/openstack/common/rpc/impl_kombu.py
1.2.2.3.4 函數declare_topic_consumer
該函數建立topic consumer(TopicConsumer)。以下幾張圖為更深層次的調用,在此就不一一贅述。需要說明的一點就是,在類ConsumerBase的consume方法,回調函數_calback實際調用的函數即為ProxyCallback執行個體proxy_cb。
1.2.2.4 RpcDispatcher
novacompute的manager是nova.compute.manager.ComputeManager,該類沒有定義create_rpc_dispatcher方法。但是它的基類nova/manager.py中的類”Manager”定義了該方法。它實際上是傳回rpc/dispatcher.py的類”RpcDispatcher”執行個體,傳入的參數為[self]是由nova.compute.manager.ComputeManager組成的list。該參數作為RPCDispatcher的callbacks。
/opt/stack/nova/nova/manager.py
/opt/stack/nova/nova/openstack/common/rpc/dispatcher.py
dispatch真正調用nova.compute.manager.ComputeManager的對應method方法。
1.2.2.5 ProxyCallback
consumer真正的callback函數,被調用時執行__call__().該函數最後調用_process_data執行1.2.2.4中proxy的dispatch方法。
/opt/stack/nova/nova/openstack/common/rpc/amqp.py