前言
源码地址:dubbo
版本: dubbo-parent pom中的version为
2.7.12-SNAPSHOT
测试代码:dubbo-demo module。在application.properties中,加入了registry url如下
dubbo.registry.address=zookeeper://127.0.0.1:2181
在dubbo中一切都是基于interface而来,对provider,consumer都是。在代码层面上对应AbstractInterfaceConfig,而接口又是以方法粒度扩展对应AbstractMethodConfig。
从代码层次上来看
AbstractServiceConfig:与服务提供方有关
- ServiceConfig -> 对应的spring bean为子类ServiceBean
- ProviderConfig
AbstractReferenceConfig:与服务消费方有关
- ReferenceConfig->对应的spring bean为子类ReferenceBean
- ConsumerConfig
启动入口
DubboBootstrapApplicationListener :继承自 OnceApplicationContextEventListener,该监听器只触发一次容器事件监听的执行避免事件传播,适合用于容器初始化完成和关闭等,定义的order为最低优先级保证当容器完全初始完成后最后执行。
onContextRefreshedEvent((ContextRefreshedEvent) event);
onContextClosedEvent((ContextClosedEvent) event);
当spring容器初始化完成后,该监听器会监听到ContextRefreshedEvent事件并执行DubboBootstrap的start,其中包括初始化、暴露服务、引用服务等所有流程,本次主要分析服务暴露的部分。
initialize:负责与dubbo相关的服务初始化,例如startConfigCenter会使用之前存放了相关配置信息(ApplicationConfig,ConfigCenterConfig等)的ConfigManager,将合适configCenter作为DynamicConfiguration存到environment
服务发布
DubboBootstrapApplicationListener监听到容器启动事件ContextRefreshedEvent
调用DubboBootstrap的start,其中与服务发布有关的为以下几个流程
exportServices
负责发布扫描到的dubbo service提供者。
DubboBootstrap中由configManager负责收集跟dubbo相关的服务,注册,协议等信息,并在此时直接定位要发布服务ServiceConfig;
根据需要异步或同步发布服务(ExecutorRepository也是dubbo扩展加载类的一种,利用executor异步处理发布每个service的任务);
exportService(ServiceConfig sc):执行ServiceConfig的export,并缓存到Map<String, ServiceConfigBase<?>> exportedServices中
ServiceConfig export
发布检验
比如是否设置了export为false,需要delay等
doExport
主要功能为doExportUrls。这里采用zookeeper注册中心为例
// 构造registry相关信息
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
拿到的registry url例如:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&id=org.apache.dubbo.config.RegistryConfig#0&pid=13300®istry=zookeeper×tamp=1627216227129;
依次对配置的protocols解析得到的ProtocolConfig,得到该服务对应在注册中心上(这里为zk)的path,一般为接口全路径名例如org.apache.dubbo.demo.DemoService, 并添加到ServiceRepository缓存中;调用doExportUrlsFor1Protocol处理对应协议的发布。
doExportUrlsFor1Protocol
- 构造url所需要的参数信息:比如side(provider),application,interface,methods及运行时信息等
- 对GenericService的情况进行处理:非generic时,会通过Wrapper找到该接口下对应要暴露的方法等,生成第一个interface->Wrapper0类存在于Wrapper的缓存中
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
....
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
- 解析host和port:用于定位ip和端口被消费方调用
- 构造服务方URL:用于服务方在注册中心的地址,例如,dubbo://172.xx.xx.x:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.xx.xx.x&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=13300&release=&service.name=ServiceBean:/org.apache.dubbo.demo.DemoService&side=provider×tamp=1627219556526
- ConfiguratorFactory:提供SPI接口扩展处理url参数信息的api,Configurator getConfigurator(URL url)
- scope决定发布类型:由目前url中的scope参数判断如何发布,none则不发布,不是remote(null的情况也匹配)则exportLocal(这里将执行),再接着不是local(null的情况也匹配)则进行远程发布(这里也会执行);
对第六步中不同方式发布进行下一步分析
exportLocal
用local URL的方式通过Protocol$Adaptive发布
//ServiceConfig中默认初始化了protocol和proxyFactory自适应代理类
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
//exportLocal中,这里实际执行StubProxyFactoryWrapper和InjvmProtocol的接口方法
Exporter<?> exporter = PROTOCOL.export(PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
其中StubProxyFactoryWrapper是个包装类,里面包裹的真正实现是JavassistProxyFactory(因为ProxyFactory的扩展目录中有符合条件的包装类StubProxyFactoryWrapper对其进行自动封装),因此会真正执行Javasist的export,构造了一个Wrapper类作为实际执行invoke的AbstractProxyInvoker中的delegate。
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
Wrapper中以类加载器AppClassLoader加载并初始化类ClassGenerator,通过动态java代码拼接的方式,为接口暴露的方法进行了处理。
$1,$2等为占位符,表示当前方法中的第几个参数($1:Object o, $2:String n,$3:Class[] p)
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
org.apache.dubbo.demo.provider.DemoServiceImpl w;
try {
w = ((org.apache.dubbo.demo.provider.DemoServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals($2) && $3.length == 1) {
return ($w) w.sayHello((java.lang.String) $4[0]);
}
if ("sayHelloAsync".equals($2) && $3.length == 1) {
return ($w) w.sayHelloAsync((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}
通过默认构造函数实例化对象(Wrapper) wc.getDeclaredConstructor().newInstance(),得到实例化后的wrapper 为Wrapper子类,并缓存到Wrapper的map即WRAPPER_MAP,此时interface及对应的实现类都已经在该map里存在
interface org.apache.dubbo.demo.DemoService->Wrapper0
class org.apache.dubbo.demo.provider.DemoServiceImpl -> Wrapper1
得到由proxyFactory包装好的Invoker后,同样在调用protocol的export也经过了多个包装类的层层调用(ProtocolFilterWrapper->ProtocolListenerWrapper->InjvmProtocol)。
ProtocolFilterWrapper
:实现了项目中配置的Filter扩展接口集成,对非注册url的服务发布进行Invoker转换,扩展为继承Invoker的FilterNode(是一个连接父子FilterNode的过滤链类,可以依次对filter进行执行),再交由ProtocolListenerWrapper继续执行;
ProtocolListenerWrapper
:只对非注册url进行处理,将具体Protocol实现(这里为InjvmProtocol)返回的invoker包装为ListenerExporterWrapper(里面增加了ExporterListener接口扩展,可自定义做监听发布事件的任务)
InjvmProtocol
:是最终负责实现export的逻辑,将invoker封装为InjvmExporter,并加入到AbstractProtocol的exporterMap缓存。 key:org.apache.dubbo.demo.DemoService,value:InjvmExporter
将得到的exporter加入到当前ServiceConfig中的exporters中(ArrayList),表明当前服务已经发布,至此exportLocal已经结束。
remote export
与exportLocal相似,通过合适的ProxyFactory动态选择了某一实现的getInvoker获得invoker并经过包装,由Protocol$Adaptive类选择当前url中protocol对应的协议即RegistryProtocol实现进行发布,将发布成功后的exporter进行缓存到当前集合exporters中
for (URL registryURL : registryURLs) {
//这里实际上将原provider url信息作为新registryURL的一个参数export附加上去,在registryProtocol的export中会取出来再次利用
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
ProxyFactory Invoker
根据registryURLs,对每个非injvm的registryURL进行注册协议信息的构造,根据url中proxy参数信息会由JavasistProxyFactory实现生成invoker。(如果url中的protocol为injvm,不需要注册到远端服务中心上)
可以看到invoker对象的wrapper,复用了之前exportLocal时已产生并存在缓存中的同个接口实现类的Wrapper1。
RegistryProtocol export
流程与上面exportLocal中的部分类似,同样通过Protoco$Adaptive进行export执行,会真正跳转到InterfaceCompatibleRegistryProtocol(该类继承自RegistryProtocol),因为此时url中的
protocol为registry
,url信息表现为registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider…,在dubbo-registry-api中配置为registry=org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol。
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally,
URL providerUrl = getProviderUrl(originInvoker);
getRegistryUrl
:将registryUrl中的参数registry对应的实现设置为url的protocol即zookeeper
getProviderUrl
:拿到之前存在registryURL参数export中的provider url信息,重新构造出provider URL
构造provider URL的OverrideListener
:把该url参数中的category置成了configurator,check设置为false,用于接受订阅的服务方事件变更的通知
doLocalExport(originInvoker, providerUrl)
:由originalInvoker中的url信息从当前缓存bounds中查找或创建一个exporter的包装类ExporterChangeableWrapper。创建时现在InvokerDelegate中url的protocol是dubbo,因此会采用DubboProtocol的export产生一个exporter
DubboProtocol export:在构造DubboExporter的同时,还对url中server的类型选择合适的Transporter实现(NettyTransporter等)创建一个可以通过ip port进行通信的server。
创建出来的DubboExporter将以service:port->exporter形式缓存到当前protocol的exportMap中;openServer会先从当前服务实例缓存(Map<String, ProtocolServer> serverMap,key为ip:port)查找,不存在则开始基于当前扩展配置的server协议创建server;
createServer:
private ProtocolServer createServer(URL url) {
//为url中默认设置额外参数
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 将url中参数server的默认实现设为了Netty,去加载对应Transporter的实现
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
...
ExchangeServer server = Exchangers.bind(url, requestHandler);
// 检验url中client的值(实际为null)是不是有对应加载的Transporter扩展
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
- default URL:对url参数中添加默认信息,并根据参数中server(默认netty)使用对应协议开启服务,默认实现netty
- Exchangers bind:getExchanger(url).bind(url, handler),通过url参数中exchanger(默认header)加载对应的Exchanger SPI实现即HeaderExchanger;然后创建基于netty通信的exchange server,其中包含了传输层tcp协议的处理NettyTransporter,消息解码器DecodeHandler,以及负责通过channel接收数据处理的HeaderExchangeHandler,它会将解析得到的请求交由真正负责处理的业务处理器handler即DubboProtocol中的ExchangeHandlerAdapter处理。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
- wrap exporter:在得到ExchangeServer后,封装为DubboProtocolServer并返回
getRegistry(originInvoker)
:拿到invoker中的url作为registryUrl,通过injectExtension注入的registryFactory(当前RegistryProtocol中有RegistryFactory的setter方法)也是适应代理类,会找到ZookeeperRegistryFactory(继承AbstractRegistryFactory)进行真正的注册逻辑。
@Override
public Registry getRegistry(URL url) {
//去掉export参数(dubbo url信息),覆盖interface为RegistryService
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
//生成的key为 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
```
//省略了检验是否已加载或从缓存中获取到的判断代码
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
createRegistry:ZookeeperRegistryFactory根据url和ZookeeperTransporter(适应代理类)创建一个ZookeeperRegistry,初始化要简历的root node(/dubbo)信息、相应的zkClient并添加zk的state listener,最后经过自动包装成ListenerRegistryWrapper(可添加自定义的RegistryServiceListener实现做register和onUnregister的监听逻辑)
registry.register(registeredProviderUrl)
:交由继承了FailbackRegistry的ZookeeperRegistry进行注册逻辑即在zk上创建对应的zk node。对原url进行category,service等封装,最后在zk上创建父级持久节点**/dubbo/org.apache.dubbo.demo.DemoService/providers**,子临时节点即当前服务实例的url信息节点
/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F172.xx.xx.x%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D12312%26release%3D%26service.name%3DServiceBean%3A%2Forg.apache.dubbo.demo.DemoService%26side%3Dprovider%26timestamp%3D1627702891449
@Override
public void doRegister(URL url) {
try {
// url:dubbo://172.xx.xx.x:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=12312&release=&service.name=ServiceBean:/org.apache.dubbo.demo.DemoService&side=provider×tamp=1627702891449
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)
:ZookeeperRegistry的doSubscribe会被调用,将对/dubbo/org.apache.dubbo.demo.DemoService/configurators目录下添加ChildListener 实现动态配置的感知。
该部分代码针对指定了具体Interface的情况,任意interface在这里不满足因此不展开分析
CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
```
}
notify(url, listener, urls);
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
这里的zkListener里面存在一个之前添加的对provider URL的listener,当以现在的url通知listener时这里就是RegistryProtocol$OverrideListener,如果得到有新的configurator信息下的url会再次通过RegistryProtocol触发reExport发布新的服务方的url
notifyExport(exporter)
:对扩展的RegistryProtocolListener实现进行服务远程注册成功后的监听逻辑调用,可以自定义扩展做事件的通知等
至此,服务DemoService在本地(injvm)及服务注册中心zookeeper已经发布完成。
exported
这里的exportedURLs中只有一个发布到注册中心的exporter,如果url的parameter中register-type是service的情况才会被认为将进行服务发现的处理,那么这个信息什么时候可以被设置到url中呢?可以采用以下方式达成该条件
- ServiceConfig中的doExportUrlsFor1Protocol时,如果配置的protocol为service-discovery-registry会设置registry-type为service。对于该demo项目可以在dubbo-provider.properties中设置
,同时也需要RegistryURL中有参数registry-type为service,配置为dubbo.registry.protocol=service-discovery-registry
,最后exported中判断isServiceDiscovery将为truedubbo.registry.parameters[registry-type]=service
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(REGISTRY_TYPE_KEY, SERVICE_REGISTRY_TYPE);
}
此时registryURL将会是service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&id=org.apache.dubbo.config.RegistryConfig#0&pid=10736®istry=zookeeper®istry-type=service®istry.type=service×tamp=1627742417006形式
- 自定义ConfiguratorFactory让dubbo能加载到,实现对url该信息的特殊设定。比如自定义添加了一个ProviderConfiguratorFactory,因为此时的url protocol默认为dubbo在对应接口的配置文件中key也需要定义为dubbo,ConfiguratorFactory是一个自适应方法会根据url中的protocol去选择对应的实现。
- 在dubbo-config-api目录下,以NacosDubboServiceProviderBootstrap或ZookeeperDubboServiceProviderBootstrap等方式启动服务,通过手动往url中添加该参数信息
因为本次分析采用zookeeper默认注册方式未有自定义registry-type,并不满足是ServiceDiscovery的情况,因此下一步是针对自定义扩展等方式的分析。
service按key,group等进行具体配置中心的发布处理
这里又是一个SPI接口ServiceNameMapping 默认值为config,在不同module下有不同处理默认实现config,对应DynamicConfigurationServiceNameMapping
// dubbo-metadata-api配置实现
config=org.apache.dubbo.metadata.DynamicConfigurationServiceNameMapping
// dubbo-registry-api配置实现
metadata=org.apache.dubbo.registry.client.metadata.MetadataServiceNameMapping
DynamicConfigurationServiceNameMapping会对每个exportedURL进行map处理
map中会根据之前配置到environment中的动态配置类,进行服务到不同配置管理中心上的发布比如Apollo,zk,Nacos等
具体实现如下
在ServiceDiscoveryRegistry中要订阅的服务同时subscribe(URL,NotifyListener),完成了相应监听器的设置
registerServiceInstancesChangedListener:其中调用具体ServiceDiscovery的注册监听逻辑。ZookeeperServiceDiscovery 为ServiceDiscovery扩展接口默认实现(zookeeper),
对于zk的服务变动事件(表现为节点数据及子节点的变化)而言,会通过watcher机制响应事件的处理。其中具体registerServiceWatcher创建对应path上的watcher监听,
之后对每个service进行ServiceInstancesChangedListener的事件发布监听方法处理,包括更新当前服务列表,通过notifyListener通知其他监听者
ZookeeperServiceDiscoveryChangeWatcher的process:
通过ServiceInstancesChangedListener会处理当前接收的事件ServiceInstancesChangedEvent,并继续对该节点进行watch以接收下次事件变更。
发送服务发布事件及监听处理
// dispatch a ServiceConfigExportedEvent since 2.7.4
dispatch(new ServiceConfigExportedEvent(this));
这里使用SPI 接口EventDispatcher(@SPI(DirectEventDispatcher.NAME)),默认采用DirectEventDispatcher(extends AbstractEventDispatcher)实现类发布事件,直接使用简单线程发布一个ServiceConfigExportedEvent(类似也会有ServiceConfigUnexportedEvent)
而dubbo对该类事件也做出了扩展点SPI接口EventListener,可以自定义监听器实现对服务发布事件或取消发布事件做出相应的处理。那么这些监听器是什么时候被加载执行的呢?
前面的DirectEventDispatcher是继承AbstractEventDispatcher,在dispatcher调用构造函数初始化的时候其中除了初始化Executor外,还有loadEventListenerInstances,找到所有配置的listener实现并添加到listenersCache中,内置的EventListener默认实现主要是打印信息的简单逻辑,如下
dubbo-registry目录
registry-logging=org.apache.dubbo.registry.client.event.listener.LoggingEventListener
dubbo-config目录
config-logging=org.apache.dubbo.config.event.listener.LoggingEventListener
publishExportEvent
与上述发布不同的是,这里是用spring的applicationContext将ServiceBeanExportedEvent事件发布到spring容器中,需要在spring容器中有对应的监听器bean处理。而上面的发布是dubbo本身依靠dispatcher绑定EventListener机制,依赖自身的EventListener扩展加载实现并在dispatcher初始化时加载配置的listener,在dispatch的时候根据事件类型找到符合的listener进行onEvent的调用。
结语
此分析使用的版本为2.7.12-SNAPSHOT,距离现在dubbo github上发布的版本也许不是最新,在一些代码上有所差别但不影响主要流程的跟踪。
通过dubbo源码中提供的dubbo-demo模块,用zk为注册中心追溯了dubbo中一个服务发布的具体过程,其中涉及到很多dubbo SPI的扩展类,为开发者提供了极大的可扩展性,以及加入缓存以增加对象复用性。其中在基于DubboProtocol创建server的部分涉及了netty相关通信细节等,都在dubbo中进行了很好的封装对开发者进行了底层接口的隔离。
文章较长,后续有问题会再次进行更改补充~