天天看點

Dubbo源碼之rpc的調用流程分析

作者:小滿隻想睡覺

前言

本章基于dubbo2.7.6版本,分析rpc調用流程。

筆者将一次rpc同步調用拆分為三個階段:

1)使用者代碼執行rpc方法,consumer發送rpc請求給provider

2)provider處理rpc請求響應consumer

3)consumer收到響應,傳回使用者代碼

在總結部分整理了rpc調用流程。

發送rpc請求

Dubbo源碼之rpc的調用流程分析

代理層

InvokerInvocationHandler#invoke:

1)把rpc方法的關鍵資訊,都包裝為一個RpcInvocation貫穿Invoker#invoke;

這個RpcInvocation就不細看了,目的無非是将遠端調用需要的資訊都封裝為一個pojo,

類似于我們平常寫業務代碼的時候aop用的MethodInvocation。

2)執行代理Invoker,一直通到DubboInvoker;

3)result.recreate:如果發生rpc異常,抛出,否則傳回rpc方法傳回值;

Dubbo源碼之rpc的調用流程分析

Cluster層

ClusterInterceptor

在上一章提到過,Cluster#join傳回的Invoker會被ClusterInterceptor激活擴充點包一層,但是ClusterInterceptor并沒有實作Invoker,是以要用擴充卡模式包一遍。

AbstractCluster内部類InterceptorInvokerNode,負責适配ClusterInterceptor實作到Invoker。

protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
    private AbstractClusterInvoker<T> clusterInvoker;
    private ClusterInterceptor interceptor;
    private AbstractClusterInvoker<T> next;
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        try {
            // 前置攔截
            interceptor.before(next, invocation);
            // 執行
            asyncResult = interceptor.intercept(next, invocation);
        } catch (Exception e) {
            // ...
            throw e;
        } finally {
            // 後置攔截
            interceptor.after(next, invocation);
        }
        return asyncResult.whenCompleteWithContext((r, t) -> {
            // rpc請求完成回調ClusterInterceptor.Listener
            if (interceptor instanceof ClusterInterceptor.Listener) {
                ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                if (t == null) {
                    listener.onMessage(r, clusterInvoker, invocation);
                } else {
                    listener.onError(t, clusterInvoker, invocation);
                }
            }
        });
    }
}
複制代碼           

ConsumerContextClusterInterceptor:負責建立和清理rpc上下文,配合實作隐式傳參特性。

  • before:發起rpc請求前,建立rpc請求上下文,清除rpc響應上下文;
  • after:發起rpc請求完成,還未收到rpc響應,清除rpc請求上下文;
  • onMessage:收到rpc響應,且響應成功,建立rpc響應上下文;
  • onError:rpc響應失敗,什麼都不做;
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {

    @Override
    public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 建立rpc【請求】上下文
        RpcContext.getContext()
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        // 清除rpc【響應】上下文
        RpcContext.removeServerContext();
    }

    @Override
    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
        // 清除rpc【請求】上下文
        RpcContext.removeContext();
    }

    @Override
    public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 建立rpc【響應】上下文
        RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
    }

    @Override
    public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {

    }
}
複制代碼           

其中rpc請求上下文對應RpcContext.LOCAL,rpc響應上下文對應RpcContext.SERVER_LOCAL。

對于服務調用方,

RpcContext.LOCAL的生命周期,是一次rpc請求,使用者也可以在rpc方法調用前,主動初始化RpcContext.LOCAL實作隐式傳參;

RpcContext.SERVER_LOCAL的生命周期很長,是一次rpc響應到下一次rpc請求之前,也就是說使用者可以在rpc方法調用結束後一直使用本次rpc調用的響應上下文,直至發生下次rpc調用。

Dubbo源碼之rpc的調用流程分析

RpcContext中的InternalThreadLocal是借鑒了Netty的FastThreadLocal( juejin.cn/post/694493… ) ,用數組代替hashmap實作的threadlocal,裡面javadoc都和FastThreadLocal如出一轍。

