laitimes

Troubleshoot and handle online Dubbo call exceptions

author:Flash Gene

01

Brief introduction

  • suishen-esb, Dubbo's integration with Hystrix is provided;
  • Hystrix uses thread pools internally to complete specific task execution;
  • Each remote service uses a separate thread pool;
  • In the internal encapsulation, the number of core threads and the maximum number of threads in the thread pool are set to 30 by default, the waiting queue uses SynchronousQueue (waiting tasks are not accepted), and the rejection policy is AbortPolicy (an exception is thrown when the thread pool cannot accept it).
  • When the number of instantaneous concurrency exceeds the maximum number of threads, the dubbo call is executed abnormally.

02

Context of events

  • If the user reports an abnormal usage, check the logs urgently
org.apache.dubbo.rpc.RpcException: Failed to invoke the method validLoginAuthentication in the service weli.wormhole.rpc.user.center.api.IAuthenticationService. Tried 1 times of the providers [10.65.0.205:11090] (1/4) from the registry node1.zk.all.platform.wtc.hwhosts.com:2181 on the consumer 10.65.0.34 using the dubbo version 2.7.3-SNAPSHOT. Last error is: validLoginAuthentication_1 could not be queued for execution and fallback failed.  
    at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:113)
    at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:248)
    at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:78)
    at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:55)
    at org.apache.dubbo.common.bytecode.proxy14.validLoginAuthentication(proxy14.java)
    at weli.peanut.web.interceptor.VerifyLoginInterceptor.preHandle(VerifyLoginInterceptor.java:134)
    at org.springframework.web.servlet.HandlerExecutionChain.applyPreHandle(HandlerExecutionChain.java:134)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:958)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:98)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1721)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1679)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: validLoginAuthentication_1 could not be queued for execution and fallback failed.  
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:818)
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:793)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1454)
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1379)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.observers.Subscribers$5.onError(Subscribers.java:230)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.Observable.unsafeSubscribe(Observable.java:10158)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:10247)
    at rx.Observable.subscribe(Observable.java:10214)
    at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)
    at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:411)
    at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:377)
    at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:343)
    at suishen.esb.hystrix.dubbo.filter.HystrixFilter.invoke(HystrixFilter.java:46)
    at com.alibaba.dubbo.rpc.Filter.invoke(Filter.java:29)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:92)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:58)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$CallbackRegistrationInvoker.invoke(ProtocolFilterWrapper.java:157)
    at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
    at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:82)
    ... 36 common frames omitted
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1bf63b71 rejected from java.util.concurrent.ThreadPoolExecutor@6fb1f813[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 0]  
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    ... 91 common frames omitted           

According to the logs, it was found that the task was rejected due to the thread pool being full, and at first it was thought that the provider's dubbo thread pool was full, so we quickly checked the middle office logs.

{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.23 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:0,当前线程数:400,队列大小:0,任务总量:48363297,已完成任务量:48363297","logger_name":"wormholeBusiness","thread_name":"qbScheduler-6","level":"INFO","level_value":20000}
{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.44 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:2,当前线程数:400,队列大小:0,任务总量:48371189,已完成任务量:48371187","logger_name":"wormholeBusiness","thread_name":"qbScheduler-3","level":"INFO","level_value":20000}           
  • It was found that the middle office service was normal, and the idle threads of dubbo were relatively abundant;
  • Looking back at the caller's exception information, I found that the caller used Hystrix, and the exception was thrown by the thread pool inside Hystrix.
  • In this case, the nodes are urgently added, and after the service is restarted, the service starts normal.

03

Problem analysis

  • 根据日志分析,是由HystrixFilter执行了HystrixCommand.execute()造成了异常。
@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {


    public HystrixFilter() {
        ApplicationContext springContext = ApplicationContextHolder.getContext();
        if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {
            BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();
            if (beanFactory instanceof DefaultListableBeanFactory) {
                BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);
                beanDefinitionBuilder.setDestroyMethodName("preDestroy");
                beanDefinitionBuilder.setScope("singleton");


                ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());


                //触发初始化
                beanFactory.getBean(HystrixSpringService.class.getSimpleName());
            }
        }
    }


    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 异步调用使用hystrix做熔断没有意义
        if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))) {
            return invoker.invoke(invocation);
        }


        DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
        return command.execute();
    }


}           
  • When a dubbo call is synchronous, a DubboHystrixCommand is created and the Hystrix is assigned to perform the remote call.
