天天看點

neutron-server的啟動流程(一)

neutron-server的啟動包括RPC-server的建立,RPC-client的建立,WSGI server的建立,是以neutron-server不單單起到與其他元件中的api的功能。本文将RPC相關建立和WSGI server的建立兩方面進行代碼流程的分析。

檢視setup.cfg檔案找到neutron-server的代碼入口。

neutron-server = neutron.cmd.eventlet.server:main
#/neutron/cmd/eventlet/server/__init__.py
def main():
    server.main()

#/neutron/server/__init__.py
def main():
    # the configuration will be read into the cfg.CONF global data structure
    config.init(sys.argv[1:])
    if not cfg.CONF.config_file:
        sys.exit(_("ERROR: Unable to find configuration file via the default"
                   " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                   " the '--config-file' option!"))
    try:
        pool = eventlet.GreenPool()

        neutron_api = service.serve_wsgi(service.NeutronApiService)
        api_thread = pool.spawn(neutron_api.wait)

        try:
            neutron_rpc = service.serve_rpc()
        except NotImplementedError:
            LOG.info(_LI("RPC was already started in parent process by "
                         "plugin."))
        else:
            rpc_thread = pool.spawn(neutron_rpc.wait)

            # api and rpc should die together.  When one dies, kill the other.
            rpc_thread.link(lambda gt: api_thread.kill())
            api_thread.link(lambda gt: rpc_thread.kill())

        pool.waitall()
    except KeyboardInterrupt:
        pass
    except RuntimeError as e:
        sys.exit(_("ERROR: %s") % e)
           

1. WSGIserver建立

neutron_api = service.serve_wsgi(service.NeutronApiService)
#/neutron/service.py
def serve_wsgi(cls):

    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log '
                              'for details.'))

    return service

#/neutron/service.py:NeutronApiService
class NeutronApiService(WsgiService):
    """Class for neutron-api service."""

    @classmethod
    def create(cls, app_name='neutron'):

        # Setup logging early, supplying both the CLI options and the
        # configuration mapping from the config file
        # We only update the conf dict for the verbose and debug
        # flags. Everything else must be set up in the conf file...
        # Log the options used when starting if we're in debug mode...

        config.setup_logging()
        # Dump the initial option values
        cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
        service = cls(app_name)
        return service
           

serve_wsgi函數将傳入的NeutronApiService對象進行建立,并執行該對象的start函數,建立WSGI sever。

#/neutron/service.py:WsgiService
    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

#/neutron/service.py
def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=cfg.CONF.api_workers)
    # Dump all option values here after all options are parsed
    cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
    LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
             {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
    return server
           

其中,

app = config.load_paste_app(app_name)

為從api-paste.ini檔案加載composite為neutron的相關資訊。首先建立/neutron/wsgi.py:Server對象,執行Server對象的start函數,将建立WSGI server。

#/neutron/wsgi.py:Server
    def start(self, application, port, host='0.0.0.0', workers=0):
        """Run a WSGI server with the given application."""
        self._host = host
        self._port = port
        backlog = CONF.backlog

        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)

        self._launch(application, workers)

    def _launch(self, application, workers=0):
        service = WorkerService(self, application)
        if workers < 1:
            # The API service should run in the current process.
            self._server = service
            service.start()
            systemd.notify_once()
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            if CONF.database.connection:
                api.get_engine().pool.dispose()
            # The API service runs in a number of child processes.
            # Minimize the cost of checking for child exit by extending the
            # wait interval past the default of 0.01s.
            self._server = common_service.ProcessLauncher(wait_interval=1.0)
            self._server.launch_service(service, workers=workers)
           

其中,start函數中的host即為neutron.conf配置檔案中bind_host參數,一般設定為管理網絡IP,port為neutron.conf檔案中的bind_port參數,一般neutron-server的bind_port參數設定為9696。首先在start函數調用self._get_socket函數建立一個socket去監聽本機的9696端口。以下是我的OpenStack環境的neutron-server程序。

[[email protected] neutron]# netstat -tnulp | grep 9696

tcp        0      0 192.168.118.1:9696      0.0.0.0:*               LISTEN      35298/python2

然後在_lauch函數中建立WSGI server程序(即建立子程序用于處理neutronclient發來的HTTP請求)。這裡WSGI server程序的個數根據neutron.conf配置檔案中的api_workers進行指定,一般為系統cpu的個數。

具體的建立WSGI server程序的代碼分析可以參看《 keystone WSGI流程》文章。下面分析最終根據api-paste.ini檔案加載的可調用的resources。api-paste.ini檔案内容如下。

[composite:neutron]

use = egg:Paste#urlmap

/: neutronversions

/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]

use = call:neutron.auth:pipeline_factory

noauth = request_id catch_errors extensions neutronapiapp_v2_0

keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[filter:request_id]

paste.filter_factory = oslo.middleware:RequestId.factory

[filter:catch_errors]

paste.filter_factory = oslo.middleware:CatchErrors.factory

[filter:keystonecontext]

paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]

paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]

paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]

paste.app_factory = neutron.api.versions:Versions.factory

[app:neutronapiapp_v2_0]

paste.app_factory = neutron.api.v2.router:APIRouter.factory

composite為neutron時,有兩個分支,一個是傳回neutronversions的分支,一個是到composite為neutronapi_v2_0去調用相應的資源。我們發送HTTP請求到neutron,一般會調用/v2.0上的資源。比如create_port, update_port等等。檢視composite為neutronapi_v2_0的部分,不管是noauth還是keystone,最終都将到app為neutronapiapp_v2_0的分支去加載資源。其中factory函數如下。

#/neutron/api/v2/router.py:APIRouter
class APIRouter(wsgi.Router):

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        plugin = manager.NeutronManager.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            allow_pagination = cfg.CONF.allow_pagination
            allow_sorting = cfg.CONF.allow_sorting
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=allow_pagination,
                allow_sorting=allow_sorting)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)
           

1.1 建立plugin(包括core plugin和service plugin)

