天天看點

dubbo服務暴露之遠端暴露前言重要類解釋:代碼調用邏輯圖具體代碼分析

文章目錄

  • 前言
  • 重要類解釋:
  • 代碼調用邏輯圖
  • 具體代碼分析
    • doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs)
    • proxyFactory.getInvoker(T proxy, Class type, URL url)
    • protocol.export(Invoker invoker)
    • RegistryProtocol.export(final Invoker originInvoker)
    • doLocalExport(final Invoker originInvoker)
    • DubboProtocol.export(Invoker invoker)
    • DubboProtocol.openServer(URL url)
    • DubboProtocol.createServer(URL url)
    • Exchangers.bind(URL url, ExchangeHandler handler)
    • getExchanger(URL url)
    • HeaderExchanger.bind(URL url, ExchangeHandler handler)
    • Transporters.bind(URL url, ChannelHandler... handlers)
    • NettyTransporter.bind(URL url, ChannelHandler listener)
    • NettyServer 構造器
    • doOpen()

前言

dubbo服務釋出共6個步驟:

    1)暴露本地服務

    2)暴露遠端服務

    3)啟動netty服務

    4)打開連接配接zk

    5)注冊到zk

    6)監聽zk

上次我們學習了《dubbo服務暴露之本地服務暴露》,今天我們學習下暴露遠端服務的過程

重要類解釋:

重要類解釋:

ProxyFactory:就是為了擷取一個接口的代理類,例如:擷取一個遠端接口的代理,Dubbo 預設的 ProxyFactory 實作類是 JavassistProxyFactory
Wrapper:它包裝了一個類或接口,類似于spring的BeanWrapper,可以通過Wrapper對執行個體對象進行指派、取值以及對方法的調用
Invoker:它是一個可執行對象,能夠根據方法的名稱、參數得到相應的結果
Exporter:維護Invoker的生命周期。
Transporter:網絡傳輸層,用來抽象netty和mina的統一接口
Exchanger:資訊交換層,封裝請求響應模式,同步轉異步

Protocol:它有兩個方法:

    1.export: 暴露遠端服務;将proxyFactory.getInvoker建立的代理類invoker對象,通過協定暴露給外部。

    2.refer:引用遠端服務(用于用戶端)

代碼調用邏輯圖

dubbo服務暴露之遠端暴露前言重要類解釋:代碼調用邏輯圖具體代碼分析

具體代碼分析

doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs)

代碼邏輯:

    1)将一些資訊,比如版本、時間戳、方法名以及各種配置對象的字段資訊放入到 map

    2)建構好 map 後,緊接着是擷取上下文路徑、主機名以及端口号等資訊

    3)最後将 map 和主機名等資料傳給 URL 構造方法建立 URL 對象,通過 URL 可讓 Dubbo 的各種配置在各個子產品之間傳遞

    4)暴露本地服務

    5)暴露遠端服務

    我們在《dubbo服務暴露之本地服務暴露》中講了前4個步驟,今天從第5步暴露遠端服務開始,源碼如下:

注:源碼中隻放了暴露遠端服務相關代碼

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

// -----------------------建構URL的代碼已省略,----------------------------------

	// scope 為none表示不暴露
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
    	// 配置不是remote的情況下做本地暴露(配置remote則隻暴露遠端服務)
        if (!"remote"..equalsIgnoreCase(scope)) {
            exportLocal(url);	//暴露本地服務
        }
        
        // 配置不是local的情況下做遠端暴露(配置local則隻暴露本地服務)
        if (!"local".equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            // 如果有注冊中心,就循環釋出到所有注冊中心裡
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                 	// 加載螢幕連結
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                    	// 将螢幕連結作為參數添加到 url 中
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                 	// 為服務提供類(ref)生成 Invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                 	// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                 	// 導出服務,并生成 Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
            	// 不存在注冊中心,僅導出服務
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}
           

在本方法中暴露遠端服務的過程是:

    1)如果有注冊中心,就循環釋出到所有注冊中心裡

    2)通過

proxyFactory.getInvoker(T proxy, Class<T> type, URL url)

方法建立一個可執行的Invoker對象

    3)調用

protocol.export(Invoker<T>)

方法執行暴露邏輯

proxyFactory.getInvoker(T proxy, Class type, URL url)

ProxyFactory

類的作用在上面已經介紹了,

proxyFactory

對象是

ServiceConfig

類的成員變量:

學習完《dubbo SPI 的實作》這一節後我們知道

proxyFactory

對象實際是

ProxyFactory$Adaptive

類型的對象

ProxyFactory$Adaptive

