閱讀須知
- dubbo版本:2.6.0
- spring版本:4.3.8
- 文章中使用注釋的方法會做深入分析
正文
我們知道,dubbo的生産者和消費者的關系維護在注冊中心,是以,消費者關聯生産者肯定是需要訂閱注冊中心的相關生産者資訊才能完成,在Dubbo源碼解析之registry注冊中心這篇文章中我們分析了dubbo有關注冊中心的一些操作如注冊、訂閱等,在文章的最後,我們分析了消費者訂閱注冊中心的configuration、routers、providers等資訊的流程,在處理provider資訊的時候,會建立invoker,
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
我們就接着invoker的建立繼續分析,來看consumer如何關聯provider。
DubboProtocol:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
/* 擷取client,建構rpc invoker */
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
DubboProtocol:
private ExchangeClient[] getClients(URL url) {
// 是否共享連接配接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
// 如果未配置,則共享連接配接,否則一個服務一個連接配接
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
/* 擷取共享的client */
clients[i] = getSharedClient(url);
} else {
/* 初始化client */
clients[i] = initClient(url);
}
}
return clients;
}
DubboProtocol:
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
// 緩存不為空并且client未關閉,增加引用計數
client.incrementAndGetCount();
return client;
} else {
// 關閉的client從緩存中移除
referenceClientMap.remove(key);
}
}
synchronized (key.intern()) {
/* 初始化client */
ExchangeClient exchangeClient = initClient(url);
// 包裝client
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
}
DubboProtocol:
private ExchangeClient initClient(URL url) {
// client 類型,預設netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 預設啟動心跳檢測
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO是不允許的,因為它有嚴重的性能問題
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
/* 建立連接配接 */
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
Exchangers:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
/* 建立連接配接 */
return getExchanger(url).connect(url, handler);
}
HeaderExchanger:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
/* 建立連接配接,建構HeaderExchangeClient */
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
/* 建立連接配接 */
return getTransporter().connect(url, handler);
}
NettyTransporter:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
/* 建立client */
return new NettyClient(url, listener);
}
NettyClient:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
/* handler包裝過程我們在provider暴露過程中分析過 */
super(url, wrapChannelHandler(url, handler));
}
AbstractClient:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler); // 父類初始化
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
doOpen(); /* 開啟服務 */
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
connect(); /* 建立連接配接 */
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close(); // 異常關閉操作,如關閉channel等
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close(); // 異常關閉操作,如關閉channel等
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
NettyClient:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
開啟服務就是調用netty3的API來建構ClientBootstrap。
AbstractClient:
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
/* 初始化重連線程 */
initConnectStatusCheckCommand();
doConnect(); /* 連接配接操作 */
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
AbstractClient:
private synchronized void initConnectStatusCheckCommand() {
int reconnect = getReconnectParam(getUrl());
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
// 重連任務
Runnable connectStatusCheckCommand = new Runnable() {
public void run() {
try {
if (!isConnected()) {
connect(); // 建立連接配接
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// 等待注冊中心同步provider清單
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
// 固定間隔執行重連任務
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
NettyClient:
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 擷取連接配接位址建立連接配接
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// 關閉舊的channel
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 如果client關閉了,則新的channel也要關閉
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
future.cancel();
}
}
}
整個建立連接配接開啟client的過程和我們分析provider暴露過程時開啟NettyServer的流程非常相似,請求的發送和響應的流程我們也分析過,consumer和provider複用相同的方法,到這裡,consumer關聯provider的流程就分析完成了。