天天看點

OpenStack-RPC-client的建構

        本文截取建立VM中的select_destinations的RPC call的代碼流程講述OpenStack-RPC-client的建構。下面是VM的建立過程。

OpenStack-RPC-client的建構

        我們這裡主要分析紅圈中的select_destinations的RPC call的代碼流程。這裡我們也簡要描述一下VM建立的代碼流程。

#/nova/compute/api.py
    @hooks.add_hook("create_instance")
    def create(self, context, instance_type,
               image_href, kernel_id=None, ramdisk_id=None,
               min_count=None, max_count=None,
               display_name=None, display_description=None,
               key_name=None, key_data=None, security_group=None,
               availability_zone=None, user_data=None, metadata=None,
               injected_files=None, admin_password=None,
               block_device_mapping=None, access_ip_v4=None,
               access_ip_v6=None, requested_networks=None, config_drive=None,
               auto_disk_config=None, scheduler_hints=None, legacy_bdm=True,
               shutdown_terminate=False, check_server_group_quota=False):
        """Provision instances, sending instance information to the
        scheduler.  The scheduler will determine where the instance(s)
        go and will handle creating the DB entries.

        Returns a tuple of (instances, reservation_id)
        """

        self._check_create_policies(context, availability_zone,
                requested_networks, block_device_mapping)

        if requested_networks and max_count > 1:
            self._check_multiple_instances_and_specified_ip(requested_networks)
            if utils.is_neutron():
                self._check_multiple_instances_neutron_ports(
                    requested_networks)

        return self._create_instance(
                       context, instance_type,
                       image_href, kernel_id, ramdisk_id,
                       min_count, max_count,
                       display_name, display_description,
                       key_name, key_data, security_group,
                       availability_zone, user_data, metadata,
                       injected_files, admin_password,
                       access_ip_v4, access_ip_v6,
                       requested_networks, config_drive,
                       block_device_mapping, auto_disk_config,
                       scheduler_hints=scheduler_hints,
                       legacy_bdm=legacy_bdm,
                       shutdown_terminate=shutdown_terminate,
                       check_server_group_quota=check_server_group_quota)
           

        這裡,nova-api元件收到novaclient的create VM的http請求。然後執行上述的create方法操作。

#/nova/compute/api.py
    def _create_instance(self, context, instance_type,
               image_href, kernel_id, ramdisk_id,
               min_count, max_count,
               display_name, display_description,
               key_name, key_data, security_groups,
               availability_zone, user_data, metadata,
               injected_files, admin_password,
               access_ip_v4, access_ip_v6,
               requested_networks, config_drive,
               block_device_mapping, auto_disk_config,
               reservation_id=None, scheduler_hints=None,
               legacy_bdm=True, shutdown_terminate=False,
               check_server_group_quota=False):
        """Verify all the input parameters regardless of the provisioning
        strategy being performed and schedule the instance(s) for
        creation.
        """

        # Normalize and setup some parameters
        if reservation_id is None:
            reservation_id = utils.generate_uid('r')
        security_groups = security_groups or ['default']
        min_count = min_count or 1
        max_count = max_count or min_count
        block_device_mapping = block_device_mapping or []
        if not instance_type:
            instance_type = flavors.get_default_flavor()

        if image_href:
            image_id, boot_meta = self._get_image(context, image_href)
        else:
            image_id = None
            boot_meta = self._get_bdm_image_metadata(
                context, block_device_mapping, legacy_bdm)

        self._check_auto_disk_config(image=boot_meta,
                                     auto_disk_config=auto_disk_config)

        handle_az = self._handle_availability_zone
        availability_zone, forced_host, forced_node = handle_az(context,
                                                            availability_zone)

        if not self.skip_policy_check and (forced_host or forced_node):
            check_policy(context, 'create:forced_host', {})

        base_options, max_net_count = self._validate_and_build_base_options(
                context,
                instance_type, boot_meta, image_href, image_id, kernel_id,
                ramdisk_id, display_name, display_description,
                key_name, key_data, security_groups, availability_zone,
                forced_host, user_data, metadata, injected_files, access_ip_v4,
                access_ip_v6, requested_networks, config_drive,
                auto_disk_config, reservation_id, max_count)

        # max_net_count is the maximum number of instances requested by the
        # user adjusted for any network quota constraints, including
        # considertaion of connections to each requested network
        if max_net_count == 0:
            raise exception.PortLimitExceeded()
        elif max_net_count < max_count:
            LOG.debug("max count reduced from %(max_count)d to "
                      "%(max_net_count)d due to network port quota",
                      {'max_count': max_count,
                       'max_net_count': max_net_count})
            max_count = max_net_count

        block_device_mapping = self._check_and_transform_bdm(context,
            base_options, instance_type, boot_meta, min_count, max_count,
            block_device_mapping, legacy_bdm)

        instance_group = self._get_requested_instance_group(context,
                                   scheduler_hints, check_server_group_quota)

        instances = self._provision_instances(context, instance_type,
                min_count, max_count, base_options, boot_meta, security_groups,
                block_device_mapping, shutdown_terminate,
                instance_group, check_server_group_quota)

        filter_properties = self._build_filter_properties(context,
                scheduler_hints, forced_host,
                forced_node, instance_type,
                base_options.get('pci_requests'))

        for instance in instances:
            self._record_action_start(context, instance,
                                      instance_actions.CREATE)

        self.compute_task_api.build_instances(context,
                instances=instances, image=boot_meta,
                filter_properties=filter_properties,
                admin_password=admin_password,
                injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping,
                legacy_bdm=False)

        return (instances, reservation_id)