Dubbo源碼之rpc的調用流程分析

ClusterInvoker

AbstractClusterInvoker#invoke:本質上ClusterInvoker是根據一系列條件,從所有服務提供者Invoker中選擇其中一個Invoker執行。

Dubbo源碼之rpc的調用流程分析

1)Directory#list:列出Invocation對應Invokers(路由)

Dubbo源碼之rpc的調用流程分析

2)AbstractClusterInvoker#initLoadBalance:根據url參數找LoadBalance擴充,預設RandomLoadBalance

Dubbo源碼之rpc的調用流程分析

3)AbstractClusterInvoker#doInvoke:子類實作,會有不同的叢集容錯方式

路由

RegistryDirectory#doList:經過RouterChain路由過濾後傳回Invoker集合。

需要注意的是,如果reference指定多個group,則不包含路由邏輯,直接傳回所有Invoker。

Dubbo源碼之rpc的調用流程分析

在服務訂閱之前,RegistryProtocol會用訂閱url構造RouterChain注入RegistryDirectory。

Dubbo源碼之rpc的調用流程分析

RouterChain構造時會根據訂閱url擷取RouterFactory激活擴充點,通過RouterFactory#getRouter建立Router對象。

Dubbo源碼之rpc的調用流程分析

RouterChain#route:循環所有Router,過濾invokers。

注意:invokers可以認為是記憶體系統資料庫(rpc服務級别),隻有注冊中心providers清單變更,這裡才會更新,rpc期間不強依賴注冊中心的遠端系統資料庫。

Dubbo源碼之rpc的調用流程分析

預設情況下會有四個Router:

1)MockInvokersSelector:本地僞裝特性,忽略

2)TagRouter:根據tag過濾Invoker

支援三種模式配置tag路由,優先級從高到低:

a)配置中心路由規則;b)RpcContext指定tag;c)reference指定tag

比如reference指定tag為gray。

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setTag("gray");
複制代碼           

TagRouter優先會找tag=gray的provider,如果找不到tag=gray的provider,會取tag=null的provider。

Dubbo源碼之rpc的調用流程分析

3)ServiceRouter:基于rpc服務的路由,需要結合配置中心

Dubbo源碼之rpc的調用流程分析

4)AppRouter:基于應用的路由,需要結合配置中心

Dubbo源碼之rpc的調用流程分析

叢集容錯

FailoverClusterInvoker是預設叢集容錯實作。

如果發生故障,重新選擇Invoker進行遠端調用,最多會調用3次(retry=2)。

Dubbo源碼之rpc的調用流程分析

AbstractClusterInvoker#select:在實際選擇invoker前,優先走了一圈粘滞連接配接特性。

Dubbo源碼之rpc的調用流程分析

AbstractClusterInvoker#doSelect:

1)如果invokers隻有一個,直接傳回第一個

2)調用loadbalance執行一次選擇,傳回invoker

2-1)invoker已經被選過或者invoker不可用,再次進行一次選擇

2-2)否則傳回loadbalance的選擇,結束

3)重新選擇invoker,如果仍然為空,兜底傳回上次invoker的下一個invoker

Dubbo源碼之rpc的調用流程分析

AbstractClusterInvoker#reselect邏輯如下:

1)擷取未選擇過的可用invokers,再次執行loadbalance

2)如果第一類invokers為空,把已選過的可用invokers也考慮進去(對于failover來說,就是之前執行失敗的invoker),再次執行loadbalance

Dubbo源碼之rpc的調用流程分析

負載均衡

當經過路由後還存在多個invokers,往往就需要loadbalance通過特定算法找到其中一個invoker。

預設實作是RandomLoadBalance,随機選擇invoker。

需要注意的是,大部分LoadBalance實作都有權重邏輯。

Dubbo源碼之rpc的調用流程分析

AbstractLoadBalance#getWeight:擷取運作時不同provider對應invoker的權重

