天天看点

Dubbo 超时与重试的实现分析转载:https://blog.csdn.net/Revivedsun/article/details/72851417重试的实现超时的判断参考

转载:https://blog.csdn.net/Revivedsun/article/details/72851417

重试的实现

当消费端发起一次调用,如果集群容错模式选择的是FailoverCluster模式(缺省模式),当调用发生失败会自动发起切换,重试其它服务器。

<dubbo:reference>
    <dubbo:method name="findFoo" retries="2" />
</dubbo:reference>
           

FailoverCluster模式的实现是在 

com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker<T>

该类的实现如下,最终的调用是通过doInvoke方法完成。

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + ;
        if (len <= ) {
            len = ;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = ; i < len; i++) {
            //重试时,进行重新选择,避免重试时invoker列表已发生变化.
            //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
            if (i > ) {
                checkWheatherDestoried();
                copyinvokers = list(invocation);
                //重新检查一下
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : , "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }
}
           

首先获取使用者配置的重试次数,如果未配置,则默认为1。

int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + ;
        if (len <= ) {
            len = ;
        }
           

接着通过循环完成重试调用。调用Result result = invoker.invoke(invocation);返回com.alibaba.dubbo.rpc.RpcResult,这个类内部封装了返回结果,抛出的异常,以及附件(attachments)信息。

private Object                   result;

    private Throwable                exception;

    private Map<String, String>      attachments = new HashMap<String, String>();
           

如果进行调用,服务方抛出异常,那么异常会记录在Result的exception成员中,如果调用时消费端检测到超时或网络异常等那么消费端将会抛出异常,并在如下代码中进行重试。

for (int i = ; i < len; i++) {
            ......
            ......
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    ......
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : , "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
           

当产生异常,且异常code为BIZ_EXCEPTION,那么也不会进行重试。

com.alibaba.dubbo.rpc.RpcException定义的异常Code有如下几种类型。

public static final int UNKNOWN_EXCEPTION = ;

    public static final int NETWORK_EXCEPTION = ;

    public static final int TIMEOUT_EXCEPTION = ;

    public static final int BIZ_EXCEPTION = ;

    public static final int FORBIDDEN_EXCEPTION = ;

    public static final int SERIALIZATION_EXCEPTION = ;
           

通过查看RpcException的setCode调用关系可以看到产BIZ_EXCEPTION的情况。

最后重试实现可总结为:通过循环重复调用方法,如果调用得到响应,则正常返回,产生的异常作为结果的成员。如果得不到响应如超时,网络异常,序列化失败等问题则去尝试重试。

超时的判断

在重试的实现中可以看到,进行方法调用最终是调用invoker的doInvoke方法完成最终调用。 

例如: 

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(Invocation)

方法内部发出请求后通过future的get方法获取结果。

return (Result) currentClient.request(inv, timeout).get();     
           

get记录开始时间start,然后不断循环检查是否得到了返回结果,如果在设定的超时时间内没有返回结果,那么跳出循环,并抛出超时异常。如果在指定的超时时间内得到响应则返回结果。

public Object get(int timeout) throws RemotingException {
        if (timeout <= ) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > , channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
           

通过超时判定实现我们发现,当一个任务处理时间很久,而消费端配置的超时时间又很短就会出现消费端产生超时异常,而服务提供方却成功完成了操作的现象。因此对服务提供方的处理时间做出规划,配置合理的超时时间,或通过回调方法返回结果给消费端。这样避免消费端发生超时异常,而服务提供方处理完成的问题。

参考

dubbo 2.8.4 源码

转载:https://blog.csdn.net/angry_tiger/article/details/51073459

Dubbo的超时重试机制为服务容错、服务稳定提供了比较好的框架支持,但是在一些比较特殊的网络环境下(网络传输慢,并发多)可能

由于服务响应慢,Dubbo自身的超时重试机制(服务端的处理时间超过了设定的超时时间时,就会有重复请求)可能会带来一些麻烦。

        常见的应用场景故障:  1、发送邮件(重复) ;2、账户注册(重复).。

        解决方案:

                       1.对于核心的服务中心,去除dubbo超时重试机制,并重新评估设置超时时间。

                            (1)、去掉超时重试机制  

                                  <dubbo:provider delay="-1" timeout="6000"  retries="0"/> 

                            (2)、重新评估设置超时时间

                                  <dubbo:service inter ref="*"  timeout="延长服务时间"/>

                      2.业务处理代码必须放在服务端,客户端只做参数验证和服务调用,不涉及业务流程处理。

继续阅读