天天看點

Dubbo逾時

1、概念

 1)服務提供者逾時是指遠端調用服務的方法執行的逾時時間.

 2)服務調用者逾時是指服務調用者調用遠端方法的執行逾時時間.

2、逾時設定

  使用dubbo進行遠端調用的過程中,需要設定遠端調用的逾時間.逾時時間分别可以在服務的提供者配置中設定,也可以在服務調用者配置中設定,逾時時間的機關是毫秒.

  1)全局逾時配置

<dubbo:consumer timeout="5000" />   或   
<dubbo:provider timeout="5000" />
           

  2)接口及指定方法逾時配置

<dubbo:reference interhljs-string">"com.foo.BarService" timeout="2000">
    <dubbo:method name="sayHello" timeout="3000" />
</dubbo:reference>
或
<dubbo:provider interhljs-string">"com.foo.BarService" timeout="2000">
    <dubbo:method name="sayHello" timeout="3000" />
</dubbo:provider>
           
3、逾時覆寫機制

  1、方法級配置别優于接口級别,即小Scope優先

  2、Consumer端配置 優于 Provider配置 優于 全局配置,最後是Dubbo Hard Code的配置值

  dubbo的機制是如果服務的調用者配置了逾時時間,會覆寫服務的提供者設定的逾時時間.請注意,如果服務的調用者覆寫了服務提供者的遠端方法調用逾時時間,那麼對于服務的提供者就會變得不可控,即服務的調用者控制了服務提供者方法執行的逾時時間,這對于一次遠端調用是非常不合理的,是以dubbo非常不建議在服務的調用者配置中配置服務的逾時時間.

4、逾時重試

  dubbo在調用服務不成功時,預設是會重試兩次的。這樣在服務端的處理時間超過了設定的逾時時間時,就會有重複請求,比如在發郵件時,可能就會發出多份重複郵件,執行注冊請求時,就會插入多條重複的注冊資料,那麼怎麼解決逾時問題呢?如下

  1)對于核心的服務中心,去除dubbo逾時重試機制,并重新評估設定逾時時間。

  2)業務處理代碼必須放在服務端,用戶端隻做參數驗證和服務調用,不涉及業務流程處理

  全局配置執行個體

<!-- 延遲到Spring初始化完成後,再暴露服務,服務調用逾時設定為6秒,逾時不重試-->    
<dubbo:provider delay="-1" timeout="6000" retries="0"/>
           
5、Dubbo調用源碼分析

Dubbo協定逾時實作使用了Future模式,主要涉及類DubboInvoker,ResponseFuture, DefaultFuture。

在DubboInvoker中會判斷是同步異步還是不需要傳回值,doInvoke()方法如下

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == ) {
        currentClient = clients[];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

1.如果不需要傳回值,直接使用send方法,發送出去,設定當期和線程綁定RpcContext的future為null

2.如果需要異步通信,使用request方法建構一個ResponseFuture,然後設定到和線程綁定的RpcContext中

3.如果需要同步通信,使用request方法建構一個ResponseFuture,阻塞等待請求完成

ResponseFuture.get()在請求還未處理完或未到逾時前一直是wait狀态;響應達到後,設定請求狀态,并進行notify喚醒。get()方法如下

public Object get(int timeout) throws RemotingException {
  if (timeout <= ) {
      timeout = Constants.DEFAULT_TIMEOUT;
  }
  if (! isDone()) {
      long start = System.currentTimeMillis();
      lock.lock();//加鎖
      try {
          while (! isDone()) {
              done.await(timeout, TimeUnit.MILLISECONDS); //等待timeout
              if (isDone() || System.currentTimeMillis() - start > timeout) {
                  break;
              }
          }
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      } finally {
          lock.unlock();
      }
      if (! isDone()) {// 用戶端逾時仍然沒有得到服務端傳回,抛出異常
          throw new TimeoutException(sent > , channel, getTimeoutMessage(false));
      }
  }
  return returnFromResponse();
}
           

Client端的處理最終轉化成ChannelHandler接口實作上,HeaderExchangeHandler的received()接口如下

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                //服務端處理請求
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
        //這裡就是作為消費者的dubbo用戶端在接收到響應後,觸發通知對應等待線程的起點
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > ) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

public static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}
           

received()方法又會調用DefaultFuture的received()方法,如下

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}
           

  用戶端調用遠端服務時,本地會生成一個DefaultFuture,調用DefaultFuture.get()擷取遠端服務傳回的結構,此方法擷取鎖,調用await方法,此時目前線程進入等待隊列,此線程會有兩種結果過:要麼逾時,抛出TimeOutException;如果被喚醒,則傳回rpc的結果,如果逾時,服務端TimeoutFilter會根據服務端timeout檢測到操作逾時,打出warn日志。

  而Dubbo生成對象ResponseFuture時,在一個全局map裡通過put(ID,Future)将該次調用的唯一ID存放起來,然後傳遞給服務端,再服務端又回傳回來,通過該ID,DefaultFuture.FUTURES可以拿到具體的那個DefaultFuture對象,即阻塞請求線程的那個對象。調用它的doReceived方法,就可喚醒阻塞的線程,拿到傳回結果

private void doReceived(Response res) {
   lock.lock();
   try {
       response = res;
       if (done != null) {
           done.signal();
       }
   } finally {
       lock.unlock();
   }
   if (callback != null) {
       invokeCallback(callback);
   }
}
           
7、案例1:用戶端逾時時間>服務端逾時時間

服務端配置:

<dubbo:service ref="xxxService" interface="com.xxx.XxxService"  timeout="5000"  />
           

用戶端配置:

<dubbo:reference id="xxxService" interhljs-string">"com.xxx.XxxService" timeout="10000"  />
           

用戶端調用遠端服務時,本地會生成一個DefaultFuture,調用DefaultFuture.get()擷取遠端服務傳回的結構,此方法擷取鎖,調用await方法,此時目前線程進入等待隊列,此線程會有兩種結果過:要麼逾時,抛出TimeOutException;如果被喚醒,則傳回rpc的結果在服務端timeout時間為5s,如果實際的資料操作耗時7s,服務端TimeoutFilter會根據服務端timeout檢測到操作逾時,打出warn日志。在第7s,用戶端接收到資料包,用戶端timeout設定為10s>7s,DefaultFuture被喚醒,仍然可以接收到Rpc傳回值。

  如果超出了10s還沒有傳回值,抛出TimeoutException。此時這個DefaultFuture逾時了,有一個線程RemotingInvocationTimeoutScan,清理所有逾時的DefaultFuture,建立一個timeoutResponse,DefaultFuture.received這樣的response就會抛出TimeoutException。

  綜上所訴:當用戶端timeout值>服務端timeout值,會出現逾時日志,但是仍然可以擷取到結果。用戶端timeout逾時抛出異常時,對應逾時的Future會自動清理。

引用:https://blog.csdn.net/peerless_hero/article/details/68922880

   https://blog.csdn.net/qq418517226/article/details/51906357

繼續閱讀