一、同步調用
預設情況下,我們通過Dubbo調用一個服務,需得等服務端執行完全部邏輯,方法才得以傳回。這個就是同步調用。
但大家是否考慮過另外一個問題,Dubbo底層網絡通信采用Netty,而Netty是異步的;那麼它是怎麼将請求轉換成同步的呢?
首先我們來看請求方,在
DubboInvoker
類中,它有三種不同的調用方式。 protected Result doInvoke(final Invocation invocation) throws Throwable {
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000);
//忽略傳回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
//異步調用
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
//同步調用
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
}
可以看到,上面的代碼有三個分支,分别是:忽略傳回值調用、異步調用和同步調用。我們重點先看
return (Result) currentClient.request(inv, timeout).get();
關于上面這句代碼,它包含兩個動作:先調用
currentClient.request
方法,通過Netty發送請求資料;然後調用其傳回值的
get
方法,來擷取傳回值。
1、發送請求
這一步主要是将請求方法封裝成Request對象,通過Netty将資料發送到服務端,然後傳回一個
DefaultFuture
對象。
public ResponseFuture request(Object request, int timeout) throws RemotingException {
//如果用戶端已斷開連接配接
if (closed) {
throw new RemotingException(".......");
}
//封裝請求資訊
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
//建構DefaultFuture對象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
//通過Netty發送網絡資料
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
如上代碼,邏輯很清晰。關于看它的傳回值是一個
DefaultFuture
對象,我們再看它的構造方法。
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter("timeout", 1000);
//目前Future和請求資訊的映射
FUTURES.put(id, this);
//目前Channel和請求資訊的映射
CHANNELS.put(id, channel);
}
在這裡,我們必須先對Future有所了解。Future模式是多線程開發中非常常見的一種設計模式,在這裡我們傳回這個對象後,調用其get方法來獲得傳回值。
2、擷取傳回值
我們接着看get方法。
public Object get(int timeout) throws RemotingException {
//設定預設逾時時間
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
//判斷 如果操作未完成
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
//通過加鎖、等待
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
//傳回資料
return returnFromResponse();
}
//擷取傳回值response
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == 30 || res.getStatus() == 31) {
throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
如上代碼,我們重點來看
get
方法。我們總結下它的運作流程:
- 判斷逾時時間,小于0則設定預設值
- 判斷操作是否已完成,即response是否為空;如果已完成,擷取傳回值,并傳回
- 如果操作未完成,加鎖、等待;獲得通知後,再次判斷操作是否完成。若完成,擷取傳回值,并傳回。
那麼我們就會想到兩個問題,response在哪裡被指派、await在哪裡被通知。
在Netty讀取到網絡資料後,其中會調用到
HeaderExchangeHandler
中的方法,我們來看一眼就明白了。
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
//處理傳回資訊
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
上面說的很清楚,如果response 不為空,并且不是心跳資料,就調用
DefaultFuture.received
,在這個方法裡面,主要就是根據傳回資訊的ID找到對應的Future,然後通知。
public static void received(Channel channel, Response response)
try {
//根據傳回資訊中的ID找到對應的Future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
//通知方法
future.doReceived(response);
} else {
logger.warn("......");
}
} finally {
//處理完成,删除Future
CHANNELS.remove(response.getId());
}
}
future.doReceived(response);
就很簡單了,它就回答了我們上面的那兩個小問題。指派response和await通知。
private void doReceived(Response res) {
lock.lock();
try {
//指派response
response = res;
if (done != null) {
//通知方法
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
通過以上方式,Dubbo就完成了同步調用。我們再總結下它的整體流程:
- 将請求封裝為Request對象,并建構DefaultFuture對象,請求ID和Future對應。
- 通過Netty發送Request對象,并傳回DefaultFuture對象。
- 調用
等待資料回傳完成。DefaultFuture.get()
- 服務端處理完成,Netty處理器接收到傳回資料,通知到DefaultFuture對象。
- get方法傳回,擷取到傳回值。
二、異步調用
如果想使用異步調用的方式,我們就得配置一下。在消費者端配置檔案中
<dubbo:reference id="infoUserService"
interface="com.viewscenes.netsupervisor.service.InfoUserService"
async="true"/>
然後我們再看它的實作方法
if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
}
可以看到,它同樣是通過
currentClient.request
傳回的Future對象,但并未調用其get方法;而是将Future對象封裝成FutureAdapter,然後設定到
RpcContext.getContext()
RpcContext是Dubbo中的一個上下文資訊,它是一個 ThreadLocal 的臨時狀态記錄器。我們重點看它的
setFuture
方法。
public class RpcContext {
private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private Future<?> future;
public void setFuture(Future<?> future) {
this.future = future;
}
}
userService.sayHello("Jack");
Future<Object> future = RpcContext.getContext().getFuture();
System.out.println("服務傳回消息:"+future.get());
// 此調用會立即傳回null
fooService.findFoo(fooId);
// 拿到調用的Future引用,當結果傳回後,會被通知和設定到此Future
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
// 此調用會立即傳回null
barService.findBar(barId);
// 拿到調用的Future引用,當結果傳回後,會被通知和設定到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 此時findFoo和findBar的請求同時在執行,用戶端不需要啟動多線程來支援并行,而是借助NIO的非阻塞完成
// 如果foo已傳回,直接拿到傳回值,否則線程wait住,等待foo傳回後,線程會被notify喚醒
Foo foo = fooFuture.get();
// 同理等待bar傳回
Bar bar = barFuture.get();
// 如果foo需要5秒傳回,bar需要6秒傳回,實際隻需等6秒,即可擷取到foo和bar,進行接下來的處理。
7人點贊
Dubbo源碼解析