天天看點

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

作者:程式猿凱撒

一、引言

對于 Java 開發者而言,關于 dubbo ,我們一般當做黑盒來進行使用,不需要去打開這個黑盒。

但随着目前程式員行業的發展,我們有必要打開這個黑盒,去探索其中的奧妙。

本期 dubbo 源碼解析系列文章,将帶你領略 dubbo 源碼的奧秘

本期源碼文章吸收了之前 Spring、Kakfa、JUC源碼文章的教訓,将不再一行一行的帶大家分析源碼,我們将一些不重要的部分當做黑盒處理,以便我們更快、更有效的閱讀源碼。

雖然現在是網際網路寒冬,但乾坤未定,你我皆是黑馬!

廢話不多說,發車!

二、服務調用流程

1、消費端

上一篇文章,講解了我們的消費端如何訂閱我們服務端注冊到 Zookeeper 的服務接口:從源碼全面解析 dubbo 服務訂閱的來龍去脈

既然消費端已經知道了我們的服務資訊,那麼下一步就要開始正式調用了

我們先從消費端聊聊服務調用的流程

1.1 動态代理的回調

我們聊到消費端訂閱服務時,最終建立的代碼如下:

java複制代碼public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
           

相信看過 動态代理 的小夥伴應該知道,當我們調用 代理 的接口時,實際上走的是 InvokerInvocationHandler 該類的 invoke 方法

java複制代碼public Object invoke(Object proxy, Method method, Object[] args){
    // 擷取方法名=getUserById
    String methodName = method.getName();
    // 擷取參數
    Class<?>[] parameterTypes = method.getParameterTypes();
    
    // 組裝成 RpcInvocation 進行調用
    RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
    
    // 執行調用方法
    return InvocationUtil.invoke(invoker, rpcInvocation);
}
           

這裡我們重點介紹下 RpcInvocation 的幾個參數:

  • serviceModel(Consumer):決定了服務的調用方式,包括使用哪種協定、注冊中心擷取服務清單、負載均衡和容錯政策等。
  • method.getName:getUserById
  • invoker.getInterface().getName:com.common.service.IUserService
  • protocolServiceKey:com.common.service.IUserService:dubbo
  • method.getParameterTypes:方法的入參類型(Long)
  • args:方法的入參值(2)

我們繼續往下看 InvocationUtil.invoke 做了什麼

java複制代碼public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
    URL url = invoker.getUrl();
    String serviceKey = url.getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);
    
    return invoker.invoke(rpcInvocation).recreate();
}

// 判斷目前的是應用注冊還是接口注冊
public Result invoke(Invocation invocation) throws RpcException {
    if (currentAvailableInvoker != null) {
        if (step == APPLICATION_FIRST) {
            if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                return invoker.invoke(invocation);
            }
            return decideInvoker().invoke(invocation);
        }
        return currentAvailableInvoker.invoke(invocation);
    }
}
           

我們繼續往下追源碼

1.2 過濾器

java複制代碼// 過濾器責任鍊模式
// 依次周遊,執行順序:
public interface FilterChainBuilder {
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
        asyncResult = filter.invoke(nextNode, invocation);
    }
}
           

這裡會依次周遊所有的 filter:

  • ConsumerContextFilter:将消費者端的資訊(遠端位址、應用名、服務名)傳遞給服務提供者端
  • ConsumerClassLoaderFilter:将消費者端的ClassLoader傳遞給服務提供者端,以便服務提供者端可以在調用時使用相同的ClassLoader加載類。
  • FutureFilter:異步調用
  • MonitorFilter:統計服務調用資訊(調用次數、平均響應時間、失敗次數)
  • RouterSnapshotFilter:動态路由,它可以根據路由規則選擇服務提供者,并緩存路由結果,以提高性能。

具體每個過濾器怎麼實作的,這裡就不展開講了,後面有機會單獨出一章

1.3 路由邏輯

當我們的責任鍊完成之後,下一步會經過我們的 路由 邏輯

java複制代碼public Result invoke(final Invocation invocation) throws RpcException {
    // 
    List<Invoker<T>> invokers = list(invocation);
    InvocationProfilerUtils.releaseDetailProfiler(invocation);

    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    return doInvoke(invocation, invokers, loadbalance);      
}
           

其中 List<Invoker<T>> invokers = list(invocation) 這裡就是我們的路由邏輯:

java複制代碼List<Invoker<T>> invokers = list(invocation);

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
}

public List<Invoker<T>> doList(BitList<Invoker<T>> invokers, Invocation invocation) {
    // 這裡就是我們的路由政策!!!
    List<Invoker<T>> result = routerChain.route(getConsumerUrl(), invokers, invocation);
    return result == null ? BitList.emptyList() : result;
}
           

這裡的路由政策比較多,我舉兩個比較經典的:

  • simpleRoute(簡單路由政策):預設的路由政策
  • routeAndPrint(自定義路由政策):我們可以自定義其路由邏輯

而對于整體路由的流程:

  • 擷取可用的服務提供者清單
  • 過濾出符合條件的服務提供者
  • 對過濾後的服務提供者清單進行排序
  • 得到符合規定的服務提供者資訊

到這裡,我們路由會把符合要求的 服務端 給篩選出來,接下來就進入我們的負載均衡環節了

1.4 重試次數

這裡我們設定 retries 為 5

java複制代碼@DubboReference(protocol = "dubbo", timeout = 100, retries = 5)
private IUserService iUserService;
           

我們看下源碼裡面有幾次調用:根據源碼來看,我們會有 5+1 次調用

java複制代碼int len = calculateInvokeTimes(methodName);
for (int i = 0; i < len; i++) {}

private int calculateInvokeTimes(String methodName) {
    // 擷取目前的重試次數+1
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    RpcContext rpcContext = RpcContext.getClientAttachment();
    Object retry = rpcContext.getObjectAttachment(RETRIES_KEY);
    if (retry instanceof Number) {
        len = ((Number) retry).intValue() + 1;
        rpcContext.removeAttachment(RETRIES_KEY);
    }
    if (len <= 0) {
        len = 1;
    }

    return len;
}
           

我們直接 Debug 一下看看:

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

1.5 負載均衡

這一行 LoadBalance loadbalance = initLoadBalance(invokers, invocation) 得到我們的負載均衡政策,預設情況下如下:

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

我們可以看到,預設情況下是 RandomLoadBalance 随機負載。

我們繼續往下追源碼:

java複制代碼public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
    
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        // 如果是重新調用的,要去更新下Invoker,防止服務端發生了變化
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // 再次校驗
            checkInvokers(copyInvokers, invocation);
        }
        // 負載均衡邏輯!!!
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getServiceContext().setInvokers((List) invoked);
        boolean success = false;
        try {
            Result result = invokeWithContext(invoker, invocation);
            success = true;
            return result;
        } 
    }
}
           

這裡我簡單将下負載均衡的邏輯:

java複制代碼Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected){
    // 如果隻有一個服務端,那還負載均衡個屁
    // 直接校驗下OK不OK直接傳回就好
    if (invokers.size() == 1) {
        Invoker<T> tInvoker = invokers.get(0);
        checkShouldInvalidateInvoker(tInvoker);
        return tInvoker;
    }
    // 如果多個服務端,需要執行負載均衡算法
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    return invoker;
}
           

Dubbo 裡面的負載均衡算法如下:

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

這裡也就不一介紹了,正常情況下,我們采用的都是 RandomLoadBalance 負載均衡

當然這裡部落客介紹另外一個寫法,也是我們業務中使用的

1.4.1 自定義負載均衡

上面我們看到,通過 LoadBalance loadbalance = initLoadBalance(invokers, invocation) ,我們可以得到一個負載均衡的實作類

在我們的生産場景中,不同的叢集上含有不同的合作方,我們需要根據合作方去分發不同叢集的調用

這個時候,我們可以重寫我們的 LoadBalance ,在裡面重寫我們 doSelect 的邏輯,而這裡的 叢集A 也就是我們的 group

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

1.6 調用服務

當我們完成下面的流程:過濾器 ---> 路由 ---> 重試 ---> 負載均衡,就到了下面這行:

java複制代碼Result result = invokeWithContext(invoker, invocation)
           

我們繼續往下追:

java複制代碼public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 加讀寫鎖
        lock.readLock().lock();
        return invoker.invoke(invocation);
    } finally {
        lock.readLock().unlock();
    }
}
           

我們直接追到 AbstractInvoker 的 invoke 方法

