天天看點

Hystrix 如何解決 ThreadLocal 資訊丢失

本文分享了ThreadLocal遇到Hystrix時上下文資訊傳遞的方案

本文分享 ThreadLocal 遇到 Hystrix 時上下文資訊傳遞的方案。

一、背景

筆者在業務開發中涉及到使用 ThreadLocal 來存放上下文鍊路中一些關鍵資訊,其中一些業務實作對外部接口依賴,對這些依賴接口使用了Hystrix作熔斷保護,但在使用Hystrix作熔斷保護的方法中發現了擷取 ThreadLocal 資訊與預期不一緻問題,本文旨在探讨如何解決這一問題。

二、ThreadLocal

在Java程式設計語言裡ThreadLocal是用來友善開發人員在同一線程上下文中不同類、不同方法中共享資訊的,ThreadLocal變量不受其他線程的影響,不同線程間互相隔離,也就是線程安全的。在實際的業務鍊路中從入口到具體的業務實作有時候需要共享某些通用資訊,比如使用者唯一辨別、鍊路追蹤唯一辨別等,這些資訊就可以使用ThreadLocal來存儲實作,下面就是一個簡單的同一鍊路中共享traceId的示例代碼。

public class ThreadLocalUtil {
 
    private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();
 
    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }
 
    public static String getTraceId() {
        return TRACE_ID.get();
    }
 
    public static void clearTraceId() {
        TRACE_ID.remove();
    }
}      

三、Hystrix

在分布式環境中,每個系統所依賴的外部服務不可避免的會出現失敗或逾時的情況,Hystrix 通過增加對依賴服務的延時容錯及失敗容錯邏輯,也就是所謂的「熔斷」,以幫助開發人員去靈活控制所依賴的分布式服務。

Hystrix通過隔離服務間的通路點,阻斷服務間的級聯故障,并提供降級選項,這一切都是為了提供系統整體的健壯性,在大規模分布式服務中,系統的健壯性尤其重要。Hystrix詳細的介紹可以看:Hystrix介紹

四、ThreadLocal遇上Hystrix

當業務鍊路中的具體實作有依賴外部服務,且作了相關熔斷保護,那麼本文的兩個主角就這麼遇上了。

根據Hystrix的相關文檔介紹我們了解到,Hystrix提供兩種線程隔離模式:信号量和線程池。

信号量模式下執行業務邏輯時處于同一線程上下文,而線程池模式則使用Hystrix提供的線程池去執行相關業務邏輯。在日常業務開發中更多需要熔斷的是涉及到外部網絡IO調用的(如RPC調用),Hystrix存在的一個目的就是想減少外部依賴的調用對服務容器線程的消耗,信号量模式顯然不太适合,是以我們在絕大部分場景下使用的都是線程池模式,而Hystrix預設情況下啟用的也是線程池模式。

本文想要解決的也正是在這種預設模式下才會有的問題:

1、InheritableThreadLocal

有人可能會想到是不是可以用InheritableThreadLocal去解決?

InheritableThreadLocal可以将目前線程中的線程變量資訊共享到目前線程所建立的「子線程」中,但這邊忽略了一個很重要的資訊,Hystrix中的線程模式底層使用的是自己維護的一個線程池,也就是其中的線程會出現複用的情況,那麼就會出現每個線程所共享的資訊都是之前首次擷取到的「父線程」的共享資訊,這顯然不是我們所期待的,是以InheritableThreadLocal被排除。

那麼想要在Hystrix中解決這個問題怎麼辦?

優秀的Hystrix已經幫大家提供了相關解決方案,而且是插件化,按需定制。Hystrix的插件詳細介紹請看這:Hystrix插件介紹,本文給大家介紹兩種方案。

如何讓ThreadLocal變量資訊在HystrixCommand執行時能在Hystrix線程中正确的傳遞?

2、Concurrency Strategy

使用 HystrixConcurrencyStrategy插件可以來包裝Hystrix線程所執行的方法,具體直接看示例代碼:

public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
 
    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        String traceId = ThreadLocalUtil.getTraceId();
        return () -> {
            ThreadLocalUtil.setTraceId(traceId);
            try {
                return callable.call();
            } finally {
                ThreadLocalUtil.clearTraceId();
            }
        };
    }
}
 
 
// 業務代碼中某處合适的地方注冊下目前的政策插件
HystrixPlugins.getInstance().registerConcurrencyStrategy(new MyHystrixConcurrencyStrategy());      

使用這種方式非常簡單,隻要開發人員将自己關注的ThreadLocal值進行「複制」即可,那是不是使用這種方式就行了?

我們留意到這種方式本質是針對HystrixCommand的run()方法(也就是加了@HystrixCommand注解的業務方法)攔截處理,但它可能會逾時或失敗,那麼就會去執行fallback方法,如果在 fallback方法中也想共享相關上下文資訊,這時就無法覆寫到這種場景了。

如果在你的業務中fallback不需要關注上下文資訊這塊的内容,那麼上述這種方案就可以滿足需求了,也很簡單。但如果在fallback方法中也需要上下文資訊,那麼可以使用Hystrix提供的下面這種插件方式。

3、Command Execution Hook

使用HystrixCommandExecutionHook可以實作對Hystrix執行流程的完全控制,你可以覆寫它的一些關鍵節點的回調方法,以實作你的定制需求。想要更多的了解可以看下這:Command Execution Hook介紹 ,下面列舉出HystrixCommandExecutionHook的一些常用的關鍵方法:

Hystrix 如何解決 ThreadLocal 資訊丢失

在了解上述這些關鍵方法後,可以發現實作也很簡單,隻要在onStart()的時候「複制」下關注的上下文資訊,然後在onExecutionStart()和onFallbackStart()兩個方法開始執行前「粘貼」下關注的上下文資訊,最後在作相應的清理行為,就可以滿足需求了,示例代碼如下所示:

public class MyHystrixHook extends HystrixCommandExecutionHook {
     
    private String traceId;
 
    @Override
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        copyTraceId();
    }
 
    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
 
    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
// 下面option1和option2選擇其中一種覆寫就可以了
//------------------------------------option1------------------------------------
    @Override
    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
        ThreadLocalUtil.clearTraceId();
        super.onExecutionSuccess(commandInstance);
    }
 
    @Override
    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
        ThreadLocalUtil.clearTraceId();
        return super.onExecutionError(commandInstance, e);
    }
 
    @Override
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
        ThreadLocalUtil.clearTraceId();
        super.onFallbackSuccess(commandInstance);
    }
     
     @Override
    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        ThreadLocalUtil.clearTraceId();
        return super.onFallbackError(commandInstance, e);
    }
//------------------------------------option1------------------------------------
 
//------------------------------------option2------------------------------------
        @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        ThreadLocalUtil.clearTraceId();
        super.onSuccess(commandInstance);
    }
 
    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        ThreadLocalUtil.clearTraceId();
        return super.onError(commandInstance, failureType, e);
    }
//------------------------------------option2------------------------------------
     
     private void copyTraceId() {
        this.traceId = ThreadLocalUtil.getTraceId();
    }
 
    private void pasteTraceId() {
        ThreadLocalUtil.setTraceId(traceId);
    }
 
}
 
// 業務代碼中某處合适的的地方注冊下Hook插件
HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());      

那是不是這樣的實作方式就解決問題了?仔細想下會不會有什麼問題?

我們知道HystrixCommandExecutionHook插件注冊後,所有HystrixCommand在被調用執行的時候都會經過這些覆寫的方法,也就會出現多線程覆寫traceId,那麼對于這個Hook下的traceId随時可能被改變了。假設有這樣場景:

Hystrix 如何解決 ThreadLocal 資訊丢失
  1. 調用者線程1上下文的traceId為"t1",在調用其依賴的Hystrix方法時,traceId被設為"t1"
  2. 同一時刻調用者線程2上下文的traceId為"t2",在調用其依賴的Hystrix方法時,也會觸發更改traceId為"t2"
  3. 在hystrix線程1開始執行具體業務方法時,其想「粘貼」的traceId已經被改成"t2",而不是初始調用者線程1時所設定"t1"