#/neutron/api/v2/router.py:APIRouter
plugin = manager.NeutronManager.get_plugin()
           
#/neutron/manager.py:NeutronManager
    @classmethod
    def get_plugin(cls):
        # Return a weakref to minimize gc-preventing references.
        return weakref.proxy(cls.get_instance().plugin)

#/neutron/manager.py:NeutronManager
    @classmethod
    def get_instance(cls):
        # double checked locking
        if not cls.has_instance():
            cls._create_instance()
        return cls._instance

#/neutron/manager.py:NeutronManager
    @classmethod
    @utils.synchronized("manager")
    def _create_instance(cls):
        if not cls.has_instance():
            cls._instance = cls()

#/neutron/manager.py:NeutronManager
class NeutronManager(object):
    """Neutron's Manager class.

    Neutron's Manager class is responsible for parsing a config file and
    instantiating the correct plugin that concretely implements
    neutron_plugin_base class.
    The caller should make sure that NeutronManager is a singleton.
    """
    _instance = None

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = {}

        msg = validate_pre_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentionally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.info(_LI("Loading core plugin: %s"), plugin_provider)
        self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
                                                plugin_provider)
        msg = validate_post_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()
           

get_plugin函數傳回NeutronManager的plugin(core plugin)的弱引用,那麼core plugin是什麼對象呢?core plugin和service plugin在NeutronManager的__init__函數建立的。其中core plugin和service plugin分别根據neutron.conf配置檔案中的core_plugin參數和service_plugins參數進行建立。

1.1.1 core plugin的建立

我的OpenStack環境的neutron.conf配置檔案中的core_plugin參數值為:

core_plugin =neutron.plugins.ml2.plugin.Ml2Plugin

其建立在_get_plugin_instance函數中執行。

#/neutron/manager.py:NeutronManager
    def _get_plugin_instance(self, namespace, plugin_provider):
        try:
            # Try to resolve plugin by name
            mgr = driver.DriverManager(namespace, plugin_provider)
            plugin_class = mgr.driver
        except RuntimeError as e1:
            # fallback to class name
            try:
                plugin_class = importutils.import_class(plugin_provider)
            except ImportError as e2:
                LOG.exception(_LE("Error loading plugin by name, %s"), e1)
                LOG.exception(_LE("Error loading plugin by class, %s"), e2)
                raise ImportError(_("Plugin not found."))
        return plugin_class()
           

利用stevedore子產品建立core_plugin,即core_plugin為/neutron/plugins/ml2/plugin.py:Ml2Plugin對象。

#/neutron/plugins/ml2/plugin.py:Ml2Plugin
    def __init__(self):
        # First load drivers, then initialize DB, then initialize drivers
        self.type_manager = managers.TypeManager()
        self.extension_manager = managers.ExtensionManager()
        self.mechanism_manager = managers.MechanismManager()
        super(Ml2Plugin, self).__init__()
        self.type_manager.initialize()
        self.extension_manager.initialize()
        self.mechanism_manager.initialize()

        self._setup_rpc()

        # REVISIT(rkukura): Use stevedore for these?
        self.network_scheduler = importutils.import_object(
            cfg.CONF.network_scheduler_driver
        )

        self.start_periodic_dhcp_agent_status_check()
        LOG.info(_LI("Modular L2 Plugin initialization complete"))
           

由于type_manager,extension_manager和mechanism_manager的建立都類似,是以這裡我們主要分析type_manager代碼流程,其餘的簡要說明。

#/neutron/plugins/ml2/managers.py:TypeManager
class TypeManager(stevedore.named.NamedExtensionManager):
    """Manage network segment types using drivers."""

    def __init__(self):
        # Mapping from type name to DriverManager
        self.drivers = {}

        LOG.info(_LI("Configured type driver names: %s"),
                 cfg.CONF.ml2.type_drivers)
        super(TypeManager, self).__init__('neutron.ml2.type_drivers',
                                          cfg.CONF.ml2.type_drivers,
                                          invoke_on_load=True)
        LOG.info(_LI("Loaded type driver names: %s"), self.names())
        self._register_types()
        self._check_tenant_network_types(cfg.CONF.ml2.tenant_network_types)
           

這裡,TypeManager對象中的drivers即為type driver對象。drivers根據/etc/neutron/plugins/ml2/ml2_conf.ini配置檔案中的type_drivers參數去構造。在我的OpenStack環境中type_drivers參數值如下。

type_drivers = vlan,flat

兩個driver對象的建立在TypeManager類的父類中完成。且建立兩個對象被/stevedore/extension.py:Extension包裹。

#/neutron/plugins/ml2/managers.py:TypeManager
    def _register_types(self):
        for ext in self:
            network_type = ext.obj.get_type()
            if network_type in self.drivers:
                LOG.error(_LE("Type driver '%(new_driver)s' ignored because"
                              " type driver '%(old_driver)s' is already"
                              " registered for type '%(type)s'"),
                          {'new_driver': ext.name,
                           'old_driver': self.drivers[network_type].name,
                           'type': network_type})
            else:
                self.drivers[network_type] = ext
        LOG.info(_LI("Registered types: %s"), self.drivers.keys())
           

這裡,将建立完成的typedriver對象register到self.drivers字典中,如下。

self.drivers: {'flat': <stevedore.extension.Extension object at 0x428cad0>, 'vlan': <stevedore.extension.Extension object at 0x4299150>}

如果調用實際的flat或vlan對于的type driver的函數,需要通路/stevedore/extension.py:Extension對象的obj成員變量,obj成員變量即為實際的type driver的對象。如下

‘flat’: <neutron.plugins.ml2.drivers.type_flat.FlatTypeDriver object at 0x46cfb50>

‘vlan’: <neutron.plugins.ml2.drivers.type_vlan.VlanTypeDriver object at 0x46a9bd0>

在register typedriver後,check tenant的network type是否在我們所register的type driver中,如果沒有,則raise異常。其中tenant的network type是由/etc/neutron/plugins/ml2/ml2_conf.ini配置檔案中的tenant_network_types參數設定。我的OpenStack環境的tenant_network_types參數配置如下。

