天天看點

nova源碼分析--rpc(3)

用戶端的代碼最終調用檔案./oslo_messaging/transport.py中類Transport的_send()函數發送調用請求,代碼如下:

def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
              retry=None):
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                           target)
        return self._driver.send(target, ctxt, message,
                                 wait_for_reply=wait_for_reply,
                                 timeout=timeout, retry=retry)
           

變量message中包含了調用的函數名和參數,target定義了接收消息的服務端。在建立服務端監聽的時候,會設定一個監聽函數self._on_incoming(),即服務端在接收到消息的時候,會調用該函數,該函數最終調用檔案./oslo_messaging/rpc/server.py中類RPCServer中的_process_incoming()函數,代碼如下:

def _process_incoming(self, incoming):
        message = incoming[0]
        try:
            message.acknowledge()
        except Exception:
            LOG.exception(_LE("Can not acknowledge message. Skip processing"))
            return

        failure = None
        try:
            res = self.dispatcher.dispatch(message)
        except rpc_dispatcher.ExpectedException as e:
            failure = e.exc_info
            LOG.debug(u'Expected exception during message handling (%s)', e)
        except Exception:
            # current sys.exc_info() content can be overridden
            # by another exception raised by a log handler during
            # LOG.exception(). So keep a copy and delete it later.
            failure = sys.exc_info()
            LOG.exception(_LE('Exception during message handling'))

        try:
            if failure is None:
                message.reply(res)
            else:
                message.reply(failure=failure)
        except Exception:
            LOG.exception(_LE("Can not send reply for message"))
        finally:
                # NOTE(dhellmann): Remove circular object reference
                # between the current stack frame and the traceback in
                # exc_info.
                del failure
           

該函數的主要功能為調用self.dispatcher.dispatch(message)函數處理消息,然後調用message.reply(res)函數傳回處理結果。self.dispatcher.dispatch(message)函數的定義在檔案./oslo_messaging/rpc/dispatcher.py中類RPCDispatcher中,代碼如下:

def dispatch(self, incoming):
        """Dispatch an RPC message to the appropriate endpoint method.

        :param incoming: incoming message
        :type incoming: IncomingMessage
        :raises: NoSuchMethod, UnsupportedVersion
        """
        message = incoming.message
        ctxt = incoming.ctxt

        method = message.get('method')
        args = message.get('args', {})
        namespace = message.get('namespace')
        version = message.get('version', '1.0')

        found_compatible = False
        for endpoint in self.endpoints:
            target = getattr(endpoint, 'target', None)
            if not target:
                target = self._default_target

            if not (self._is_namespace(target, namespace) and
                    self._is_compatible(target, version)):
                continue

            if hasattr(endpoint, method):
                if self.access_policy.is_allowed(endpoint, method):
                    return self._do_dispatch(endpoint, method, ctxt, args)

            found_compatible = True

        if found_compatible:
            raise NoSuchMethod(method)
        else:
            raise UnsupportedVersion(version, method=method)
           

       從消息中提取出函數名、參數、名字空間等等變量,并做一系列的檢測比對工作,然後調用self._do_dispatch(endpoint, method, ctxt, args)函數,該函數的代碼如下:

def _do_dispatch(self, endpoint, method, ctxt, args):
        ctxt = self.serializer.deserialize_context(ctxt)
        new_args = dict()
        for argname, arg in args.items():
            new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
        func = getattr(endpoint, method)
        result = func(ctxt, **new_args)
        return self.serializer.serialize_entity(ctxt, result)
           

該函數根據函數名執行服務端對應的函數,并将執行結果序列化後傳回。

在這裡調用的函數名為:schedule_and_build_instances,在ComputeTaskManager這個endpoint中有這個成員函數,namespace為‘compute_task’。

繼續閱讀