文章目錄
- 一、前言
- 二、Consumer 整合
-
- 2.1 ReferenceAnnotationBeanPostProcessor 類層次結構
- 2.2 Consumer整合流程
- 2.3 源碼分析
-
- 2.3.1 依賴查找
- 2.3.2 依賴注入
- 三、Consumer 側服務引入
-
- 3.1 代理對象生成
一、前言
Apache Dubbo是一款高性能、輕量級的開源Java RPC架構,實際應用中,比較傳統的使用方式是通過xml檔案配置Dubbo Provider和Consumer,完成與Spring的整合。自Spring Boot面世後,注解驅動開發的方式已成為Spring的主流方式。對于Dubbo,Apache官方也提供了dubbo-spring-boot-starter,來降低Dubbo與Spring整合成本,做到通過注解方式極速接入。
Dubbo項目為了與Spring整合,提供了三個核心注解:使用 @Service【導出服務】,使用@Reference【引入服務】,使用 @EnableDubbo 一鍵完成 Dubbo 所需運作環境的自動配置。在這一篇,主要探究Dubbo的Consumer是如何做到與Spring無縫整合的。
二、Consumer 整合
Consumer的整合比較簡單,看過源碼就會發現,完全是通過
ReferenceAnnotationBeanPostProcessor
這個 BeanPostProcessor 來完成整合工作的。
2.1 ReferenceAnnotationBeanPostProcessor 類層次結構