#/nova/compute/api.py
    @property
    def compute_task_api(self):
        if self._compute_task_api is None:
            # TODO(alaski): Remove calls into here from conductor manager so
            # that this isn't necessary. #1180540
            from nova import conductor
            self._compute_task_api = conductor.ComputeTaskAPI()
        return self._compute_task_api

#/nova/conductor/__init__.py
def ComputeTaskAPI(*args, **kwargs):
    use_local = kwargs.pop('use_local', False)
    if oslo_config.cfg.CONF.conductor.use_local or use_local:
        api = conductor_api.LocalComputeTaskAPI
    else:
        api = conductor_api.ComputeTaskAPI
    return api(*args, **kwargs)
           

        這裡,首先确定self.compute_task_api的對象類型,self.compute_task_api有兩種api對象類型:LocalComputeTaskAPI和ComputeTaskAPI。其中LocalComputeTaskAPI不需要通過RPC的方式通路LocalComputeTaskAPI類中的方法,而ComputeTaskAPI則需要RPC遠端調用。在這裡,我們采用的是ComputeTaskAPI類的方式。

#/nova/conductor/api.py:ComputeTaskAPI
    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping, legacy_bdm=True):
        self.conductor_compute_rpcapi.build_instances(context,
                instances=instances, image=image,
                filter_properties=filter_properties,
                admin_password=admin_password, injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping,
                legacy_bdm=legacy_bdm)

#/nova/conductor/rpcapi.py:ComputeTaskAPI
    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping, legacy_bdm=True):
        image_p = jsonutils.to_primitive(image)
        version = '1.10'
        if not self.client.can_send_version(version):
            version = '1.9'
            if 'instance_type' in filter_properties:
                flavor = filter_properties['instance_type']
                flavor_p = objects_base.obj_to_primitive(flavor)
                filter_properties = dict(filter_properties,
                                         instance_type=flavor_p)
        kw = {'instances': instances, 'image': image_p,
               'filter_properties': filter_properties,
               'admin_password': admin_password,
               'injected_files': injected_files,
               'requested_networks': requested_networks,
               'security_groups': security_groups}
        if not self.client.can_send_version(version):
            version = '1.8'
            kw['requested_networks'] = kw['requested_networks'].as_tuples()
        if not self.client.can_send_version('1.7'):
            version = '1.5'
            bdm_p = objects_base.obj_to_primitive(block_device_mapping)
            kw.update({'block_device_mapping': bdm_p,
                       'legacy_bdm': legacy_bdm})

        cctxt = self.client.prepare(version=version)
        cctxt.cast(context, 'build_instances', **kw)
           

        是以最終是通過RPC的cast遠端調用到ComputeTaskAPI類的build_instances的方法。這就是上圖VM的建立過程的第一個RPC的cast遠端調用,即nova-api到nova-conductor的RPC:cast的build_instances方法調用。這裡的RPC:cast遠端調用不是我們關注的重點,是以我們這裡不關心底層通過RabbitMQ如何通信的,我們執行跳到最終調用的結果,即最終調用到conductor中的manager.py檔案中的ComputeTaskManager類的build_instances方法。

