天天看點

Dubbo NettyServer 消息分發政策

 這篇文章的目标是分析清楚Dubbo NettyServer的消息分發政策,會分析Handler的封裝和調用過程,最後分析Dubbo NettyServer支援的所有分發政策。

 前半部分會講解清楚Handler的封裝流程和調用過程,具體的關系如下圖。我們關注 MultiMessageHandler => HeartbeatHandler => AllChannelHandler的封裝過程及對應的調用過程。

 後半部分會分析Dubbo NettyServer支援的所有分發政策及對應處理方法,這裡會涉及到Dubbo業務線程池的解析。

Dubbo NettyServer 消息分發政策

ChannelHandler封裝過程

  • Dubbo Protocol 服務釋出 篇我們已經分析了NettyServer建立的過程,這裡分析NettyServer建立過程中關于ChannelHandler的封裝過程。
public class NettyServer extends AbstractServer implements Server {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private Map<String, Channel> channels; // <ip:port, channel>

    private ServerBootstrap bootstrap;

    private org.jboss.netty.channel.Channel channel;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}           
  • NettyServer中關注ChannelHandlers.wrap()過程,這裡是ChannelHandler的封裝入口,這裡的handler對象是DecodeHandler。
public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}           
  • ChannelHandlers的wrap()方法内部實作了MultiMessageHandler => HeartbeatHandler => AllChannelHandler的封裝。
  • AllChannelHandler從擷取通過動态代理擷取方式擷取。ExtensionLoader.getExtensionLoader().getAdaptiveExtension().dispatch(handler, url)
  • 關注下Dispatcher$Adaptive的實作。内部通過SPI擷取AllChannelHandler對象。
public class Dispatcher$Adaptive implements Dispatcher {
    public ChannelHandler dispatch(ChannelHandler channelHandler, URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }

        URL uRL2 = uRL;
        String string = uRL2.getParameter("dispatcher",
                uRL2.getParameter("dispather",
                    uRL2.getParameter("channel.handler", "all")));

        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append(
                    "Failed to get extension (org.apache.dubbo.remoting.Dispatcher) name from url (")
                                                              .append(uRL2.toString())
                                                              .append(") use keys([dispatcher, dispather, channel.handler])")
                                                              .toString());
        }

        Dispatcher dispatcher = (Dispatcher) ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                            .getExtension(string);

        return dispatcher.dispatch(channelHandler, uRL);
    }
}


public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}           
  • Dispatcher$Adaptive内部擷取擴充名然後擷取Dispatcher對象。
  • uRL2.getParameter("dispatcher",

    uRL2.getParameter("dispather",

uRL2.getParameter("channel.handler", "all"))過程記錄原來錯别字dispather,預設值是all,傳回的AllDispatcher對象。

  • 至此ChannelHandler的封裝過程已經分析清楚。

Handler分析

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
}           
  • MultiMessageHandler為AbstractChannelHandlerDelegate子類,重載received()方法。
  • 其他connected()、disconnected()等方法都繼承自AbstractChannelHandlerDelegate類。
  • MultiMessageHandler類的handler對象是HeartbeatHandler,所有調用都會調用HeartbeatHandler對應的方法。

    MultiMessageHandler的handler的定義在父類AbstractChannelHandlerDelegate。

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    public static final String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";

    public static final String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";

    public HeartbeatHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        setReadTimestamp(channel);
        setWriteTimestamp(channel);
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        clearReadTimestamp(channel);
        clearWriteTimestamp(channel);
        handler.disconnected(channel);
    }
}           
  • HeartbeatHandler為AbstractChannelHandlerDelegate子類,重載connected()/disconnected()等方法。
  • HeartbeatHandler的handler對象是AllChannelHandler,所有調用都會調用AllChannelHandler對應的方法。
  • HeartbeatHandler的handler的定義在父類AbstractChannelHandlerDelegate。
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
    // handler變量儲存
    protected ChannelHandler handler;

    protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
        Assert.notNull(handler, "handler == null");
        this.handler = handler;
    }

    @Override
    public ChannelHandler getHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        }
        return handler;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        handler.disconnected(channel);
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        handler.received(channel, message);
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        handler.caught(channel, exception);
    }
}           
  • AbstractChannelHandlerDelegate類實作ChannelHandlerDelegate接口。
  • AbstractChannelHandlerDelegate包含ChannelHandler handler對象。
  • HeartbeatHandler和MultiMessageHandler的handler是在AbstractChannelHandlerDelegate中定義的。
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() 
                                           + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() +
                 " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information 
            //can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, 
            // and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                                  ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}


public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
    protected static final ExecutorService SHARED_EXECUTOR = 
       Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
                          .getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}           
  • AllChannelHandler繼承WrappedChannelHandler類,重載如connected()等方法。
  • AllChannelHandler内部的executor對象在WrappedChannelHandler類中定義。
  • ExtensionLoader.getExtensionLoader().getAdaptiveExtension().getExecutor()

    生成executor對象。

  • ExtensionLoader.getExtensionLoader().getAdaptiveExtension()傳回ThreadPool$Adaptive。
public class ThreadPool$Adaptive implements ThreadPool {
    public Executor getExecutor(URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("threadpool", "fixed");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().
             append("Failed to get extension (org.apache.dubbo.common.threadpool.ThreadPool) name from url (")
             .append(uRL2.toString()).append(") use keys([threadpool])").toString());
        }
        ThreadPool threadPool = (ThreadPool)ExtensionLoader.getExtensionLoader(ThreadPool.class)
                   .getExtension(string);
        return threadPool.getExecutor(uRL);
    }
}           
  • ThreadPool$Adaptive的通過ExtensionLoader.getExtensionLoader().getExtension()擷取ThreadPool對象。
  • 傳回的ThreadPool對象是FixedThreadPool對象。
  • WrappedChannelHandler中的executor對象是FixedThreadPool對象。
  • WrappedChannelHandler中的executor對象是業務線程池,是FixedThreadPool類型。