預設每個invoker權重都是100,基于provider的啟動時間和預熱時間可能會減少權重。

Dubbo源碼之rpc的調用流程分析

Protocol層

經過Cluster+Directory篩選,最終剩下一個服務提供者的ProtocolInvoker。

以Dubbo協定為例,最終會進入DubboInvoker

在進入DubboInvoker之前會經過一系列group=consumer的Filter,這裡略過。

在AbstractProtocol#refer引用時,主動在DubboInvoker外部包裝了AsyncToSyncInvoker。

Dubbo源碼之rpc的調用流程分析

AsyncToSyncInvoker#invoke調用協定Invoker,待底層協定Invoker傳回,如果是同步調用,阻塞等待rpc響應。

Dubbo源碼之rpc的調用流程分析

DubboInvoker繼承自AbstractInvoker抽象類。

AbstractInvoker#invoke做一些通用控制,比如填充RpcInvocation、異常封裝到Result等等,底層調用DubboInvoker#doInvoke。

Dubbo源碼之rpc的調用流程分析

DubboProtocol#doInvoke:

1)getCallbackExecutor:決策處理rpc響應的線程池;

2)ExchangeClient#request:将Invocation、逾時時間、處理rpc響應線程池送出給通訊層,得到Future傳回;

Dubbo源碼之rpc的調用流程分析

對于普通同步調用來說,getCallbackExecutor每次會傳回一個新的ThreadlessExecutor。

Dubbo源碼之rpc的調用流程分析

ThreadlessExecutor的javadoc如下:

Dubbo源碼之rpc的調用流程分析

ThreadlessExecutor不管理任何線程,如果任務被送出到這個executor,不會馬上被排程。

直到某個線程調用waitAndDrain方法,該線程立即執行這個任務。

這部分我們到第三階段再分析。

通訊層

一般情況下,rpc架構業務線程和io線程都是分開的。

為了解決這個問題,一般設計方式都遵循如下規則:

1)【業務線程】為request配置設定唯一id

2)【業務線程】将request-id和request-future緩存到全局map

3)【定時線程】對于全局map做定時掃描,如果逾時,用指定線程池執行future的回調

4)【io線程】發送請求到io線程

5)【io線程】io線程收到響應,從全局map根據響應裡的request-id拿到request-future,送出到6線程

6)【rpc響應線程(異步)/業務線程(同步)】執行future的回調

HeaderExchangeChannel#request:這裡做了1234步,第四步就是channel#send。

Dubbo源碼之rpc的調用流程分析

首先Request建立時配置設定了自增id,作為rpc請求id(1)。

public class Request {
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    private final long mId;
    public Request() {
        mId = newId();
    }
    private static long newId() {
        return INVOKE_ID.getAndIncrement();
    }
}
複制代碼           

DefaultFuture#newFuture:建立DefaultFuture執行個體,送出逾時檢測任務(3)。

Dubbo源碼之rpc的調用流程分析

DefaultFuture構造時将request-id和自己放到全局map中(2)。

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : 
        channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    FUTURES.put(id, this); // 核心
    CHANNELS.put(id, channel);
}
複制代碼           

序列化

DubboProtocol#initClient:在建立用戶端連接配接時,Dubbo協定序列化方式為dubbo。

Dubbo源碼之rpc的調用流程分析

最終Request都會走DubboCountCodec進行序列化,序列化細節忽略。

Dubbo源碼之rpc的調用流程分析

同步調用阻塞等待

DubboInvoker将rpc請求送出到通訊層後,直接傳回。

在AsyncToSyncInvoker中由于是同步調用,走AsyncRpcResult#get阻塞等待rpc響應。

Dubbo源碼之rpc的調用流程分析

對于同步調用AsyncRpcResult#get會主動調用ThreadlessExecutor#waitAndDrain。

Dubbo源碼之rpc的調用流程分析

ThreadlessExecutor#waitAndDrain:阻塞等待rpc響應任務投遞,當響應到來,目前線程執行rpc響應任務。