#/nova/conductor/manager.py:ComputeTaskManager
    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping=None, legacy_bdm=True):
        # TODO(ndipanov): Remove block_device_mapping and legacy_bdm in version
        #                 2.0 of the RPC API.
        request_spec = scheduler_utils.build_request_spec(context, image,
                                                          instances)
        # TODO(danms): Remove this in version 2.0 of the RPC API
        if (requested_networks and
                not isinstance(requested_networks,
                               objects.NetworkRequestList)):
            requested_networks = objects.NetworkRequestList(
                objects=[objects.NetworkRequest.from_tuple(t)
                         for t in requested_networks])
        # TODO(melwitt): Remove this in version 2.0 of the RPC API
        flavor = filter_properties.get('instance_type')
        if flavor and not isinstance(flavor, objects.Flavor):
            # Code downstream may expect extra_specs to be populated since it
            # is receiving an object, so lookup the flavor to ensure this.
            flavor = objects.Flavor.get_by_id(context, flavor['id'])
            filter_properties = dict(filter_properties, instance_type=flavor)

        try:
            scheduler_utils.setup_instance_group(context, request_spec,
                                                 filter_properties)
            # check retry policy. Rather ugly use of instances[0]...
            # but if we've exceeded max retries... then we really only
            # have a single instance.
            scheduler_utils.populate_retry(filter_properties,
                instances[0].uuid)
            hosts = self.scheduler_client.select_destinations(context,
                    request_spec, filter_properties)
        except Exception as exc:
            updates = {'vm_state': vm_states.ERROR, 'task_state': None}
            for instance in instances:
                self._set_vm_state_and_notify(
                    context, instance.uuid, 'build_instances', updates,
                    exc, request_spec)
            return

        for (instance, host) in itertools.izip(instances, hosts):
            try:
                instance.refresh()
            except (exception.InstanceNotFound,
                    exception.InstanceInfoCacheNotFound):
                LOG.debug('Instance deleted during build', instance=instance)
                continue
            local_filter_props = copy.deepcopy(filter_properties)
            scheduler_utils.populate_filter_properties(local_filter_props,
                host)
            # The block_device_mapping passed from the api doesn't contain
            # instance specific information
            bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
                    context, instance.uuid)

            self.compute_rpcapi.build_and_run_instance(context,
                    instance=instance, host=host['host'], image=image,
                    request_spec=request_spec,
                    filter_properties=local_filter_props,
                    admin_password=admin_password,
                    injected_files=injected_files,
                    requested_networks=requested_networks,
                    security_groups=security_groups,
                    block_device_mapping=bdms, node=host['nodename'],
                    limits=host['limits'])
           

        這裡,hosts =self.scheduler_client.select_destinations(context,request_spec,filter_properties)即為我們本文所需講解的内容,這就是上圖的VM的建立過程的第二個RPC的call遠端調用(具體代碼内部細節),即nova-conductor到nova-scheduler的RPC:call的select_destinations方法調用。注意self.compute_rpcapi.build_and_run_instance是上圖的VM的建立過程的第三個RPC的call遠端調用,即nova-conductor到nova-computer的RPC:cast的build_and_run_instance方法調用。下面我們将重點關注nova-conductor到nova-scheduler的RPC:call的select_destinations方法調用。

#/nova/scheduler/client/__init__.py:SchedulerClient
    @utils.retry_select_destinations
    def select_destinations(self, context, request_spec, filter_properties):
        return self.queryclient.select_destinations(
            context, request_spec, filter_properties)
#/nova/scheduler/client/query.py:SchedulerQueryClient
    def __init__(self):
        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()

    def select_destinations(self, context, request_spec, filter_properties):
        """Returns destinations(s) best suited for this request_spec and
        filter_properties.

        The result should be a list of dicts with 'host', 'nodename' and
        'limits' as keys.
        """
        return self.scheduler_rpcapi.select_destinations(
            context, request_spec, filter_properties)
           
#/nova/scheduler/rpcapi.py:SchedulerAPI
    def __init__(self):
        super(SchedulerAPI, self).__init__()
        target = messaging.Target(topic=CONF.scheduler_topic, version='4.0')
        version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.scheduler,
                                               CONF.upgrade_levels.scheduler)
        serializer = objects_base.NovaObjectSerializer()
        self.client = rpc.get_client(target, version_cap=version_cap,
                                     serializer=serializer)

    def select_destinations(self, ctxt, request_spec, filter_properties):
        cctxt = self.client.prepare(version='4.0')
        return cctxt.call(ctxt, 'select_destinations',
            request_spec=request_spec, filter_properties=filter_properties)
           

        在/nova/scheduler/client/query.py:SchedulerQueryClient的__init__方法中,建立了SchedulerAPI對象,在該對象建立時(__init__),建立了self.client屬性,那麼我們向下分析這裡self.client屬性如何建立完成的?

