前言
本章基于dubbo2.7.6版本,分析rpc調用流程。
筆者将一次rpc同步調用拆分為三個階段:
1)使用者代碼執行rpc方法,consumer發送rpc請求給provider
2)provider處理rpc請求響應consumer
3)consumer收到響應,傳回使用者代碼
在總結部分整理了rpc調用流程。
發送rpc請求
代理層
InvokerInvocationHandler#invoke:
1)把rpc方法的關鍵資訊,都包裝為一個RpcInvocation貫穿Invoker#invoke;
這個RpcInvocation就不細看了,目的無非是将遠端調用需要的資訊都封裝為一個pojo,
類似于我們平常寫業務代碼的時候aop用的MethodInvocation。
2)執行代理Invoker,一直通到DubboInvoker;
3)result.recreate:如果發生rpc異常,抛出,否則傳回rpc方法傳回值;
Cluster層
ClusterInterceptor
在上一章提到過,Cluster#join傳回的Invoker會被ClusterInterceptor激活擴充點包一層,但是ClusterInterceptor并沒有實作Invoker,是以要用擴充卡模式包一遍。
AbstractCluster内部類InterceptorInvokerNode,負責适配ClusterInterceptor實作到Invoker。
protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
private AbstractClusterInvoker<T> clusterInvoker;
private ClusterInterceptor interceptor;
private AbstractClusterInvoker<T> next;
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 前置攔截
interceptor.before(next, invocation);
// 執行
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
// ...
throw e;
} finally {
// 後置攔截
interceptor.after(next, invocation);
}
return asyncResult.whenCompleteWithContext((r, t) -> {
// rpc請求完成回調ClusterInterceptor.Listener
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
if (t == null) {
listener.onMessage(r, clusterInvoker, invocation);
} else {
listener.onError(t, clusterInvoker, invocation);
}
}
});
}
}
複制代碼
ConsumerContextClusterInterceptor:負責建立和清理rpc上下文,配合實作隐式傳參特性。
- before:發起rpc請求前,建立rpc請求上下文,清除rpc響應上下文;
- after:發起rpc請求完成,還未收到rpc響應,清除rpc請求上下文;
- onMessage:收到rpc響應,且響應成功,建立rpc響應上下文;
- onError:rpc響應失敗,什麼都不做;
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
@Override
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
// 建立rpc【請求】上下文
RpcContext.getContext()
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
// 清除rpc【響應】上下文
RpcContext.removeServerContext();
}
@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
// 清除rpc【請求】上下文
RpcContext.removeContext();
}
@Override
public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
// 建立rpc【響應】上下文
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
}
@Override
public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {
}
}
複制代碼
其中rpc請求上下文對應RpcContext.LOCAL,rpc響應上下文對應RpcContext.SERVER_LOCAL。
對于服務調用方,
RpcContext.LOCAL的生命周期,是一次rpc請求,使用者也可以在rpc方法調用前,主動初始化RpcContext.LOCAL實作隐式傳參;
RpcContext.SERVER_LOCAL的生命周期很長,是一次rpc響應到下一次rpc請求之前,也就是說使用者可以在rpc方法調用結束後一直使用本次rpc調用的響應上下文,直至發生下次rpc調用。
RpcContext中的InternalThreadLocal是借鑒了Netty的FastThreadLocal( juejin.cn/post/694493… ) ,用數組代替hashmap實作的threadlocal,裡面javadoc都和FastThreadLocal如出一轍。
ClusterInvoker
AbstractClusterInvoker#invoke:本質上ClusterInvoker是根據一系列條件,從所有服務提供者Invoker中選擇其中一個Invoker執行。
1)Directory#list:列出Invocation對應Invokers(路由)
2)AbstractClusterInvoker#initLoadBalance:根據url參數找LoadBalance擴充,預設RandomLoadBalance
3)AbstractClusterInvoker#doInvoke:子類實作,會有不同的叢集容錯方式
路由
RegistryDirectory#doList:經過RouterChain路由過濾後傳回Invoker集合。
需要注意的是,如果reference指定多個group,則不包含路由邏輯,直接傳回所有Invoker。
在服務訂閱之前,RegistryProtocol會用訂閱url構造RouterChain注入RegistryDirectory。
RouterChain構造時會根據訂閱url擷取RouterFactory激活擴充點,通過RouterFactory#getRouter建立Router對象。
RouterChain#route:循環所有Router,過濾invokers。
注意:invokers可以認為是記憶體系統資料庫(rpc服務級别),隻有注冊中心providers清單變更,這裡才會更新,rpc期間不強依賴注冊中心的遠端系統資料庫。
預設情況下會有四個Router:
1)MockInvokersSelector:本地僞裝特性,忽略
2)TagRouter:根據tag過濾Invoker
支援三種模式配置tag路由,優先級從高到低:
a)配置中心路由規則;b)RpcContext指定tag;c)reference指定tag
比如reference指定tag為gray。
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setTag("gray");
複制代碼
TagRouter優先會找tag=gray的provider,如果找不到tag=gray的provider,會取tag=null的provider。
3)ServiceRouter:基于rpc服務的路由,需要結合配置中心
4)AppRouter:基于應用的路由,需要結合配置中心
叢集容錯
FailoverClusterInvoker是預設叢集容錯實作。
如果發生故障,重新選擇Invoker進行遠端調用,最多會調用3次(retry=2)。
AbstractClusterInvoker#select:在實際選擇invoker前,優先走了一圈粘滞連接配接特性。
AbstractClusterInvoker#doSelect:
1)如果invokers隻有一個,直接傳回第一個
2)調用loadbalance執行一次選擇,傳回invoker
2-1)invoker已經被選過或者invoker不可用,再次進行一次選擇
2-2)否則傳回loadbalance的選擇,結束
3)重新選擇invoker,如果仍然為空,兜底傳回上次invoker的下一個invoker
AbstractClusterInvoker#reselect邏輯如下:
1)擷取未選擇過的可用invokers,再次執行loadbalance
2)如果第一類invokers為空,把已選過的可用invokers也考慮進去(對于failover來說,就是之前執行失敗的invoker),再次執行loadbalance
負載均衡
當經過路由後還存在多個invokers,往往就需要loadbalance通過特定算法找到其中一個invoker。
預設實作是RandomLoadBalance,随機選擇invoker。
需要注意的是,大部分LoadBalance實作都有權重邏輯。
AbstractLoadBalance#getWeight:擷取運作時不同provider對應invoker的權重
預設每個invoker權重都是100,基于provider的啟動時間和預熱時間可能會減少權重。
Protocol層
經過Cluster+Directory篩選,最終剩下一個服務提供者的ProtocolInvoker。
以Dubbo協定為例,最終會進入DubboInvoker
在進入DubboInvoker之前會經過一系列group=consumer的Filter,這裡略過。
在AbstractProtocol#refer引用時,主動在DubboInvoker外部包裝了AsyncToSyncInvoker。
AsyncToSyncInvoker#invoke調用協定Invoker,待底層協定Invoker傳回,如果是同步調用,阻塞等待rpc響應。
DubboInvoker繼承自AbstractInvoker抽象類。
AbstractInvoker#invoke做一些通用控制,比如填充RpcInvocation、異常封裝到Result等等,底層調用DubboInvoker#doInvoke。
DubboProtocol#doInvoke:
1)getCallbackExecutor:決策處理rpc響應的線程池;
2)ExchangeClient#request:将Invocation、逾時時間、處理rpc響應線程池送出給通訊層,得到Future傳回;
對于普通同步調用來說,getCallbackExecutor每次會傳回一個新的ThreadlessExecutor。
ThreadlessExecutor的javadoc如下:
ThreadlessExecutor不管理任何線程,如果任務被送出到這個executor,不會馬上被排程。
直到某個線程調用waitAndDrain方法,該線程立即執行這個任務。
這部分我們到第三階段再分析。
通訊層
一般情況下,rpc架構業務線程和io線程都是分開的。
為了解決這個問題,一般設計方式都遵循如下規則:
1)【業務線程】為request配置設定唯一id
2)【業務線程】将request-id和request-future緩存到全局map
3)【定時線程】對于全局map做定時掃描,如果逾時,用指定線程池執行future的回調
4)【io線程】發送請求到io線程
5)【io線程】io線程收到響應,從全局map根據響應裡的request-id拿到request-future,送出到6線程
6)【rpc響應線程(異步)/業務線程(同步)】執行future的回調
HeaderExchangeChannel#request:這裡做了1234步,第四步就是channel#send。
首先Request建立時配置設定了自增id,作為rpc請求id(1)。
public class Request {
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId;
public Request() {
mId = newId();
}
private static long newId() {
return INVOKE_ID.getAndIncrement();
}
}
複制代碼
DefaultFuture#newFuture:建立DefaultFuture執行個體,送出逾時檢測任務(3)。
DefaultFuture構造時将request-id和自己放到全局map中(2)。
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
FUTURES.put(id, this); // 核心
CHANNELS.put(id, channel);
}
複制代碼
序列化
DubboProtocol#initClient:在建立用戶端連接配接時,Dubbo協定序列化方式為dubbo。
最終Request都會走DubboCountCodec進行序列化,序列化細節忽略。
同步調用阻塞等待
DubboInvoker将rpc請求送出到通訊層後,直接傳回。
在AsyncToSyncInvoker中由于是同步調用,走AsyncRpcResult#get阻塞等待rpc響應。
對于同步調用AsyncRpcResult#get會主動調用ThreadlessExecutor#waitAndDrain。
ThreadlessExecutor#waitAndDrain:阻塞等待rpc響應任務投遞,當響應到來,目前線程執行rpc響應任務。
之是以這個Executor的行為不同,是因為重寫了execute方法。
當沒有調用waitAndDrain執行任務前,任務會投遞到queue中。
至于rpc響應,放到第三階段繼續看。
處理rpc請求
反序列化
同樣,反序列化也是走DubboCountCodec,差別在于Request的data部分不是一個普通的RpcInvocation。
rpc請求體會被反序列化為一個DecodeableRpcInvocation,繼承自RpcInvocation。
DecodeableRpcInvocation是個特殊的Invocation,特點在于能夠自己根據輸入流反序列化。
通訊層
AllChannelHandler#received:收到反序列化之後的Request,根據url找業務線程池執行rpc請求。
在暴露階段,provider對于每個端口開啟一個業務線程池,處理業務。
預設采用固定大小200+無隊列+日志列印拒絕政策的線程池。
HeaderExchangeHandler#handleRequest:執行ChannelEventRunnable
1)送出到ExchangeHandler實作,執行Invocation調用
2)注冊future回調,當業務處理完畢,再将rpc響應送出到io線程,響應用戶端
接下來的流程,就是根據Request找到Invoker,經過層層包裝,調用到rpc服務實作。
Protocol層
DubboProtocol#requestHandler:
1)getInvoker:根據Invocation找Invoker
2)執行Invoker
找Invoker
DubboProtocol#getInvoker:根據Invocation資訊拼接serviceKey,定位到之前暴露的DubboExporter,傳回DubboExporter持有的Invoker。
Filter
接下來經過一系列group=provider的Filter。
介紹一個比較關鍵的ContextFilter。
在進入下遊invoker前,将Invocation中的關鍵資訊注入RpcContext請求rpc上下文中。
在rpc服務方法執行完畢後,将RpcContext響應rpc上下文中關鍵資訊,注入rpc響應。
代理層
接下來會來到Rpc服務代理層,暴露階段會通過ProxyFactory建立rpc服務實作代理,預設javassist實作。
AbstractProxyInvoker#invoke對于目标rpc方法執行包了一層,異步适配、異常處理等等。
對于javassist來說執行動态生成的Wrapper#invokeMethod,動态生成的Wrapper在上一章提到過,不再贅述。
注意:這裡proxy實際上是target,即rpc服務實作類執行個體。
HeaderExchangeHandler#handleRequest:再複述一次
當rpc服務方法執行完畢,future完成後會将response送出到io線程,進入三階段。
處理rpc響應
反序列化
rpc響應模型Response:
1)mId:對應request-id
2)mResult:DecodeableRpcResult執行個體
Response.mResult是DecodeableRpcResult執行個體。
DecodeableRpcResult實作和DecodeableRpcInvocation類似,自己支援反序列化,繼承了AppResponse。
通訊層
AllChannelHandler#received:
目前是io線程(netty),根據入參message(Response)擷取rpc響應處理線程池,送出ChannelEventRunnable任務到對應線程池。
WrappedChannelHandler#getPreferredExecutorService:
根據請求id找到挂起的DefaultFuture,再根據DefaultFuture找到綁定的線程池。
對于同步調用,這裡就是ThreadlessExecutor。
io線程将ChannelEventRunnable,送出到ThreadlessExecutor之後,就能喚醒使用者線程,實作同步調用 。
Protocol層
HeaderExchangeHandler繼續處理,調用DefaultFuture#received。
DefaultFuture#received:從全局futures中移除挂起請求
1)取消逾時檢測
2)完成future
DefaultFuture#doReceived:future以正常或異常完成。
AsyncToSyncInvoker調用Result#get從阻塞中傳回。
代理層
最後回到rpc服務代理。
InvokerInvocationHandler#invoke:AsyncRpcResult#recreate擷取結果。
AsyncRpcResult#recreate:對于同步調用,走底層AppResponse的recreate。
AppResponse#recreate:調用正常,傳回反序列化後的result。
總結
rpc流程
一次同步rpc調用流程大緻如下:
階段一(consumer):
1)代理層:使用者代碼進入rpc服務代理,收集rpc調用必要參數,封裝RpcInvocation
2)Cluster層:執行Interceptor,經過路由、容錯、負載均衡最終標明一個provider
3)Protocol層:
- 執行Filter
- 建構Request并緩存到記憶體map,背景線程掃描map校驗Request是否逾時
- 送出Request到io線程,阻塞等待Response
4)通訊層:Request序列化,發送Request給對端
階段二(provider):
1)通訊層:Request反序列化,得到RpcInvocation,送出到業務線程池并注冊future回調,回調時将rpc傳回值封裝為Response并序列化送出到io線程
2)Protocol層:通過RpcInvocation找到之前暴露的Exporter,找到Exporter對應的Invoker,執行Filter
3)代理層:調用rpc方法實作
階段三(consumer):
1)通訊層:将Response反序列化,送出任務到業務線程池
2)Protocol層:根據請求id反向找到挂起的future,取消逾時檢測,設定future執行結果
3)代理層:解析future傳回結果,傳回使用者代碼
一些細節
實作同步rpc調用
rpc調用業務線程和io線程一般是獨立的,是以有如下設計:
1)【業務線程】為request配置設定唯一id
2)【業務線程】将request-id和request-future緩存到全局map
3)【定時線程】對于全局map做定時掃描,如果逾時,用指定線程池執行future的回調
4)【io線程】發送請求到io線程
5)【io線程】io線程收到響應,從全局map根據響應裡的request-id拿到request-future,送出到6線程
6)【rpc響應線程(異步)/業務線程(同步)】執行future的回調
一般架構底層通訊都會設計成異步,然後同步去适配異步。
不例外,dubbo設計了ThreadlessExecutor。
本質上ThreadlessExecutor是一個阻塞隊列适配了ExecutorService。
業務線程會阻塞等待隊列非空(queue.take),io線程收到response投遞請求到阻塞隊列,喚醒業務線程,進而實作同步調用。
負載均衡算法
之前對預設負載均衡算法有一點誤解,認為是純粹的随機算法,實際上包含權重邏輯。
而權重邏輯中會包含warmup減權,預設warmup時長是10分鐘,權重是100。