Dubbo源碼之rpc的調用流程分析

之是以這個Executor的行為不同,是因為重寫了execute方法。

當沒有調用waitAndDrain執行任務前,任務會投遞到queue中。

至于rpc響應,放到第三階段繼續看。

Dubbo源碼之rpc的調用流程分析

處理rpc請求

反序列化

同樣,反序列化也是走DubboCountCodec,差別在于Request的data部分不是一個普通的RpcInvocation。

rpc請求體會被反序列化為一個DecodeableRpcInvocation,繼承自RpcInvocation。

Dubbo源碼之rpc的調用流程分析

DecodeableRpcInvocation是個特殊的Invocation,特點在于能夠自己根據輸入流反序列化。

Dubbo源碼之rpc的調用流程分析

通訊層

AllChannelHandler#received:收到反序列化之後的Request,根據url找業務線程池執行rpc請求。

Dubbo源碼之rpc的調用流程分析

在暴露階段,provider對于每個端口開啟一個業務線程池,處理業務。

Dubbo源碼之rpc的調用流程分析

預設采用固定大小200+無隊列+日志列印拒絕政策的線程池。

Dubbo源碼之rpc的調用流程分析

HeaderExchangeHandler#handleRequest:執行ChannelEventRunnable

1)送出到ExchangeHandler實作,執行Invocation調用

2)注冊future回調,當業務處理完畢,再将rpc響應送出到io線程,響應用戶端

Dubbo源碼之rpc的調用流程分析

接下來的流程,就是根據Request找到Invoker,經過層層包裝,調用到rpc服務實作。

Dubbo源碼之rpc的調用流程分析

Protocol層

DubboProtocol#requestHandler:

1)getInvoker:根據Invocation找Invoker

2)執行Invoker

Dubbo源碼之rpc的調用流程分析

找Invoker

DubboProtocol#getInvoker:根據Invocation資訊拼接serviceKey,定位到之前暴露的DubboExporter,傳回DubboExporter持有的Invoker。

Dubbo源碼之rpc的調用流程分析

Filter

接下來經過一系列group=provider的Filter。

介紹一個比較關鍵的ContextFilter。

在進入下遊invoker前,将Invocation中的關鍵資訊注入RpcContext請求rpc上下文中。

Dubbo源碼之rpc的調用流程分析

在rpc服務方法執行完畢後,将RpcContext響應rpc上下文中關鍵資訊,注入rpc響應。

Dubbo源碼之rpc的調用流程分析

代理層

接下來會來到Rpc服務代理層,暴露階段會通過ProxyFactory建立rpc服務實作代理,預設javassist實作。

Dubbo源碼之rpc的調用流程分析

AbstractProxyInvoker#invoke對于目标rpc方法執行包了一層,異步适配、異常處理等等。

Dubbo源碼之rpc的調用流程分析

對于javassist來說執行動态生成的Wrapper#invokeMethod,動态生成的Wrapper在上一章提到過,不再贅述。

注意:這裡proxy實際上是target,即rpc服務實作類執行個體。

Dubbo源碼之rpc的調用流程分析

HeaderExchangeHandler#handleRequest:再複述一次

當rpc服務方法執行完畢,future完成後會将response送出到io線程,進入三階段。

Dubbo源碼之rpc的調用流程分析

處理rpc響應

反序列化

rpc響應模型Response:

1)mId:對應request-id

2)mResult:DecodeableRpcResult執行個體

Dubbo源碼之rpc的調用流程分析

Response.mResult是DecodeableRpcResult執行個體。

DecodeableRpcResult實作和DecodeableRpcInvocation類似,自己支援反序列化,繼承了AppResponse。

Dubbo源碼之rpc的調用流程分析

通訊層

AllChannelHandler#received:

目前是io線程(netty),根據入參message(Response)擷取rpc響應處理線程池,送出ChannelEventRunnable任務到對應線程池。