tenant_network_types = vlan
#/neutron/plugins/ml2/managers.py:TypeManager
    def _check_tenant_network_types(self, types):
        self.tenant_network_types = []
        for network_type in types:
            if network_type in self.drivers:
                self.tenant_network_types.append(network_type)
            else:
                LOG.error(_LE("No type driver for tenant network_type: %s. "
                              "Service terminated!"), network_type)
                raise SystemExit(1)
        LOG.info(_LI("Tenant network_types: %s"), self.tenant_network_types)
           

是以執行_check_tenant_network_types函數時,tenant network type校驗成功。

mechanism_manager的建立也是類似的,它管理的mechanism driver。其根據/etc/neutron/plugins/ml2/ml2_conf.ini配置檔案中的mechanism_drivers參數去構造mechanism driver對象。我的OpenStack環境mechanism_drivers配置參數如下。

mechanism_drivers =linuxbridge

建構的mechanismdriver對象為:

<neutron.plugins.ml2.drivers.mech_linuxbridge.LinuxbridgeMechanismDriver object at 0x3f42b90>

extension_manager的建立也類似。

在type_manager,extension_manager和mechanism_manager建立完成後,執行initialize函數對所建立的driver進行初始化。比如type_manager的initialize函數如下。

#/neutron/plugins/ml2/managers.py:TypeManager
    def initialize(self):
        for network_type, driver in self.drivers.iteritems():
            LOG.info(_LI("Initializing driver for type '%s'"), network_type)
            driver.obj.initialize()
           

從上可以看出,最終會調用type_manager所管理的driver的initialize函數。

下面分析建立rpc-client的代碼流程。即執行以下代碼。

self._setup_rpc()
#/neutron/plugins/ml2/managers.py:TypeManager
    def _setup_rpc(self):
        self.notifier = rpc.AgentNotifierApi(topics.AGENT)
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
        )
           

這裡topics.AGENT=’q-agent-notifier’。

#/neutron/plugins/ml2/rpc.py:AgentNotifierApi
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
                       sg_rpc.SecurityGroupAgentRpcApiMixin,
                       type_tunnel.TunnelAgentRpcApiMixin):
    """Agent side of the openvswitch rpc API.

    API version history:
        1.0 - Initial version.
        1.1 - Added get_active_networks_info, create_dhcp_port,
              update_dhcp_port, and removed get_dhcp_port methods.

    """

    def __init__(self, topic):
        self.topic = topic
        self.topic_network_delete = topics.get_topic_name(topic,
                                                          topics.NETWORK,
                                                          topics.DELETE)
        self.topic_port_update = topics.get_topic_name(topic,
                                                       topics.PORT,
                                                       topics.UPDATE)
        self.topic_port_delete = topics.get_topic_name(topic,
                                                       topics.PORT,
                                                       topics.DELETE)

        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

#/neutron/common/topics.py
def get_topic_name(prefix, table, operation, host=None):
    """Create a topic name.

    The topic name needs to be synced between the agent and the
    plugin. The plugin will send a fanout message to all of the
    listening agents so that the agents in turn can perform their
    updates accordingly.

    :param prefix: Common prefix for the plugin/agent message queues.
    :param table: The table in question (NETWORK, SUBNET, PORT).
    :param operation: The operation that invokes notification (CREATE,
                      DELETE, UPDATE)
    :param host: Add host to the topic
    :returns: The topic name.
    """
    if host:
        return '%s-%s-%s.%s' % (prefix, table, operation, host)
    return '%s-%s-%s' % (prefix, table, operation)
           

其中/neutron/common/topics.py檔案中定義了一些常量如下。

#/neutron/common/topics.py

NETWORK = 'network'

SUBNET = 'subnet'

PORT = 'port'

SECURITY_GROUP = 'security_group'

L2POPULATION = 'l2population'

DVR = 'dvr'

CREATE = 'create'

DELETE = 'delete'

UPDATE = 'update'

AGENT = 'q-agent-notifier'

PLUGIN = 'q-plugin'

L3PLUGIN = 'q-l3-plugin'

DHCP = 'q-dhcp-notifer'

FIREWALL_PLUGIN = 'q-firewall-plugin'

METERING_PLUGIN = 'q-metering-plugin'

LOADBALANCER_PLUGIN = 'n-lbaas-plugin'

L3_AGENT = 'l3_agent'

DHCP_AGENT = 'dhcp_agent'

METERING_AGENT = 'metering_agent'

LOADBALANCER_AGENT = 'n-lbaas_agent'

是以AgentNotifierApi對象建立了通過AMQP到topic為’q-agent-notifier’的RPC-server的綁定。即AgentNotifierApi可遠端調用執行topic為’q-agent-notifier’的RPC-server上的函數。不過這裡,并沒有純粹的調用topic為’q-agent-notifier’的RPC-server函數。而是通過prepare函數修改topic建構新的Target來調用3種另外的topic的RPC-server上的函數。這3種topic為(根據get_topic_name函數構造出來的):

‘q-agent-notifier-network-delete’

‘q-agent-notifier-port-update’

‘q-agent-notifier-port-delete’

那麼對應的這3個topic的RPC-server是哪個服務建構的呢?通過AgentNotifierApi類的解釋可知,應該是neutron-openvswitch-agent服務建構的。如下

#/neutron/plugins/opensvswitch/agent/ovs_neutron_agent.py:OVSNeutronAgent
    def setup_rpc(self):
        self.agent_id = 'ovs-agent-%s' % cfg.CONF.host
        self.topic = topics.AGENT
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
        self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.endpoints = [self]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.PORT, topics.DELETE],
                     [topics.NETWORK, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE],
                     [constants.TUNNEL, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE],
                     [topics.DVR, topics.UPDATE]]
        if self.l2_pop:
            consumers.append([topics.L2POPULATION,
                              topics.UPDATE, cfg.CONF.host])
        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                     self.topic,
                                                     consumers,
                                                     start_listening=False)