#/nova/scheduler/rpc.py
def get_client(target, version_cap=None, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return messaging.RPCClient(TRANSPORT,
                               target,
                               version_cap=version_cap,
                               serializer=serializer)
           

        注意,這裡的TRANSPORT在nova-scheduler元件服務啟動的時候就已經建立完成了(即執行該檔案中的init方法,具體細節參考OpenStack-RPC-server的建構(一)),是以這裡的TRANSPORT不為None。

#/oslo_messaging/rpc/client.py:RPCClient
class RPCClient(object):

    """A class for invoking methods on remote servers.

    The RPCClient class is responsible for sending method invocations to remote
    servers via a messaging transport.

    A default target is supplied to the RPCClient constructor, but target
    attributes can be overridden for individual method invocations using the
    prepare() method.

    A method invocation consists of a request context dictionary, a method name
    and a dictionary of arguments. A cast() invocation just sends the request
    and returns immediately. A call() invocation waits for the server to send
    a return value.

    This class is intended to be used by wrapping it in another class which
    provides methods on the subclass to perform the remote invocation using
    call() or cast()::

        class TestClient(object):

            def __init__(self, transport):
                target = messaging.Target(topic='testtopic', version='2.0')
                self._client = messaging.RPCClient(transport, target)

            def test(self, ctxt, arg):
                return self._client.call(ctxt, 'test', arg=arg)

    An example of using the prepare() method to override some attributes of the
    default target::

        def test(self, ctxt, arg):
            cctxt = self._client.prepare(version='2.5')
            return cctxt.call(ctxt, 'test', arg=arg)

    RPCClient have a number of other properties - for example, timeout and
    version_cap - which may make sense to override for some method invocations,
    so they too can be passed to prepare()::

        def test(self, ctxt, arg):
            cctxt = self._client.prepare(timeout=10)
            return cctxt.call(ctxt, 'test', arg=arg)

    However, this class can be used directly without wrapping it another class.
    For example::

        transport = messaging.get_transport(cfg.CONF)
        target = messaging.Target(topic='testtopic', version='2.0')
        client = messaging.RPCClient(transport, target)
        client.call(ctxt, 'test', arg=arg)

    but this is probably only useful in limited circumstances as a wrapper
    class will usually help to make the code much more obvious.

    By default, cast() and call() will block until the message is successfully
    sent. However, the retry parameter can be used to have message sending
    fail with a MessageDeliveryFailure after the given number of retries. For
    example::

        client = messaging.RPCClient(transport, target, retry=None)
        client.call(ctxt, 'sync')
        try:
            client.prepare(retry=0).cast(ctxt, 'ping')
        except messaging.MessageDeliveryFailure:
            LOG.error("Failed to send ping message")
    """

    def __init__(self, transport, target,
                 timeout=None, version_cap=None, serializer=None, retry=None):
        """Construct an RPC client.

        :param transport: a messaging transport handle
        :type transport: Transport
        :param target: the default target for invocations
        :type target: Target
        :param timeout: an optional default timeout (in seconds) for call()s
        :type timeout: int or float
        :param version_cap: raise a RPCVersionCapError version exceeds this cap
        :type version_cap: str
        :param serializer: an optional entity serializer
        :type serializer: Serializer
        :param retry: an optional default connection retries configuration
                      None or -1 means to retry forever
                      0 means no retry
                      N means N retries
        :type retry: int
        """
        self.conf = transport.conf
        self.conf.register_opts(_client_opts)

        self.transport = transport
        self.target = target
        self.timeout = timeout
        self.retry = retry
        self.version_cap = version_cap
        self.serializer = serializer or msg_serializer.NoOpSerializer()

        super(RPCClient, self).__init__()

    _marker = _CallContext._marker
           

        這樣,RPCClient對象即為self.client屬性的值,根據建構RPCClient對象的注釋可知,RPCClient對象可以通過執行cast和call方法來執行遠端調用,且可以通過prepare方法修改預設target的一些屬性。

#/nova/scheduler/rpcapi.py:SchedulerAPI
    def select_destinations(self, ctxt, request_spec, filter_properties):
        cctxt = self.client.prepare(version='4.0')
        return cctxt.call(ctxt, 'select_destinations',
            request_spec=request_spec, filter_properties=filter_properties)
           

        首先,我們看看RPCClient對象中的prepare方法做了什麼操作?

#/oslo_messaging/rpc/client.py:RPCClient
    def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
                version=_marker, server=_marker, fanout=_marker,
                timeout=_marker, version_cap=_marker, retry=_marker):
        """Prepare a method invocation context.

        Use this method to override client properties for an individual method
        invocation. For example::

            def test(self, ctxt, arg):
                cctxt = self.prepare(version='2.5')
                return cctxt.call(ctxt, 'test', arg=arg)

        :param exchange: see Target.exchange
        :type exchange: str
        :param topic: see Target.topic
        :type topic: str
        :param namespace: see Target.namespace
        :type namespace: str
        :param version: requirement the server must support, see Target.version
        :type version: str
        :param server: send to a specific server, see Target.server
        :type server: str
        :param fanout: send to all servers on topic, see Target.fanout
        :type fanout: bool
        :param timeout: an optional default timeout (in seconds) for call()s
        :type timeout: int or float
        :param version_cap: raise a RPCVersionCapError version exceeds this cap
        :type version_cap: str
        :param retry: an optional connection retries configuration
                      None or -1 means to retry forever
                      0 means no retry
                      N means N retries
        :type retry: int
        """
        return _CallContext._prepare(self,
                                     exchange, topic, namespace,
                                     version, server, fanout,
                                     timeout, version_cap, retry)
#/oslo_messaging/rpc/client.py:_CallContext
    @classmethod
    def _prepare(cls, base,
                 exchange=_marker, topic=_marker, namespace=_marker,
                 version=_marker, server=_marker, fanout=_marker,
                 timeout=_marker, version_cap=_marker, retry=_marker):
        """Prepare a method invocation context. See RPCClient.prepare()."""
        kwargs = dict(
            exchange=exchange,
            topic=topic,
            namespace=namespace,
            version=version,
            server=server,
            fanout=fanout)
        kwargs = dict([(k, v) for k, v in kwargs.items()
                       if v is not cls._marker])
        target = base.target(**kwargs)

        if timeout is cls._marker:
            timeout = base.timeout
        if retry is cls._marker:
            retry = base.retry
        if version_cap is cls._marker:
            version_cap = base.version_cap

        return _CallContext(base.transport, target,
                            base.serializer,
                            timeout, version_cap, retry)
           

        這裡是prepare一個可調用上下文的方法,即基于RPCClient對象提供的參數,修改一些屬性,建構_CallContext對象,且RPCClient對象的call和cast方法都是通過_CallContext對象的call和cast方法執行的。

