1. 概述
Dubbo 服務引用,和 Dubbo 服務暴露一樣,也有兩種方式:
本地引用,JVM 本地調用。
遠端暴露,網絡遠端通信。
2. 本地引用
2.1 createProxy
本地引用服務的順序圖如下:
/**
* 建立 Service 代理對象
*
* @param map 集合
* @return 代理對象
*/
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 是否本地引用
final boolean isJvmRefer;
// injvm 屬性為空,不通過該屬性判斷
if (isInjvm() == null) {
// 直連服務提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
// 通過 `tmpUrl` 判斷,是否需要本地引用
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
// 預設不是
} else {
isJvmRefer = false;
}
// 通過 injvm 屬性。
} else {
isJvmRefer = isInjvm();
}
// 本地引用
if (isJvmRefer) {
// 建立本地服務引用 URL 對象。
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 引用服務,傳回 Invoker 對象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
// 正常流程,一般為遠端引用
} else {
// ... 省略本文暫時不分享的服務遠端引用
}
// 啟動時檢查
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 建立 Service 代理對象
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
2.2 isInjvmRefer
/**
* 是否本地引用
*
* @param url URL
* @return 是否
*/
public boolean isInjvmRefer(URL url) {
final boolean isJvmRefer;
String scope = url.getParameter(Constants.SCOPE_KEY);
// Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
// 當 `protocol = injvm` 時,本身已經是 jvm 協定了,走正常流程就是了。
if (Constants.LOCAL_PROTOCOL.toString().equals(url.getProtocol())) {
isJvmRefer = false;
// 當 `scope = local` 或者 `injvm = true` 時,本地引用
} else if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter("injvm", false))) {
// if it's declared as local reference
// 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
isJvmRefer = true;
// 當 `scope = remote` 時,遠端引用
} else if (Constants.SCOPE_REMOTE.equals(scope)) {
// it's declared as remote reference
isJvmRefer = false;
// 當 `generic = true` 時,即使用泛化調用,遠端引用。
} else if (url.getParameter(Constants.GENERIC_KEY, false)) {
// generic invocation is not local reference
isJvmRefer = false;
// 當本地已經有該 Exporter 時,本地引用
} else if (getExporter(exporterMap, url) != null) {
// by default, go through local reference if there's the service exposed locally
isJvmRefer = true;
// 預設,遠端引用
} else {
isJvmRefer = false;
}
return isJvmRefer;
}
2.3 Protocol
涉及的 Protocol 類圖如下:
2.3.1 ProtocolFilterWrapper
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 注冊中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 引用服務,傳回 Invoker 對象
// 給改 Invoker 對象,包裝成帶有 Filter 過濾鍊的 Invoker 對象.建立帶有 Filter 過濾鍊的 Invoker 對象。
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
2.3.2 ProtocolListenerWrapper
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 注冊中心協定
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 引用服務
Invoker<T> invoker = protocol.refer(type, url);
// 獲得 InvokerListener 數組
List<InvokerListener> listeners = Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, Constants.INVOKER_LISTENER_KEY));
// 建立 ListenerInvokerWrapper 對象
return new ListenerInvokerWrapper<T>(invoker, listeners);
}
2.3.3 InjvmProtocol
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
建立 InjvmInvoker 對象。注意,傳入的 exporterMap 參數,包含所有的 InjvmExporter 對象。
2.4 Invoker
2.4.1 AbstractInvoker
實作 Invoker 接口,抽象 Invoker 類,主要提供了 Invoker 的通用屬性和 #invoke(Invocation) 方法的通用實作。
本文主要涉及到它的通用屬性,代碼如下:
/**
* 接口類型
*/
private final Class<T> type;
/**
* 服務 URL
*/
private final URL url;
/**
* 公用的隐式傳參。在 {@link #invoke(Invocation)} 方法中使用。
*/
private final Map<String, String> attachment;
/**
* 是否可用
*/
private volatile boolean available = true;
/**
* 是否銷毀
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
2.4.1.1 InjvmInvoker
實作 AbstractInvoker 抽象類,Injvm Invoker 實作類。
/**
* 服務鍵
*/
private final String key;
/**
* Exporter 集合
*
* key: 服務鍵
*
* 該值實際就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;
InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
this.exporterMap = exporterMap;
}
2.4.1.2 isAvailable
isAvailable() 方法,是否可用
@Override
public boolean isAvailable() {
// 判斷是否有 Exporter 對象
InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
if (exporter == null) {
return false;
} else {
return super.isAvailable();
}
}
開啟 啟動時檢查 時,調用該方法,判斷該 Invoker 對象,是否有對應的 Exporter 。若不存在,說明依賴服務不存在,檢查不通過
2.4.2 ListenerInvokerWrapper
實作 Invoker 接口,具有監聽器功能的 Invoker 包裝器
public class ListenerInvokerWrapper<T> implements Invoker<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerInvokerWrapper.class);
/**
* 真實的 Invoker 對象
*/
private final Invoker<T> invoker;
/**
* Invoker 監聽器數組
*/
private final List<InvokerListener> listeners;
public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
if (invoker == null) {
throw new IllegalArgumentException("invoker == null");
}
this.invoker = invoker;
this.listeners = listeners;
// 執行監聽器
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.referred(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public String toString() {
return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}
public void destroy() {
try {
invoker.destroy();
} finally {
// 執行監聽器
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.destroyed(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
}
}
2.5 InvokerListener
com.alibaba.dubbo.rpc.InvokerListener ,Invoker 監聽器。
@SPI
public interface InvokerListener {
/**
* The invoker referred
*
* 當服務引用完成
*
* @param invoker
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#refer(Class, URL)
*/
void referred(Invoker<?> invoker) throws RpcException;
/**
* The invoker destroyed.
*
* 當服務銷毀引用完成
*
* @param invoker
* @see com.alibaba.dubbo.rpc.Invoker#destroy()
*/
void destroyed(Invoker<?> invoker);
}
2.5.1 InvokerListenerAdapter
com.alibaba.dubbo.rpc.listener.InvokerListenerAdapter ,實作 InvokerListener 接口,InvokerListener 擴充卡抽象類。代碼如下:
public abstract class InvokerListenerAdapter implements InvokerListener {
public void referred(Invoker<?> invoker) throws RpcException { }
public void destroyed(Invoker<?> invoker) { }
}
2.5.2 DeprecatedInvokerListener
現 InvokerListenerAdapter 抽象類 ,引用廢棄的服務時,列印錯誤日志提醒。代碼如下:
@Activate(Constants.DEPRECATED_KEY)
public class DeprecatedInvokerListener extends InvokerListenerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedInvokerListener.class);
public void referred(Invoker<?> invoker) throws RpcException {
if (invoker.getUrl().getParameter(Constants.DEPRECATED_KEY, false)) {
LOGGER.error("The service " + invoker.getInterface().getName() + " is DEPRECATED! Declare from " + invoker.getUrl());
}
}
}
3. 服務引用
在 Dubbo 中提供多種協定( Protocol ) 的實作,大體流程一緻,本文以 Dubbo Protocol 為例子,這也是 Dubbo 的預設協定。
遠端暴露服務的順序圖如下:
3.1 createProxy(map)
涉及遠端引用服務的代碼如下:
* 建立 Service 代理對象
*
* @param map 集合
* @return 代理對象
*/
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 是否本地引用
final boolean isJvmRefer;
// injvm 屬性為空,不通過該屬性判斷
if (isInjvm() == null) {
// 直連服務提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
// 通過 `tmpUrl` 判斷,是否需要本地引用
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
// 預設不是
} else {
isJvmRefer = false;
}
// 通過 injvm 屬性。
} else {
isJvmRefer = isInjvm();
}
// 本地引用
if (isJvmRefer) {
// 【省略代碼】本地引用
} else {
// 正常流程,一般為遠端引用
// 定義直連位址,可以是服務提供者的位址,也可以是注冊中心的位址
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 拆分位址成數組,使用 ";" 分隔。
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
// 循環數組,添加到 `url` 中。
if (us != null && us.length > 0) {
for (String u : us) {
// 建立 URL 對象
URL url = URL.valueOf(u);
// 設定預設路徑
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
// 注冊中心的位址,帶上服務引用的配置參數
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
// 服務提供者的位址
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
// 注冊中心
} else { // assemble URL from register center's configuration
// 加載注冊中心 URL 數組
List<URL> us = loadRegistries(false);
// 循環數組,添加到 `url` 中。
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加載監控中心 URL
URL monitorUrl = loadMonitor(u);
// 服務引用配置對象 `map`,帶上監控中心的 URL
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 注冊中心的位址,帶上服務引用的配置參數
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注冊中心,帶上服務引用的配置參數
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 單 `urls` 時,引用服務,傳回 Invoker 對象
if (urls.size() == 1) {
// 引用服務
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 循環 `urls` ,引用服務,傳回 Invoker 對象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 引用服務
invokers.add(refprotocol.refer(interfaceClass, url));
// 使用最後一個注冊中心的 URL
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 有注冊中心
if (registryURL != null) { // registry url is available
// 對有注冊中心的 Cluster 隻用 AvailableCluster
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
// 無注冊中心,全部都是服務直連
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 啟動時檢查
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 建立 Service 代理對象
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
3.2 Protocol
本文涉及的 Protocol 類圖如下:
3.2.1 ProtocolFilterWrapper
本文涉及的 #refer(type, url) 方法,代碼如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 注冊中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 引用服務,傳回 Invoker 對象
// 給改 Invoker 對象,包裝成帶有 Filter 過濾鍊的 Invoker 對象
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
3.2.2 RegistryProtocol
本文涉及的 #refer(type, url) 方法,代碼如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 獲得真實的注冊中心的 URL
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 獲得注冊中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 獲得服務引用配置參數集合
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
// 分組聚合,參見文檔 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 執行服務引用
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 執行服務引用
return doRefer(cluster, registry, type, url);
}
3.2.3 doRefer
#doRefer(cluster, registry, type, url) 方法,執行服務引用的邏輯。代碼如下:
/**
* 執行服務引用,傳回 Invoker 對象
*
* @param cluster Cluster 對象
* @param registry 注冊中心對象
* @param type 服務接口類型
* @param url 注冊中心 URL
* @param <T> 泛型
* @return Invoker 對象
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 建立 RegistryDirectory 對象,并設定注冊中心
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 建立訂閱 URL
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 服務引用配置集合
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 向注冊中心注冊自己(服務消費者)
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false))); // 不檢查的原因是,不需要檢查。
}
// 向注冊中心訂閱服務提供者 + 路由規則 + 配置規則
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 建立 Invoker 對象
Invoker invoker = cluster.join(directory);
// 向本地系統資料庫,注冊消費者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
3.3 DubboProtocol
3.3.1 refer
本文涉及的 #refer(type, url) 方法,代碼如下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// 初始化序列化優化器
optimizeSerialization(url);
// 獲得遠端通信用戶端數組
// 建立 DubboInvoker 對象
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 添加到 `invokers`
invokers.add(invoker);
return invoker;
}
3.3.2 getClients
getClients(url) 方法,獲得連接配接服務提供者的遠端通信用戶端數組。代碼如下:
/**
* 獲得連接配接服務提供者的遠端通信用戶端數組
*
* @param url 服務提供者 URL
* @return 遠端通信用戶端
*/
private ExchangeClient[] getClients(URL url) {
// 是否共享連接配接
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) { // 未配置時,預設共享
service_share_connect = true;
connections = 1;
}
// 建立連接配接服務提供者的 ExchangeClient 對象數組
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) { // 共享
clients[i] = getSharedClient(url);
} else { // 不共享
clients[i] = initClient(url);
}
}
return clients;
}
3.3.3 getSharedClient
getClients(url) 方法,獲得連接配接服務提供者的遠端通信用戶端數組。代碼如下:
* Get shared connection
*
* 獲得 ExchangeClient 對象。若集合中已經存在,則直接使用,無需建立。否則,建立 ExchangeClient 對象。
*/
private ExchangeClient getSharedClient(URL url) {
// 從集合中,查找 ReferenceCountExchangeClient 對象
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
// 若未關閉,增加指向該 Client 的數量,并傳回它
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
// 若已關閉,移除
} else {
referenceClientMap.remove(key);
}
}
// 同步,建立 ExchangeClient 對象。
synchronized (key.intern()) {
// 建立 ExchangeClient 對象
ExchangeClient exchangeClient = initClient(url);
// 将 `exchangeClient` 包裝,建立 ReferenceCountExchangeClient 對象
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
// 添加到集合
referenceClientMap.put(key, client);
// 添加到 `ghostClientMap`
ghostClientMap.remove(key);
return client;
}
}
3.3.4 initClient
#initClient(url) 方法,建立 ExchangeClient 對象,”連接配接”伺服器。
/**
* Create new connection
*
* 建立 ExchangeClient 對象,"連接配接"伺服器
*/
private ExchangeClient initClient(URL url) {
// 校驗 Client 的 Dubbo SPI 拓展是否存在
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
// 設定編解碼器為 Dubbo ,即 DubboCountCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 預設開啟 heartbeat
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 連接配接伺服器,建立用戶端
ExchangeClient client;
try {
// 懶連接配接,建立 LazyConnectExchangeClient 對象
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
// 直接連接配接,建立 HeaderExchangeClient 對象
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
3.4 Invoker
本文涉及的 Invoker 類圖如下:
3.4.1 DubboInvoker
實作 AbstractExporter 抽象類,Dubbo Invoker 實作類。代碼如下:
/**
* 遠端通信用戶端數組
*/
private final ExchangeClient[] clients;
/**
* 使用的 {@link #clients} 的位置
*/
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
/**
* 版本
*/
private final String version;
/**
* 銷毀鎖
*
* 在 {@link #destroy()} 中使用
*/
private final ReentrantLock destroyLock = new ReentrantLock();
/**
* Invoker 集合,從 {@link DubboProtocol#invokers} 擷取
*/
private final Set<Invoker<?>> invokers;
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
this(serviceType, url, clients, null);
}
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
this.clients = clients;
// get version.
this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
this.invokers = invokers;
}
3.5 Client
3.5.1 ReferenceCountExchangeClient
com.alibaba.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient ,實作 ExchangeClient 接口,支援指向計數的資訊交換用戶端實作類。
/**
* URL
*/
private final URL url;
/**
* 指向數量
*/
private final AtomicInteger refenceCount = new AtomicInteger(0);
/**
* 幽靈用戶端集合
*/
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
/**
* 用戶端
*/
private ExchangeClient client;
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
// 指向加一
refenceCount.incrementAndGet();
this.url = client.getUrl();
if (ghostClientMap == null) {
throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
}
this.ghostClientMap = ghostClientMap;
}
3.5.2 LazyConnectExchangeClient
com.alibaba.dubbo.rpc.protocol.dubbo.LazyConnectExchangeClient ,實作 ExchangeClient 接口,支援懶連接配接伺服器的資訊交換用戶端實作類。
/**
* URL
*/
private final URL url;
/**
* 通道處理器
*/
private final ExchangeHandler requestHandler;
/**
* 連接配接鎖
*/
private final Lock connectLock = new ReentrantLock();
/**
* lazy connect 如果沒有初始化時的連接配接狀态
*/
// lazy connect, initial state for connection
private final boolean initialState;
/**
* 通信用戶端
*/
private volatile ExchangeClient client;
/**
* 請求時,是否檢查告警
*/
protected final boolean requestWithWarning;
/**
* 警告計數器。每超過一定次數,列印告警日志。參見 {@link #warning(Object)}
*/
private AtomicLong warningcount = new AtomicLong(0);
public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) {
// lazy connect, need set send.reconnect = true, to avoid channel bad status.
this.url = url.addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString());
this.requestHandler = requestHandler;
this.initialState = url.getParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE);
this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false);
}