#/neutron/agent/rpc.py
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
    """Create agent RPC consumers.

    :param endpoints: The list of endpoints to process the incoming messages.
    :param prefix: Common prefix for the plugin/agent message queues.
    :param topic_details: A list of topics. Each topic has a name, an
                          operation, and an optional host param keying the
                          subscription to topic.host for plugin calls.
    :param start_listening: if True, it starts the processing loop

    :returns: A common Connection.
    """

    connection = n_rpc.create_connection(new=True)
    for details in topic_details:
        topic, operation, node_name = itertools.islice(
            itertools.chain(details, [None]), 3)

        topic_name = topics.get_topic_name(prefix, topic, operation)
        connection.create_consumer(topic_name, endpoints, fanout=True)
        if node_name:
            node_topic_name = '%s.%s' % (topic_name, node_name)
            connection.create_consumer(node_topic_name,
                                       endpoints,
                                       fanout=False)
    if start_listening:
        connection.consume_in_threads()
    return connection
           

從上面的代碼可以看出,neutron-openvswitch-agent建構的RPC-server将會最多有8個topic。即

‘q-agent-notifier-port-update’

‘q-agent-notifier-port-delete’

‘q-agent-notifier-network-delete’

‘q-agent-notifier-tunnel-update’

‘q-agent-notifier-tunnel-delete’

‘q-agent-notifier-security_group-update’

‘q-agent-notifier-dvr-update’

‘q-agent-notifier-l2population–update.jun2’

從上面8個topic可以看出,包括AgentNotifierApi類所建構RPC-client所需的topic。

不過,我的OpenStack環境采用的linuxbridge的mechanism driver,而在neutron-linuxbridge-agent服務啟動時,建立RPC-server時,沒有topic為’ q-agent-notifier-port-delete’的Target,是以采用的linuxbridge的mechanism driver時,AgentNotifierApi對象不能調用port_delete函數。如下

#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
    def setup_rpc(self, physical_interfaces):
        if physical_interfaces:
            mac = utils.get_interface_mac(physical_interfaces[0])
        else:
            devices = ip_lib.IPWrapper().get_devices(True)
            if devices:
                mac = utils.get_interface_mac(devices[0].name)
            else:
                LOG.error(_LE("Unable to obtain MAC address for unique ID. "
                              "Agent terminated!"))
                exit(1)
        self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
        LOG.info(_LI("RPC agent_id: %s"), self.agent_id)

        self.topic = topics.AGENT
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        # RPC network init
        # Handle updates from service
        self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
                                                  self.sg_agent)]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.NETWORK, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE]]
        if cfg.CONF.VXLAN.l2_population:
            consumers.append([topics.L2POPULATION,
                              topics.UPDATE, cfg.CONF.host])
        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                     self.topic,
                                                     consumers)
        report_interval = cfg.CONF.AGENT.report_interval
        if report_interval:
            heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            heartbeat.start(interval=report_interval)
           

下面分析另一個跟dhcp相關的RPC-client的建立。

        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (

            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

        )

其中self.agent_notifiers屬性是從父類/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin繼承而來。

#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
    """Common class for agent scheduler mixins."""

    # agent notifiers to handle agent update operations;
    # should be updated by plugins;
    agent_notifiers = {
        constants.AGENT_TYPE_DHCP: None,
        constants.AGENT_TYPE_L3: None,
        constants.AGENT_TYPE_LOADBALANCER: None,
    }
           

DhcpAgentNotifyAPI對象的建立如下。

#/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py:DhcpAgentNotifyAPI
class DhcpAgentNotifyAPI(object):
    """API for plugin to notify DHCP agent.

    This class implements the client side of an rpc interface.  The server side
    is neutron.agent.dhcp_agent.DhcpAgent.  For more information about changing
    rpc interfaces, please see doc/source/devref/rpc_api.rst.
    """
    # It seems dhcp agent does not support bulk operation
    VALID_RESOURCES = ['network', 'subnet', 'port']
    VALID_METHOD_NAMES = ['network.create.end',
                          'network.update.end',
                          'network.delete.end',
                          'subnet.create.end',
                          'subnet.update.end',
                          'subnet.delete.end',
                          'port.create.end',
                          'port.update.end',
                          'port.delete.end']

    def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
        self._plugin = plugin
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)
           

其中RPC-client的topic的值為’dhcp_agent’。而對應的RPC-server是在neutron-dhcp-agent服務啟動時建立的。

#neutron/agent/dhcp_agent.py
def main():
    register_options()
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-dhcp-agent',
        topic=topics.DHCP_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
    service.launch(server).wait()

#neutron/service.py:Service
    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

#neutron/common/rpc.py:Service
    def start(self):
        super(Service, self).start()

        self.conn = create_connection(new=True)
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()
           

總結一下,上面的coreplugin中的RPC-client與RPC-server的對應關系。

RPC-client RPC-server
Class service endpoints topic service
AgentNotifierApi

neutron-server

(core plugin)

OVSNeutronAgent

q-agent-notifier-xxx-yyy

(topics.AGENT-xxx-yyy)

neutron-openvswitch-agent
LinuxBridgeRpcCallbacks

q-agent-notifier-xxx-yyy

(topics.AGENT-xxx-yyy)

neutron-linuxbridge-agent
DhcpAgentNotifyAPI

DhcpAgentWithStateReport

(inherit from DhcpAgent)

dhcp_agent

(topics.DHCP_AGENT)

neutron-dhcp-agent

其中,xxx表示資源(如port,network等),yyy表示操作(如update,delete等)。

至此_setup_rpc函數分析完成。

Ml2Plugin的__init__函數還剩下以下代碼。

#neutron/plugins/ml2/plugin.py:Ml2Plugin.__init__
        # REVISIT(rkukura): Use stevedore for these?
        self.network_scheduler = importutils.import_object(
            cfg.CONF.network_scheduler_driver
        )

        self.start_periodic_dhcp_agent_status_check()
        LOG.info(_LI("Modular L2 Plugin initialization complete"))
           

