1、概念
1)服務提供者逾時是指遠端調用服務的方法執行的逾時時間.
2)服務調用者逾時是指服務調用者調用遠端方法的執行逾時時間.
2、逾時設定
使用dubbo進行遠端調用的過程中,需要設定遠端調用的逾時間.逾時時間分别可以在服務的提供者配置中設定,也可以在服務調用者配置中設定,逾時時間的機關是毫秒.
1)全局逾時配置
<dubbo:consumer timeout="5000" /> 或
<dubbo:provider timeout="5000" />
2)接口及指定方法逾時配置
<dubbo:reference interhljs-string">"com.foo.BarService" timeout="2000">
<dubbo:method name="sayHello" timeout="3000" />
</dubbo:reference>
或
<dubbo:provider interhljs-string">"com.foo.BarService" timeout="2000">
<dubbo:method name="sayHello" timeout="3000" />
</dubbo:provider>
3、逾時覆寫機制
1、方法級配置别優于接口級别,即小Scope優先
2、Consumer端配置 優于 Provider配置 優于 全局配置,最後是Dubbo Hard Code的配置值
dubbo的機制是如果服務的調用者配置了逾時時間,會覆寫服務的提供者設定的逾時時間.請注意,如果服務的調用者覆寫了服務提供者的遠端方法調用逾時時間,那麼對于服務的提供者就會變得不可控,即服務的調用者控制了服務提供者方法執行的逾時時間,這對于一次遠端調用是非常不合理的,是以dubbo非常不建議在服務的調用者配置中配置服務的逾時時間.
4、逾時重試
dubbo在調用服務不成功時,預設是會重試兩次的。這樣在服務端的處理時間超過了設定的逾時時間時,就會有重複請求,比如在發郵件時,可能就會發出多份重複郵件,執行注冊請求時,就會插入多條重複的注冊資料,那麼怎麼解決逾時問題呢?如下
1)對于核心的服務中心,去除dubbo逾時重試機制,并重新評估設定逾時時間。
2)業務處理代碼必須放在服務端,用戶端隻做參數驗證和服務調用,不涉及業務流程處理
全局配置執行個體
<!-- 延遲到Spring初始化完成後,再暴露服務,服務調用逾時設定為6秒,逾時不重試-->
<dubbo:provider delay="-1" timeout="6000" retries="0"/>
5、Dubbo調用源碼分析
Dubbo協定逾時實作使用了Future模式,主要涉及類DubboInvoker,ResponseFuture, DefaultFuture。
在DubboInvoker中會判斷是同步異步還是不需要傳回值,doInvoke()方法如下
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == ) {
currentClient = clients[];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
1.如果不需要傳回值,直接使用send方法,發送出去,設定當期和線程綁定RpcContext的future為null
2.如果需要異步通信,使用request方法建構一個ResponseFuture,然後設定到和線程綁定的RpcContext中
3.如果需要同步通信,使用request方法建構一個ResponseFuture,阻塞等待請求完成
ResponseFuture.get()在請求還未處理完或未到逾時前一直是wait狀态;響應達到後,設定請求狀态,并進行notify喚醒。get()方法如下
public Object get(int timeout) throws RemotingException {
if (timeout <= ) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();//加鎖
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS); //等待timeout
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {// 用戶端逾時仍然沒有得到服務端傳回,抛出異常
throw new TimeoutException(sent > , channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
Client端的處理最終轉化成ChannelHandler接口實作上,HeaderExchangeHandler的received()接口如下
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
//服務端處理請求
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//這裡就是作為消費者的dubbo用戶端在接收到響應後,觸發通知對應等待線程的起點
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > ) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
received()方法又會調用DefaultFuture的received()方法,如下
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
用戶端調用遠端服務時,本地會生成一個DefaultFuture,調用DefaultFuture.get()擷取遠端服務傳回的結構,此方法擷取鎖,調用await方法,此時目前線程進入等待隊列,此線程會有兩種結果過:要麼逾時,抛出TimeOutException;如果被喚醒,則傳回rpc的結果,如果逾時,服務端TimeoutFilter會根據服務端timeout檢測到操作逾時,打出warn日志。
而Dubbo生成對象ResponseFuture時,在一個全局map裡通過put(ID,Future)将該次調用的唯一ID存放起來,然後傳遞給服務端,再服務端又回傳回來,通過該ID,DefaultFuture.FUTURES可以拿到具體的那個DefaultFuture對象,即阻塞請求線程的那個對象。調用它的doReceived方法,就可喚醒阻塞的線程,拿到傳回結果
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
7、案例1:用戶端逾時時間>服務端逾時時間
服務端配置:
<dubbo:service ref="xxxService" interface="com.xxx.XxxService" timeout="5000" />
用戶端配置:
<dubbo:reference id="xxxService" interhljs-string">"com.xxx.XxxService" timeout="10000" />
用戶端調用遠端服務時,本地會生成一個DefaultFuture,調用DefaultFuture.get()擷取遠端服務傳回的結構,此方法擷取鎖,調用await方法,此時目前線程進入等待隊列,此線程會有兩種結果過:要麼逾時,抛出TimeOutException;如果被喚醒,則傳回rpc的結果在服務端timeout時間為5s,如果實際的資料操作耗時7s,服務端TimeoutFilter會根據服務端timeout檢測到操作逾時,打出warn日志。在第7s,用戶端接收到資料包,用戶端timeout設定為10s>7s,DefaultFuture被喚醒,仍然可以接收到Rpc傳回值。
如果超出了10s還沒有傳回值,抛出TimeoutException。此時這個DefaultFuture逾時了,有一個線程RemotingInvocationTimeoutScan,清理所有逾時的DefaultFuture,建立一個timeoutResponse,DefaultFuture.received這樣的response就會抛出TimeoutException。
綜上所訴:當用戶端timeout值>服務端timeout值,會出現逾時日志,但是仍然可以擷取到結果。用戶端timeout逾時抛出異常時,對應逾時的Future會自動清理。
引用:https://blog.csdn.net/peerless_hero/article/details/68922880
https://blog.csdn.net/qq418517226/article/details/51906357