為了解決上面遇到的問題,Hystrix為開發人員提供了通過HystrixRequestContext和HystrixRequestVariableDefault這兩個關鍵類解決。

HystrixRequestContext用于記錄每次Hystrix請求的上下文資訊,其中有兩個關鍵資訊:

static ThreadLocal<HystrixRequestContext> requestVariables: 用于記錄每次HystrixCommand執行時的上下文。

ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state:用于記錄上下文真正的資料。

HystrixRequestVariableDefault的用法有點似于ThreadLocal,提供了get(),set()方法,具體能力的實作借助于HystrixRequestContext。

HystrixCommandExecutionHook插件終極解決方式的實作的示例代碼如下:

public class MyHystrixHook extends HystrixCommandExecutionHook {
     
    private HystrixRequestVariableDefault<String> requestVariable = new HystrixRequestVariableDefault<>();
 
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.initializeContext();
                copyTraceId();
    }
 
    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
 
    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
 
        @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        super.onSuccess(commandInstance);
    }
 
    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return super.onError(commandInstance, failureType, e);
    }
     
     private void copyTraceId() {
        requestVariable.set(ThreadLocalUtil.getTraceId());
    }
 
    private void pasteTraceId() {
        ThreadLocalUtil.setTraceId(requestVariable.get());
    }
}      

在每次Hook執行onStart()方法的時候,需要先執行HystrixRequestContext的初始化操作,然後對關注的上下文資訊進行「複制」,關鍵代碼如下:

public void set(T value) {
    HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
}      

把關注的資訊複制到一個線程相關的ConcurrentHashMap中了,根據前面對HystrixCommandExecutionHook的介紹我們知道,onStart()的時候目前線程為調用者線程;

在真正開始執行HystrixCommand業務方方法的時候,此時需要進行「粘貼」上下文資訊,從requestVariable.get()擷取,get操作關鍵代碼如下:

public T get() {
      if (HystrixRequestContext.getContextForCurrentThread() == null) {
          throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
      }
      ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
     
      // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
      LazyInitializer<?> v = variableMap.get(this);
      if (v != null) {
          return (T) v.get();
      }
 
      // 省略一部分
      ....
}      

從代碼可以看出get與set操作相對應,也是從線程相關的ConcurrentHashMap擷取相應的值,從前序介紹我們也得知目前線程是Hystrix提供的線程池線程,與調用者線程不是同一個線程,那麼這個業務關注的上下文資訊還能正确的傳遞到Hystrix線程中嗎?經過測試它确實「神奇」的正确傳遞了,那到底是怎麼做到的呢?

原來是Hystrix「默默」的幫我們做了,通過調試我們看到如下一段關鍵代碼:

this.actual = action;
    // 調用者線程HystrixRequestContext資訊
    this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
 
    this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {
 
        @Override
        public Void call() throws Exception {
            HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
            try {
                // 幫我們做了一步拷貝操作
                HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                // 開始真正的執行業務定義的方法,此時上下文資訊已經一緻了
                actual.call();
                return null;
            } finally {
                HystrixRequestContext.setContextOnCurrentThread(existingState);
            }
        }
    });
}      

在執行業務定義的HystrixCommand方法前,Hystrix封裝的對象幫我們把調用者線程的上下文資訊「拷貝」過來了,其實這個處理的思路有點類似于我們前一個插件HystrixConcurrencyStrategy。

五、總結

HystrixConcurrencyStrategy 和HystrixCommandExecutionHook兩者插件方式大家可以根據實際情況去判定,如果确定不需要在fallback中關注上下文傳遞資訊,那用前者就可以了,也很簡便,但如果你想解決的更徹底點,那麼用後一種方式就可以了。

作者:vivo 官網商城開發團隊

分享 vivo 網際網路技術幹貨與沙龍活動,推薦最新行業動态與熱門會議。

繼續閱讀