network_scheduler_driver是在neutron.conf配置檔案中的參數,其預設值為:

network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler

是以self.network_scheduler為/neutron/scheduler/dhcp_agent_scheduler.py:ChanceScheduler對象。

start_periodic_dhcp_agent_status_check函數是一個周期性檢測函數。

目前把core plugin的代碼流程分析完成。下面分析service plugin的代碼流程。

1.1.2 service plugin的建立

#/neutron/manager.py:NeutronManager.__init__
        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()
           

service plugin首先将core plugin載入到self.service_plugins中,然後執行self._load_service_plugins函數将自身的service plugin載入到self.service_plugins中。

#/neutron/manager.py:NeutronManager
    def _load_service_plugins(self):
        """Loads service plugins.

        Starts from the core plugin and checks if it supports
        advanced services then loads classes provided in configuration.
        """
        # load services from the core plugin first
        self._load_services_from_core_plugin()

        plugin_providers = cfg.CONF.service_plugins
        LOG.debug("Loading service plugins: %s", plugin_providers)
        for provider in plugin_providers:
            if provider == '':
                continue

            LOG.info(_LI("Loading Plugin: %s"), provider)
            plugin_inst = self._get_plugin_instance('neutron.service_plugins',
                                                    provider)

            # only one implementation of svc_type allowed
            # specifying more than one plugin
            # for the same type is a fatal exception
            if plugin_inst.get_plugin_type() in self.service_plugins:
                raise ValueError(_("Multiple plugins for service "
                                   "%s were configured") %
                                 plugin_inst.get_plugin_type())

            self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst

            # search for possible agent notifiers declared in service plugin
            # (needed by agent management extension)
            if (hasattr(self.plugin, 'agent_notifiers') and
                    hasattr(plugin_inst, 'agent_notifiers')):
                self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)

            LOG.debug("Successfully loaded %(type)s plugin. "
                      "Description: %(desc)s",
                      {"type": plugin_inst.get_plugin_type(),
                       "desc": plugin_inst.get_plugin_description()})
           

首先從core plugin中load service plugin。

#/neutron/manager.py:NeutronManager
    def _load_services_from_core_plugin(self):
        """Puts core plugin in service_plugins for supported services."""
        LOG.debug("Loading services supported by the core plugin")

        # supported service types are derived from supported extensions
        for ext_alias in getattr(self.plugin,
                                 "supported_extension_aliases", []):
            if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
                service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
                self.service_plugins[service_type] = self.plugin
                LOG.info(_LI("Service %s is supported by the core plugin"),
                         service_type)
           

其中EXT_TO_SERVICE_MAPPING的資訊如下。

#/neutron/plugins/common/constants.py
# service type constants:
CORE = "CORE"
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
LOADBALANCERV2 = "LOADBALANCERV2"
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"


#maps extension alias to service type
EXT_TO_SERVICE_MAPPING = {
    'dummy': DUMMY,
    'lbaas': LOADBALANCER,
    'lbaasv2': LOADBALANCERV2,
    'fwaas': FIREWALL,
    'vpnaas': VPN,
    'metering': METERING,
    'router': L3_ROUTER_NAT
}
           

即從coreplugin(Ml2Plugin對象)的supported_extension_aliases函數傳回的alias查找是否有EXT_TO_SERVICE_MAPPING相比對的alias,如果有,則将core plugin載入到service plugin中。

#/neutron/plugins/ml2/plugin.py:Ml2Plugin
    # List of supported extensions
    _supported_extension_aliases = ["provider", "external-net", "binding",
                                    "quotas", "security-group", "agent",
                                    "dhcp_agent_scheduler",
                                    "multi-provider", "allowed-address-pairs",
                                    "extra_dhcp_opt", "subnet_allocation",
                                    "net-mtu", "vlan-transparent"]

    @property
    def supported_extension_aliases(self):
        if not hasattr(self, '_aliases'):
            aliases = self._supported_extension_aliases[:]
            aliases += self.extension_manager.extension_aliases()
            sg_rpc.disable_security_group_extension_by_config(aliases)
            vlantransparent.disable_extension_by_config(aliases)
            self._aliases = aliases
        return self._aliases
           

supported_extension_aliases函數将_supported_extension_aliases清單的alias賦給self._aliases變量。不過_supported_extension_aliases清單有13個alias,在supported_extension_aliases函數會判斷是否移除’security-group’和’vlan-transparent’這兩個alias。拿security-group舉例說明。

#/neutron/agent/securitygroups_rpc.py
#This is backward compatibility check for Havana
def _is_valid_driver_combination():
    return ((cfg.CONF.SECURITYGROUP.enable_security_group and
             (cfg.CONF.SECURITYGROUP.firewall_driver and
              cfg.CONF.SECURITYGROUP.firewall_driver !=
             'neutron.agent.firewall.NoopFirewallDriver')) or
            (not cfg.CONF.SECURITYGROUP.enable_security_group and
             (cfg.CONF.SECURITYGROUP.firewall_driver ==
             'neutron.agent.firewall.NoopFirewallDriver' or
              cfg.CONF.SECURITYGROUP.firewall_driver is None)
             ))

def is_firewall_enabled():
    if not _is_valid_driver_combination():
        LOG.warn(_LW("Driver configuration doesn't match with "
                     "enable_security_group"))

    return cfg.CONF.SECURITYGROUP.enable_security_group

def _disable_extension(extension, aliases):
    if extension in aliases:
        aliases.remove(extension)

def disable_security_group_extension_by_config(aliases):
    if not is_firewall_enabled():
        LOG.info(_LI('Disabled security-group extension.'))
        _disable_extension('security-group', aliases)
        LOG.info(_LI('Disabled allowed-address-pairs extension.'))
        _disable_extension('allowed-address-pairs', aliases)
           

因為/etc/neutron/plugins/ml2/ml2_conf.ini配置檔案中的enable_security_group參數設定如下。

enable_security_group = True