代碼如下:

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements ProxyFactory {
	public Object getProxy(Invoker arg0) throws RpcException {
		if (arg0 == null)
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
		if (arg0.getUrl() == null)
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
		URL url = arg0.getUrl();
		String extName = url.getParameter("proxy", "javassist");
		if (extName == null)
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
					+ url.toString() + ") use keys([proxy])");
		
		ProxyFactory extension = (ProxyFactory) ExtensionLoader
				.getExtensionLoader(ProxyFactory.class).getExtension(extName);
		return extension.getProxy(arg0);
	}

	public Invoker getInvoker(Object arg0, Class arg1, URL arg2) throws RpcException {
		if (arg2 == null)
			throw new IllegalArgumentException("url == null");
		URL url = arg2;
		String extName = url.getParameter("proxy", "javassist");
		if (extName == null)
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
					+ url.toString() + ") use keys([proxy])");
		
		ProxyFactory extension = (ProxyFactory) ExtensionLoader
				.getExtensionLoader(ProxyFactory.class).getExtension(extName);
		return extension.getInvoker(arg0, arg1, arg2);
	}
}
           

通過

getInvoker(Object arg0, Class arg1, URL arg2)

的源碼可以看到,最終傳回了

extension.getInvoker(arg0, arg1, arg2)

對象,而

extension

ProxyFactory

通過dubbo spi加載出來的對象,本例中spi加載的對象執行個體為

JavassistProxyFactory

源碼如下:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    	// 為目标類建立 Wrapper
    		// 在本例中為proxy.getClass().getName()==com.alibaba.dubbo.demo.provider.DemoServiceImpl
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 建立匿名 Invoker 類對象,并實作 doInvoke 方法。
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
            	// 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
           

最終傳回的Invoker對象就是DemoServiceImpl的可執行對象(關于Invoker類的介紹,請看上面)

protocol.export(Invoker invoker)

Protocol類的作用在上面已經介紹了,

protocol

對象也是

ServiceConfig

類的成員變量:

學習完《dubbo SPI 的實作》這一節後我們知道

protocol

對象實際是

Protocol$Adaptive

類型的對象

Protocol$Adaptive

代碼如下:

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements Protocol {
	public void destroy() {
		throw new UnsupportedOperationException(
				"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
	}

	public int getDefaultPort() {
		throw new UnsupportedOperationException(
				"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
	}

	public Invoker refer(Class arg0, URL arg1) throws RpcException {
		if (arg1 == null)
			throw new IllegalArgumentException("url == null");
		URL url = arg1;
		String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
		if (extName == null)
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
		Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
		return extension.refer(arg0, arg1);
	}

	public Exporter export(Invoker arg0) throws RpcException {
		if (arg0 == null)
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
		if (arg0.getUrl() == null)
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
		URL url = arg0.getUrl();
		String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
		if (extName == null)
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
		Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
		return extension.export(arg0);
	}
}
           

通過

export(Invoker arg0)

的源碼可以看到,最終傳回了

extension.export(arg0)

對象,而

extension

Protocol

接口通過dubbo spi加載出來的對象,本例中spi對象執行個體為

RegistryProxyFactory

,不過由于

ExtensionLoader.createExtension(String name)

方法中對

RegistryProxyFactory

進行了AOP的包裝,是以:

    1)

extension.export(arg0)

調用的是

ProtocolFilterWrapper

export(invoker)

方法

    2)

ProtocolFilterWrapper

export

方法又調用了

ProtocolListenerWrapper

export(invoker)

方法

    3)

ProtocolListenerWrapper

export

方法,調用了

RegistryProtocol

對象的

export

方法

注:ProtocolFilterWrapper和ProtocolListenerWrapper的調用邏輯很簡單,就不做展示了

RegistryProtocol.export(final Invoker originInvoker)

邏輯分析:

    1)調用 doLocalExport 導出服務

    2)向注冊中心注冊服務

    3)向注冊中心進行訂閱 override 資料

    4)建立并傳回 DestroyableExporter

源碼如下:

注:隻有第一步與暴露遠端服務有關,故其他代碼省略

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //netty服務暴露
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    // --------------------------連接配接zk的代碼已省略-------------------------
}
           

服務暴露的邏輯都在

doLocalExport

方法中了

doLocalExport(final Invoker originInvoker)

代碼邏輯:

    1)通過

originInvoker

對象的URL擷取緩存key

    2)通過緩存key,從緩存對象

bounds

中取出緩存對象

    3)如果緩存不存在就調用

protocol

export

