開篇
在dubbo的使用過程中會在标簽中會配置filter變量,但是filter具體如何生效卻不是特别清楚,這篇文章就是針對Filter的加載過程進行下分析,嘗試描述清楚過程。
在這篇文章中會嘗試解釋ProtocolFilterWrapper被調用過程,協定釋出的時候都會走到ProtocolFilterWrapper,而這個類是Filter的加載入口,其核心方法在buildInvokerChain()當中。
進而在buildInvokerChain()方法中通過ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension()擷取所有的Filter(包括系統和自定義的Filter對象)的加載過程。
Filter調用鍊分析
- Dubbo的調用鍊如下圖,調用順序按照ProtocolListenerWrapper => ProtocolFilterWrapper,包含關系是 ProtocolListenerWrapper包含ProtocolFilterWrapper。
- 執行export()方法是會按照ProtocolListenerWrapper => ProtocolFilterWrapper的順序執行,最終會調用ProtocolFilterWrapper的export()方法,進而進入buildInvokerChain()構造Filter鍊的邏輯。

- org.apache.dubbo.rpc.Protocol檔案内容如下圖,其中按照先後順序是先filter後listener。
檔案:
org.apache.dubbo.rpc.Protocol
内容:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
- Protocol對應ExtensionLoader對象中cachedWrapperClasses包含filter對象和listener對象,且順序先filter後listener。
Filter調用鍊源碼分析
- ProtocolListenerWrapper 包含一個Protocol類型的構造函數,會根據構造函數進行初始化。
- ProtocolFilterWrapper 包含一個Protocol類型的構造函數,會根據構造函數進行初始化。
public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol;
public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}
- 擴充卡Protocol$Adaptive的export()方法會調用ExtensionLoader的getExtension()方法。
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0)
throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException(
"org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException(
"Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader
.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
- getExtension()方法内部調用createExtension()建立擴充。
- createExtension()内部主要包括四個核心步驟。
- 步驟一:根據擴充名擷取擴充類,getExtensionClasses().get(name)。
- 步驟二:建立擴充類的執行個體對象,clazz.newInstance()。
- 步驟三:根據set方法注入執行個體依賴的對象,injectExtension(instance)。
- 步驟四:建立包裝類對象并注入擴充類執行個體對象,wrapperClass.getConstructor(type).newInstance(instance)。
- 在Protocol的ExtensionLoader的cachedWrapperClasses包括ProtocolFilterWrapper和ProtocolListenerWrapper。
- 繼續分析ProtocolFilterWrapper類。
public class ExtensionLoader<T> {
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
private T createExtension(String name) {
// 根據擴充名擷取擴充類clazz
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 建立擴充類對象
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入instance對象,這裡假設為DubboProtocol對象作為例子
// injectExtension方法内部周遊set方法并從上下文擷取并set到對象當中
injectExtension(instance);
// 擷取包裝類WrapperClasses并挨個進行包裝
// 包裝類以instance的type作為構造函數的參數
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
private T injectExtension(T instance) {
if (objectFactory == null) {
return instance;
}
try {
for (Method method : instance.getClass().getMethods()) {
if (!isSetter(method)) {
continue;
}
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
// 擷取set方法的屬性property
String property = getSetterProperty(method);
// 這裡的objectFactory為AdaptiveExtensionFactory
// objectFactory.getExtension執行
// SpiExtensionFactory的getExtension()方法
// 傳回loader.getAdaptiveExtension()對象
// Protocol$Adaptive對象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
}
}
} catch (Exception e) {
}
return instance;
}
}
- 補充一句,當injectExtension()的參數instance為RegistryProtocol,RegistryProtocol包含setProtocol()方法,執行反射調用method.invoke(instance, object)注入Protocol$Adaptive對象。比較抽象,需要好好debug了解
ProtocolFilterWrapper分析
- 導出Dubbo協定過程中ProtocolFilterWrapper的Protocol為DubboProtocol對象。
- 執行ProtocolFilterWrapper的export()的參數為buildInvokerChain()包裝後包含過濾器的invoker對象。
- buildInvokerChain()方法包含兩個核心步驟,擷取Filter清單群組裝Filter清單。
- 先關注Filter清單的過程,擷取過程後面繼續分析。
- 假設Filter鍊為"A,B,C",那麼實際串聯後的順序為A => B => C => Invoker。
- 串聯的邏輯代碼在注釋中标注了,邏輯是通過next記錄上一次對象并儲存目前Filter當中。
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
// 儲存引用,後續用于把真正的調用者儲存到過濾器鍊的最後
Invoker<T> last = invoker;
// 擷取系統和自定義的Filter過濾器
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class)
.getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
// 用過濾器包裝真正的invoker,過濾器的包裝順序是從尾到頭,按照順序逆向包裝。
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
// 每次把last對象設定為next,挂到目前Filter的next當中,實作串聯。
final Invoker<T> next = last;
last = new Invoker<T>() {
// 隻關注重點代碼
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
}
- Dubbo原生的filter定義在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.filter檔案。
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
- Filter的類圖是下圖所示,其中接口Filter包含了SPI注解。
- 以EchoFilter作為例子,我們可以看到Filter的實作都帶有@Activate注解,加載類的時候會把帶有@Activate注解的類進行單獨緩存。
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
}
- ExtensionLoader的getActivateExtension()内部主要執行擷取系統Filter和使用者自定義Filter兩步驟。
- 擷取系統Filter當中,如果filter标簽不帶有"-default"字段,就會執行擷取系統Filter對象。
- 擷取系統Filter對象是從cachedActivates擷取的,而cachedActivates是在ExtensionLoader加載擴充類的時候緩存帶有@Activate的類,也就是@Activate的Filter會在加載擴充類的時被緩存。
- 系統Filter儲存在exts并按照一定規則進行排序。
- 擷取自定義Filter根據filter标簽定義的Filter,加載過程中不加載帶有排除符号"-"的Filter,在關鍵字"default"之前的Filter會被優先加載。
- 自定義Filter一般不帶@Activate标簽,如果帶@Activate标簽就會進入系統Filter的加載流程。
public class ExtensionLoader<T> {
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<>();
List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values);
// 如果Filter中不帶有"-default"字段,就會加載系統擴充Filter對象。
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
// 擷取所有的擴充類
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
if (isMatchGroup(group, activateGroup)
&& !names.contains(name)
&& !names.contains(REMOVE_VALUE_PREFIX + name)
&& isActive(activateValue, url)) {
exts.add(getExtension(name));
}
}
// 按照一定的比較規則進行排序
exts.sort(ActivateComparator.COMPARATOR);
}
// 加載使用者自定義擴充Filter對象
List<T> usrs = new ArrayList<>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
// 帶有排除符号"-"的Filter不加載
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (DEFAULT_KEY.equals(name)) {
// 在default之前的Filter會被系統Filter之前被加載
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
usrs.add(getExtension(name));
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
}
- 總體來說,針對Filter加載的過程需要搞清楚兩個問題,一個問題是ProtocolFilterWrapper對象的調用鍊路,另外一個是buildInvokerChain的執行過程,兩個問題在上面已經都提到并進行了一定的解釋。