服务导出要做的几件事情
1.确定服务的参数
2.确定服务支持的协议
3.构造服务最终的URL
4.将服务URL注册到注册中心去
5.根据服务支持的不同协议,启动不同的Server,用来接收和处理请求
6.因为Dubbo支持动态配置服务参数,所以服务导出时还需要绑定一个监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进行导出
主要就是两个
1.构造URL
2.注册URL到注册中心
服务概念的演化:
1.DemoService接口表示一个服务,此时的服务表示服务定义
2.DemoServiceImpl表示DemoService服务的具体实现,此时的服务表示服务的具体实现
3.DemoService+group+version表示一个服务,此时的服务增加了分组和版本概念
4.http://192.168.1.112:80/com.zyz.DemoService表示一个服务,此时的服务增加了机器IP和Port,表示远程机器可以访问这个URL来使用com.luban.DemoService这个服务
5.http:192.168.1.112:80/com.zyz.DemoService?timeout=3000&version=1.0.1&application=dubbo-demo-provider-application表示一个服务,此时的服务是拥有参数的,比如超时时间、版本号、所属应用
可以看到,最后就是要讲我们的服务变成这一条URL,然后注册到注册中心里去。
而难点就是将Dubbo里的参数都放进URL,不过一想,Dubbo里面不是可以都使用注解来配置,应该很容易就可以得到这些参数吧,其实不是,Dubbo不仅仅支持注解配置,还会有dubbo.properties文件、配置中心等等,所以这里面有一些逻辑的关系。所以要获得真正能够注册进注册中心的这条URL,也不是很简单滴。
源码分析
服务导出的入口,是在ServiceBean里面的export方法
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware
看这个ServiceBean实现了这么多接口
而它的onApplicationEvent就是实现了ApplicationListener接口,重写的方法,就是在Spring启动的时候,会发布一个事件,而Dubbo里的这个ServiceBean就可以接收到这个事件,从而调用这个方法进行服务的注册。
public void onApplicationEvent(ContextRefreshedEvent event) {
// 当前服务没有被导出并且没有卸载,才导出服务
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 服务导出(服务注册)
export();
}
}
public synchronized void export() {
//检查并更新配置
checkAndUpdateSubConfigs();
// 检查服务是否需要导出
if (!shouldExport()) {
return;
}
// 检查是否需要延迟发布
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 导出服务
doExport();
}
}
为什么这个方法,第一步是这个check啥啥啥呢。其实这个是更新配置。我在上面也说过,Dubbo里面,不只有一处能够对服务进行配置,而这个配置最终也要放进ServiceBean里去,而多处都能够配置这个参数,肯定有一个优先级的关系,那么这里的checkAndUpdateSubConfigs()方法,就是根据优先级关系,更新配置,因为上一篇博文也说了Spring整合Dubbo,生成一个ServiceBean,是根据配置文件赋值到ServiceBean里的,那么可能在其他地方有优先级更高的配置,所以用这个方法来更新配置。
public void checkAndUpdateSubConfigs() {
// Use default configs defined explicitly on global configs
// ServiceConfig中的某些属性如果是空的,那么就从ProviderConfig、ModuleConfig、ApplicationConfig中获取
// 补全ServiceConfig中的属性
completeCompoundConfigs();
// Config Center should always being started first.
// 从配置中心获取配置,包括应用配置和全局配置
// 把获取到的配置放入到Environment中的externalConfigurationMap和appExternalConfigurationMap中
// 并刷新所有的XxConfig的属性(除开ServiceConfig),刷新的意思就是将配置中心的配置覆盖调用XxConfig中的属性
startConfigCenter();
checkDefault();
checkProtocol();
checkApplication();
// if protocol is not injvm checkRegistry
// 如果protocol不是只有injvm协议,表示服务调用不是只在本机jvm里面调用,那就需要用到注册中心
if (!isOnlyInJvm()) {
checkRegistry();
}
// 刷新ServiceConfig
this.refresh();
// 如果配了metadataReportConfig,那么就刷新配置
checkMetadataReport();
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:service inter\" /> interface not allow null!");
}
// 当前服务对应的实现类是一个GenericService,表示没有特定的接口
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
// 加载接口
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 刷新MethodConfig,并判断MethodConfig中对应的方法在接口中是否存在
checkInterfaceAndMethods(interfaceClass, methods);
// 实现类是不是该接口类型
checkRef();
generic = Boolean.FALSE.toString();
}
// local和stub一样,不建议使用了
if (local != null) {
// 如果本地存根为true,则存根类为interfaceName + "Local"
if (Boolean.TRUE.toString().equals(local)) {
local = interfaceName + "Local";
}
// 加载本地存根类
Class<?> localClass;
try {
localClass = ClassUtils.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
// 本地存根
if (stub != null) {
// 如果本地存根为true,则存根类为interfaceName + "Stub"
if (Boolean.TRUE.toString().equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
// 检查local和stub
checkStubAndLocal(interfaceClass);
// 检查mock
checkMock(interfaceClass);
}
看completeCompoundConfigs方法代码
private void completeCompoundConfigs() {
// 如果配置了provider,那么则从provider中获取信息赋值其他属性,在这些属性为空的情况下
if (provider != null) {
if (application == null) {
setApplication(provider.getApplication());
}
if (module == null) {
setModule(provider.getModule());
}
if (registries == null) {
setRegistries(provider.getRegistries());
}
if (monitor == null) {
setMonitor(provider.getMonitor());
}
if (protocols == null) {
setProtocols(provider.getProtocols());
}
if (configCenter == null) {
setConfigCenter(provider.getConfigCenter());
}
}
// 如果配置了module,那么则从module中获取信息赋值其他属性,在这些属性为空的情况下
if (module != null) {
if (registries == null) {
setRegistries(module.getRegistries());
}
if (monitor == null) {
setMonitor(module.getMonitor());
}
}
// 如果配置了application,那么则从application中获取信息赋值其他属性,在这些属性为空的情况下
if (application != null) {
if (registries == null) {
setRegistries(application.getRegistries());
}
if (monitor == null) {
setMonitor(application.getMonitor());
}
}
}
它里面可以看到,我们现在代码往下走的这个对象,是ServiceBean,它继承的对象是ServiceConfig,而在这个对象里有很多很多的属性,因为最后要注册到注册中心里,是通过这个对象得到URL的,所以Dubbo会把很多我们配的参数都加到这个对象的属性里去,所以在这个时候就会有其他的Config,比如说ApplicationConfig,如果在ServiceConfig对象里的这个属性为空,这个ApplicationConfig就会对其赋值。而后面的provider、module、application,这是一个层级的关系,可以通过这三个来进行配置。就是一个补全配置。
再看startConfigCenter();
void startConfigCenter() {
//如果配置中心属性为空
if (configCenter == null) {
//将configCenter属性连上生成的configCenter的bean对象
ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
}
// 如果配置了ConfigCenter
if (this.configCenter != null) {
// 从其他位置获取配置中心的相关属性信息,比如配置中心地址
// TODO there may have duplicate refresh
this.configCenter.refresh();
// 属性更新后,从远程配置中心获取数据(应用配置,全局配置)
prepareEnvironment();
}
// 从配置中心取到配置数据后,刷新所有的XxConfig中的属性,除开ServiceConfig
ConfigManager.getInstance().refreshAll();
}
ConfigCenter也是一个配置Bean,不过这里忘记上面整合spring的时候,在哪里解析ConfigCenterBean了。
重点是这个refresh方法
public void refresh() {
try {
//得到一个混合的配置
CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId());
// 表示XxConfig对象本身- AbstractConfig
Configuration config = new ConfigConfigurationAdapter(this); // ServiceConfig
//如果优先级为true
if (Environment.getInstance().isConfigCenterFirst()) {
// The sequence would be: SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
//加到第四个位置
compositeConfiguration.addConfiguration(4, config);
} else {
//加到第二个位置
// The sequence would be: SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration
compositeConfiguration.addConfiguration(2, config);
}
// loop methods, get override value and set the new value back to method
Method[] methods = getClass().getMethods(); //ServiceBean
for (Method method : methods) {
// 是不是setXX()方法
if (MethodUtils.isSetter(method)) {
// 获取xx配置项的value
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
// isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig.
if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) {
method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
}
// 是不是setParameters()方法
} else if (isParametersSetter(method)) {
// 获取parameter配置项的value
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value)) {
Map<String, String> map = invokeGetParameters(getClass(), this);
map = map == null ? new HashMap<>() : map;
map.putAll(convert(StringUtils.parseParameters(value), ""));
invokeSetParameters(getClass(), this, map);
}
}
}
} catch (Exception e) {
logger.error("Failed to override ", e);
}
}
刷新XxConfig
一个XxConfig对象的属性可能是有值的,也可能是没有值的,这时需要从其他位置获取属性值,来进行属性的覆盖
覆盖的优先级,从大到小为系统变量->配置中心应用配置->配置中心全局配置->注解或xml中定义->dubbo.properties文件
以ServiceConfig为例,ServiceConfig中包括很多属性,比如timeout
但是在定义一个Service时,如果在注解上没有配置timeout,那么就会其他地方获取timeout的配置
比如可以从系统变量->配置中心应用配置->配置中心全局配置->注解或xml中定义->dubbo.properties文件
refresh是刷新,将当前ServiceConfig上的set方法所对应的属性更新为优先级最高的值
看上面,调用这个方法返回了CompositeConfiguration这个对象。
public CompositeConfiguration getConfiguration(String prefix, String id) {
CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
// Config center has the highest priority
//混合添加,是linkedList,是有顺序的,有优先级的
//都生成了几个Configuration对象,然后加入linkedList里去
// JVM环境变量
compositeConfiguration.addConfiguration(this.getSystemConfig(prefix, id));
// 操作系统环境变量
compositeConfiguration.addConfiguration(this.getEnvironmentConfig(prefix, id));
// 配置中心APP配置
compositeConfiguration.addConfiguration(this.getAppExternalConfig(prefix, id));
// 配置中心Global配置
compositeConfiguration.addConfiguration(this.getExternalConfig(prefix, id));
// dubbo.properties中的配置
compositeConfiguration.addConfiguration(this.getPropertiesConfig(prefix, id));
return compositeConfiguration;
}
可以看到,它其实是将这几个Config加入到一个linkedList里去,这是一个链表,是有顺序的,所以也代表了他们的优先级。然后都加入到这个compositeConfiguration对象里。
然后new了一个ConfigConfigurationAdapter点进去
public ConfigConfigurationAdapter(AbstractConfig config) {
this.metaData = config.getMetaData();
}
可以看到它的构造方法调用了getMetaData方法,是通过这个方法,得到一个map,得到它这个类的每个切入点。
然后接着看,可以看到,它里面会有一个判断,判断是不是按原来的优先级,如果是就加到链表的第四个位置,不是就加到第二个。
再然后下面就又是属性的注入了,将优先级最高的属性,注入进这个set方法里的属性。点到最里面去会看到是调用了这个方法
public Object getInternalProperty(String key) {
Configuration firstMatchingConfiguration = null;
//按照优先级一个个遍历,看配置里有没有这个key,如果有就直接赋值,break
for (Configuration config : configList) {
try {
if (config.containsKey(key)) {
firstMatchingConfiguration = config;
break;
}
} catch (Exception e) {
logger.error("Error when trying to get value for key " + key + " from " + config + ", will continue to try the next one.");
}
}
if (firstMatchingConfiguration != null) {
return firstMatchingConfiguration.getProperty(key);
} else {
return null;
}
}
可以看到,是根据这个linkedList,如果这个list里面的第一个有这个key,那么就会直接赋值,然后break。所以就是按优先级一个个来的。
属性更新后,再从远程配置中心获取数据(应用配置,全局配置)
prepareEnvironment();方法。用这个方法,去真正的连配置中心的属性。
private void prepareEnvironment() {
if (configCenter.isValid()) {
if (!configCenter.checkOrUpdateInited()) {
return;
}
// 动态配置中心,管理台上的配置中心
DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
// 如果是zookeeper,获取的就是/dubbo/config/dubbo/dubbo.properties节点中的内容
String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());
String appGroup = application != null ? application.getName() : null;
String appConfigContent = null;
if (StringUtils.isNotEmpty(appGroup)) {
// 获取的就是/dubbo/config/dubbo-demo-consumer-application/dubbo.properties节点中的内容
appConfigContent = dynamicConfiguration.getProperties
(StringUtils.isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(),
appGroup
);
}
try {
Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority());
Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent));
Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent));
} catch (IOException e) {
throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
}
}
}
private DynamicConfiguration getDynamicConfiguration(URL url) {
DynamicConfigurationFactory factory = ExtensionLoader
.getExtensionLoader(DynamicConfigurationFactory.class)
.getExtension(url.getProtocol());
DynamicConfiguration configuration = factory.getDynamicConfiguration(url);
Environment.getInstance().setDynamicConfiguration(configuration);
return configuration;
}
这个方法,就是用了Dubbo的SPI的机制,得到了一个叫DynamicConfigurationFactory的对象,这个看名字就是知道是一个工厂类,然后通过这个得到DynamicConfiguration
zookeeper://xxxxxxx:2181/ConfigCenterConfig?check=true&config-file=dubbo.properties&group=dubbo&highest-priority=false&namespace=dubbo&timeout=3000
这是我打断点,通过在dubbo-provider.properties下的配置,让我们上一章写过的和Spring整合的时候,就会解析这个配置文件,生成一个带有配置文件里属性的一个Bean,也是一个Config,然后在上面也讲了,这个方法传进来的是configCenter这个对象的URL,configCenter不知道大家还有没有印象,在上面咋们check的第一步,就是将其他模块的刷新到ServiceBean里,就是那个provider配置的setConfigCenter,刷新到了ServiceBean里,而provider也是在上一文,Spring整合的时候解析配置文件,传到属性里的,configCenter这里才会有值。
那么Dubbo通过这个ConfigCenter,调用了它的SPI机制,看看得到了啥呢。
这个是通过url的Protocol来获得扩展的类,而打断点可以看到这个protocol在这里,因为是我自己配的,所以就是zookeeper。于是会获得ZookeeperDynamicConfigurationFactory,也就会得到ZookeeperDynamicConfiguration动态配置
深点进这个getProperties可以看到最后是调用了zkCli,肯定是从zookeeper那边获取节点下的value了,那么它这个方法,肯定还通过configCenter里的属性来得到一个节点的key,这里就不看了。
configContent就是获取了此节点下的内容。是全局配置 externalConfigurationMap
appConfigContent,就是另一个节点下的内容。是应用配置 appExternalConfigurationMap
这两个我暂时只能这么理解吧,因为我对Dubbo的应用了解并不多。然后将他们都分别放进两个map里
然后它会把配置中心里的所有配置,刷新到除了ServiceConfig的其他Config上
public void refreshAll() {
// refresh all configs here,
getApplication().ifPresent(ApplicationConfig::refresh);
getMonitor().ifPresent(MonitorConfig::refresh);
getModule().ifPresent(ModuleConfig::refresh);
getProtocols().values().forEach(ProtocolConfig::refresh);
getRegistries().values().forEach(RegistryConfig::refresh);
getProviders().values().forEach(ProviderConfig::refresh);
getConsumers().values().forEach(ConsumerConfig::refresh);
}
同样也是调用上面的refresh方法。
再回到上面的checkAndUpdateSubConfigs方法里,后面的都是一些检查然后更新配置的方法。
这就是服务导出的第一步。检查然后刷新配置。
服务导出的真正逻辑
private void doExportUrls() {
// registryURL 表示一个注册中心
//得到url,注册服务也是一个服务,所以也会有对应的url,通过调用该url完成服务注册
List<URL> registryURLs = loadRegistries(true);
//遍历每一个协议
//一个协议一个服务
for (ProtocolConfig protocolConfig : protocols) {
// pathKey = group/contextpath/path:version
// 例子:myGroup/user/org.apache.dubbo.demo.DemoService:1.0.1
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// ProviderModel中存在服务提供者访问路径,实现类,接口,以及接口中的各个方法对应的ProviderMethodModel
// ProviderMethodModel表示某一个方法,方法名,所属的服务的,
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
// ApplicationModel表示应用中有哪些服务提供者和引用了哪些服务
ApplicationModel.initProviderModel(pathKey, providerModel);
// 重点
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
因为要真正完成服务导出,也就是服务注册到注册中心里这个业务。并不是直接把个啥啥啥东西create个节点进zookeeper就行了,而是等之后服务要调用这个放在zookeeper的节点,所以这个节点,必须包含很多很多东西,比如说这个服务的ip端口,服务的全限定类名,还有各个参数。
所以首先要做的就是构造URL,再然后是通过netty啊,啥的连接注册中心create这个value为URL的节点。
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
if (CollectionUtils.isNotEmpty(registries)) {
//遍历注册中心
for (RegistryConfig config : registries) {
String address = config.getAddress();
// 如果注册中心没有配地址,则地址为0.0.0.0
if (StringUtils.isEmpty(address)) {
address = ANYHOST_VALUE;
}
// 如果注册中心的地址不是"N/A"
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 把application中的参数放入map中,注意,map中的key是没有prefix的
appendParameters(map, application);
// 把config中的参数放入map中,注意,map中的key是没有prefix的
// config是RegistryConfig,表示注册中心
appendParameters(map, config);
// 此处path值固定为RegistryService.class.getName(),因为现在是在加载注册中心
map.put(PATH_KEY, RegistryService.class.getName());
// 把dubbo的版本信息和pid放入map中
appendRuntimeParameters(map);
// 如果map中如果没有protocol,那么默认为dubbo
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
// 构造注册中心url,地址+参数
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
.setProtocol(REGISTRY_PROTOCOL)
.build();
// 到此为止,url的内容大概为:
// registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=269936®istry=zookeeper×tamp=1584886077813
// 该url表示:使用registry协议调用org.apache.dubbo.registry.RegistryService服务
// 参数为application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=269936®istry=zookeeper×tamp=1584886077813
// 这里是服务提供者和服务消费者区别的逻辑
// 如果是服务提供者,获取register的值,如果为false,表示该服务不注册到注册中心
// 如果是服务消费者,获取subscribe的值,如果为false,表示该引入的服务不订阅注册中心中的数据
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
这个方法,它会先取配置中心配好的registries,一般就配了一个zookeeper,然后还有ip,端口啥的。
下面的就是服务的一些参数,都加到这个map里,然后构造一个注册中心的urls,然后遍历改变了一下这个url,应该就如注释写的。
然后就会遍历,配置的协议。拿到一个pathkey,服务的分组和版本等等参数。下面的代码看看注释。
然后最重要的就是下面这个doExportUrlsFor1Protocol(protocolConfig, registryURLs);看这个方法传进去的参数,协议,和注册中心的url。很明显就是要把这两个整合成一个url。
这个方法代码,看起来很多,其实就是加一些参数什么的,构造成一个urls的list,因为一个协议,可能一个Dubbo协议,配置了多个注册中心,所以会构成多个url。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// protocolConfig表示某个协议,registryURLs表示所有的注册中心
// 如果配置的某个协议,没有配置name,那么默认为dubbo
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 这个map表示服务url的参数
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
appendRuntimeParameters(map);
// 监控中心参数
appendParameters(map, metrics);
// 应用相关参数
appendParameters(map, application);
// 模块相关参数
appendParameters(map, module);
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
// 提供者相关参数
appendParameters(map, provider);
// 协议相关参数
appendParameters(map, protocolConfig);
// 服务本身相关参数
appendParameters(map, this);
// 服务中某些方法参数
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
// 某个方法的配置参数,注意有prefix
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
// 如果某个方法配置存在xx.retry=false,则改成xx.retry=0
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if (Boolean.FALSE.toString().equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
// 遍历当前方法配置中的参数配置
for (ArgumentConfig argument : arguments) {
// 如果配置了type,则遍历当前接口的所有方法,然后找到方法名和当前方法名相等的方法,可能存在多个
// 如果配置了index,则看index对应位置的参数类型是否等于type,如果相等,则向map中存入argument对象中的参数
// 如果没有配置index,那么则遍历方法所有的参数类型,等于type则向map中存入argument对象中的参数
// 如果没有配置type,但配置了index,则把对应位置的argument放入map
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 通过接口对应的Wrapper,拿到接口中所有的方法名字
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// Token是为了防止服务被消费者直接调用(伪造http请求)
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// export service
// 通过该host和port访问该服务
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 服务url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// url:http://192.168.40.17:80/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=285072&release=&side=provider×tamp=1585206500409
// 可以通过ConfiguratorFactory,对服务url再次进行配置
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY); // scope可能为null,remote, local,none
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果scope为none,则不会进行任何的服务导出,既不会远程,也不会本地
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 如果scope不是remote,则会进行本地导出,会把当前url的protocol改为injvm,然后进行导出
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 如果scope不是local,则会进行远程导出
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 如果有注册中心,则将服务注册到注册中心
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
// 如果是injvm,则不需要进行注册中心注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 拿到监控中心地址
URL monitorUrl = loadMonitor(registryURL);
// 当前服务连接哪个监控中心
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// 服务的register参数,如果为true,则表示要注册到注册中心
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
// 服务使用的动态代理机制,如果为空则使用javassit
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 生成一个当前服务接口的代理对象
// 使用代理生成一个Invoker,Invoker表示服务提供者的代理,可以使用Invoker的invoke方法执行服务
// 对应的url为 registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=http%3A%2F%2F192.168.40.17%3A80%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.40.17%26bind.port%3D80%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D19472%26release%3D%26side%3Dprovider%26timestamp%3D1585207994860&pid=19472®istry=zookeeper×tamp=1585207994828
// 这个Invoker中包括了服务的实现者、服务接口类、服务的注册地址(针对当前服务的,参数export指定了当前服务)
// 此invoker表示一个可执行的服务,调用invoker的invoke()方法即可执行服务,同时此invoker也可用来导出
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// invoker.invoke(Invocation)
// DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter
// 1. 先使用RegistryProtocol进行服务注册
// 2. 注册完了之后,使用DubboProtocol进行导出
// 到此为止做了哪些事情? ServiceBean.export()-->刷新ServiceBean的参数-->得到注册中心URL和协议URL-->遍历每个协议URL-->组成服务URL-->生成可执行服务Invoker-->导出服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 没有配置注册中心时,也会导出服务
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
// 根据服务url,讲服务的元信息存入元数据中心
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
可以看一下注释,这个参数都整到一起后,就生成一个代理对象,当然中间还有很多判断,这个没有深入了解过。
而这个代理对象,其实看这个PROXY_FACTORY,点进去看其实可以知道也是用了Dubbo的SPI机制,得到一个JavassistProxyFactory对象,javassist就是用来做代理的。然后会new一个包装着invoker的wrapperInvoker对象,最后到后面会调用这个export方法,它其实里面有一些SPI的机制,比如说会先调用它的wrapper对象啥的,这里不是特别理解,就先不探究。总之是会先调用RegistryProtocol的export方法,然后再调用Dubbo的export方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 导出服务
// registry:// ---> RegistryProtocol
// zookeeper:// ---> ZookeeperRegistry
// dubbo:// ---> DubboProtocol
// registry://xxx?xx=xx®istry=zookeeper ---> zookeeper://xxx?xx=xx 表示注册中心
URL registryUrl = getRegistryUrl(originInvoker); // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider-application&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.40.17%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-provider-application%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.40.17%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26logger%3Dlog4j%26methods%3DsayHello%26pid%3D27656%26release%3D2.7.0%26side%3Dprovider%26timeout%3D3000%26timestamp%3D1590735956489&logger=log4j&pid=27656&release=2.7.0×tamp=1590735956479
// 得到服务提供者url,表示服务提供者
URL providerUrl = getProviderUrl(originInvoker); // dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000×tamp=1590735956489
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// overrideSubscribeUrl是老版本的动态配置监听url,表示了需要监听的服务以及监听的类型(configurators, 这是老版本上的动态配置)
// 在服务提供者url的基础上,生成一个overrideSubscribeUrl,协议为provider://,增加参数category=configurators&check=false
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 一个overrideSubscribeUrl对应一个OverrideListener,用来监听变化事件,监听到overrideSubscribeUrl的变化后,
// OverrideListener就会根据变化进行相应处理,具体处理逻辑看OverrideListener的实现
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 在这个方法里会利用providerConfigurationListener和serviceConfigurationListener去重写providerUrl
// providerConfigurationListener表示应用级别的动态配置监听器,providerConfigurationListener是RegistyProtocol的一个属性
// serviceConfigurationListener表示服务级别的动态配置监听器,serviceConfigurationListener是在每暴露一个服务时就会生成一个
// 这两个监听器都是新版本中的监听器
// 新版本监听的zk路径是:
// 服务: /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点的内容
// 应用: /dubbo/config/dubbo/dubbo-demo-provider-application.configurators节点的内容
// 注意,要喝配置中心的路径区分开来,配置中心的路径是:
// 应用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties节点的内容
// 全局:/dubbo/config/dubbo/dubbo.properties节点的内容
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker
// 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 得到注册中心-ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
//是否需要注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册服务,把简化后的服务提供者url注册到registryUrl中去
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// 针对老版本的动态配置,需要把overrideSubscribeListener绑定到overrideSubscribeUrl上去进行监听
// 兼容老版本的配置修改,利用overrideSubscribeListener去监听旧版本的动态配置变化
// 监听overrideSubscribeUrl provider://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=416332&release=&side=provider×tamp=1585318241955
// 那么新版本的providerConfigurationListener和serviceConfigurationListener是在什么时候进行订阅的呢?在这两个类构造的时候
// Deprecated! Subscribe to override rules in 2.6.x or before.
// 老版本监听的zk路径是:/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000
// 监听的是路径的内容,不是节点的内容
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
doLocalExport方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// protocol属性的值是哪来的,是在SPI中注入进来的,是一个代理类
// 这里实际利用的就是DubboProtocol或HttpProtocol去export NettyServer
// 为什么需要ExporterChangeableWrapper?方便注销已经被导出的服务
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
// 构造一个Exporter进行服务导出
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
// 服务的stub方法
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 开启NettyServer
openServer(url); //请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务
// 特殊的一些序列化机制,比如kryo提供了注册机制来注册类,提高序列化和反序列化的速度
optimizeSerialization(url);
return exporter;
}