方法導出服務,得到緩存對象

    4)将緩存對象存入緩存并傳回

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
	// 擷取invoker在bounds中緩存的key
    String key = getCacheKey(originInvoker);
    // 通路緩存
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            // 雙重檢查鎖
            if (exporter == null) {
            	// 建立 Invoker 為委托類對象
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
             	// 調用 protocol 的 export 方法導出服務
                exporter = new ExporterChangeableWrapper<T>(
                		(Exporter<T>) protocol.export(invokerDelegete), 
                		originInvoker);
             	// 入寫緩存
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}
           

第一次通路時,緩存中肯定沒有對象,是以一定會調用

protocol.export(invokerDelegete)

,而

protocol

對象是在

RegistryProtocol

類被dubbo spi加載的時候,就通過調用IoC原理的

ExtensionLoader.injectExtension(T instance)

方法注入的

Protocol$Adaptive

對象的執行個體。是以這裡調用的是

Protocol$Adaptive

類的export()方法,

Protocol$Adaptive

類源碼在上面。

Protocol$Adaptive.export(Invoker arg0)

方法執行了下面的代碼:

Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
           

其中

extName

的值為

“dubbo”

,是以SPI加載的是

DubboProtocol

類,但是根據dubbo SPI的AOP,dubbo在DubboProtocol的基礎上包裝了

ProtocolFilterWrapper

類和

ProtocolListenerWrapper

類,是以:

    1)

extension.export(arg0)

調用的是

ProtocolFilterWrapper

export(invoker)

方法

    2)

ProtocolFilterWrapper

export

方法又調用了

ProtocolListenerWrapper

export(invoker)

方法

    3)

ProtocolListenerWrapper

export

方法,調用了

DubboProtocol

對象的

export

方法

注:ProtocolFilterWrapper和ProtocolListenerWrapper的調用邏輯很簡單,就不做展示了,我們直接檢視

DubboProtocol

export

方法

DubboProtocol.export(Invoker invoker)

代碼邏輯:

    1)建立DubboExporter維護invoker的生命周期

    2)維護本地存根代碼

    3)啟動伺服器

    4)優化序列化

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 擷取服務辨別,由服務組名,服務名,服務版本号以及端口組成。比如:
        // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        String key = serviceKey(url);
        // 建立 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 将 <key, exporter> 鍵值對放入緩存中
        exporterMap.put(key, exporter);

        // 本地存根相關代碼
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        // 啟動伺服器
        openServer(url);
        // 優化序列化
        optimizeSerialization(url);
        return exporter;
    }
           

我們重點看下

DubboExporter

的建立以及

openServer

方法,其他邏輯看不懂也不影響我們服務釋出得到邏輯。其中

DubboExporter

就是用來維護我們的

invoker

對象生命周期的,代碼很簡單。

我們直接分析

openServer

方法

DubboProtocol.openServer(URL url)

    1)擷取伺服器執行個體的 key,格式為host:port

    2)通過key,從緩存中擷取對象

    3)如果沒有擷取到,就調用

createServer

建立一個

    4)如果擷取到了就調用

reset

重置服務

private void openServer(URL url) {
	// 擷取 host:port,并将其作為伺服器執行個體的 key,用于辨別目前的伺服器執行個體,如本例中key = 192.168.25.1:20880
    String key = url.getAddress();
    boolean isServer = url.getParameter("isserver", true);
    if (isServer) {
    	// 通路緩存
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
        	// 建立伺服器執行個體
            serverMap.put(key, createServer(url));
        } else {
        	// 伺服器已建立,則根據 url 中的配置重置伺服器
            server.reset(url);
        }
    }
}
           

第一次調用時,緩存中沒有

ExchangeServer

對象,是以會調用

createServer

方法建立

DubboProtocol.createServer(URL url)

/**
 * 1)組裝URL
 * 2)從url中擷取參數server的值,并檢測是否存在參數 server 所代表的 Transporter 拓展,不存在則抛出異常
 * 3)建立伺服器執行個體ExchangeServer
 * 4)從url中擷取參數 client 的值,并檢測是否支援參數 client 所表示的 Transporter 拓展,不存在也是抛出異常
 */