是以is_firewall_enabled函數傳回true,是以disable_security_group_extension_by_config函數不會将’security-group’ alias移除。這裡需要注意的。_is_valid_driver_combination函數用于check security group配置參數是否正确。其比對如下。

enable_security_group firewall_driver
True firewall_driver !=  'neutron.agent.firewall.NoopFirewallDriver
False firewall_driver == 'neutron.agent.firewall.NoopFirewallDriver
or firewall_driver is None

’vlan-transparent’ alias的remove比較簡單。

#/neutron/extensions/vlantransparent.py
def disable_extension_by_config(aliases):
    if not cfg.CONF.vlan_transparent:
        if 'vlan-transparent' in aliases:
            aliases.remove('vlan-transparent')
        LOG.info(_LI('Disabled vlantransparent extension.'))
           

vlan_transparent 為neutron.conf配置檔案中的參數。如下

vlan_transparent = False

是以’vlan-transparent’alias被remove了。

最終supported_extension_aliases函數傳回的alias為12個:

['provider', 'external-net', 'binding', 'quotas', 'security-group', 'agent', 'dhcp_agent_scheduler', 'multi-provider', 'allowed-address-pairs', 'extra_dhcp_opt', 'subnet_allocation', 'net-mtu']

這裡沒有EXT_TO_SERVICE_MAPPING相比對的alias。是以執行完_load_services_from_core_plugin函數後,service plugin目前還是隻有core plugin被加載其中。

繼續回到/neutron/manager.py:NeutronManager的_load_service_plugins函數。其内部在執行完_load_services_from_core_plugin函數後,将根據neutron.conf配置檔案中的service_plugins參數去加載和建立service plugin對象。

service_plugins =neutron.services.l3_router.l3_router_plugin.L3RouterPlugin

是以最終的self.service_plugins變量的值為:

{'L3_ROUTER_NAT': <neutron.services.l3_router.l3_router_plugin.L3RouterPlugin object at 0x42038d0>, 'CORE': <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x360d910>}

其中L3_ROUTER_NAT由get_plugin_type擷取的。

#/neutron/plugins/common/constants.py
L3_ROUTER_NAT = "L3_ROUTER_NAT"

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
    def get_plugin_type(self):
        return constants.L3_ROUTER_NAT
           

下面我們分析serviceplugin中的L3RouterPlugin對象的建立。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
                     extraroute_db.ExtraRoute_db_mixin,
                     l3_hamode_db.L3_HA_NAT_db_mixin,
                     l3_gwmode_db.L3_NAT_db_mixin,
                     l3_dvrscheduler_db.L3_DVRsch_db_mixin,
                     l3_hascheduler_db.L3_HA_scheduler_db_mixin):

    """Implementation of the Neutron L3 Router Service Plugin.

    This class implements a L3 service plugin that provides
    router and floatingip resources and manages associated
    request/response.
    All DB related work is implemented in classes
    l3_db.L3_NAT_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin,
    l3_dvr_db.L3_NAT_with_dvr_db_mixin, and extraroute_db.ExtraRoute_db_mixin.
    """
    supported_extension_aliases = ["dvr", "router", "ext-gw-mode",
                                   "extraroute", "l3_agent_scheduler",
                                   "l3-ha"]

    def __init__(self):
        self.setup_rpc()
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)
        self.start_periodic_l3_agent_status_check()
        super(L3RouterPlugin, self).__init__()
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()
           

首先,建立RPC-client。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
    def setup_rpc(self):
        # RPC support
        self.topic = topics.L3PLUGIN
        self.conn = n_rpc.create_connection(new=True)
        self.agent_notifiers.update(
            {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
        self.endpoints = [l3_rpc.L3RpcCallback()]
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()
           

其中,

#/neutron/common/topics.py
L3PLUGIN = 'q-l3-plugin'
           

其中self.agent_notifiers屬性是從父類/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin繼承而來。

#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
    """Common class for agent scheduler mixins."""

    # agent notifiers to handle agent update operations;
    # should be updated by plugins;
    agent_notifiers = {
        constants.AGENT_TYPE_DHCP: None,
        constants.AGENT_TYPE_L3: None,
        constants.AGENT_TYPE_LOADBALANCER: None,
    }
           

self.agent_notifiers中的AGENT_TYPE_L3所對應的value值為L3AgentNotifyAPI對象,如下。

#/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py:L3AgentNotifyAPI
class L3AgentNotifyAPI(object):
    """API for plugin to notify L3 agent."""

    def __init__(self, topic=topics.L3_AGENT):
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)
           

L3AgentNotifyAPI對象建立了topic為L3_AGENT = 'l3_agent'的RPC-client。而topic為L3_AGENT ='l3_agent'的RPC-server是由neutron-l3-agent服務啟動時建立的。

#/neutron/agent/l3_agent.py
def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
    register_opts(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-l3-agent',
        topic=topics.L3_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager=manager)
service.launch(server).wait()

#neutron/service.py:Service
    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

#neutron/common/rpc.py:Service
    def start(self):
        super(Service, self).start()

        self.conn = create_connection(new=True)
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()
           

同時setup_rpc也建立了一個topic為L3PLUGIN = 'q-l3-plugin'的RPC-server。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.setup_rpc
        self.endpoints = [l3_rpc.L3RpcCallback()]
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()
           

其endpoints為/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback對象。其中,/neutron/agent/l3/agent.py:L3PluginApi将建立topic為為L3PLUGIN = 'q-l3-plugin'的RPC-client(在neutron-l3-agent服務啟動時建立的)去調用/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback類中的函數。

#/neutron/agent/l3/agent.py:L3PluginApi
class L3PluginApi(object):
    """Agent side of the l3 agent RPC API.

    API version history:
        1.0 - Initial version.
        1.1 - Floating IP operational status updates
        1.2 - DVR support: new L3 plugin methods added.
              - get_ports_by_subnet
              - get_agent_gateway_port
              Needed by the agent when operating in DVR/DVR_SNAT mode
        1.3 - Get the list of activated services
        1.4 - Added L3 HA update_router_state. This method was reworked in
              to update_ha_routers_states
        1.5 - Added update_ha_routers_states

    """

    def __init__(self, topic, host):
        self.host = host
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

#/neutron/agent/l3/agent.py:L3NATAgent
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
                 ha.AgentMixin,
                 dvr.AgentMixin,
                 manager.Manager):
    """Manager for L3NatAgent

        API version history:
        1.0 initial Version
        1.1 changed the type of the routers parameter
            to the routers_updated method.
            It was previously a list of routers in dict format.
            It is now a list of router IDs only.
            Per rpc versioning rules,  it is backwards compatible.
        1.2 - DVR support: new L3 agent methods added.
              - add_arp_entry
              - del_arp_entry
              Needed by the L3 service when dealing with DVR
    """
    target = oslo_messaging.Target(version='1.2')

    def __init__(self, host, conf=None):
        if conf:
            self.conf = conf
        else:
            self.conf = cfg.CONF
        self.router_info = {}

        self._check_config_params()

        self.process_monitor = external_process.ProcessMonitor(
            config=self.conf,
            resource_type='router')

              ... ... ...

        self.context = n_context.get_admin_context_without_session()
        self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
        self.fullsync = True
        ... ... ...

        self._queue = queue.RouterProcessingQueue()
        super(L3NATAgent, self).__init__(conf=self.conf)

        self.target_ex_net_id = None
        self.use_ipv6 = ipv6_utils.is_enabled()

        if self.conf.enable_metadata_proxy:
            self.metadata_driver = metadata_driver.MetadataDriver(self)