#/oslo_messaging/rpc/client.py:_CallContext
    def call(self, ctxt, method, **kwargs):
        """Invoke a method and wait for a reply. See RPCClient.call()."""
        if self.target.fanout:
            raise exceptions.InvalidTarget('A call cannot be used with fanout',
                                           self.target)

        msg = self._make_message(ctxt, method, kwargs)
        msg_ctxt = self.serializer.serialize_context(ctxt)

        timeout = self.timeout
        if self.timeout is None:
            timeout = self.conf.rpc_response_timeout

        if self.version_cap:
            self._check_version_cap(msg.get('version'))

        try:
            result = self.transport._send(self.target, msg_ctxt, msg,
                                          wait_for_reply=True, timeout=timeout,
                                          retry=self.retry)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)
        return self.serializer.deserialize_entity(ctxt, result)
           

        在發送message到RabbitMQ-server之前,首先将message進行預處理群組裝,有利于message的識别和處理。其中self._make_message方法将method存放到傳回的msg字典中(self._make_message方法組建的msg字典包括的鍵值有:namespace,method,version以及args(如果有對method傳遞參數的話))。然後執行self.transport._send方法的調用。

#/oslo_messaging/transport.py:Transport
    def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
              retry=None):
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                           target)
        return self._driver.send(target, ctxt, message,
                                 wait_for_reply=wait_for_reply,
                                 timeout=timeout, retry=retry)

#/oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase
    def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
             retry=None):
        return self._send(target, ctxt, message, wait_for_reply, timeout,
                          retry=retry)
    def _send(self, target, ctxt, message,
              wait_for_reply=None, timeout=None,
              envelope=True, notify=False, retry=None):

        # FIXME(markmc): remove this temporary hack
        class Context(object):
            def __init__(self, d):
                self.d = d

            def to_dict(self):
                return self.d

        context = Context(ctxt)
        msg = message

        if wait_for_reply:
            msg_id = uuid.uuid4().hex
            msg.update({'_msg_id': msg_id})
            LOG.debug('MSG_ID is %s', msg_id)
            msg.update({'_reply_q': self._get_reply_q()})

        rpc_amqp._add_unique_id(msg)
        rpc_amqp.pack_context(msg, context)

        if envelope:
            msg = rpc_common.serialize_msg(msg)

        if wait_for_reply:
            self._waiter.listen(msg_id)

        try:
            with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
                if notify:
                    conn.notify_send(self._get_exchange(target),
                                     target.topic, msg, retry=retry)
                elif target.fanout:
                    conn.fanout_send(target.topic, msg, retry=retry)
                else:
                    topic = target.topic
                    if target.server:
                        topic = '%s.%s' % (target.topic, target.server)
                    conn.topic_send(exchange_name=self._get_exchange(target),
                                    topic=topic, msg=msg, timeout=timeout,
                                    retry=retry)

            if wait_for_reply:
                result = self._waiter.wait(msg_id, timeout)
                if isinstance(result, Exception):
                    raise result
                return result
        finally:
            if wait_for_reply:
                self._waiter.unlisten(msg_id)
           

        這裡的_send方法是我們要分析的重點,是以我們在這裡将詳細分析_send方法都做了些什麼?

        在調用call方法時,設定wait_for_reply =True,是以這裡會向msg添加_msg_id和_reply_q兩個鍵值對到msg字典中去,_msg_id鍵所所對應的值為16進制的序列号,那麼_reply_q是什麼呢?我們需要分析self._get_reply_q()的代碼流程。

#/oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase
    def _get_reply_q(self):
        with self._reply_q_lock:
            if self._reply_q is not None:
                return self._reply_q

            reply_q = 'reply_' + uuid.uuid4().hex

            conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

            self._waiter = ReplyWaiter(reply_q, conn,
                                       self._allowed_remote_exmods)

            self._reply_q = reply_q
            self._reply_q_conn = conn

        return self._reply_q
           

        對于conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)的代碼流程在《OpenStack-RPC-server的建構(三)》文章中有詳解,我們就不在這裡分析了,該代碼主要是建立了到RabbitMQ-server的連接配接(connection和channel)。注意,在調用nova-scheduler元件且設定wait_for_reply = True的多個call方法時,這裡隻會建立一個self._reply_q,一個到RabbitMQ-server的連接配接和一個self._waiter,即所有的調用nova-scheduler元件且設定wait_for_reply = True的call方法都共享這些屬性。下面講述self._waiter對象的建立。

#/oslo_messaging/_drivers/amqpdriver.py:ReplyWaiter
class ReplyWaiter(object):
    def __init__(self, reply_q, conn, allowed_remote_exmods):
        self.conn = conn
        self.allowed_remote_exmods = allowed_remote_exmods
        self.msg_id_cache = rpc_amqp._MsgIdCache()
        self.waiters = ReplyWaiters()

        self.conn.declare_direct_consumer(reply_q, self)

        self._thread_exit_event = threading.Event()
        self._thread = threading.Thread(target=self.poll)
        self._thread.daemon = True
        self._thread.start()

