用戶端的代碼最終調用檔案./oslo_messaging/transport.py中類Transport的_send()函數發送調用請求,代碼如下:
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout, retry=retry)
變量message中包含了調用的函數名和參數,target定義了接收消息的服務端。在建立服務端監聽的時候,會設定一個監聽函數self._on_incoming(),即服務端在接收到消息的時候,會調用該函數,該函數最終調用檔案./oslo_messaging/rpc/server.py中類RPCServer中的_process_incoming()函數,代碼如下:
def _process_incoming(self, incoming):
message = incoming[0]
try:
message.acknowledge()
except Exception:
LOG.exception(_LE("Can not acknowledge message. Skip processing"))
return
failure = None
try:
res = self.dispatcher.dispatch(message)
except rpc_dispatcher.ExpectedException as e:
failure = e.exc_info
LOG.debug(u'Expected exception during message handling (%s)', e)
except Exception:
# current sys.exc_info() content can be overridden
# by another exception raised by a log handler during
# LOG.exception(). So keep a copy and delete it later.
failure = sys.exc_info()
LOG.exception(_LE('Exception during message handling'))
try:
if failure is None:
message.reply(res)
else:
message.reply(failure=failure)
except Exception:
LOG.exception(_LE("Can not send reply for message"))
finally:
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del failure
該函數的主要功能為調用self.dispatcher.dispatch(message)函數處理消息,然後調用message.reply(res)函數傳回處理結果。self.dispatcher.dispatch(message)函數的定義在檔案./oslo_messaging/rpc/dispatcher.py中類RPCDispatcher中,代碼如下:
def dispatch(self, incoming):
"""Dispatch an RPC message to the appropriate endpoint method.
:param incoming: incoming message
:type incoming: IncomingMessage
:raises: NoSuchMethod, UnsupportedVersion
"""
message = incoming.message
ctxt = incoming.ctxt
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target
if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue
if hasattr(endpoint, method):
if self.access_policy.is_allowed(endpoint, method):
return self._do_dispatch(endpoint, method, ctxt, args)
found_compatible = True
if found_compatible:
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version, method=method)
從消息中提取出函數名、參數、名字空間等等變量,并做一系列的檢測比對工作,然後調用self._do_dispatch(endpoint, method, ctxt, args)函數,該函數的代碼如下:
def _do_dispatch(self, endpoint, method, ctxt, args):
ctxt = self.serializer.deserialize_context(ctxt)
new_args = dict()
for argname, arg in args.items():
new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
func = getattr(endpoint, method)
result = func(ctxt, **new_args)
return self.serializer.serialize_entity(ctxt, result)
該函數根據函數名執行服務端對應的函數,并将執行結果序列化後傳回。
在這裡調用的函數名為:schedule_and_build_instances,在ComputeTaskManager這個endpoint中有這個成員函數,namespace為‘compute_task’。