文章目錄
- 前言
- 重要類解釋:
- 代碼調用邏輯圖
- 具體代碼分析
-
- doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs)
- proxyFactory.getInvoker(T proxy, Class type, URL url)
- protocol.export(Invoker invoker)
- RegistryProtocol.export(final Invoker originInvoker)
- doLocalExport(final Invoker originInvoker)
- DubboProtocol.export(Invoker invoker)
- DubboProtocol.openServer(URL url)
- DubboProtocol.createServer(URL url)
- Exchangers.bind(URL url, ExchangeHandler handler)
- getExchanger(URL url)
- HeaderExchanger.bind(URL url, ExchangeHandler handler)
- Transporters.bind(URL url, ChannelHandler... handlers)
- NettyTransporter.bind(URL url, ChannelHandler listener)
- NettyServer 構造器
- doOpen()
前言
dubbo服務釋出共6個步驟:
1)暴露本地服務
2)暴露遠端服務
3)啟動netty服務
4)打開連接配接zk
5)注冊到zk
6)監聽zk
上次我們學習了《dubbo服務暴露之本地服務暴露》,今天我們學習下暴露遠端服務的過程
重要類解釋:
重要類解釋:
ProxyFactory:就是為了擷取一個接口的代理類,例如:擷取一個遠端接口的代理,Dubbo 預設的 ProxyFactory 實作類是 JavassistProxyFactory
Wrapper:它包裝了一個類或接口,類似于spring的BeanWrapper,可以通過Wrapper對執行個體對象進行指派、取值以及對方法的調用
Invoker:它是一個可執行對象,能夠根據方法的名稱、參數得到相應的結果
Exporter:維護Invoker的生命周期。
Transporter:網絡傳輸層,用來抽象netty和mina的統一接口
Exchanger:資訊交換層,封裝請求響應模式,同步轉異步
Protocol:它有兩個方法:
1.export: 暴露遠端服務;将proxyFactory.getInvoker建立的代理類invoker對象,通過協定暴露給外部。
2.refer:引用遠端服務(用于用戶端)
代碼調用邏輯圖
具體代碼分析
doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs)
代碼邏輯:
1)将一些資訊,比如版本、時間戳、方法名以及各種配置對象的字段資訊放入到 map
2)建構好 map 後,緊接着是擷取上下文路徑、主機名以及端口号等資訊
3)最後将 map 和主機名等資料傳給 URL 構造方法建立 URL 對象,通過 URL 可讓 Dubbo 的各種配置在各個子產品之間傳遞
4)暴露本地服務
5)暴露遠端服務
我們在《dubbo服務暴露之本地服務暴露》中講了前4個步驟,今天從第5步暴露遠端服務開始,源碼如下:
注:源碼中隻放了暴露遠端服務相關代碼
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// -----------------------建構URL的代碼已省略,----------------------------------
// scope 為none表示不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// 配置不是remote的情況下做本地暴露(配置remote則隻暴露遠端服務)
if (!"remote"..equalsIgnoreCase(scope)) {
exportLocal(url); //暴露本地服務
}
// 配置不是local的情況下做遠端暴露(配置local則隻暴露本地服務)
if (!"local".equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// 如果有注冊中心,就循環釋出到所有注冊中心裡
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加載螢幕連結
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将螢幕連結作為參數添加到 url 中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 為服務提供類(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 導出服務,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 不存在注冊中心,僅導出服務
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
在本方法中暴露遠端服務的過程是:
1)如果有注冊中心,就循環釋出到所有注冊中心裡
2)通過
proxyFactory.getInvoker(T proxy, Class<T> type, URL url)
方法建立一個可執行的Invoker對象
3)調用
protocol.export(Invoker<T>)
方法執行暴露邏輯
proxyFactory.getInvoker(T proxy, Class type, URL url)
ProxyFactory
類的作用在上面已經介紹了,
proxyFactory
對象是
ServiceConfig
類的成員變量:
學習完《dubbo SPI 的實作》這一節後我們知道
proxyFactory
對象實際是
ProxyFactory$Adaptive
類型的對象
ProxyFactory$Adaptive
代碼如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements ProxyFactory {
public Object getProxy(Invoker arg0) throws RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
ProxyFactory extension = (ProxyFactory) ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public Invoker getInvoker(Object arg0, Class arg1, URL arg2) throws RpcException {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
ProxyFactory extension = (ProxyFactory) ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
通過
getInvoker(Object arg0, Class arg1, URL arg2)
的源碼可以看到,最終傳回了
extension.getInvoker(arg0, arg1, arg2)
對象,而
extension
是
ProxyFactory
通過dubbo spi加載出來的對象,本例中spi加載的對象執行個體為
JavassistProxyFactory
源碼如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 為目标類建立 Wrapper
// 在本例中為proxy.getClass().getName()==com.alibaba.dubbo.demo.provider.DemoServiceImpl
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 建立匿名 Invoker 類對象,并實作 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
最終傳回的Invoker對象就是DemoServiceImpl的可執行對象(關于Invoker類的介紹,請看上面)
protocol.export(Invoker invoker)
Protocol類的作用在上面已經介紹了,
protocol
對象也是
ServiceConfig
類的成員變量:
學習完《dubbo SPI 的實作》這一節後我們知道
protocol
對象實際是
Protocol$Adaptive
類型的對象
Protocol$Adaptive
代碼如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public Invoker refer(Class arg0, URL arg1) throws RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public Exporter export(Invoker arg0) throws RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
通過
export(Invoker arg0)
的源碼可以看到,最終傳回了
extension.export(arg0)
對象,而
extension
是
Protocol
接口通過dubbo spi加載出來的對象,本例中spi對象執行個體為
RegistryProxyFactory
,不過由于
ExtensionLoader.createExtension(String name)
方法中對
RegistryProxyFactory
進行了AOP的包裝,是以:
1)
extension.export(arg0)
調用的是
ProtocolFilterWrapper
的
export(invoker)
方法
2)
ProtocolFilterWrapper
的
export
方法又調用了
ProtocolListenerWrapper
的
export(invoker)
方法
3)
ProtocolListenerWrapper
的
export
方法,調用了
RegistryProtocol
對象的
export
方法
注:ProtocolFilterWrapper和ProtocolListenerWrapper的調用邏輯很簡單,就不做展示了
RegistryProtocol.export(final Invoker originInvoker)
邏輯分析:
1)調用 doLocalExport 導出服務
2)向注冊中心注冊服務
3)向注冊中心進行訂閱 override 資料
4)建立并傳回 DestroyableExporter
源碼如下:
注:隻有第一步與暴露遠端服務有關,故其他代碼省略
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//netty服務暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// --------------------------連接配接zk的代碼已省略-------------------------
}
服務暴露的邏輯都在
doLocalExport
方法中了
doLocalExport(final Invoker originInvoker)
代碼邏輯:
1)通過
originInvoker
對象的URL擷取緩存key
2)通過緩存key,從緩存對象
bounds
中取出緩存對象
3)如果緩存不存在就調用
protocol
的
export
方法導出服務,得到緩存對象
4)将緩存對象存入緩存并傳回
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 擷取invoker在bounds中緩存的key
String key = getCacheKey(originInvoker);
// 通路緩存
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
// 雙重檢查鎖
if (exporter == null) {
// 建立 Invoker 為委托類對象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 調用 protocol 的 export 方法導出服務
exporter = new ExporterChangeableWrapper<T>(
(Exporter<T>) protocol.export(invokerDelegete),
originInvoker);
// 入寫緩存
bounds.put(key, exporter);
}
}
}
return exporter;
}
第一次通路時,緩存中肯定沒有對象,是以一定會調用
protocol.export(invokerDelegete)
,而
protocol
對象是在
RegistryProtocol
類被dubbo spi加載的時候,就通過調用IoC原理的
ExtensionLoader.injectExtension(T instance)
方法注入的
Protocol$Adaptive
對象的執行個體。是以這裡調用的是
Protocol$Adaptive
類的export()方法,
Protocol$Adaptive
類源碼在上面。
Protocol$Adaptive.export(Invoker arg0)
方法執行了下面的代碼:
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
其中
extName
的值為
“dubbo”
,是以SPI加載的是
DubboProtocol
類,但是根據dubbo SPI的AOP,dubbo在DubboProtocol的基礎上包裝了
ProtocolFilterWrapper
類和
ProtocolListenerWrapper
類,是以:
1)
extension.export(arg0)
調用的是
ProtocolFilterWrapper
的
export(invoker)
方法
2)
ProtocolFilterWrapper
的
export
方法又調用了
ProtocolListenerWrapper
的
export(invoker)
方法
3)
ProtocolListenerWrapper
的
export
方法,調用了
DubboProtocol
對象的
export
方法
注:ProtocolFilterWrapper和ProtocolListenerWrapper的調用邏輯很簡單,就不做展示了,我們直接檢視
DubboProtocol
的
export
方法
DubboProtocol.export(Invoker invoker)
代碼邏輯:
1)建立DubboExporter維護invoker的生命周期
2)維護本地存根代碼
3)啟動伺服器
4)優化序列化
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 擷取服務辨別,由服務組名,服務名,服務版本号以及端口組成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 建立 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 将 <key, exporter> 鍵值對放入緩存中
exporterMap.put(key, exporter);
// 本地存根相關代碼
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 啟動伺服器
openServer(url);
// 優化序列化
optimizeSerialization(url);
return exporter;
}
我們重點看下
DubboExporter
的建立以及
openServer
方法,其他邏輯看不懂也不影響我們服務釋出得到邏輯。其中
DubboExporter
就是用來維護我們的
invoker
對象生命周期的,代碼很簡單。
我們直接分析
openServer
方法
DubboProtocol.openServer(URL url)
1)擷取伺服器執行個體的 key,格式為host:port
2)通過key,從緩存中擷取對象
3)如果沒有擷取到,就調用
createServer
建立一個
4)如果擷取到了就調用
reset
重置服務
private void openServer(URL url) {
// 擷取 host:port,并将其作為伺服器執行個體的 key,用于辨別目前的伺服器執行個體,如本例中key = 192.168.25.1:20880
String key = url.getAddress();
boolean isServer = url.getParameter("isserver", true);
if (isServer) {
// 通路緩存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 建立伺服器執行個體
serverMap.put(key, createServer(url));
} else {
// 伺服器已建立,則根據 url 中的配置重置伺服器
server.reset(url);
}
}
}
第一次調用時,緩存中沒有
ExchangeServer
對象,是以會調用
createServer
方法建立
DubboProtocol.createServer(URL url)
/**
* 1)組裝URL
* 2)從url中擷取參數server的值,并檢測是否存在參數 server 所代表的 Transporter 拓展,不存在則抛出異常
* 3)建立伺服器執行個體ExchangeServer
* 4)從url中擷取參數 client 的值,并檢測是否支援參數 client 所表示的 Transporter 拓展,不存在也是抛出異常
*/
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString());
// 添加心跳檢測配置到 url 中
url = url.addParameterIfAbsent("heartbeat", String.valueOf(60 * 1000));
// 擷取 server 參數,預設為 netty
String str = url.getParameter("server", "netty");
// 通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則抛出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加編碼解碼器參數
url = url.addParameter("codec", "dubbo");
ExchangeServer server;
try {
// 建立 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 擷取 client 參數,可指定 netty,mina
str = url.getParameter("client");
if (str != null && str.length() > 0) {
// 擷取所有的 Transporter 實作類名稱集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 檢測目前 Dubbo 所支援的 Transporter 實作類名稱清單中,是否包含 client 所表示的 Transporter,若不包含,則抛出異常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchangers.bind(URL url, ExchangeHandler handler)
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent("codec", "exchange");
// 擷取 Exchanger,預設為 HeaderExchanger。
// 緊接着調用 HeaderExchanger 的 bind 方法建立 ExchangeServer 執行個體
return getExchanger(url).bind(url, handler);
}
bind方法先執行了getExchanger(url)擷取了Exchanger的執行個體對象
getExchanger(URL url)
public static Exchanger getExchanger(URL url) {
String type = url.getParameter("exchanger", "header");
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);;
}
上面的方法最終傳回的是
HeaderExchanger
類的執行個體對象,是以
getExchanger(url).bind(url, handler)
實際上執行的是
HeaderExchanger.bind(URL url, ExchangeHandler handler)
HeaderExchanger.bind(URL url, ExchangeHandler handler)
建立
HeaderExchangeServer
執行個體,該方法包含了多個邏輯,分别如下:
1)
new HeaderExchangeHandler(handler)
2)
new DecodeHandler(new HeaderExchangeHandler(handler))
3)
Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(
Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
);
}
Transporters.bind(URL url, ChannelHandler… handlers)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素數量大于1,則建立 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 擷取自适應 Transporter 執行個體,并調用執行個體方法
return getTransporter().bind(url, handler);
}
上面的方法先調用
getTransporter()
擷取
Transporter
的自适應擴充類,代碼如下:
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
最終傳回的對象是自适應擴充類
Transporter$Adaptive
的執行個體對象,然後調用對象的
bind(URL arg0, ChannelHandler arg1)
方法,代碼如下:
public Server bind(URL arg0, ChannelHandler arg1) throws RemotingException {
if (arg0 == null)
throw new IllegalArgumentException("url == null");
URL url = arg0;
String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString()
+ ") use keys([server, transporter])");
Transporter extension = (Transporter) ExtensionLoader
.getExtensionLoader(Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}
通過這個
bind
方法可以知道,最終調用了DUBBO SPI加載的
Transporter
的實作類
NettyTransporter
的
bind(URL url, ChannelHandler listener)
方法
NettyTransporter.bind(URL url, ChannelHandler listener)
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyServer 構造器
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 調用父類構造方法
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, "DubboServerHandler")));
}
}
/**
* 該類是NettyServer 的父類,由于NettyServer構造器調用了父類構造器,故而展示下父類構造方法
*/
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 調用父類構造方法,這裡就不用跟進去了,沒什麼複雜邏輯
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 擷取 ip 和端口
String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost());
int bindPort = getUrl().getParameter("bind.port", getUrl().getPort());
if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 設定 ip 為 0.0.0.0
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 擷取最大可接受連接配接數
this.accepts = url.getParameter("accepts", 0);
this.idleTimeout = url.getParameter("idle.timeout", 600 * 1000);
try {
// 調用模闆方法 doOpen 啟動伺服器
doOpen();
} catch (Throwable t) {
throw new RemotingException("Failed to bind ");
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
}
上面的代碼大多都是從url中擷取參數值的代碼,我們直接看
doOpen()
方法
doOpen()
這個方法就是調用netty服務的方法了,裡面的代碼大多數也都是netty的代碼,代碼邏輯為:
1)設定new NioServerSocketChannelFactory() boss worker線程池
2)設定編解碼 decoder、encoder、handler3個管道
3)bootstrap.bind()
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 建立 boss 和 worker 線程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 建立 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
// 設定 PipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 綁定到指定的 ip 和端口上
channel = bootstrap.bind(getBindAddress());
}
至此,我們dubbo服務暴露之遠端暴露的邏輯就捋完了。