天天看點

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

    本節将主要學習Dubbo是如何使用Netty來實作網絡通訊的。

    從官網我們得知,Dubbo協定是使用單一長連接配接來進行網絡傳輸,也就是說服務調用方持久與服務提供者建立一條連接配接,所有的服務調用調用資訊通過。

    一條TCP連接配接進行傳輸,在網絡層至少要考慮如下問題:

    1、服務端,用戶端網絡通訊模型(線程模型)

    2、傳輸(編碼解碼、序列化)。

    3、服務端轉發政策等。

    Dubbo服務端的網絡啟動流程,在上篇中已給出序列圖,本節還是以該圖為切入點,引入本文的兩個主人公:NettyServer、NettyClient。

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

    dubbo使用SPI機制,根據配置,可以支援如下架構實作網絡通訊模型,例如netty3,netty4、mina、grizzly,本文重點分析基于Netty4的實作,包路徑:dubbo-remoting-netty4。

    從上面的流程圖,NettyTransport的職責就是調用new NettyServer的構造方法,進而建構NettyServer對象,在深入NettyServer對象構造過程之前,先來看一下NettyServer的類繼承層次:

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

   NettyServer構造函數:

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {  // @1
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));    // @2
}
           

   代碼@1:URL url:服務提供者URL;ChannelHandler網絡事件處理器,

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

也就是當相應網絡事件觸發時,執行的事件處理器。

  • void connected(Channel channel) throws RemotingException

    連接配接事件,當收到用戶端的連接配接事件時,執行該方法處理相關業務操作。

  • void disconnected(Channel channel) throws RemotingException:連接配接斷開事件
  • void sent(Channel channel, Object message) throws RemotingException

    當可寫事件觸發時,服務端向用戶端傳回響應資料,就是通過該方法發送的。

  • void received(Channel channel, Object message) throws RemotingException

    當讀事件觸發時執行該方法,服務端在收到用戶端的請求資料是,調用該方法執行解包等操作。

  • void caught(Channel channel, Throwable exception) throws RemotingException

    發生異常時,調用該方法。

   代碼@2:調用ChannelHandlers.wrap對原生Handler進行包裝,然後調用其父類的構造方法,首先,設定Dubbo服務端線程池中線程的名稱,可以通過參數threadname來指定線程池中線程的字首,預設為:DubboServerHandler + dubbo服務端IP與接口号。我比較好奇的是這裡為什麼需要對ChannelHandler進行包裝呢?是增加了些什麼邏輯呢?帶着者問題,引出本節重點探讨的内容:事件派發機制。

    事件派發機制指的是網絡事件(連接配接、讀、寫)等事件觸發後,這些事件如何執行,是由IO線程還是派發到線程池中執行。Dubbo定義了如下5種事件派發機制:

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

   本文将詳細分析各種事件的派發實作原理。

   ChannelHandlers#wrapInternal

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
}
           

   這裡是典型的裝飾模式,MultiMessageHandler,多消息處理Handler,HeartbeatHandler,心跳Handler,其主要功能是處理心跳傳回與心跳請求,直接在IO線程中執行,每次收到資訊,更新通道的讀事件戳,每次發送資料時,記錄通道的寫事件戳。這裡的核心關鍵是利用SPI自适配,傳回合适的事件派發機制。Dispatcher的類層次結構如圖所示:

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

   1、源碼分析AllDispatcher實作原理

   線程派發機制:所有的消息都派發到線程池,包括請求、響應、連接配接事件、斷開事件、心跳等。

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
           

   從中可以看出,事件派發類繼承圖分兩個次元,Dispatcher(事件派發器)、與之對應的ChannelHandler,例如AllChannelHandler。

   1.1 WrappedChannelHandler

   接下來分析事件派發機制,重點關注ChannelHandler類的實作體系。

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

   縱觀Dubbo ChannelHanler體系的設計,是經典的類裝飾器模式,上述派發器主要解決的問題,是相關網絡事件(連接配接、讀(請求)、寫(響應)、心跳請求、心跳響應)是在IO線程、還是在額外定義的線程池,故WrappedChannelHandler的主要職責是定義線程池相關的邏輯,具體是在IO線程上執行,還是在定義的線程池中執行,則由子類具體去定制,WrappedChannelHandler預設實作ChannelHandler的所有方法,各個方法的實作直接調用被裝飾Handler的方法,見下圖:

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

   接下來先重點關注一下WrappedChannelHandler的成員變量和構造方法的實作。

protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
           
  • ExecutorService SHARED_EXECUTOR:共享線程池,預設線程池,如果

    ExecutorService executor為空,則使用SHARED_EXECUTOR

  • ExecutorService executor 定義的線程池
  • ChannelHandler handler:被裝飾的ChannelHandler
  • URL url 服務提供者URL

       接下來關注一下其構造函數:

