天天看點

Dubbo源碼分析(十)同步調用與異步調用

一、同步調用

預設情況下,我們通過Dubbo調用一個服務,需得等服務端執行完全部邏輯,方法才得以傳回。這個就是同步調用。

但大家是否考慮過另外一個問題,Dubbo底層網絡通信采用Netty,而Netty是異步的;那麼它是怎麼将請求轉換成同步的呢?

首先我們來看請求方,在

DubboInvoker

類中,它有三種不同的調用方式。

protected Result doInvoke(final Invocation invocation) throws Throwable {
    
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000);
        
        //忽略傳回值
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        //異步調用
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout);
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        //同步調用
        } else {
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    }
}
           

可以看到,上面的代碼有三個分支,分别是:忽略傳回值調用、異步調用和同步調用。我們重點先看

return (Result) currentClient.request(inv, timeout).get();

關于上面這句代碼,它包含兩個動作:先調用

currentClient.request

方法,通過Netty發送請求資料;然後調用其傳回值的

get

方法,來擷取傳回值。

1、發送請求

這一步主要是将請求方法封裝成Request對象,通過Netty将資料發送到服務端,然後傳回一個

DefaultFuture

對象。

public ResponseFuture request(Object request, int timeout) throws RemotingException {

    //如果用戶端已斷開連接配接
    if (closed) {
        throw new RemotingException(".......");
    }
    //封裝請求資訊
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);
    
    //建構DefaultFuture對象
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        //通過Netty發送網絡資料
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
           

如上代碼,邏輯很清晰。關于看它的傳回值是一個

DefaultFuture

對象,我們再看它的構造方法。

public DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : 
                channel.getUrl().getPositiveParameter("timeout", 1000);
    //目前Future和請求資訊的映射
    FUTURES.put(id, this);
    //目前Channel和請求資訊的映射
    CHANNELS.put(id, channel);
}
           

在這裡,我們必須先對Future有所了解。Future模式是多線程開發中非常常見的一種設計模式,在這裡我們傳回這個對象後,調用其get方法來獲得傳回值。

2、擷取傳回值

我們接着看get方法。

public Object get(int timeout) throws RemotingException {
    //設定預設逾時時間
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    //判斷 如果操作未完成
    if (!isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            //通過加鎖、等待
            while (!isDone()) {
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (!isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    //傳回資料
    return returnFromResponse();
}

//擷取傳回值response
private Object returnFromResponse() throws RemotingException {
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        return res.getResult();
    }
    if (res.getStatus() == 30 || res.getStatus() == 31) {
        throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage());
    }
    throw new RemotingException(channel, res.getErrorMessage());
}
           

如上代碼,我們重點來看

get

方法。我們總結下它的運作流程:

  • 判斷逾時時間,小于0則設定預設值
  • 判斷操作是否已完成,即response是否為空;如果已完成,擷取傳回值,并傳回
  • 如果操作未完成,加鎖、等待;獲得通知後,再次判斷操作是否完成。若完成,擷取傳回值,并傳回。

那麼我們就會想到兩個問題,response在哪裡被指派、await在哪裡被通知。

在Netty讀取到網絡資料後,其中會調用到

HeaderExchangeHandler

中的方法,我們來看一眼就明白了。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    
    //處理傳回資訊
    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}
           

上面說的很清楚,如果response 不為空,并且不是心跳資料,就調用

DefaultFuture.received

,在這個方法裡面,主要就是根據傳回資訊的ID找到對應的Future,然後通知。

public static void received(Channel channel, Response response)     
    try {
        //根據傳回資訊中的ID找到對應的Future
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //通知方法
            future.doReceived(response);
        } else {
            logger.warn("......");
        }
    } finally {
        //處理完成,删除Future
        CHANNELS.remove(response.getId());
    }
}
           

future.doReceived(response);

就很簡單了,它就回答了我們上面的那兩個小問題。指派response和await通知。

private void doReceived(Response res) {
    lock.lock();
    try {
        //指派response
        response = res;
        if (done != null) {
            //通知方法
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}
           

通過以上方式,Dubbo就完成了同步調用。我們再總結下它的整體流程:

  • 将請求封裝為Request對象,并建構DefaultFuture對象,請求ID和Future對應。
  • 通過Netty發送Request對象,并傳回DefaultFuture對象。
  • 調用

    DefaultFuture.get()

    等待資料回傳完成。
  • 服務端處理完成,Netty處理器接收到傳回資料,通知到DefaultFuture對象。
  • get方法傳回,擷取到傳回值。

二、異步調用

如果想使用異步調用的方式,我們就得配置一下。在消費者端配置檔案中

<dubbo:reference id="infoUserService" 
        interface="com.viewscenes.netsupervisor.service.InfoUserService" 
    async="true"/>
           

然後我們再看它的實作方法

if (isAsync) {
    ResponseFuture future = currentClient.request(inv, timeout);
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();
} 
           

可以看到,它同樣是通過

currentClient.request

傳回的Future對象,但并未調用其get方法;而是将Future對象封裝成FutureAdapter,然後設定到

RpcContext.getContext()

RpcContext是Dubbo中的一個上下文資訊,它是一個 ThreadLocal 的臨時狀态記錄器。我們重點看它的

setFuture

方法。

public class RpcContext {
    
    private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
    
    private Future<?> future;
    
    public void setFuture(Future<?> future) {
        this.future = future;
    }
}
           
userService.sayHello("Jack");

Future<Object> future = RpcContext.getContext().getFuture();

System.out.println("服務傳回消息:"+future.get());
           
// 此調用會立即傳回null
fooService.findFoo(fooId);
// 拿到調用的Future引用,當結果傳回後,會被通知和設定到此Future
Future<Foo> fooFuture = RpcContext.getContext().getFuture(); 
 
// 此調用會立即傳回null
barService.findBar(barId);
// 拿到調用的Future引用,當結果傳回後,會被通知和設定到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture(); 
 
// 此時findFoo和findBar的請求同時在執行,用戶端不需要啟動多線程來支援并行,而是借助NIO的非阻塞完成
 
// 如果foo已傳回,直接拿到傳回值,否則線程wait住,等待foo傳回後,線程會被notify喚醒
Foo foo = fooFuture.get(); 
// 同理等待bar傳回
Bar bar = barFuture.get(); 
 
// 如果foo需要5秒傳回,bar需要6秒傳回,實際隻需等6秒,即可擷取到foo和bar,進行接下來的處理。
           

7人點贊

Dubbo源碼解析