public class DubboHystrixCommand extends HystrixCommand<Result> {


    private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
    private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;
    private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000;
    private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
    private Invoker invoker;
    private Invocation invocation;


    public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //10秒钟内请求失败上限值,超过此值熔断器发挥作用
                        .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
                        //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl()))
                        //错误率达到50开启熔断保护
                        .withCircuitBreakerErrorThresholdPercentage(50)
                        //使用dubbo的超时,禁用这里的超时
                        .withExecutionTimeoutEnabled(false))
                //根据dubbo配置设置线程池大小
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));




        this.invoker = invoker;
        this.invocation = invocation;
    }


    /**
     * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
     *
     * @param url
     * @return
     */
    private static int getFailQpsThreshold(URL url) {
        if (url != null) {
            int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
            if (logger.isDebugEnabled()) {
                logger.debug("FailQpsThreshold: " + threshold);
            }
            return threshold;
        }


        return DEFAULT_FAIL_QPS_THRESHOLD;
    }


    /**
     * 获取熔断器中断请求窗口大小
     *
     * @param url
     * @return
     */
    private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) {
        if (url != null) {
            int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS);
            if (logger.isDebugEnabled()) {
                logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds);
            }
            return circuitBreakerSleepWindowInMilliseconds;
        }


        return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;


    }


    /**
     * 获取线程池大小
     *
     * @param url
     * @return
     */
    private static int getThreadPoolCoreSize(URL url) {
        if (url != null) {
            int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
            if (logger.isDebugEnabled()) {
                logger.debug("ThreadPoolCoreSize: " + size);
            }
            return size;
        }


        return DEFAULT_THREADPOOL_CORE_SIZE;


    }


    @Override
    protected Result run() throws Exception {
        Throwable exception = null;


        Result result = null;


        try {
            result = invoker.invoke(invocation);
            exception = result.getException();
        } catch (Exception e) {
            exception = e;
        }


        // 这里打印异常是为了记录异常,再抛出异常是为了触发fallback
        if (exception != null) {
            Logs.error("Dubbo Exception: ", exception);
            throw new Exception(exception);
        }


        return result;
    }


    @Override
    protected Result getFallback() {
        return new RpcResult((Object) null);
    }
}public class DubboHystrixCommand extends HystrixCommand<Result> {


    private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
    private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;
    private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000;
    private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
    private Invoker invoker;
    private Invocation invocation;


    public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //10秒钟内请求失败上限值,超过此值熔断器发挥作用
                        .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
                        //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl()))
                        //错误率达到50开启熔断保护
                        .withCircuitBreakerErrorThresholdPercentage(50)
                        //使用dubbo的超时,禁用这里的超时
                        .withExecutionTimeoutEnabled(false))
                //根据dubbo配置设置线程池大小
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));




        this.invoker = invoker;
        this.invocation = invocation;
    }


    /**
     * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
     *
     * @param url
     * @return
     */
    private static int getFailQpsThreshold(URL url) {
        if (url != null) {
            int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
            if (logger.isDebugEnabled()) {
                logger.debug("FailQpsThreshold: " + threshold);
            }
            return threshold;
        }


        return DEFAULT_FAIL_QPS_THRESHOLD;
    }


    /**
     * 获取熔断器中断请求窗口大小
     *
     * @param url
     * @return
     */
    private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) {
        if (url != null) {
            int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS);
            if (logger.isDebugEnabled()) {
                logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds);
            }
            return circuitBreakerSleepWindowInMilliseconds;
        }


        return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;


    }


    /**
     * 获取线程池大小
     *
     * @param url
     * @return
     */
    private static int getThreadPoolCoreSize(URL url) {
        if (url != null) {
            int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
            if (logger.isDebugEnabled()) {
                logger.debug("ThreadPoolCoreSize: " + size);
            }
            return size;
        }


        return DEFAULT_THREADPOOL_CORE_SIZE;


    }


    @Override
    protected Result run() throws Exception {
        Throwable exception = null;


        Result result = null;


        try {
            result = invoker.invoke(invocation);
            exception = result.getException();
        } catch (Exception e) {
            exception = e;
        }


        // 这里打印异常是为了记录异常,再抛出异常是为了触发fallback
        if (exception != null) {
            Logs.error("Dubbo Exception: ", exception);
            throw new Exception(exception);
        }


        return result;
    }


    @Override
    protected Result getFallback() {
        return new RpcResult((Object) null);
    }
}           