#/oslo_messaging/_drivers/amqpdriver.py:ReplyWaiters
class ReplyWaiters(object):

    WAKE_UP = object()

    def __init__(self):
        self._queues = {}
        self._wrn_threshold = 10
           

        首先建立了一個ReplyWaiters對象,即self.waiters。

        self.conn.declare_direct_consumer(reply_q,self)代碼建立了一個direct方式的consumer。詳細細節參考《OpenStack-RPC-server的建構(四)》,該代碼主要是建立了amqp層的exchange和queue對象,且綁定amqp層的exchange和queue對象,其中這裡所對應的amqp層的callback為ReplyWaiter對象。topic為reply_q(即reply_xxxx)。

        最後執行self._thread = threading.Thread(target=self.poll)代碼,建立一個Thread,當self._thread.start()執行時,則會開啟該Thread進入self.poll代碼去執行相關操作。

#/oslo_messaging/_drivers/amqpdriver.py:ReplyWaiter
    def poll(self):
        while not self._thread_exit_event.is_set():
            try:
                self.conn.consume(limit=1)
            except Exception:
                LOG.exception("Failed to process incoming message, "
                              "retrying...")
           

        參考《OpenStack-RPC-server的建構(五)》可知,隻要self._thread_exit_event沒有被設定,則consume方法将會一直循環執行,等待reply message的回應。回應流程是這樣的:producer發送message到consumer去遠端調用endpoints上的方法,在執行調用方法之前,consumer會發回一個ack到這個producer建立的direct consumer的queue中,那麼你或許會問,那邊的consumer怎麼會知道這邊producer建立的direct consumer呢?那是因為producer傳輸給consumer的msg中帶有producer的direct consumer的相關資訊,是以那邊的consumer會從收到的msg中提取producer的direct consumer的資訊,然後将ack資訊回應給producer的direct consumer進行處理。當然,這裡所謂的處理,就是将收到的ack資訊存到與msg_id相對應的queue中。

        這樣ReplyWaiters對象建立完成。_get_reply_q将self._reply_q傳回。下面分析self._waiter.listen(msg_id)代碼流程。

#/oslo_messaging/_drivers/amqpdriver.py:ReplyWaiter
    def listen(self, msg_id):
        self.waiters.add(msg_id)

#/oslo_messaging/_drivers/amqpdriver.py:ReplyWaiters
    def add(self, msg_id):
        self._queues[msg_id] = moves.queue.Queue()
        if len(self._queues) > self._wrn_threshold:
            LOG.warn('Number of call queues is greater than warning '
                     'threshold: %d. There could be a leak. Increasing'
                     ' threshold to: %d', self._wrn_threshold,
                     self._wrn_threshold * 2)
            self._wrn_threshold *= 2
           

        從上述代碼可知,為每個設定wait_for_reply為True的call方法建立一個唯一(根據msg_id唯一識别)的queue用于接受message。

        繼續回到/oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase的_send方法分析後面的代碼。此時with self._get_connection(rpc_amqp.PURPOSE_SEND)as conn代碼将又get一個到RabbitMQ-server的連接配接,如果沒有get到一個連接配接,則它将會新建立一個連接配接。根據前面OpenStack-RPC-server的建構的代碼流程分析,我們知道,當這個with中的代碼執行完成,将釋放get或create的連接配接到連接配接池中,以備後續調用call或cast方法的代碼使用這些連接配接。

        對于with中的代碼,根據我們傳遞下來的參數可知,我們conn.topic_send方法,且target.server為None,是以這裡将把message直接傳遞到與exchange為scheduler相綁定的queue中去。這裡我們具體分析conn.topic_send方法都做了什麼?

#/oslo_messaging/_drivers/impl_rabbit.py:Connection
    def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
        """Send a 'topic' message."""
        self.publisher_send(TopicPublisher, topic, msg, timeout,
                            exchange_name=exchange_name, retry=retry)

    def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
                       error_callback=default_marker, **kwargs):
        """Send to a publisher based on the publisher class."""

        def _default_error_callback(exc):
            self._log_publisher_send_error(topic, exc)

        if error_callback is self.default_marker:
            error_callback = _default_error_callback

        def _publish():
            publisher = cls(self.driver_conf, self.channel, topic=topic,
                            **kwargs)
            publisher.send(msg, timeout)

        with self._connection_lock:
            self.ensure(_publish, retry=retry, error_callback=error_callback)
           

        這裡self.ensure會最終執行_publish方法,該方法主要執行兩部分:

        1. _publish首先建立一個publisher

        2. 發送message到相應的queue中。

        首先我們講解第一部分:建立publisher。

#/oslo_messaging/_drivers/impl_rabbit.py:TopicPublisher
class TopicPublisher(Publisher):
    """Publisher class for 'topic'."""
    def __init__(self, conf, channel, exchange_name, topic, **kwargs):
        """Init a 'topic' publisher.

        Kombu options may be passed as keyword args to override defaults
        """
        options = {'durable': conf.amqp_durable_queues,
                   'auto_delete': conf.amqp_auto_delete,
                   'exclusive': False}

        options.update(kwargs)
        super(TopicPublisher, self).__init__(channel,
                                             exchange_name,
                                             topic,
                                             type='topic',
                                             **options)

