本文繼續上文Dubbo服務提供者啟動流程,在上篇文章中詳細梳理了基于dubbo spring檔案的配置方式,Dubbo是如何加載配置檔案,服務提供者dubbo:service标簽服務暴露全流程,本節重點關注RegistryProtocol#export中調用doLocalExport方法,其實主要是根據各自協定,服務提供者建立網絡伺服器,在特定端口建立監聽,監聽來自消息消費端服務的請求。
RegistryProtocol#doLocalExport:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // @1
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // @2
bounds.put(key, exporter);
}
}
}
return exporter;
}
代碼@1:如果服務提供者以dubbo協定暴露服務,getProviderUrl(originInvoker)傳回的URL将以dubbo://開頭。
代碼@2:根據Dubbo内置的SPI機制,将調用DubboProtocol#export方法。
1、源碼分析DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl(); // @1
// export service.
String key = serviceKey(url); // @2
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); //@3 start
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
} // @3 end
openServer(url); // @4
optimizeSerialization(url); // @5
return exporter;
}
代碼@1:擷取服務提供者URL,以協定名稱,這裡是dubbo://開頭。
代碼@2:從服務提供者URL中擷取服務名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。
代碼@3:是否将轉發事件導出成stub。
代碼@4:根據url打開服務,下面将詳細分析其實作。
代碼@5:根據url優化器序列化方式。
2、源碼分析DubboProtocol#openServer
private void openServer(URL url) {
// find server.
String key = url.getAddress(); // @1
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key); // @2
if (server == null) {
serverMap.put(key, createServer(url)); //@3
} else {
// server supports reset, use together with override
server.reset(url); //@4
}
}
}
代碼@1:根據url擷取網絡位址:ip:port,例如:192.168.56.1:20880,服務提供者IP與暴露服務端口号。
代碼@2:根據key從伺服器緩存中擷取,如果存在,則執行代碼@4,如果不存在,則執行代碼@3.
代碼@3:根據URL建立一伺服器,Dubbo服務提供者伺服器實作類為ExchangeServer。
代碼@4:如果伺服器已經存在,用目前URL重置伺服器,這個不難了解,因為一個Dubbo服務中,會存在多個dubbo:service标簽,這些标簽都會在服務台提供者的同一個IP位址、端口号上暴露服務。
2.1 源碼分析DubboProtocol#createServer
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @1
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @2
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @3
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @4
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @5
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // @6
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY); //@7
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
代碼@1:為服務提供者url增加channel.readonly.sent屬性,預設為true,表示在發送請求時,是否等待将位元組寫入socket後再傳回,預設為true。
代碼@2:為服務提供者url增加heartbeat屬性,表示心跳間隔時間,預設為60*1000,表示60s。
代碼@3:為服務提供者url增加server屬性,可選值為netty,mina等等,預設為netty。
代碼@4:根據SPI機制,判斷server屬性是否支援。
代碼@5:為服務提供者url增加codec屬性,預設值為dubbo,協定編碼方式。
代碼@6:根據服務提供者URI,服務提供者指令請求處理器requestHandler建構ExchangeServer執行個體。requestHandler的實作具體在以後詳細分析Dubbo服務調用時再詳細分析。
代碼@7:驗證用戶端類型是否可用。
2.1.1 源碼分析Exchangers.bind 根據URL、ExchangeHandler建構伺服器
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
上述代碼不難看出,首先根據url擷取Exchanger執行個體,然後調用bind方法建構ExchangeServer,Exchanger接口如下

- ExchangeServer bind(URL url, ExchangeHandler handler) : 服務提供者調用。
- ExchangeClient connect(URL url, ExchangeHandler handler):服務消費者調用。
dubbo提供的實作類為:HeaderExchanger,其bind方法如下:
HeaderExchanger#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
從這裡可以看出,端口的綁定由Transporters的bind方法實作。
2.1.2 源碼分析Transporters.bind方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
從這裡得知,Dubbo網絡傳輸的接口有Transporter接口實作,其繼承類圖所示:
本文以netty版本來檢視一下Transporter實作。
NettyTransporter源碼如下:
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
NettyServer建立網絡連接配接的實作方法為:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @1
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler); // @2
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
熟悉本方法需要具備Netty的知識,有關源碼:
閱讀Netty系列文章,這裡不對每一行代碼進行解讀,對于與網絡相關的參數,将在後續文章中詳細講解,本方法@1、@2引起了我的注意,首先建立NettyServer必須傳入一個服務提供者URL,但從DubboProtocol#createServer中可以看出,Server是基于網絡套接字(ip:port)緩存的,一個JVM應用中,必然會存在多個dubbo:server标簽,就會有多個URL,這裡為什麼可以這樣做呢?從DubboProtocol#createServer中可以看出,在解析第二個dubbo:service标簽時并不會調用createServer,而是會調用Server#reset方法,是不是這個方法有什麼魔法,在reset方法時能将URL也注冊到Server上,那接下來分析NettyServer#reset方法是如何實作的。
2.2 源碼分析DdubboProtocol#reset
reset方法最終将用Server的reset方法,同樣還是以netty版本的NettyServer為例,檢視reset方法的實作原理。NettyServer#reset--->父類(AbstractServer)
AbstractServer#reset
public void reset(URL url) {
if (url == null) {
return;
}
try { // @1 start
if (url.hasParameter(Constants.ACCEPTS_KEY)) {
int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(Constants.THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
} // @1 end
super.setUrl(getUrl().addParameters(url.getParameters())); // @2
}
代碼@1:首先是調整線程池的相關線程數量,這個好了解。、
代碼@2:然後設定調用setUrl覆寫原先NettyServer的private volatile URL url的屬性,那為什麼不會影響原先注冊的dubbo:server呢?
原來NettyHandler上加了注解:@Sharable,由該注解去實作線程安全。
Dubbo服務提供者啟動流程将分析到這裡了,本文并未對網絡細節進行詳細分析,旨在梳理出啟動流程,有關Dubbo服務網絡實作原理将在後續章節中詳細分析,敬請期待。
原文釋出時間為:2019-02-15
本文作者:丁威,《RocketMQ技術内幕》作者。
本文來自
中間件興趣圈,了解相關資訊可以關注
。