#/neutron/agent/l3/agent.py:L3NATAgentWithStateReport
class L3NATAgentWithStateReport(L3NATAgent):

    def __init__(self, host, conf=None):
        self.use_call = True
        super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        self.agent_state = {
            'binary': 'neutron-l3-agent',
            'host': host,
            'topic': topics.L3_AGENT,
            'configurations': {
                'agent_mode': self.conf.agent_mode,
                'use_namespaces': self.conf.use_namespaces,
                'router_id': self.conf.router_id,
                'handle_internal_only_routers':
                self.conf.handle_internal_only_routers,
                'external_network_bridge': self.conf.external_network_bridge,
                'gateway_external_network_id':
                self.conf.gateway_external_network_id,
                'interface_driver': self.conf.interface_driver},
            'start_flag': True,
            'agent_type': l3_constants.AGENT_TYPE_L3}
        report_interval = self.conf.AGENT.report_interval
        if report_interval:
            self.heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            self.heartbeat.start(interval=report_interval)
           

在neutron-l3-agent服務啟動時,會建立L3NATAgentWithStateReport對象,該對象會初始化父類L3NATAgent類,L3NATAgent類将建構L3PluginApi對象(self.plugin_rpc= L3PluginApi(topics.L3PLUGIN, host)),進而L3PluginApi類中建立topic為為L3PLUGIN ='q-l3-plugin'的RPC-client。

在這裡總結一下service plugin的RPC相關操作。

RPC-client RPC-server
Class service endpoints topic service
L3AgentNotifyAPI

neutron-server

(service plugin)

L3NATAgentWithStateReport

(inherit from L3NATAgent)

l3_agent

(topics.L3_AGENT)

neutron-l3-agent
L3PluginApi neutron-l3-agent L3RpcCallback

q-l3-plugin

(topics.L3PLUGIN)

neutron-server

至此,L3RouterPlugin的__init__中setup_rpc函數分析完成,建立的RPC相關資訊如上表所示。在L3RouterPlugin的__init__函數中還剩下以下重要的知識點。即Neutron Callback System。不過在L3RouterPlugin的__init__函數中隻是注冊了一些函數,在合适的地方将調用registry.notify函數進行執行注冊的函數。如下

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.__init__
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()
           

這裡,我将利用l3_db.subscribe()舉例說明。

#/ neutron/db/l3_db.py
def subscribe():
    registry.subscribe(
        _prevent_l3_port_delete_callback, resources.PORT, events.BEFORE_DELETE)
    registry.subscribe(
        _notify_routers_callback, resources.PORT, events.AFTER_DELETE)

# NOTE(armax): multiple l3 service plugins (potentially out of tree) inherit
# from l3_db and may need the callbacks to be processed. Having an implicit
# subscription (through the module import) preserves the existing behavior,
# and at the same time it avoids fixing it manually in each and every l3 plugin
# out there. That said, The subscription is also made explicit in the
# reference l3 plugin. The subscription operation is idempotent so there is no
# harm in registering the same callback multiple times.
subscribe()
           

首先,我們看看如何subscribe的?

#/neutron/callbacks/registry.py
def _get_callback_manager():
    global CALLBACK_MANAGER
    if CALLBACK_MANAGER is None:
        CALLBACK_MANAGER = manager.CallbacksManager()
    return CALLBACK_MANAGER

#/neutron/callbacks/registry.py
def subscribe(callback, resource, event):
    _get_callback_manager().subscribe(callback, resource, event)

#/neutron/callbacks/manager.py:CallbacksManager
    def subscribe(self, callback, resource, event):
        """Subscribe callback for a resource event.

        The same callback may register for more than one event.

        :param callback: the callback. It must raise or return a boolean.
        :param resource: the resource. It must be a valid resource.
        :param event: the event. It must be a valid event.
        """
        LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s",
                  {'callback': callback, 'resource': resource, 'event': event})
        if resource not in resources.VALID:
            raise exceptions.Invalid(element='resource', value=resource)
        if event not in events.VALID:
            raise exceptions.Invalid(element='event', value=event)

        callback_id = _get_id(callback)
        self._callbacks[resource][event][callback_id] = weakref.proxy(callback)
        # We keep a copy of callbacks to speed the unsubscribe operation.
        if callback_id not in self._index:
            self._index[callback_id] = collections.defaultdict(set)
        self._index[callback_id][resource].add(event)
           

subscribe函數根據resource,event和callback_id 3個屬性來标示callback函數。且将subscribe的callback函數儲存到self._callbacks字典中。其中resource和event定義在如下檔案中。