#/oslo_messaging/_drivers/impl_rabbit.py:Publisher
class Publisher(object):
    """Base Publisher class."""

    def __init__(self, channel, exchange_name, routing_key, **kwargs):
        """Init the Publisher class with the exchange_name, routing_key,
        and other options
        """
        self.exchange_name = exchange_name
        self.routing_key = routing_key
        self.kwargs = kwargs
        self.reconnect(channel)

    def reconnect(self, channel):
        """Re-establish the Producer after a rabbit reconnection."""
        self.exchange = kombu.entity.Exchange(name=self.exchange_name,
                                              **self.kwargs)
        self.producer = kombu.messaging.Producer(exchange=self.exchange,
                                                 channel=channel,
                                                 routing_key=self.routing_key)
           

        這裡我們重點關注父類Publisher建立kombu層的exchange和producer對象,其中exchange對象的建立,我們無需多解釋,具體參考OpenStack-RPC-server的建構的代碼流程,注意這裡建立的kombu層的exchange的name為nova,這裡routing_key為topic的值(scheduler),下面分析建立producer對象的代碼流程。

#/kombu/messaging.py:Publisher
class Producer(object):
    """Message Producer.

    :param channel: Connection or channel.
    :keyword exchange: Optional default exchange.
    :keyword routing_key: Optional default routing key.
    :keyword serializer: Default serializer. Default is `"json"`.
    :keyword compression: Default compression method. Default is no
        compression.
    :keyword auto_declare: Automatically declare the default exchange
      at instantiation. Default is :const:`True`.
    :keyword on_return: Callback to call for undeliverable messages,
        when the `mandatory` or `immediate` arguments to
        :meth:`publish` is used. This callback needs the following
        signature: `(exception, exchange, routing_key, message)`.
        Note that the producer needs to drain events to use this feature.

    """

    #: Default exchange
    exchange = None

    #: Default routing key.
    routing_key = ''

    #: Default serializer to use. Default is JSON.
    serializer = None

    #: Default compression method.  Disabled by default.
    compression = None

    #: By default the exchange is declared at instantiation.
    #: If you want to declare manually then you can set this
    #: to :const:`False`.
    auto_declare = True

    #: Basic return callback.
    on_return = None

    #: Set if channel argument was a Connection instance (using
    #: default_channel).
    __connection__ = None

    def __init__(self, channel, exchange=None, routing_key=None,
                 serializer=None, auto_declare=None, compression=None,
                 on_return=None):
        self._channel = channel
        self.exchange = exchange
        self.routing_key = routing_key or self.routing_key
        self.serializer = serializer or self.serializer
        self.compression = compression or self.compression
        self.on_return = on_return or self.on_return
        self._channel_promise = None
        if self.exchange is None:
            self.exchange = Exchange('')
        if auto_declare is not None:
            self.auto_declare = auto_declare

        if self._channel:
            self.revive(self._channel)
           

        上述便是建立producer對象的代碼,很簡單,沒什麼可解釋的。

        下面解釋_publish方法中的第二部分:send方法的調用(publisher.send(msg,timeout))

#/oslo_messaging/_drivers/impl_rabbit.py:Publisher
    def send(self, msg, timeout=None):
        """Send a message."""
        if timeout:
            #
            # AMQP TTL is in milliseconds when set in the header.
            #
            self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
        else:
            self.producer.publish(msg)
           

        這裡的timeout采用的預設值60s(具體回到call方法中檢視),是以執行self.producer.publish(msg,headers={'ttl': (timeout * 1000)})代碼。

