先思考一個問題,服務引用的時候需要做什麼呢?
整理一下:
- 建立注冊中心連結,連結注冊中心
- 訂閱服務
- 訂閱通知更新服務資訊
- 建立代理類對象
Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務,第二個是在 ReferenceBean 對應的服務被注入到其他類中時引用。這兩個引用服務的時機差別在于,第一個是餓漢式的,第二個是懶漢式的。預設情況下,Dubbo 使用懶漢式引用服務。如果需要使用餓漢式,可通過配置 <dubbo:reference> 的 init 屬性開啟。
我們還是從DubboBootStrap的start方法進入看
private void referServices() {
if (cache == null) {
cache = ReferenceConfigCache.getCache();
}
configManager.getReferences().forEach(rc -> {
// TODO, compatible with ReferenceConfig.refer()
ReferenceConfig referenceConfig = (ReferenceConfig) rc;
referenceConfig.setBootstrap(this);
if (rc.shouldInit()) {
if (referAsync) {
CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
executorRepository.getServiceExporterExecutor(),
() -> cache.get(rc)
);
asyncReferringFutures.add(future);
} else {
cache.get(rc);
}
}
});
}
然後是cache.get(rc);
public <T> T get(ReferenceConfigBase<T> referenceConfig) {
String key = generator.generateKey(referenceConfig);
Class<?> type = referenceConfig.getInterfaceClass();
proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>());
ConcurrentMap<String, Object> proxiesOfType = proxies.get(type);
proxiesOfType.computeIfAbsent(key, _k -> {
Object proxy = referenceConfig.get();
referredReferences.put(key, referenceConfig);
return proxy;
});
return (T) proxiesOfType.get(key);
}
然後是referenceConfig.get();方法
public synchronized T get() {
// 是否已被銷毀
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 檢測 ref 是否為空,為空則通過 init 方法建立
if (ref == null) {
// init 方法主要用于處理配置,以及調用 createProxy 生成代理類
init();
}
return ref;
}
然後是init方法,該方法比較長,這裡就不貼全了,主要就是配置檢查、構造參數map、生成代理對象。其中構造map的代碼和之前分析服務導出時挺相似的,可以對照着看:
public synchronized void init() {
... 省略代碼 配置檢查、構造參數map
// 建立代理類
ref = createProxy(map);
... 省略代碼
}
重點在建立代理類,跟進去:
private T createProxy(Map<String, String> map) {
// 本地引用
if (shouldJvmRefer(map)) {
// 生成本地引用 URL,協定為 injvm
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
// 調用 refer 方法建構 InjvmInvoker 執行個體
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
// 遠端引用
} else {
urls.clear();
// url 不為空,表明使用者可能想進行點對點調用
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 當需要配置多個 url 時,可用分号進行分割,這裡會進行切分
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
// 設定接口全限定名為 url 路徑
url = url.setPath(interfaceName);
}
// 檢測 url 協定是否為 registry,若是,表明使用者想使用指定的注冊中心
if (UrlUtils.isRegistry(url)) {
// 将 map 轉換為查詢字元串,并作為 refer 參數的值添加到 url 中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服務提供者的一些配置(這些配置來源于使用者配置的 url 屬性),
// 比如線程池相關配置。并保留服務提供者的部配置設定置,比如版本,group,時間戳等
// 最後将合并後的配置設定為 url 查詢字元串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
// 加載注冊中心 url
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 參數到 url 中,并将 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注冊中心,抛出異常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
// RegistryProtocol中定義了一個服務目錄RegistryDirectory
// 發起服務目錄RegistryDirectory的notify通知invoker重新整理
// 重新整理invoker時會服務目錄持有的DubboProtocol.refer實作服務引入
// DubboProtocol refer作用,建構DubboInvoker,建構該對象時需要建立一個ExchangeClient,通過exchange層下調Transporter 建立netty連結
// 單個注冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
// 調用 RegistryProtocol 的 refer 建構 Invoker 執行個體
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
// 多個注冊中心或多個服務提供者,或者兩者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 擷取所有的 Invoker
for (URL url : urls) {
// 通過 refprotocol 調用 refer 建構 Invoker,refprotocol 會在運作時
// 根據 url 協定頭加載指定的 Protocol 執行個體,并調用執行個體的 refer 方法
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
// 如果注冊中心連結不為空,則将使用 AvailableCluster
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// 建立 StaticDirectory 執行個體,并由 Cluster 對多個 Invoker 進行合并
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// 生成代理類
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
上面代碼很多,不過邏輯比較清晰。首先根據配置檢查是否為本地調用,若是,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 執行個體。若不是,則讀取直連配置項,或注冊中心 url,并将讀取到的 url 存儲到 urls 中。然後根據 urls 元素數量進行後續操作。若 urls 元素數量為1,則直接通過 Protocol 自适應拓展類建構 Invoker 執行個體接口。若 urls 元素數量大于1,即存在多個注冊中心或服務直連 url,此時先根據 url 建構 Invoker。然後再通過 Cluster 合并多個 Invoker,最後調用 ProxyFactory 生成代理類。
這裡我們主要分析兩部分内容:
- invoker的生成
- 建立代理類并生成執行個體
Invoker建立
Invoker 是 Dubbo 的核心模型,代表一個可執行體。在服務提供方,Invoker 用于調用服務提供類。在服務消費方,Invoker 用于執行遠端調用。Invoker 是由 Protocol 實作類建構而來。
我們先看REF_PROTOCOL.refer方法,一樣的,這裡的protocol是自适應擴充,此時的實作是RegistryProtocol:
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 參數值,并将其設定為協定頭
url = getRegistryUrl(url);
// 擷取注冊中心執行個體
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 将 url 查詢字元串轉為 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 擷取 group 配置
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 通過 SPI 加載 MergeableCluster 執行個體,并調用 doRefer 繼續執行服務引用邏輯
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
// 調用 doRefer 繼續執行服務引用邏輯
return doRefer(cluster, registry, type, url);
}
然後是doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 建立 RegistryDirectory 執行個體
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設定注冊中心和協定
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 生成服務消費者連結
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注冊服務消費者,在 consumers 目錄下新節點
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 訂閱 providers、configurators、routers 等節點資料
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 一個注冊中心可能有多個服務提供者,是以這裡需要将多個服務提供者合并為一個
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
doRefer 方法建立一個 RegistryDirectory 執行個體,然後生成服務者消費者連結,并向注冊中心進行注冊。注冊完畢後,緊接着訂閱 providers、configurators、routers 等節點下的資料。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點資訊。由于一個服務可能部署在多台伺服器上,這樣就會在 providers 産生多個節點,這個時候就需要 Cluster 将多個服務節點合并為一個,并生成一個 Invoker。
訂閱服務後,有服務内容會觸發服務通知,最終會調用到Protocol的refer方法,不同的協定對應的實作類不同,以DubboProtocol為例:
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 異步轉同步的Invoker
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
這段代碼是在父類AbstractProtocol裡,是由前面的服務目錄RegistryDirectory訂閱時觸發notify通知消息過來的,可以了解成當連接配接注冊中心後會将訂閱的服務拉到本地并緩存起來。具體服務目錄的通知邏輯後面單獨分析。這裡主要分析一下DubboProtocol在refer時做了什麼。
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
// 建立 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
實際上就是傳回一個invoker,注意getClients這個方法
private ExchangeClient[] getClients(URL url) {
// whether to share connection
// 是否共享連接配接
boolean useShareConnect = false;
// 擷取連接配接數,預設為0,表示未配置
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
// 如果未配置 connections,則共享連接配接
if (connections == 0) {
useShareConnect = true;
/*
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
// 擷取共享用戶端
clients[i] = shareClients.get(i);
} else {
// 初始化新的用戶端
clients[i] = initClient(url);
}
}
return clients;
}
這裡到交換層初始化用戶端
private ExchangeClient initClient(URL url) {
// 擷取用戶端類型,預設為 netty
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// 添加編解碼和心跳包參數到 url 中
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
// 檢測用戶端類型是否存在,不存在則抛出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
// 擷取 lazy 配置,并根據配置值決定建立的用戶端類型
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
// 建立懶加載 ExchangeClient 執行個體
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 建立 普通的 ExchangeClient 執行個體
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
看一下Exchangers.connect(url, requestHandler);方法:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 擷取 Exchanger 執行個體,預設為 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
然後是
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 這裡包含了多個調用,分别如下:
// 1. 建立 HeaderExchangeHandler 對象
// 2. 建立 DecodeHandler 對象
// 3. 通過 Transporters 建構 Client 執行個體
// 4. 建立 HeaderExchangeClient 對象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
和之前bind差不多。再看Transporters.connect
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 數量大于1,則建立一個 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 擷取 Transporter 自适應拓展類,并調用 connect 方法生成 Client 執行個體
return getTransporter().connect(url, handler);
}
然後是NettyTransporter的connect
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
initExecutor(url);
try {
// 建立netty用戶端
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
// 建立連接配接
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
}
看一下doOpen方法:
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
标準的netty用戶端初始化
簡單總結一下:
invoker建立過程:
- 建立zookeeper連接配接,發起訂閱
- 訂閱觸發服務目錄RegistryDirectory的通知方法
- 通知發現有服務變化後會建立對應的連結,并将建立的invoker重新整理到服務目錄中。