天天看点

Dubbo超时

1、概念

 1)服务提供者超时是指远程调用服务的方法执行的超时时间.

 2)服务调用者超时是指服务调用者调用远程方法的执行超时时间.

2、超时设置

  使用dubbo进行远程调用的过程中,需要设置远程调用的超时间.超时时间分别可以在服务的提供者配置中设置,也可以在服务调用者配置中设置,超时时间的单位是毫秒.

  1)全局超时配置

<dubbo:consumer timeout="5000" />   或   
<dubbo:provider timeout="5000" />
           

  2)接口及指定方法超时配置

<dubbo:reference interhljs-string">"com.foo.BarService" timeout="2000">
    <dubbo:method name="sayHello" timeout="3000" />
</dubbo:reference>
或
<dubbo:provider interhljs-string">"com.foo.BarService" timeout="2000">
    <dubbo:method name="sayHello" timeout="3000" />
</dubbo:provider>
           
3、超时覆盖机制

  1、方法级配置别优于接口级别,即小Scope优先

  2、Consumer端配置 优于 Provider配置 优于 全局配置,最后是Dubbo Hard Code的配置值

  dubbo的机制是如果服务的调用者配置了超时时间,会覆盖服务的提供者设置的超时时间.请注意,如果服务的调用者覆盖了服务提供者的远程方法调用超时时间,那么对于服务的提供者就会变得不可控,即服务的调用者控制了服务提供者方法执行的超时时间,这对于一次远程调用是非常不合理的,所以dubbo非常不建议在服务的调用者配置中配置服务的超时时间.

4、超时重试

  dubbo在调用服务不成功时,默认是会重试两次的。这样在服务端的处理时间超过了设定的超时时间时,就会有重复请求,比如在发邮件时,可能就会发出多份重复邮件,执行注册请求时,就会插入多条重复的注册数据,那么怎么解决超时问题呢?如下

  1)对于核心的服务中心,去除dubbo超时重试机制,并重新评估设置超时时间。

  2)业务处理代码必须放在服务端,客户端只做参数验证和服务调用,不涉及业务流程处理

  全局配置实例

<!-- 延迟到Spring初始化完成后,再暴露服务,服务调用超时设置为6秒,超时不重试-->    
<dubbo:provider delay="-1" timeout="6000" retries="0"/>
           
5、Dubbo调用源码分析

Dubbo协议超时实现使用了Future模式,主要涉及类DubboInvoker,ResponseFuture, DefaultFuture。

在DubboInvoker中会判断是同步异步还是不需要返回值,doInvoke()方法如下

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == ) {
        currentClient = clients[];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

1.如果不需要返回值,直接使用send方法,发送出去,设置当期和线程绑定RpcContext的future为null

2.如果需要异步通信,使用request方法构建一个ResponseFuture,然后设置到和线程绑定的RpcContext中

3.如果需要同步通信,使用request方法构建一个ResponseFuture,阻塞等待请求完成

ResponseFuture.get()在请求还未处理完或未到超时前一直是wait状态;响应达到后,设置请求状态,并进行notify唤醒。get()方法如下

public Object get(int timeout) throws RemotingException {
  if (timeout <= ) {
      timeout = Constants.DEFAULT_TIMEOUT;
  }
  if (! isDone()) {
      long start = System.currentTimeMillis();
      lock.lock();//加锁
      try {
          while (! isDone()) {
              done.await(timeout, TimeUnit.MILLISECONDS); //等待timeout
              if (isDone() || System.currentTimeMillis() - start > timeout) {
                  break;
              }
          }
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      } finally {
          lock.unlock();
      }
      if (! isDone()) {// 客户端超时仍然没有得到服务端返回,抛出异常
          throw new TimeoutException(sent > , channel, getTimeoutMessage(false));
      }
  }
  return returnFromResponse();
}
           

Client端的处理最终转化成ChannelHandler接口实现上,HeaderExchangeHandler的received()接口如下

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                //服务端处理请求
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
        //这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > ) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

public static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}
           

received()方法又会调用DefaultFuture的received()方法,如下

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}
           

  客户端调用远程服务时,本地会生成一个DefaultFuture,调用DefaultFuture.get()获取远程服务返回的结构,此方法获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,抛出TimeOutException;如果被唤醒,则返回rpc的结果,如果超时,服务端TimeoutFilter会根据服务端timeout检测到操作超时,打出warn日志。

  而Dubbo生成对象ResponseFuture时,在一个全局map里通过put(ID,Future)将该次调用的唯一ID存放起来,然后传递给服务端,再服务端又回传回来,通过该ID,DefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,即阻塞请求线程的那个对象。调用它的doReceived方法,就可唤醒阻塞的线程,拿到返回结果

private void doReceived(Response res) {
   lock.lock();
   try {
       response = res;
       if (done != null) {
           done.signal();
       }
   } finally {
       lock.unlock();
   }
   if (callback != null) {
       invokeCallback(callback);
   }
}
           
7、案例1:客户端超时时间>服务端超时时间

服务端配置:

<dubbo:service ref="xxxService" interface="com.xxx.XxxService"  timeout="5000"  />
           

客户端配置:

<dubbo:reference id="xxxService" interhljs-string">"com.xxx.XxxService" timeout="10000"  />
           

客户端调用远程服务时,本地会生成一个DefaultFuture,调用DefaultFuture.get()获取远程服务返回的结构,此方法获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,抛出TimeOutException;如果被唤醒,则返回rpc的结果在服务端timeout时间为5s,如果实际的数据操作耗时7s,服务端TimeoutFilter会根据服务端timeout检测到操作超时,打出warn日志。在第7s,客户端接收到数据包,客户端timeout设置为10s>7s,DefaultFuture被唤醒,仍然可以接收到Rpc返回值。

  如果超出了10s还没有返回值,抛出TimeoutException。此时这个DefaultFuture超时了,有一个线程RemotingInvocationTimeoutScan,清理所有超时的DefaultFuture,创建一个timeoutResponse,DefaultFuture.received这样的response就会抛出TimeoutException。

  综上所诉:当客户端timeout值>服务端timeout值,会出现超时日志,但是仍然可以获取到结果。客户端timeout超时抛出异常时,对应超时的Future会自动清理。

引用:https://blog.csdn.net/peerless_hero/article/details/68922880

   https://blog.csdn.net/qq418517226/article/details/51906357

继续阅读