Seeing this, the root problem was finally found:

  • In the internal encapsulation, the number of core threads and the maximum number of threads in the thread pool are set to 30 by default, the waiting queue uses SynchronousQueue (waiting tasks are not accepted), and the rejection policy is AbortPolicy (an exception is thrown when the thread pool cannot accept it).
  • When the number of instantaneous concurrency exceeds the maximum number of threads, the dubbo call is executed abnormally.

04

dispose

1. Modify the HystrixFilter, provide whether to use the Hystrix switch, and select synchronous execution for core interfaces such as signature verification.

@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {


    public HystrixFilter() {
        ApplicationContext springContext = ApplicationContextHolder.getContext();
        if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {
            BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();
            if (beanFactory instanceof DefaultListableBeanFactory) {
                BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);
                beanDefinitionBuilder.setDestroyMethodName("preDestroy");
                beanDefinitionBuilder.setScope("singleton");


                ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());


                //触发初始化
                beanFactory.getBean(HystrixSpringService.class.getSimpleName());
            }
        }
    }


    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 异步调用使用hystrix做熔断没有意义
        if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))
                || "false".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "hystrixOpen"))) {
            return invoker.invoke(invocation);
        }


        DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
        return command.execute();
    }


}           

2. Adjust the thread pool parameters in DubboHystrixCommand, and increase the maximum number of threads, thread active time, and waiting queue size.

public class DubboHystrixCommand extends HystrixCommand<Result> {


    private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
    private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 30;
    private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 50;
    private static final int DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES = 5;


    private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
    private Invoker invoker;
    private Invocation invocation;


    public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //10秒钟内请求失败上限值,超过此值熔断器发挥作用
                        .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
                        //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerSleepWindowInMilliseconds(30000)
                        //错误率达到50开启熔断保护
                        .withCircuitBreakerErrorThresholdPercentage(50)
                        //使用dubbo的超时,禁用这里的超时
                        .withExecutionTimeoutEnabled(false))
                //根据dubbo配置设置线程池大小
                .andThreadPoolPropertiesDefaults(getThreadPoolSetter(invoker.getUrl())));




        this.invoker = invoker;
        this.invocation = invocation;
    }


    /**
     * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
     *
     * @param url
     * @return
     */
    private static int getFailQpsThreshold(URL url) {
        if (url != null) {
            int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
            if (logger.isDebugEnabled()) {
                logger.debug("FailQpsThreshold: " + threshold);
            }
            return threshold;
        }


        return DEFAULT_FAIL_QPS_THRESHOLD;
    }


    private static HystrixThreadPoolProperties.Setter getThreadPoolSetter(URL url) {
        return HystrixThreadPoolProperties.Setter()
                .withCoreSize(getThreadPoolProperties(url, "threadPoolCoreSize", DEFAULT_THREAD_POOL_CORE_SIZE))
                .withMaximumSize(getThreadPoolProperties(url, "threadPoolMaxSize", DEFAULT_THREAD_POOL_MAX_SIZE))
                .withKeepAliveTimeMinutes(getThreadPoolProperties(url, "threadPoolKeepAliveTimeMinutes",
                        DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES));
    }


    /**
     * 获取线程池大小
     *
     * @param url
     * @return
     */
    private static int getThreadPoolProperties(URL url, String name, int defaultProperties) {
        if (url != null) {
            int size = url.getParameter(name, defaultProperties);
            if (logger.isDebugEnabled()) {
                logger.debug(name + ": " + size);
            }
            return size;
        }
        return defaultProperties;
    }


    @Override
    protected Result run() throws Exception {
        Throwable exception;


        Result result = null;
        try {
            result = invoker.invoke(invocation);
            exception = result.getException();
        } catch (Exception e) {
            exception = e;
        }


        // 有异常抛出
        if (exception != null) {
            Logs.error("dubbo exception: ", exception);
            throw new RuntimeException(exception);
        }


        return result;
    }


}           

Author | Zheng Yateng is a senior server-side development engineer

Source-WeChat public account: micro carp technical team

Source: https://mp.weixin.qq.com/s/QSTdBLQR9K2zNWTRwTFP5Q

Read on