java複制代碼public Result invoke(Invocation inv) throws RpcException {
    RpcInvocation invocation = (RpcInvocation) inv;

    // 配置RPCinvocation
    prepareInvocation(invocation);

    // 調用RPC同時同步傳回結果
    AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);

    // 等待傳回結果
    waitForResultIfSync(asyncResult, invocation);

    return asyncResult;
}
           

我們可以看到,對于調用服務來說,一共分為一下三步:

  • 配置 RPCinvocation
  • 調用 RPC 同步傳回結果
  • 等待傳回結果

1.6.1 配置 RPCinvocation

這裡主要将 Invocation 轉變成 RPCInvocation

  • 設定 RpcInvocation 的 Invoker 屬性,指明該調用是由哪個 Invoker 發起的
  • 目前線程的一些狀态資訊
  • 同步調用、異步調用
  • 異步調用生成一個唯一的調用 ID
  • 選擇序列化的類型
java複制代碼private void prepareInvocation(RpcInvocation inv) {
    // 設定 RpcInvocation 的 Invoker 屬性,指明該調用是由哪個 Invoker 發起的
    inv.setInvoker(this);
    
	// 目前線程的一些狀态資訊
    addInvocationAttachments(inv);

    // 同步調用、異步調用
    inv.setInvokeMode(RpcUtils.getInvokeMode(url, inv));

    // 異步調用生成一個唯一的調用 ID
    RpcUtils.attachInvocationIdIfAsync(getUrl(), inv);

    // 選擇序列化的類型
    Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DefaultSerializationSelector.getDefaultRemotingSerialization()));
    if (serializationId != null) {
        inv.put(SERIALIZATION_ID_KEY, serializationId);
    }
}
           

1.6.2 調用 RPC 同步傳回結果

java複制代碼private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
    asyncResult = (AsyncRpcResult) doInvoke(invocation);
}

protected Result doInvoke(final Invocation invocation){
    // 擷取逾時時間
    int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);
   
    // 設定逾時時間
    invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
    
    // 從dubbo線程池中拿出一個線程
    ExecutorService executor = getCallbackExecutor(getUrl(), inv);
    // request:進行調用
	CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
    FutureContext.getContext().setCompatibleFuture(appResponseFuture);
    AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
    result.setExecutor(executor);
    return result;
}
           

這裡的 currentClient.request 進行請求的發送:

java複制代碼public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor){
    return client.request(request, timeout, executor);
}

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor){
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    channel.send(req);
    return future;
}
           

這裡的 channel.send(req) 是 dubbo 自己包裝的 channel,我們去看看其實作

當然,我們這裡如果看過部落客 Netty 源碼文章的話,實際可以猜到,肯定是封裝了 Netty 的 channel

java複制代碼public void send(Object message, boolean sent) throws RemotingException {
        // 校驗目前的Channel是否關閉
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // channel 寫入并重新整理
            // channel:io.netty.channel.Channel
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // 等待逾時的時間
                // 超過時間會報錯
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            // 這裡如果報錯了,就會走重試的邏輯
            Throwable cause = future.cause();
    }
}
           

1.6.3 等待傳回結果

java複制代碼waitForResultIfSync(asyncResult, invocation);

private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
    // 判斷目前的調用是不是同步調用
    // 異步調用直接傳回即可
    if (InvokeMode.SYNC != invocation.getInvokeMode()) {
        return;
    }
    
    // 擷取逾時時間 
    Object timeoutKey = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
    long timeout = RpcUtils.convertToNumber(timeoutKey, Integer.MAX_VALUE);

    // 等待timeout時間
    // 擷取失敗-直接抛出異常
    asyncResult.get(timeout, TimeUnit.MILLISECONDS);
}

public Result get(long timeout, TimeUnit unit){
    // 擷取響應傳回的資料-等待timeout時間
    return responseFuture.get(timeout, unit);
}
           

如果沒有異常,如下圖所示:

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

到這裡我們的消費端調用服務的整個流程源碼剖析就完畢了~

三、流程

高清圖檔可私聊部落客

從源碼全面解析 dubbo 消費端服務調用的來龍去脈

四、總結

魯迅先生曾說:獨行難,衆行易,和志同道合的人一起進步。彼此毫無保留的分享經驗,才是對抗網際網路寒冬的最佳選擇。

其實很多時候,并不是我們不夠努力,很可能就是自己努力的方向不對,如果有一個人能稍微指點你一下,你真的可能會少走幾年彎路。

繼續閱讀