public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);    // @1

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);  // @2
    }
           

   代碼@1:建構線程池,這裡基于SPI機制,使用者可選擇cached、eager、fixed、limited,将在本節下面詳細介紹,這裡隻需要知道是建構了一個線程池。

   代碼@2:将服務端都與線程池緩存起來,在服務端,線程池的緩存級别是 服務提供者協定(端口):線程池。

   1.2 AllChannelHandler

事件派發機制:所有網絡事件線上程池中執行,其實作機制肯定是重寫ChannelHandler的所有網絡事件方法,将調用其修飾的ChannelHanlder線上程池中執行。由于AllChannelHandler是第一個事件派發機制,故對其實作做一個較長的描述。

   1.2.1 AllChannelHandler#connected

public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
           

   連接配接事件,其主要實作是,首先先擷取執行線程池,其擷取邏輯是如果executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).

getAdaptiveExtension().getExecutor(url);擷取不到線程池,則使用共享線程池。可以看出,連接配接事件的業務調用時異步執行,基于線程池。

注:調用時機,服務端收到用戶端連接配接後,該方法會被調用。

   2.2.2 AllChannelHandler#disconnected

public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
           

   其基本實作與connected相同,就是将具體的disconnected 事件所對應的業務擴充方法線上程池中執行。

   注:調用時機,服務端收到用戶端斷開連接配接後,該方法會被調用。

   2.2.3 AllChannelHandler#received

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
           

   調用時機:當服務端收到用戶端發送的請求後,經過IO線程(Netty)會首先從二進制流中解碼出一個個的請求,參數Object message,就是調用請求,然後在送出給線程池執行,執行完後,當業務處理完畢後,組裝結果後,必然會在該線程中調用通道(Channel#write,flush)方法,向通道寫入響應結果。

注:all事件派發機制,ChannelHandler#recive是線上程池中執行。

   2.2.4 AllChannelHandler#caught

public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
           

   當發生異常時,ChannelHandler#caught也線上程池中執行。

   令人頗感意外的是,AllChannelHandler并未重寫WrappedChannelHandler的sent方法,也就是說ChannelHandler#sent事件回調方法,是在IO線程中執行。

WrappedChannelHandler#sent

public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
}
           

   這個和官方文檔還是有一定出入的。

源碼分析Dubbo網絡通訊篇之NettyServer網絡事件派發機制(Dispatch)

   1.3 ExecutionChannelHandler

   對應事件派件器:ExecutionDispatcher,其配置值:execution,從其源碼的實作來看,與AllDispatcher實作基本類似,唯一的差別是,如果executor線程池為空時,并不會使用共享線程池,目前我還想不出什麼情況下,線程池會初始化失敗。

   1.4 DirectDispatcher

直接派發,也就是所有的事件全部在IO線程中執行,故其實作非常簡單:

public class DirectDispatcher implements Dispatcher {
    public static final String NAME = "direct";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}
           

   1.5 MessageOnlyDispatcher、MessageOnlyChannelHandler

   事件派發器:隻有請求事件線上程池中執行,其他響應事件、心跳,連接配接,斷開連接配接等事件在IO線程上執行,故其隻需要重寫recive方法即可:

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
           

   1.6 ConnectionOrderedDispatcher ConnectionOrderedChannelHandler

   事件派發器:連接配接、斷開連接配接事件排隊執行,并可通過connect.queue.capacity屬性設定隊列長度,請求事件、異常事件線上程池中執行。

   1.6.1 構造方法

public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }
           

   重點關注一下connectionExecutor ,用來執行連接配接、斷開事件的線程池,線程池中隻有一個線程,并且隊列可以選擇時有界隊列,通過connect.queue.capacity屬性配置,超過的事件,則拒絕執行。

   1.6.2 ConnectionOrderedChannelHandler#connected

public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
           

   檢查隊列長度,如果超過警告值,則輸出警告資訊,然後送出連接配接線程池中執行,disconnected事件類似。其他received、caught事件,則與AllDispatcher相同,就不在重複。

   總結:本文主要是分析闡述了Dubbo Dispatch機制,但與官方文檔存在出入,先歸納如下:

Dispatch

   所有的sent事件方法、心跳請求全部在IO線程上執行。

   1、all : 除sent事件回調方法、心跳外,全部線上程池上執行。

   2、execution : 與all類似,唯一區就是all線上程池未指定時,可以使用共享線程池,這個差别等同于沒有。

   3、 message : 隻有請求事件線上程池中執行,其他在IO線程上執行。

   4、connection : 請求事件線上程池中執行,連接配接、斷開連接配接事件排隊執行(含一個線程的線程池)

   5、direct : 所有事件都在IO線程中執行。

歡迎加筆者微信号(dingwpmz),加群探讨,筆者優質專欄目錄:

1、源碼分析RocketMQ專欄(40篇+)

2、源碼分析Sentinel專欄(12篇+)

3、源碼分析Dubbo專欄(28篇+)

4、源碼分析Mybatis專欄

5、源碼分析Netty專欄(18篇+)

6、源碼分析JUC專欄

7、源碼分析Elasticjob專欄

8、Elasticsearch專欄(20篇+)

9、源碼分析MyCat專欄

繼續閱讀