private ExchangeServer createServer(URL url) {
    url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString());
    // 添加心跳檢測配置到 url 中
    url = url.addParameterIfAbsent("heartbeat", String.valueOf(60 * 1000));
    // 擷取 server 參數,預設為 netty
    String str = url.getParameter("server", "netty");
    // 通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則抛出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    // 添加編碼解碼器參數
    url = url.addParameter("codec", "dubbo");
    ExchangeServer server;
    try {
    	// 建立 ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    // 擷取 client 參數,可指定 netty,mina
    str = url.getParameter("client");
    if (str != null && str.length() > 0) {
    	// 擷取所有的 Transporter 實作類名稱集合,比如 supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 檢測目前 Dubbo 所支援的 Transporter 實作類名稱清單中,是否包含 client 所表示的 Transporter,若不包含,則抛出異常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}
           

Exchangers.bind(URL url, ExchangeHandler handler)

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent("codec", "exchange");
    // 擷取 Exchanger,預設為 HeaderExchanger。
    // 緊接着調用 HeaderExchanger 的 bind 方法建立 ExchangeServer 執行個體
    return getExchanger(url).bind(url, handler);
}
           

bind方法先執行了getExchanger(url)擷取了Exchanger的執行個體對象

getExchanger(URL url)

public static Exchanger getExchanger(URL url) {
 	String type = url.getParameter("exchanger", "header");
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);;
}
           

上面的方法最終傳回的是

HeaderExchanger

類的執行個體對象,是以

getExchanger(url).bind(url, handler)

實際上執行的是

HeaderExchanger.bind(URL url, ExchangeHandler handler)

HeaderExchanger.bind(URL url, ExchangeHandler handler)

建立

HeaderExchangeServer

執行個體,該方法包含了多個邏輯,分别如下:

    1)

new HeaderExchangeHandler(handler)

    2)

new DecodeHandler(new HeaderExchangeHandler(handler))

    3)

Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(
    	Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    );
}
           

Transporters.bind(URL url, ChannelHandler… handlers)

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
        	// 如果 handlers 元素數量大于1,則建立 ChannelHandler 分發器
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 擷取自适應 Transporter 執行個體,并調用執行個體方法
        return getTransporter().bind(url, handler);
    }
           

上面的方法先調用

getTransporter()

擷取

Transporter

的自适應擴充類,代碼如下:

public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
           

最終傳回的對象是自适應擴充類

Transporter$Adaptive

的執行個體對象,然後調用對象的

bind(URL arg0, ChannelHandler arg1)

方法,代碼如下:

public Server bind(URL arg0, ChannelHandler arg1) throws RemotingException {
		if (arg0 == null)
			throw new IllegalArgumentException("url == null");
		URL url = arg0;
		String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
		if (extName == null)
			throw new IllegalStateException(
					"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString()
							+ ") use keys([server, transporter])");
		Transporter extension = (Transporter) ExtensionLoader
				.getExtensionLoader(Transporter.class).getExtension(extName);
		return extension.bind(arg0, arg1);
	}
           

通過這個

bind

方法可以知道,最終調用了DUBBO SPI加載的

Transporter

的實作類

NettyTransporter

bind(URL url, ChannelHandler listener)

方法

NettyTransporter.bind(URL url, ChannelHandler listener)

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}
           

NettyServer 構造器

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // 調用父類構造方法
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, "DubboServerHandler")));
    }
}

/**
 * 該類是NettyServer 的父類,由于NettyServer構造器調用了父類構造器,故而展示下父類構造方法
 */
public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        // 調用父類構造方法,這裡就不用跟進去了,沒什麼複雜邏輯
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        // 擷取 ip 和端口
        String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost());
        int bindPort = getUrl().getParameter("bind.port", getUrl().getPort());
        if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) {
            // 設定 ip 為 0.0.0.0
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 擷取最大可接受連接配接數
        this.accepts = url.getParameter("accepts", 0);
        this.idleTimeout = url.getParameter("idle.timeout", 600 * 1000);
        try {
            // 調用模闆方法 doOpen 啟動伺服器
            doOpen();
        } catch (Throwable t) {
            throw new RemotingException("Failed to bind ");
        }

        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
    
    protected abstract void doOpen() throws Throwable;

    protected abstract void doClose() throws Throwable;
}
           

上面的代碼大多都是從url中擷取參數值的代碼,我們直接看

doOpen()

方法

doOpen()

這個方法就是調用netty服務的方法了,裡面的代碼大多數也都是netty的代碼,代碼邏輯為:

    1)設定new NioServerSocketChannelFactory() boss worker線程池

    2)設定編解碼 decoder、encoder、handler3個管道

    3)bootstrap.bind()

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
     // 建立 boss 和 worker 線程池
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
     // 建立 ServerBootstrap
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        bootstrap.setOption("child.tcpNoDelay", true);
     // 設定 PipelineFactory
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // 綁定到指定的 ip 和端口上
        channel = bootstrap.bind(getBindAddress());
    }
           

至此,我們dubbo服務暴露之遠端暴露的邏輯就捋完了。