從 ReferenceAnnotationBeanPostProcessor 的類層次結構圖,可以看到其實作了
MergedBeanDefinitionPostProcessor
和
InstantiationAwareBeanPostProcessor
這兩個BPP接口。這兩個BPP接口為Consumer側整合,提供了關鍵的回調方法。
-
MergedBeanDefinitionPostProcessor
作用:在bean執行個體化完成後調用,可用來修改merged BeanDefinition的一些properties 或緩存一些meta資訊以便後續的回調使用
public interface MergedBeanDefinitionPostProcessor extends BeanPostProcessor {
// merged BeanDefinition 回調
/**
* Post-process the given merged bean definition for the specified bean.
* @param beanDefinition the merged bean definition for the bean
* @param beanType the actual type of the managed bean instance
* @param beanName the name of the bean
*/
void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName);
}
-
InstantiationAwareBeanPostProcessor
作用:對象執行個體化前、後的生命周期回調;對象執行個體化之後,設定PropertyValue的生命周期回調
public interface InstantiationAwareBeanPostProcessor extends BeanPostProcessor {
// 執行個體化階段的前置處理回調
Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException;
// 執行個體化階段的後置處理回調
boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException;
// 依賴注入
/**
* Post-process the given property values before the factory applies them
* to the given bean. Allows for checking whether all dependencies have been
* satisfied, for example based on a "Required" annotation on bean property setters.
* <p>Also allows for replacing the property values to apply, typically through
* creating a new MutablePropertyValues instance based on the original PropertyValues,
* adding or removing specific values.
* @param pvs the property values that the factory is about to apply (never {@code null})
* @param pds the relevant property descriptors for the target bean (with ignored
* dependency types - which the factory handles specifically - already filtered out)
* @param bean the bean instance created, but whose properties have not yet been set
* @param beanName the name of the bean
* @return the actual property values to apply to the given bean (can be the passed-in
* PropertyValues instance), or {@code null} to skip property population
* @throws org.springframework.beans.BeansException in case of errors
* @see org.springframework.beans.MutablePropertyValues
*/
PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException;
}
2.2 Consumer整合流程
整合流程簡要示意圖如下,可分為三步來看,稍微概括下:
- 1、應用通過
注解或Spring Boot的@EnableDubbo
檔案擴充機制,引入了spring.factories
這一配置類;DubboAutoConfiguration
- 2、
進一步向Spring容器導入DubboAutoConfiguration
這個BPP元件;ReferenceAnnotationBeanPostProcessor
- 3、當Ioc容器啟動時(AbstractApplicationContext.refresh()),假設ControllerA 依賴了 Dubbo服務 ServerB,則在 ControllerA 進行執行個體化之後、初始化之前,
這個BPP分别執行ReferenceAnnotationBeanPostProcessor
、postProcessMergedBeanDefinition()
方法,完成ControllerA對ServerB的依賴查找和依賴注入。postProcessPropertyValues()
Spring Boot整合Dubbo Consumer一、前言二、Consumer 整合三、Consumer 側服務引入
注: ReferenceAnnotationBeanPostProcessor有三個比較重要的屬性
1、Class<? extends Annotation>[] annotationTypes:其實就是@Reference注解類型,是由構造函數傳入。Dubbo為了保持相容,傳入了2個:Reference.class, com.alibaba.dubbo.config.annotation.Reference.class;依賴查找的時候就是根據這裡的注解類型來過濾。
2、ConcurrentMap<String, AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata> injectionMetadataCache:用來緩存 @Reference 依賴查找的InjectionMetadata結果對象;
3、ConcurrentMap<String, Object> injectedObjectsCache:緩存已完成依賴注入的對象。
2.3 源碼分析
對照上圖,下面再結合源碼來進行分析佐證。
2.3.1 依賴查找
依賴查找的過程,即是篩選出目前執行個體對象中,
@Reference
注解的字段和方法并緩存起來,後面進行依賴注入時會用到。
-
1、AbstractAutowireCapableBeanFactory#applyMergedBeanDefinitionPostProcessors
在對象執行個體化出來之後,Spring執行對merged BeanDefinition的生命周期回調;從源碼可以看到隻有 MergedBeanDefinitionPostProcessor 類型的BPP才有處理資格
protected void applyMergedBeanDefinitionPostProcessors(RootBeanDefinition mbd, Class<?> beanType, String beanName) {
for (BeanPostProcessor bp : getBeanPostProcessors()) {
if (bp instanceof MergedBeanDefinitionPostProcessor) {
MergedBeanDefinitionPostProcessor bdp = (MergedBeanDefinitionPostProcessor) bp;
// 生命周期回調
bdp.postProcessMergedBeanDefinition(mbd, beanType, beanName);
}
}
}
-
2、ReferenceAnnotationBeanPostProcessor的postProcessMergedBeanDefinition()方法
該方法的實作在抽象父類 AbstractAnnotationBeanPostProcessor中
public abstract class AbstractAnnotationBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean {
private static final int CACHE_SIZE = Integer.getInteger("", 32).intValue();
private final Log logger = LogFactory.getLog(this.getClass());
private final Class<? extends Annotation>[] annotationTypes;
private final ConcurrentMap<String, AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata> injectionMetadataCache;
private final ConcurrentMap<String, Object> injectedObjectsCache;
private ConfigurableListableBeanFactory beanFactory;
private Environment environment;
private ClassLoader classLoader;
private int order;
// 構造函數傳入的annotationTypes 就是 @Reference
public AbstractAnnotationBeanPostProcessor(Class... annotationTypes) {
this.injectionMetadataCache = new ConcurrentHashMap(CACHE_SIZE);
this.injectedObjectsCache = new ConcurrentHashMap(CACHE_SIZE);
this.order = 2147483644;
Assert.notEmpty(annotationTypes, "The argument of annotations' types must not empty");
this.annotationTypes = annotationTypes;
}
@Override
public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
if (beanType != null) {
// 查找需要依賴注入的元素(屬性、方法)
InjectionMetadata metadata = this.findInjectionMetadata(beanName, beanType, (PropertyValues)null);
metadata.checkConfigMembers(beanDefinition);
}
}
// 具體依賴查找過程
private InjectionMetadata findInjectionMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
String cacheKey = StringUtils.hasLength(beanName) ? beanName : clazz.getName();
// 先從緩存找:第一次在injectionMetadataCache中肯定找不到
AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata metadata = (AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata)this.injectionMetadataCache.get(cacheKey);
if (InjectionMetadata.needsRefresh(metadata, clazz)) {
ConcurrentMap var6 = this.injectionMetadataCache;
synchronized(this.injectionMetadataCache) {
// 雙重檢查
metadata = (AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata)this.injectionMetadataCache.get(cacheKey);
if (InjectionMetadata.needsRefresh(metadata, clazz)) {
if (metadata != null) {
metadata.clear(pvs);
}
try {
// 緩存沒有,則建構InjectionMetadata
metadata = this.buildAnnotatedMetadata(clazz);
// 放入緩存
this.injectionMetadataCache.put(cacheKey, metadata);
} catch (NoClassDefFoundError var9) {
throw new IllegalStateException("Failed to introspect object class [" + clazz.getName() + "] for annotation metadata: could not find class that it depends on", var9);
}
}
}
}
return metadata;
}
}
-
2、建構InjectionMetadata
實際建構的是Dubbo提供的InjectionMetadata子類
;查找 @Reference 注解的字段或方法是通過反射周遊所有 @Reference 字段 或 setter方法來實作的。AnnotatedInjectionMetadata
private AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata buildAnnotatedMetadata(final Class<?> beanClass) {
// 查找 @Reference 注解的屬性
Collection<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> fieldElements = findFieldAnnotationMetadata(beanClass);
Collection<AbstractAnnotationBeanPostProcessor.AnnotatedMethodElement> methodElements = findAnnotatedMethodMetadata(beanClass);
return new AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata(beanClass, fieldElements, methodElements);
}
private List<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> findFieldAnnotationMetadata(final Class<?> beanClass) {
final List<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> elements = new LinkedList<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement>();
// 反射周遊每一個字段,檢視是否有 @Reference 注解
ReflectionUtils.doWithFields(beanClass, new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
for (Class<? extends Annotation> annotationType : getAnnotationTypes()) {
// 解析得到 @Reference 注解的各屬性
AnnotationAttributes attributes = getAnnotationAttributes(field, annotationType, getEnvironment(), true, true);
if (attributes != null) {
if (Modifier.isStatic(field.getModifiers())) {
if (logger.isWarnEnabled()) {
logger.warn("@" + annotationType.getName() + " is not supported on static fields: " + field);
}
return;
}
// 統一封裝為 AnnotatedFieldElement;AnnotatedFieldElement 是目前類的一個内部類,本身繼承了Spring的InjectionMetadata
elements.add(new AnnotatedFieldElement(field, attributes));
}
}
}
});
return elements;
}
2.3.2 依賴注入
依賴查找完成後,已經将 @Reference 注解的屬性或方法,統一封裝成
InjectionMetadata
對象,緩存在AbstractAnnotationBeanPostProcessor的
injectionMetadataCache
中;緊接着就開始依賴注入,依賴注入過程中,則利用到依賴查找過程中建構的緩存。
-
依賴注入
依賴注入的過程,就是對查找到依賴對象,完成其生命周期後,通過反射指派到目前執行個體對象中。
// com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor#postProcessPropertyValues
@Override
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
// 第二次調用findInjectionMetadata方法,由于已經有緩存了,直接從緩存就能拿到
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
try {
// 依賴注入;這個metadata對象是 AbstractAnnotationBeanPostProcessor的内部類AnnotatedFieldElement
metadata.inject(bean, beanName, pvs);
} catch (BeanCreationException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
+ " dependencies is failed", ex);
}
return pvs;
}
對于屬性注入,同Spring的@Resource等常用注解注入方式類似(可對照Spring提供的CommonAnnotationBeanPostProcessor看下)。不過這裡的屬性注入執行邏輯是由Dubbo提供,源碼在com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement
public class AnnotatedFieldElement extends InjectionMetadata.InjectedElement {
private final Field field;
private final AnnotationAttributes attributes;
private volatile Object bean;
protected AnnotatedFieldElement(Field field, AnnotationAttributes attributes) {
super(field, null);
this.field = field;
this.attributes = attributes;
}
@Override
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
Class<?> injectedType = field.getType();
// 【核心】這裡生成的是Dubbo Provider接口的代理對象
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
ReflectionUtils.makeAccessible(field);
// 反射,直接屬性注入
field.set(bean, injectedObject);
}
}
至此,就完成了 @Reference 注解的依賴查找和依賴注入過程,總體流程和Spring原生的@Resouce、@Autowired 依賴查找注入過程幾乎完全一緻。
三、Consumer 側服務引入
3.1 代理對象生成
前一節,已經将Dubbo Consumer側對Provider的依賴注入流程梳理清楚了。但是,還剩一個問題沒交代:既然是Dubbo Consumer,那Dubbo服務引入是哪個環節做到的呢?帶着這個疑問,回到上文提到的AnnotatedFieldElement.inject()方法
public class AnnotatedFieldElement extends InjectionMetadata.InjectedElement {
@Override
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
Class<?> injectedType = field.getType();
// 【核心】這裡生成的是Dubbo Provider接口的代理對象
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
ReflectionUtils.makeAccessible(field);
// 反射,直接屬性注入
field.set(bean, injectedObject);
}
}
由于Consumer側是沒有Provider的實作類的,顯然依賴注入的肯定是個代理對象。那這個代理對象是如何創造創造出來的?進com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor#getInjectedObject方法看看。
// com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor#getInjectedObject
protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
String cacheKey = buildInjectedObjectCacheKey(attributes, bean, beanName, injectedType, injectedElement);
// 先讀緩存:第一次請求肯定沒有
Object injectedObject = injectedObjectsCache.get(cacheKey);
if (injectedObject == null) {
// 生成代理對象
injectedObject = doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
// Customized inject-object if necessary
// 寫入緩沖
injectedObjectsCache.putIfAbsent(cacheKey, injectedObject);
}
return injectedObject;
}
接着進 org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean 方法。
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
/**
* The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
*/
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
// eg: @Reference(timeout=2000,url=dubbo://127.0.0.1:12345,version=1.0.0) org.apache.dubbo.spring.boot.demo.consumer.DemoService
// 可以看出:@Reference注解即使是對同一個Dubbo服務的引用,隻要注解的屬性稍有不同,比如逾時時間不同,那緩存生成的key就會不同
/**
* The name of bean that is declared by {@link Reference @Reference} annotation injection
*/
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
// 這一行的主要作用就是 new了一個ReferenceBean對象出來,并将 @Reference 的屬性指派進去
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);
// 将referenceBean 注冊為單例bean,交給Spring容器進行管理
registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);
cacheInjectedReferenceBean(referenceBean, injectedElement);
// 真正開始建立代理對象
return getOrCreateProxy(referencedBeanName, referenceBean, localServiceBean, injectedType);
}
private Object getOrCreateProxy(String referencedBeanName, ReferenceBean referenceBean, boolean localServiceBean,
Class<?> serviceInterfaceType) {
if (localServiceBean) { // If the local @Service Bean exists, build a proxy of Service
return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
newReferencedBeanInvocationHandler(referencedBeanName));
} else {
exportServiceBeanIfNecessary(referencedBeanName); // If the referenced ServiceBean exits, export it immediately
// 調用 ReferenceBean的get方法,觸發ReferenceBean的初始化
return referenceBean.get();
}
}
通過上面的分析可知,代理對象的生成肯定是圍繞 ReferenceBean 來展開的。不過ReferenceBean的源碼,并沒有太多邏輯。核心邏輯還在在其父類ReferenceConfig中
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
public static final Logger logger = LoggerFactory.getLogger(ReferenceConfig.class);
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/**
* The interface proxy reference
*/
private transient volatile T ref;
/**
* The invoker of the reference service
*/
private transient volatile Invoker<?> invoker;
/**
* The flag whether the ReferenceConfig has been initialized
*/
private transient volatile boolean initialized;
/**
* whether this ReferenceConfig has been destroyed
*/
private transient volatile boolean destroyed;
private final ServiceRepository repository;
private DubboBootstrap bootstrap;
// 擷取代理對象的入口方法
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// 初始化
init();
}
return ref;
}
public synchronized void init() {
if (initialized) {
// 避免重複初始化
return;
}
if (bootstrap == null) {
// 【核心1】: 如果 DubboBootstrap 為null,代表DubboBootstrap還沒初始化過,則觸發一次初始化
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
// 省略。。。。
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
// 省略部分URL參數拼接代碼。。。。
// 【核心2】: 建立代理對象
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
// 标記已完成
initialized = true;
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
// 省略。。。
if (urls.size() == 1) {
//【核心3】這一步會 new一個 DubboInvoker;Dubbo内部會通過netty建立與Provider的連接配接,DubboInvoker持有該連接配接
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
// 省略。。。
// 【核心4】采用Dubbo提供的位元組碼技術生成代理對象,細節見JavassistProxyFactory、org.apache.dubbo.common.bytecode.Proxy
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
與Provider側建立連接配接過程,見org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer,由于與本文主流程不太相關故不再展開。
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
到這裡,Spring對Dubbo Consumer側的整合流程基本梳理完成了,抛去Dubbo服務引用的細節,流程還是很簡單的,僅依靠了
ReferenceAnnotationBeanPostProcessor
這個BPP提供的2個回調方法。如果熟悉Spring
BeanPostProcessor
擴充機制的話,相信看起來應該很輕松。下一篇,會接着總結下Dubbo Provider與Spring的整合流程。
全文完~