#/neutron/callbacks/resources.py
PORT = 'port'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
ROUTER_INTERFACE = 'router_interface'
SECURITY_GROUP = 'security_group'
SECURITY_GROUP_RULE = 'security_group_rule'

VALID = (
    PORT,
    ROUTER,
    ROUTER_GATEWAY,
    ROUTER_INTERFACE,
    SECURITY_GROUP,
    SECURITY_GROUP_RULE,
)

#/neutron/callbacks/events.py
BEFORE_CREATE = 'before_create'
BEFORE_READ = 'before_read'
BEFORE_UPDATE = 'before_update'
BEFORE_DELETE = 'before_delete'

AFTER_CREATE = 'after_create'
AFTER_READ = 'after_read'
AFTER_UPDATE = 'after_update'
AFTER_DELETE = 'after_delete'

ABORT_CREATE = 'abort_create'
ABORT_READ = 'abort_read'
ABORT_UPDATE = 'abort_update'
ABORT_DELETE = 'abort_delete'

ABORT = 'abort_'
BEFORE = 'before_'

VALID = (
    BEFORE_CREATE,
    BEFORE_READ,
    BEFORE_UPDATE,
    BEFORE_DELETE,
    AFTER_CREATE,
    AFTER_READ,
    AFTER_UPDATE,
    AFTER_DELETE,
    ABORT_CREATE,
    ABORT_READ,
    ABORT_UPDATE,
    ABORT_DELETE,
)
           

當我們需要調用subscribe的callback函數時,将registry.notify函數。如下

#/neutron/callbacks/registry.py
def notify(resource, event, trigger, **kwargs):
    _get_callback_manager().notify(resource, event, trigger, **kwargs)

#/neutron/callbacks/manager.py:CallbacksManager
    def notify(self, resource, event, trigger, **kwargs):
        """Notify all subscribed callback(s).

        Dispatch the resource's event to the subscribed callbacks.

        :param resource: the resource.
        :param event: the event.
        :param trigger: the trigger. A reference to the sender of the event.
        """
        errors = self._notify_loop(resource, event, trigger, **kwargs)
        if errors and event.startswith(events.BEFORE):
            abort_event = event.replace(
                events.BEFORE, events.ABORT)
            self._notify_loop(resource, abort_event, trigger)
            raise exceptions.CallbackFailure(errors=errors)

#/neutron/callbacks/manager.py:CallbacksManager
    def _notify_loop(self, resource, event, trigger, **kwargs):
        """The notification loop."""
        LOG.info(_LI("Notify callbacks for %(resource)s, %(event)s"),
                 {'resource': resource, 'event': event})

        errors = []
        # TODO(armax): consider using a GreenPile
        for callback_id, callback in self._callbacks[resource][event].items():
            try:
                LOG.info(_LI("Calling callback %s"), callback_id)
                callback(resource, event, trigger, **kwargs)
            except Exception as e:
                LOG.exception(_LE("Error during notification for "
                                  "%(callback)s %(resource)s, %(event)s"),
                              {'callback': callback_id,
                               'resource': resource,
                               'event': event})
                errors.append(exceptions.NotificationError(callback_id, e))
        return errors
           

最終在/neutron/callbacks/manager.py:CallbacksManager中的_notify_loop函數執行該callback函數。

下面,我将利用官網的例子進行介紹。

from neutron.callbacks import events

from neutron.callbacks import resources

from neutron.callbacks import registry

def callback1(resource, event, trigger, **kwargs):

    print 'Callback1 called by trigger: ', trigger

    print 'kwargs: ', kwargs

def callback2(resource, event, trigger, **kwargs):

    print 'Callback2 called by trigger: ', trigger

    print 'kwargs: ', kwargs

# B and C express interest with I

registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE)

registry.subscribe(callback2, resources.ROUTER, events.BEFORE_CREATE)

print 'Subscribed'

# A notifies

def do_notify():

    kwargs = {'foo': 'bar'}

    registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)

print 'Notifying...'

do_notify()

執行上面的代碼,其輸出為:

Subscribed

Notifying...

Callback1 called by trigger:  <function do_notify at 0x1e86578>

kwargs:  {'foo': 'bar'}

Callback2 called by trigger:  <function do_notify at 0x1e86578>

kwargs:  {'foo': 'bar'}

因為subscribecallback1和callback2時,都是利用相同的resource(resources.ROUTER)和相同的event(events.BEFORE_CREATE)去subscribe的。是以最終在調用_notify_loop函數時,利用for循環,将執行callback1和callback2函數。

至此,core plugin和service plugin的建立代碼流程分析完成。總結一下。

core plugin的建立涉及到type_manager管理的type driver(如vlan,flat)的建立, mechanism_manager管理的mechanism driver(如linuxbridge, openvswitch)的建立以及extension_manager管理的extension driver的建立(不過我的OpenStack并沒使用extension driver)。還涉及到RPC-client的建立。

service plugin的建立涉及RPC-client和RPC-server的建立 以及使用Neutron Callback System 注冊了一些callback函數。

這裡總結一下coreresource(core API)與extension resource(extension API)的關系。

類似于nova對API資源的管理方式,neutron也将基于各種網絡抽象得到的API資源分為coreresource(core API)與extension resource(extension API),core API隻對應L2層的network/subnet/port/subnetpool(Kilo版本添加進來的)四種資源,其餘的各層抽象都涵蓋在extension API的範圍。

Extension API有兩種方式擴充現有的資源,一種是為network/subnet/port/subnetpool增加屬性,比如port binding擴充,還有一種就是增加一些新的資源,比如VPNaaS等。其結構如下

neutron-server的啟動流程(一)

由于neutron-server涉及的代碼流程較多,對于extension resource和RPC建立在下一篇文章進行分析。

PS:由于《nova boot代碼流程分析(三):nova與neutron的互動》沒有分析完成,是以待neutron-server分析完成後,再分析nova與neutron的互動的剩下内容。

繼續閱讀