Dubbo源碼之rpc的調用流程分析

WrappedChannelHandler#getPreferredExecutorService:

根據請求id找到挂起的DefaultFuture,再根據DefaultFuture找到綁定的線程池。

對于同步調用,這裡就是ThreadlessExecutor。

Dubbo源碼之rpc的調用流程分析

io線程将ChannelEventRunnable,送出到ThreadlessExecutor之後,就能喚醒使用者線程,實作同步調用 。

Dubbo源碼之rpc的調用流程分析

Protocol層

HeaderExchangeHandler繼續處理,調用DefaultFuture#received。

Dubbo源碼之rpc的調用流程分析

DefaultFuture#received:從全局futures中移除挂起請求

1)取消逾時檢測

2)完成future

Dubbo源碼之rpc的調用流程分析

DefaultFuture#doReceived:future以正常或異常完成。

Dubbo源碼之rpc的調用流程分析

AsyncToSyncInvoker調用Result#get從阻塞中傳回。

Dubbo源碼之rpc的調用流程分析

代理層

最後回到rpc服務代理。

InvokerInvocationHandler#invoke:AsyncRpcResult#recreate擷取結果。

Dubbo源碼之rpc的調用流程分析

AsyncRpcResult#recreate:對于同步調用,走底層AppResponse的recreate。

Dubbo源碼之rpc的調用流程分析

AppResponse#recreate:調用正常,傳回反序列化後的result。

Dubbo源碼之rpc的調用流程分析

總結

rpc流程

一次同步rpc調用流程大緻如下:

Dubbo源碼之rpc的調用流程分析

階段一(consumer):

1)代理層:使用者代碼進入rpc服務代理,收集rpc調用必要參數,封裝RpcInvocation

2)Cluster層:執行Interceptor,經過路由、容錯、負載均衡最終標明一個provider

3)Protocol層:

  • 執行Filter
  • 建構Request并緩存到記憶體map,背景線程掃描map校驗Request是否逾時
  • 送出Request到io線程,阻塞等待Response

4)通訊層:Request序列化,發送Request給對端

階段二(provider):

1)通訊層:Request反序列化,得到RpcInvocation,送出到業務線程池并注冊future回調,回調時将rpc傳回值封裝為Response并序列化送出到io線程

2)Protocol層:通過RpcInvocation找到之前暴露的Exporter,找到Exporter對應的Invoker,執行Filter

3)代理層:調用rpc方法實作

階段三(consumer):

1)通訊層:将Response反序列化,送出任務到業務線程池

2)Protocol層:根據請求id反向找到挂起的future,取消逾時檢測,設定future執行結果

3)代理層:解析future傳回結果,傳回使用者代碼

一些細節

實作同步rpc調用

rpc調用業務線程和io線程一般是獨立的,是以有如下設計:

1)【業務線程】為request配置設定唯一id

2)【業務線程】将request-id和request-future緩存到全局map

3)【定時線程】對于全局map做定時掃描,如果逾時,用指定線程池執行future的回調

4)【io線程】發送請求到io線程

5)【io線程】io線程收到響應,從全局map根據響應裡的request-id拿到request-future,送出到6線程

6)【rpc響應線程(異步)/業務線程(同步)】執行future的回調

一般架構底層通訊都會設計成異步,然後同步去适配異步。

不例外,dubbo設計了ThreadlessExecutor。

本質上ThreadlessExecutor是一個阻塞隊列适配了ExecutorService。

Dubbo源碼之rpc的調用流程分析

業務線程會阻塞等待隊列非空(queue.take),io線程收到response投遞請求到阻塞隊列,喚醒業務線程,進而實作同步調用。

負載均衡算法

之前對預設負載均衡算法有一點誤解,認為是純粹的随機算法,實際上包含權重邏輯。

而權重邏輯中會包含warmup減權,預設warmup時長是10分鐘,權重是100。

Dubbo源碼之rpc的調用流程分析

繼續閱讀