天天看點

實用技巧:Hystrix傳播ThreadLocal對象(兩種方案)

原文: http://www.itmuch.com/spring-cloud-sum/hystrix-threadlocal/ 采用 CC BY 3.0 CN 許可協定。可自由轉載、引用,但需署名作者且注明文章原文位址。

目前,Spring Cloud已在南京公司推廣開來,不僅如此,深圳那邊近期也要基于Spring Cloud新開微服務了。

于是,上司要求我出一套基于Spring Cloud的快速開發腳手架(近期開源)。在編寫腳手架的過程中,也順帶總結一下以前在項目中遇到的問題:

使用Hystrix時,如何傳播ThreadLocal對象?

我們知道,Hystrix有隔離政策:THREAD以及SEMAPHORE。

如果你不知道Hystrix的隔離政策,可以閱讀我的書籍《Spring Cloud與Docker微服務架構實戰》,或者參考文檔: https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationstrategy

引子

當隔離政策為

THREAD

時,是沒辦法拿到

ThreadLocal

中的值的。

舉個例子,使用Feign調用某個遠端API,這個遠端API需要傳遞一個Header,這個Header是動态的,跟你的HttpRequest相關,我們選擇編寫一個攔截器來實作Header的傳遞(當然也可以在Feign Client接口的方法上加

RequestHeader

)。

示例代碼:

public class KeycloakRequestInterceptor implements RequestInterceptor {

    private static final String AUTHORIZATION_HEADER = "Authorization";

    @Override
    public void apply(RequestTemplate template) {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        Principal principal = attributes.getRequest().getUserPrincipal();

        if (principal != null && principal instanceof KeycloakPrincipal) {
            KeycloakSecurityContext keycloakSecurityContext = ((KeycloakPrincipal) principal)
                    .getKeycloakSecurityContext();

            if (keycloakSecurityContext instanceof RefreshableKeycloakSecurityContext) {
                RefreshableKeycloakSecurityContext.class.cast(keycloakSecurityContext)
                        .refreshExpiredToken(true);
                template.header(AUTHORIZATION_HEADER, "Bearer " + keycloakSecurityContext.getTokenString());
            }

        }
        // 否則啥都不幹
    }
}
           

你可能不知道Keycloak是什麼,不過沒有關系,相信這段代碼并不難閱讀,該攔截器做了幾件事:

  • 使用

    RequestContextHolder.getRequestAttributes()

    靜态方法獲得Request。
  • 從Request獲得目前使用者的身份,然後使用Keycloak的API拿到Token,并扔到Header裡。
  • 這樣,Feign使用這個攔截器時,就會用你這個Header去請求了。
注:Keycloak是一個非常容易上手,并且功能強大的單點認證平台。

現實很骨感

以上代碼可完美運作——但僅限于Feign不開啟Hystrix支援時。

注:Spring Cloud Dalston以及更高版可使用

feign.hystrix.enabled=true

為Feign開啟Hystrix支援。

當Feign開啟Hystrix支援時,

ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
           

null

原因在于,Hystrix的預設隔離政策是

THREAD

。而

RequestContextHolder

源碼中,使用了兩個血淋淋的

ThreadLocal

解決方案一:調整隔離政策

将隔離政策設為SEMAPHORE即可:

hystrix.command.default.execution.isolation.strategy: SEMAPHORE
           

這樣配置後,Feign可以正常工作。

但該方案不是特别好。原因是Hystrix官方強烈建議使用THREAD作為隔離政策! 參考文檔:

Thread or Semaphore
The default, and the recommended setting, is to run

HystrixCommand

s using thread isolation (

THREAD

) and

HystrixObservableCommand

s using semaphore isolation (

SEMAPHORE

).

Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.

Generally the only time you should use semaphore isolation for

HystrixCommand

s is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

于是,那麼有沒有更好的方案呢?

解決方案二:自定義并發政策

既然Hystrix不太建議使用SEMAPHORE作為隔離政策,那麼是否有其他方案呢?答案是自定義并發政策,目前,Spring Cloud Sleuth以及Spring Security都通過該方式傳遞

ThreadLocal

對象。

下面我們來編寫自定義的并發政策。

編寫自定義并發政策

編寫自定義并發政策比較簡單,隻需編寫一個類,讓其繼承

HystrixConcurrencyStrategy

,并重寫

wrapCallable

方法即可。

代碼示例:

@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private static final Log log = LogFactory.getLog(RequestHystrixConcurrencyStrategy.class);

    public RequestHystrixConcurrencyStrategy() {
        HystrixPlugins.reset();
        HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}
           

如代碼所示,我們編寫了一個

RequestHystrixConcurrencyStrategy

,在其中:

  • wrapCallable

    方法拿到

    RequestContextHolder.getRequestAttributes()

    ,也就是我們想傳播的對象;
  • WrappedCallable

    類中,我們将要傳播的對象作為成員變量,并在其中的call方法中,為靜态方法設值。
  • 這樣,在Hystrix包裹的方法中,就可以使用

    RequestContextHolder.getRequestAttributes()

    擷取到相關屬性——也就是說,可以拿到

    RequestContextHolder

    中的

    ThreadLocal

    屬性。

經過測試,代碼能正常工作。

新的問題

至此,我們已經實作了

ThreadLocal

屬性的傳遞,然而Hystrix隻允許有一個并發政策!這意味着——如果不做任何處理,Sleuth、Spring Security将無法正常拿到上下文!(上文說過,目前Sleuth、Spring Security都是通過自定義并發政策的方式來傳遞ThreadLocal對象的。)

如何解決這個問題呢?

我們知道,Spring Cloud中,Spring Cloud Security與Spring Cloud Sleuth是可以共存的!我們不妨參考下Sleuth以及Spring Security的實作:

  • Sleuth:

    org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy

  • Spring Security:

    org.springframework.cloud.netflix.hystrix.security.SecurityContextConcurrencyStrategy

閱讀完後,你将恍然大悟——于是,我們可以模仿它們的寫法,改寫上文編寫的并發政策:

public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private static final Log log = LogFactory.getLog(RequestAttributeHystrixConcurrencyStrategy.class);

    private HystrixConcurrencyStrategy delegate;

    public RequestAttributeHystrixConcurrencyStrategy() {
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
                // Welcome to singleton hell...
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
                    .getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                    propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                    .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        }
        catch (Exception e) {
            log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
            HystrixMetricsPublisher metricsPublisher,
            HystrixPropertiesStrategy propertiesStrategy) {
        if (log.isDebugEnabled()) {
            log.debug("Current Hystrix plugins configuration is ["
                    + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
                    + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
        }
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixProperty<Integer> corePoolSize,
            HystrixProperty<Integer> maximumPoolSize,
            HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }

    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(
            HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            }
            finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

           

簡單講解下:

  • 将現有的并發政策作為新并發政策的成員變量
  • 在新并發政策中,傳回現有并發政策的線程池、Queue。

Pull Request

筆者已将該實作方式Pull Request:

https://github.com/spring-cloud/spring-cloud-netflix/pull/2509

,希望官方能夠接納,也希望在不久的将來,能夠更舒服、更爽地使用Spring Cloud。

PS. Pull Request的代碼跟部落格中的代碼略有差別,有少量簡單的優化,主要是增加了一個開關。

靈感來自