至此,我們了解到預設的消息分發政策為AllChannelHandler,AllChannelHandler内部的線程池executor為FixedThreadPool類型。繼續分析消息分發政策

消息分發政策

  • Dubbo中NettyServer的消息分發政策如下圖所示,各類消息分發政策有對應說明。
    Dubbo NettyServer 消息分發政策
  • 針對不同的分發政策,我們從源碼角度進行分析下。
org.apache.dubbo.remoting.Dispatcher檔案

all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher           
name handler desc
all AllChannelHandler 将所有請求、連接配接等事件都分發到ThreadPool擴充點指定的線程池中,如果ThreadPool指定的線程池已關閉或不存在則使用預設的一個cached線程池
connection ConnectionOrderedChannelHandler 将所有請求、異常都分發到ThreadPool擴充點指定的線程池中。而連接配接等事件發送到隻有一個工作線程的線程池内按調用接收順序逐個執行
direct 不使用多線程,在目前的接收線程(IO線程)上直接處理所有事件、請求
execution ExecutionChannelHandler 隻把請求類消息派發到業務線程池處理,但是響應和其它連接配接斷開事件,心跳等消息直接在IO線程上執行
message MessageOnlyChannelHandler 隻将所有請求都分發到ThreadPool擴充點指定的線程池中,事件直接由接收線程(IO線程)處理

WrappedChannelHandler

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
    protected static final ExecutorService SHARED_EXECUTOR = 
       Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    // 核心的業務線程池ExecutorService對象
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
             .getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

    public ExecutorService getExecutorService() {
        return executor == null || executor.isShutdown() ? SHARED_EXECUTOR : executor;
    }
}           
  • WrappedChannelHandler内部包含業務處理線程池executor。
  • 業務線程池通過擴充卡方法傳回的FixedThreadPool對象。

AllDispatcher

  • all : (AllDispatcher類)所有消息都派發到業務線程池,這些消息包括請求/響應/連接配接事件/斷開事件/心跳等,這些線程模型如下圖:
Dubbo NettyServer 消息分發政策
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}           
  • AllDispatcher将所有請求、事件都分發到ThreadPool擴充點指定的線程池中,如果ThreadPool指定的線程池已關閉或不存在則使用預設的一個cached線程池。
  • AllDispatcher内部傳回AllChannelHandler()對象。
  • AllDispatcher作為Dubbo預設的消息分發政策。
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + 
                    " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + 
                  " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information 
            //can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and 
            //causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                                       ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}           
  • AllChannelHandler重載了WrappedChannelHandler方法。
  • AllChannelHandler将所有請求、事件都分發到ThreadPool擴充點指定的線程池中,如果ThreadPool指定的線程池已關閉或不存在則使用預設的一個cached線程池。
  • AllChannelHandler處理了所有的請求、事件,包括連接配接等事件和具體的消息處理等。

MessageOnlyDispatcher

  • message : (MessageOnlyDispatcher類)隻有請求響應消息派發到業務線程池,其他連接配接斷開事件/心跳等消息,直接在IO線程上執行,模型圖如下:
Dubbo NettyServer 消息分發政策
public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }
}           
  • MessageOnlyDispatcher傳回的是MessageOnlyChannelHandler對象。
public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}           
  • MessageOnlyDispatcher隻将所有請求都分發到ThreadPool擴充點指定的線程池中,連接配接等事件直接由接收線程(IO線程)處理。
  • MessageOnlyDispatcher重載received()方法,用于處理所有請求。

ExecutionDispatcher

  • execution:(ExecutionDispatcher類)隻把請求類消息派發到業務線程池處理,但是響應和其它連接配接斷開事件,心跳等消息直接在IO線程上執行,模型如下圖:
Dubbo NettyServer 消息分發政策
public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}           
  • ExecutionDispatcher傳回的是ExecutionChannelHandler對象。
/**
 * Only request message will be dispatched to thread pool. Other messages like response, connect, disconnect,
 * heartbeat will be directly executed by I/O thread.
 */
public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}           
  • ExecutionChannelHandler隻把請求類消息派發到業務線程池處理,但是響應和其它連接配接斷開事件,心跳等消息直接在IO線程上執行。
  • ExecutionChannelHandler重載received()方法,隻處理Request類型的Message。

ConnectionOrderedDispatcher

  • connection:(ConnectionOrderedDispatcher類)在IO線程上,将連接配接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池處理,模型如下圖:
Dubbo NettyServer 消息分發政策
public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }
}           
  • ConnectionOrderedDispatcher傳回ConnectionOrderedChannelHandler對象。
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() 
                 + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // fix, reject exception can not be sent to consumer 
            // because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                         ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " 
               + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}           
  • ConnectionOrderedChannelHandler在IO線程上,将連接配接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池處理。
  • ConnectionOrderedChannelHandler包含兩個線程池,一個是IO線程池,一個是業務線程池。這裡的IO線程池是差別于NettyServer的Worker線程池。
  • IO線程池connectionExecutor是單個執行線程的線程池實作事件的有序處理。
  • ConnectionOrderedChannelHandler重載received()/connected()等事件。

DirectDispatcher

  • direct : (DirectDispacher類)所有消息都不派發到業務線程池,全部在IO線程上直接執行,模型如下圖:
Dubbo NettyServer 消息分發政策
public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}           
  • DirectDispatcher内部沒有任何線程池對象,所有的請求事件都由NettyServer的worker線程進行處理。

參考

Dubbo 線程池政策和線程模型