概述
最近做了個服務化項目,基于dubbo項目的,是以抽空把原本想看dubbo源碼的願望給實作了,等把dubbo的事情了解了再繼續去完善flume部分的源碼閱讀。
這一章主要是講dubbo服務釋出的過程,其實dubbo服務的釋出我認為其實有三個階段:階段一是指解析xml配置檔案;階段二是spring如何生成bean對象;階段三是實作bean的服務釋出。其中階段二由于涉及到spring本身的bean初始化,這塊暫時沒有能力去講解清楚。是以努力将其他兩個階段講解清楚。
參考了很多其他人的文章,先把概念性的東西最先寫出來,結合着後面的具體過程分析會比較好了解,這裡共享了《
益文的圈》的圖檔,特意備注出來。
基礎概念普及
Invoker
public interface Invoker<T> extends Node {
Class<T> getInterface();
Result invoke(Invocation invocation) throws RpcException;
}
可執行對象的抽象,能夠根據方法的名稱、參數得到相應的執行結果。
Invoker可分為三類:
- AbstractProxyInvoker:本地執行類的Invoker,實際通過Java反射的方式執行原始對象的方法。
- AbstractInvoker:遠端通信類的Invoker,實際通過通信協定發起遠端調用請求并接收響應。
- AbstractClusterInvoker:多個遠端通信類的Invoker聚合成的叢集版Invoker,加入了叢集容錯和負載均衡政策。 invoker
dubbo - 服務釋出
Invocation:包含了需要執行的方法和參數等重要資訊,他有兩個實作類RpcInvocation和MockInvocation。
ProxyFactory
public interface ProxyFactory {
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
服務接口代理抽象,用于生成一個接口的代理類。
- getInvoker方法:針對Server端,将服務對象(如DemoServiceImpl)包裝成一個Invoker對象。
- getProxy方法:針對Client端,建立接口(如DemoService)的代理對象。
Exporter
public interface Exporter<T> {
Invoker<T> getInvoker();
void unexport();
}
維護Invoker的生命周期,内部包含Invoker或者ExporterMap。
Protocol
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
協定抽象接口。封裝RPC調用。
- exporter方法:暴露遠端服務(用于服務端),就是将Invoker對象通過協定暴露給外部。
- refer方法:引用遠端服務(用于用戶端),通過Clazz、url等資訊建立遠端的動态代理Invoker。
關系圖
- ServiceConfig包含ProxyFactory和Protocol,通過SPI的方式注入生成;
- ProxyFactory負責建立Invoker;
- Protocol負責通過Invoker生成Exporter,将服務啟動并暴露;
dubbo的注冊中心位址
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
application=demo-provider&dubbo=2.0.0&pid=1908®istry=zookeeper×tamp=1527741226496
dubbo的服務釋出位址
dubbo://172.17.32.44:20880/com.alibaba.dubbo.demo.DemoService?
anyhost=true&application=demo-provider&bind.ip=172.17.32.44&bind.port=20880&dubbo=2.0.0&generic=false&
interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1908&side=provider×tamp=1527741226515
XML配置解析
dubbo的demo中的基本XML配置如下,基本上我們可以看到核心的幾個配置包括dubbo:application、dubbo:registry、dubbo:protocol、dubbo:service,我們的解析過程就是對這個XML檔案進行了解析。
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>
<!-- use multicast registry center to export service -->
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20881"/>
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- declare the service interface to be exported -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" delay="-1"/>
</beans>
XML配置解析的過程主要是解析XML檔案并生成BeanDefinition,這個BeanDefinition就是用于spring初始化bean執行個體的對象。DubboNamespaceHandler主要用于注冊各個模型的解析類,其中的application、module、provider、consumer、service等都是dubbo的schema關鍵字。
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
DubboBeanDefinitionParser類作為解析XML的核心,通過parse方法傳回BeanDefinition對象。這裡Element和beanClass是一一對應的,解析dubbo:application标簽對應的beanClass為ApplicationConfig.class,解析dubbo:service标簽對應的beanClass為ServiceBean.class。下面代碼裡面标準了step_x用來解析每一步具體執行的内容。當然這裡需要考慮每個類的繼承關系,因為子類會繼承父類的的方法。
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
//step_1 建立RootBeanDefinition對象并設定對應的beanClass。
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
//step_2 生成id的邏輯集中在這塊
String id = element.getAttribute("id");
if ((id == null || id.length() == 0) && required) {
String generatedBeanName = element.getAttribute("name");
if (generatedBeanName == null || generatedBeanName.length() == 0) {
if (ProtocolConfig.class.equals(beanClass)) {
generatedBeanName = "dubbo";
} else {
generatedBeanName = element.getAttribute("interface");
}
}
if (generatedBeanName == null || generatedBeanName.length() == 0) {
generatedBeanName = beanClass.getName();
}
id = generatedBeanName;
int counter = 2;
while (parserContext.getRegistry().containsBeanDefinition(id)) {
id = generatedBeanName + (counter++);
}
}
if (id != null && id.length() > 0) {
if (parserContext.getRegistry().containsBeanDefinition(id)) {
throw new IllegalStateException("Duplicate spring bean id " + id);
}
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
beanDefinition.getPropertyValues().addPropertyValue("id", id);
}
//step_3 針對不同的beanClass做特殊字段的解析
if (ProtocolConfig.class.equals(beanClass)) {
for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
if (property != null) {
Object value = property.getValue();
if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
}
}
}
} else if (ServiceBean.class.equals(beanClass)) {
String className = element.getAttribute("class");
if (className != null && className.length() > 0) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
} else if (ProviderConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
} else if (ConsumerConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
}
//step_4 針對beanClass通過反射擷取所有的set方法擷取所有需要解析的屬性放置在Set<String> props當中。
//beanDefinition.getPropertyValues().addPropertyValue将所有參數儲存到beanDefinition當中。
Set<String> props = new HashSet<String>();
ManagedMap parameters = null;
for (Method setter : beanClass.getMethods()) {
String name = setter.getName();
if (name.length() > 3 && name.startsWith("set")
&& Modifier.isPublic(setter.getModifiers())
&& setter.getParameterTypes().length == 1) {
Class<?> type = setter.getParameterTypes()[0];
String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), "-");
props.add(property);
Method getter = null;
try {
getter = beanClass.getMethod("get" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e) {
try {
getter = beanClass.getMethod("is" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e2) {
}
}
if (getter == null
|| !Modifier.isPublic(getter.getModifiers())
|| !type.equals(getter.getReturnType())) {
continue;
}
if ("parameters".equals(property)) {
parameters = parseParameters(element.getChildNodes(), beanDefinition);
} else if ("methods".equals(property)) {
parseMethods(id, element.getChildNodes(), beanDefinition, parserContext);
} else if ("arguments".equals(property)) {
parseArguments(id, element.getChildNodes(), beanDefinition, parserContext);
} else {
String value = element.getAttribute(property);
if (value != null) {
value = value.trim();
if (value.length() > 0) {
if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(RegistryConfig.NO_AVAILABLE);
beanDefinition.getPropertyValues().addPropertyValue(property, registryConfig);
} else if ("registry".equals(property) && value.indexOf(',') != -1) {
parseMultiRef("registries", value, beanDefinition, parserContext);
} else if ("provider".equals(property) && value.indexOf(',') != -1) {
parseMultiRef("providers", value, beanDefinition, parserContext);
} else if ("protocol".equals(property) && value.indexOf(',') != -1) {
parseMultiRef("protocols", value, beanDefinition, parserContext);
} else {
Object reference;
if (isPrimitive(type)) {
if ("async".equals(property) && "false".equals(value)
|| "timeout".equals(property) && "0".equals(value)
|| "delay".equals(property) && "0".equals(value)
|| "version".equals(property) && "0.0.0".equals(value)
|| "stat".equals(property) && "-1".equals(value)
|| "reliable".equals(property) && "false".equals(value)) {
// backward compatibility for the default value in old version's xsd
value = null;
}
reference = value;
} else if ("protocol".equals(property)
&& ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(value)
&& (!parserContext.getRegistry().containsBeanDefinition(value)
|| !ProtocolConfig.class.getName().equals(parserContext.getRegistry().getBeanDefinition(value).getBeanClassName()))) {
if ("dubbo:provider".equals(element.getTagName())) {
logger.warn("Recommended replace <dubbo:provider protocol=\"" + value + "\" ... /> to <dubbo:protocol name=\"" + value + "\" ... />");
}
// backward compatibility
ProtocolConfig protocol = new ProtocolConfig();
protocol.setName(value);
reference = protocol;
} else if ("onreturn".equals(property)) {
int index = value.lastIndexOf(".");
String returnRef = value.substring(0, index);
String returnMethod = value.substring(index + 1);
reference = new RuntimeBeanReference(returnRef);
beanDefinition.getPropertyValues().addPropertyValue("onreturnMethod", returnMethod);
} else if ("onthrow".equals(property)) {
int index = value.lastIndexOf(".");
String throwRef = value.substring(0, index);
String throwMethod = value.substring(index + 1);
reference = new RuntimeBeanReference(throwRef);
beanDefinition.getPropertyValues().addPropertyValue("onthrowMethod", throwMethod);
} else {
if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
if (!refBean.isSingleton()) {
throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
}
}
reference = new RuntimeBeanReference(value);
}
beanDefinition.getPropertyValues().addPropertyValue(property, reference);
}
}
}
}
}
}
NamedNodeMap attributes = element.getAttributes();
int len = attributes.getLength();
for (int i = 0; i < len; i++) {
Node node = attributes.item(i);
String name = node.getLocalName();
if (!props.contains(name)) {
if (parameters == null) {
parameters = new ManagedMap();
}
String value = node.getNodeValue();
parameters.put(name, new TypedStringValue(value, String.class));
}
}
if (parameters != null) {
beanDefinition.getPropertyValues().addPropertyValue("parameters", parameters);
}
return beanDefinition;
}
bean初始化過程
整個bean的執行個體化是由spring内部實作的(暫時未來得及分析其中過程),但是執行個體化後收尾的初始化工作其實是在dubbo當中完成的,我們就暫時忽略其中的spring初始化bean的過程直接分析後面的的過程。service的釋出過程集中在ServiceBean類當中。這裡我們關注onApplicationEvent()和afterPropertiesSet()方法。這裡先補充下bean的初始化過程。
1、啟動容器
2、調用BeanFactoryPostProcessor的postProcessBeanFactory()方法進行工廠後處理
3、getBean()調用某一個Bean
4、調用InstantiationAwareBeanPostProcessor 的postProcessBeforeInstantiation()方法。
5、執行個體化Bean(構造函數、工廠方法)
6、調用InstantiationAwareBeanPostProcessor 的postProcessAfterInstantiation()方法。
7、調用InstantiationAwareBeanPostProcessor 的postProcessPropertyValues()方法。
8、設定Bean的屬性值
9、調用BeanNameAware的setBeanName()方法
10、調用BeanFactoryAware的setBeanFactory()方法
11、調用ApplicationContextAware的setApplicationContext()方法
12、調用BeanPostProcessor的postProcessBeforeInitialization()方法
13、調用InitializingBean的afterPropertiesSet()方法
14、調用init-method初始化方法
15、調用BeanPostProcessor的postProcessAfterInitialization()方法
16、如果singleton則緩存bean,如果prototype則傳回bean。
17、如果singleton則在容器銷毀的時候調用DisposableBean的destroy()方法
18、如果singleton則在容器銷毀的時候調用destory-method方法
跟進ServiceBean.java類,主要是關注onApplicationEvent和afterPropertiesSet和setApplicationContext方法,其中setApplicationContext方法要早于afterPropertiesSet方法執行。
setApplicationContext負責把spring的上下文注入到目前bean當中,afterPropertiesSet負責初始化該bean,onApplicationEvent負責把bean釋出成service。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean,
DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
private final transient Service service;
private transient ApplicationContext applicationContext;
private transient String beanName;
public ServiceBean() {
super();
this.service = null;
}
public void setApplicationContext(ApplicationContext applicationContext) {
}
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//todo 這裡的核心初始化bean後的動作,會進入ServiceConfig當中去執行
export();
}
}
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay == -1);
}
@SuppressWarnings({"unchecked", "deprecation"})
public void afterPropertiesSet() throws Exception {
}
public void destroy() throws Exception {
unexport();
}
}
ServiceBean的初始化過程關鍵在于afterPropertiesSet,内部主要擷取provider、application、module、registries、monitor、protocols、path等對象并注入到bean當中,整個過去方法其實就是從spring的上下文中擷取對象,BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false)方法就是用來實作這個功能的。
代碼末尾的export其實本身是沒有執行的,真正執行export是在onApplicationEvent當中實作的,是以我們後面需要跟蹤核心的export過程。
public void afterPropertiesSet() throws Exception {
//step_1 擷取provider設定到bean當中
if (getProvider() == null) {
//todo 從上下文擷取是否有對應的對象
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault().booleanValue()) {
providerConfigs.add(config);
}
}
if (providerConfigs.size() > 0) {
setProviders(providerConfigs);
}
} else {
ProviderConfig providerConfig = null;
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (providerConfig != null) {
throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
}
providerConfig = config;
}
}
if (providerConfig != null) {
setProvider(providerConfig);
}
}
}
}
//step_2 擷取application設定到bean當中
if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
//step_3 擷取module設定到bean當中
if (getModule() == null
&& (getProvider() == null || getProvider().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
//step_4 擷取registries設定到bean當中
if ((getRegistries() == null || getRegistries().size() == 0)
&& (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0) {
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
for (RegistryConfig config : registryConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
registryConfigs.add(config);
}
}
if (registryConfigs != null && registryConfigs.size() > 0) {
super.setRegistries(registryConfigs);
}
}
}
//step_5 擷取monitor設定到bean當中
if (getMonitor() == null
&& (getProvider() == null || getProvider().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
//step_6 擷取protocols設定到bean當中
if ((getProtocols() == null || getProtocols().size() == 0)
&& (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
for (ProtocolConfig config : protocolConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
protocolConfigs.add(config);
}
}
if (protocolConfigs != null && protocolConfigs.size() > 0) {
super.setProtocols(protocolConfigs);
}
}
}
//step_7 擷取path設定到bean當中
if (getPath() == null || getPath().length() == 0) {
if (beanName != null && beanName.length() > 0
&& getInterface() != null && getInterface().length() > 0
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
//todo 如果不是延遲暴露就會立即export,否則應該會等到onApplicationEvent中去執行暴露
if (!isDelay()) {
export();
}
}
bean暴露過程
整個服務的暴露過程如下圖,共享自《
》,這裡的指假設我們沒有指定本地釋出或遠端釋出,那麼就會執行兩個釋出,暫時隻關注遠端釋出。
服務釋出過程
首先我們需要了解下ServiceBean的類繼承關系,ServiceBean的onApplicationEvent方法中export()因為我們剛剛說的export方法其實就是在ServiceConfig類當中的。後面我們會就ServiceConfig類的export()方法實作跟蹤。
ServiceBean的類圖
執行ServiceConfig類的doExport方法的核心邏輯,doExport -> doExportUrls -> doExportUrlsFor1Protocol,這裡需要關注registryURLs的生成方法loadRegistries,也就是說我們的注冊中心的url位址是如何生成的。
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
注冊中心URL生成過程主要集中在loadRegistries方法中,整個思路就是周遊所有dubbo:registry标簽生成的registries清單,針對每個registry結合application+registry自身的config最後生成注冊中心的URL。
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address != null && address.length() > 0
&& !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
核心重點來了,doExportUrlsFor1Protocol部分主要是負責導出local和remote的dubbo服務,暫時關注導出remote服務,整體步驟是:
- 1、 proxyFactory生成invoker,這裡factory為JavassistProxyFactory。(dubug跟進下)
-
2、由protocol導出為Exporter,這裡protocol為RegistryProtocol。(dubug跟進下)
由于整個暴露過程還是比較重要的,是以繼續深入進行分析。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略一些不重要的代碼
String scope = url.getParameter(Constants.SCOPE_KEY);
//todo scope值為null,是以會釋出到local
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//釋出local的export
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope值為null,是以會釋出到remote
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
// todo 核心重點
//step_1 proxyFactory生成invoker
//step_2 生成wrapperInvoker
//step_3 由protocol導出為Exporter
//step_4 加入到exporters當中
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
首先proxyFactory在這裡其實由dubbo的擴充機制生成的,内部實作是通過javassit生成的源碼。proxyFactory負責生産invoker,Protocol負責到處Export對象。
//step_1 proxyFactory生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//step_2 生成wrapperInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//step_3 由protocol導出為Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
proxyFactory.getInvoker()的調用鍊路如下,proxyFactory其實代表的是ProxyFactory$Adaptive對象。
Invoker<?> invoker = proxyFactory.getInvoker()
等同于ProxyFactory$Adaptive.getInvoker(ref, interfaceClass, dubboUrl)
内部執行 extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
傳回StubProxyFactoryWrapper
extension.getInvoker(arg0, arg1, arg2)
等同于StubProxyFactoryWrapper.getInvoker(arg0, arg1, arg2)
内部執行javassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
傳回通過new AbstractProxyInvoker()建立的對象
ProxyFactory$Adaptive的源碼如下,服務暴露需要關注getInvoker()方法。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
JavassistProxyFactory的代碼中我們需要關注getInvoker方法,内部其實就是通過wrapper.invokeMethod()方法實作調用。
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
protocol.export()調用鍊如下,
protocol.export() 等同 Protocol$Adaptive.export()
執行 extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
傳回 ProtocolListenerWrapper
執行 extension.export(arg0) 等同 ProtocolListenerWrapper.export(Invoker<T> invoker)
執行 ProtocolFilterWrapper.export(Invoker<T> invoker)
執行RegistryProtocol.export()
(1)執行doLocalExport()将服務在本地開啟端口監聽之類并傳回exporter
執行Protocol$Adaptive.export()
ProtocolFilterWrapper.export() -> ProtocolListenerWrapper.export() -> DubboProtocol.export() -> openServer開啟服務監聽
(2)執行register()方法将服務釋出将服務注冊到zookeeper上
Protocol$Adaptive的源碼如下,服務暴露需要關注export()方法。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
DubboProtocol的核心export部分代碼如下,核心事情包括openServer開啟監聽;建立exporter對象并傳回。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//todo 核心在于打開socket連接配接
openServer(url);
optimizeSerialization(url);
return exporter;
}
整體流程
服務暴露整體流程
參考知識點
spring bean初始化過程
bean生成過程
bean初始化順序