#/kombu/messaging.py:Publisher
    def publish(self, body, routing_key=None, delivery_mode=None,
                mandatory=False, immediate=False, priority=0,
                content_type=None, content_encoding=None, serializer=None,
                headers=None, compression=None, exchange=None, retry=False,
                retry_policy=None, declare=[], **properties):
        """Publish message to the specified exchange.

        :param body: Message body.
        :keyword routing_key: Message routing key.
        :keyword delivery_mode: See :attr:`delivery_mode`.
        :keyword mandatory: Currently not supported.
        :keyword immediate: Currently not supported.
        :keyword priority: Message priority. A number between 0 and 9.
        :keyword content_type: Content type. Default is auto-detect.
        :keyword content_encoding: Content encoding. Default is auto-detect.
        :keyword serializer: Serializer to use. Default is auto-detect.
        :keyword compression: Compression method to use.  Default is none.
        :keyword headers: Mapping of arbitrary headers to pass along
          with the message body.
        :keyword exchange: Override the exchange.  Note that this exchange
          must have been declared.
        :keyword declare: Optional list of required entities that must
            have been declared before publishing the message.  The entities
            will be declared using :func:`~kombu.common.maybe_declare`.
        :keyword retry: Retry publishing, or declaring entities if the
            connection is lost.
        :keyword retry_policy: Retry configuration, this is the keywords
            supported by :meth:`~kombu.Connection.ensure`.
        :keyword \*\*properties: Additional message properties, see AMQP spec.

        """
        headers = {} if headers is None else headers
        retry_policy = {} if retry_policy is None else retry_policy
        routing_key = self.routing_key if routing_key is None else routing_key
        compression = self.compression if compression is None else compression
        exchange = exchange or self.exchange

        if isinstance(exchange, Exchange):
            delivery_mode = delivery_mode or exchange.delivery_mode
            exchange = exchange.name
        else:
            delivery_mode = delivery_mode or self.exchange.delivery_mode
        if not isinstance(delivery_mode, (int, long)):
            delivery_mode = DELIVERY_MODES[delivery_mode]
        properties['delivery_mode'] = delivery_mode
        body, content_type, content_encoding = self._prepare(
            body, serializer, content_type, content_encoding,
            compression, headers)

        publish = self._publish
        if retry:
            publish = self.connection.ensure(self, publish, **retry_policy)
        return publish(body, priority, content_type,
                       content_encoding, headers, properties,
                       routing_key, mandatory, immediate, exchange, declare)

    def _publish(self, body, priority, content_type, content_encoding,
                 headers, properties, routing_key, mandatory,
                 immediate, exchange, declare):
        channel = self.channel
        #/kombu/transport/pyamqp.py:Channel
        message = channel.prepare_message(
            body, priority, content_type,
            content_encoding, headers, properties,
        )
        if declare:
            maybe_declare = self.maybe_declare
            [maybe_declare(entity) for entity in declare]
        return channel.basic_publish(
            message,
            exchange=exchange, routing_key=routing_key,
            mandatory=mandatory, immediate=immediate,
        )
           

這裡publish函數主要完成三個任務:

1. 根據delivery_mode的值設定message是否持久化。(如果沒有指定delivery_mode的值,則系統預設從exchange中取值,且采用PERSISTENT_DELIVERY_MODE:持久化,具體檢視/kombu/entity.py:Exchange)

2. _prepare方法将message的body序列化為字元串作為适合amqp發送的消息體。

3. publishmessage到consumer的queue中。(執行_publish方法)

下面,我們分析_publish方法的代碼流程。

首先,_publish方法組裝适合amqp傳輸的message。

#/kombu/transport/pyamqp.py:Channel
class Channel(amqp.Channel, base.StdChannel):
    Message = Message

    def prepare_message(self, body, priority=None,
                        content_type=None, content_encoding=None,
                        headers=None, properties=None):
        """Encapsulate data into a AMQP message."""
        return amqp.Message(body, priority=priority,
                            content_type=content_type,
                            content_encoding=content_encoding,
                            application_headers=headers,
                            **properties)

    def message_to_python(self, raw_message):
        """Convert encoded message body back to a Python value."""
        return self.Message(self, raw_message)
           

        然後執行basic_publish方法發送message到對應的queue中。

#/amqp/channel.py:Channel
    def _basic_publish(self, msg, exchange='', routing_key='',
                       mandatory=False, immediate=False):
        """Publish a message

        This method publishes a message to a specific exchange. The
        message will be routed to queues as defined by the exchange
        configuration and distributed to any active consumers when the
        transaction, if any, is committed.

        PARAMETERS:
            exchange: shortstr

                Specifies the name of the exchange to publish to.  The
                exchange name can be empty, meaning the default
                exchange.  If the exchange name is specified, and that
                exchange does not exist, the server will raise a
                channel exception.

                RULE:

                    The server MUST accept a blank exchange name to
                    mean the default exchange.

                RULE:

                    The exchange MAY refuse basic content in which
                    case it MUST raise a channel exception with reply
                    code 540 (not implemented).

            routing_key: shortstr

                Message routing key

                Specifies the routing key for the message.  The
                routing key is used for routing messages depending on
                the exchange configuration.

            mandatory: boolean

                indicate mandatory routing

                This flag tells the server how to react if the message
                cannot be routed to a queue.  If this flag is True, the
                server will return an unroutable message with a Return
                method.  If this flag is False, the server silently
                drops the message.

                RULE:

                    The server SHOULD implement the mandatory flag.

            immediate: boolean

                request immediate delivery

                This flag tells the server how to react if the message
                cannot be routed to a queue consumer immediately.  If
                this flag is set, the server will return an
                undeliverable message with a Return method. If this
                flag is zero, the server will queue the message, but
                with no guarantee that it will ever be consumed.

                RULE:

                    The server SHOULD implement the immediate flag.

        """
        args = AMQPWriter()
        args.write_short(0)
        args.write_shortstr(exchange)
        args.write_shortstr(routing_key)
        args.write_bit(mandatory)
        args.write_bit(immediate)

        self._send_method((60, 40), args, msg)
    basic_publish = _basic_publish
           

        注意,kombu層雖然調用的basic_publish方法,但在amqp層basic_publish= _basic_publish,是以kombu層最終還是調用的_basic_publish方法。

        至此,我們OpenStack-RPC-client的建構分析完成(call的調用方式),主要内容我們利用網上的一張圖來表示:

OpenStack-RPC-client的建構

        注意:對于cast的調用方式,沒有上圖中的下半部分,即它沒有設定wait_for_reply= True。

繼續閱讀