一、引言
海外商城從印度做起,慢慢的會有一些其他國家的訴求,這個時候需要我們針對目前的商城做一個改造,可以支撐多個國家的商城,這裡會涉及多個問題,多語言,多國家,多時區,本地化等等。在多國家的情況下如何把識别出來的國家資訊傳遞下去,一層一層直到代碼執行的最後一步。甚至還有一些多線程的場景需要處理。
二、背景技術
2.1 ThreadLocal
ThreadLocal是最容易想到了,入口識别到國家資訊後,丢進ThreadLocal,這樣後續代碼、redis、DB等做國家區分的時候都能使用到。
這裡先簡單介紹一下ThreadLocal:
/**
* Sets the current thread's copy of this thread-local variable
* to the specified value. Most subclasses will have no need to
* override this method, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
*
* @param value the value to be stored in the current thread's copy of
* this thread-local.
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
/**
* Returns the value in the current thread's copy of this
* thread-local variable. If the variable has no value for the
* current thread, it is first initialized to the value returned
* by an invocation of the {@link #initialValue} method.
*
* @return the current thread's value of this thread-local
*/
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
/**
* Get the entry associated with key. This method
* itself handles only the fast path: a direct hit of existing
* key. It otherwise relays to getEntryAfterMiss. This is
* designed to maximize performance for direct hits, in part
* by making this method readily inlinable.
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
- 每一個Thread線程都有屬于自己的threadLocals(ThreadLocalMap),裡面有一個弱引用的Entry(ThreadLocal,Object)。
- get方法首先通過Thread.currentThread得到目前線程,然後拿到線程的threadLocals(ThreadLocalMap),再從Entry中取得目前線程存儲的value。
- set值的時候更改目前線程的threadLocals(ThreadLocalMap)中Entry對應的value值。
實際使用中除了同步方法之外,還有起異步線程處理的場景,這個時候就需要把ThreadLocal的内容從父線程傳遞給子線程,這個怎麼辦呢?
不急,Java 還有InheritableThreadLocal來幫我們解決這個問題。
2.2 InheritableThreadLoca
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* Computes the child's initial value for this inheritable thread-local
* variable as a function of the parent's value at the time the child
* thread is created. This method is called from within the parent
* thread before the child is started.
* <p>
* This method merely returns its input argument, and should be overridden
* if a different behavior is desired.
*
* @param parentValue the parent thread's value
* @return the child thread's initial value
*/
protected T childValue(T parentValue) {
return parentValue;
}
/**
* Get the map associated with a ThreadLocal.
*
* @param t the current thread
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/**
* Create the map associated with a ThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
- java.lang.Thread#init(java.lang.ThreadGroup, java.lang.Runnable, java.lang.String, long, java.security.AccessControlContext, boolean)
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
- InheritableThreadLocal操作的是inheritableThreadLocals這個變量,而不是ThreadLocal操作的threadLocals變量。
- 建立新線程的時候會檢查父線程中parent.inheritableThreadLocals變量是否為null,如果不為null則複制一份parent.inheritableThreadLocals的資料到子線程的this.inheritableThreadLocals中去。
- 因為複寫了getMap(Thread)和CreateMap()方法直接操作inheritableThreadLocals,這樣就實作了在子線程中擷取父線程ThreadLocal值。
現在在使用多線程的時候,都是通過線程池來做的,這個時候用InheritableThreadLocal可以嗎?會有什麼問題嗎?先看下下面的代碼的執行情況:
- test
static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
inheritableThreadLocal.set("i am a inherit parent");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(inheritableThreadLocal.get());
}
});
TimeUnit.SECONDS.sleep(1);
inheritableThreadLocal.set("i am a new inherit parent");// 設定新的值
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(inheritableThreadLocal.get());
}
});
}
i am a inherit parent
i am a inherit parent
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
inheritableThreadLocal.set("i am a inherit parent");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(inheritableThreadLocal.get());
inheritableThreadLocal.set("i am a old inherit parent");// 子線程中設定新的值
}
});
TimeUnit.SECONDS.sleep(1);
inheritableThreadLocal.set("i am a new inherit parent");// 主線程設定新的值
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(inheritableThreadLocal.get());
}
});
}
i am a inherit parent
i am a old inherit parent
這裡看第一個執行結果,發現主線程第二次設定的值,沒有改掉,還是第一次設定的值“i am a inherit parent”,這是什麼原因呢?
再看第二個例子的執行結果,發現在第一個任務中設定的“i am a old inherit parent"的值,在第二個任務中列印出來了。這又是什麼原因呢?
回過頭來看看上面的源碼,線上程池的情況下,第一次建立線程的時候會從父線程中copy inheritableThreadLocals中的資料,是以第一個任務成功拿到了父線程設定的”i am a inherit parent“,第二個任務執行的時候複用了第一個任務的線程,并不會觸發複制父線程中的inheritableThreadLocals操作,是以即使在主線程中設定了新的值,也會不生效。同時get()方法是直接操作inheritableThreadLocals這個變量的,是以就直接拿到了第一個任務設定的值。
那遇到線程池應該怎麼辦呢?
2.3 TransmittableThreadLocal
TransmittableThreadLocal(TTL)這個時候就派上用場了。這是阿裡開源的一個元件,我們來看看它怎麼解決線程池的問題,先來一段代碼,在上面的基礎上修改一下,使用TransmittableThreadLocal。
static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();// 使用TransmittableThreadLocal
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService = TtlExecutors.getTtlExecutorService(executorService); // 用TtlExecutors裝飾線程池
transmittableThreadLocal.set("i am a transmittable parent");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(transmittableThreadLocal.get());
transmittableThreadLocal.set("i am a old transmittable parent");// 子線程設定新的值
}
});
System.out.println(transmittableThreadLocal.get());
TimeUnit.SECONDS.sleep(1);
transmittableThreadLocal.set("i am a new transmittable parent");// 主線程設定新的值
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(transmittableThreadLocal.get());
}
});
}
i am a transmittable parent
i am a transmittable parent
i am a new transmittable parent
執行代碼後發現,使用TransmittableThreadLocalTtlExecutors.getTtlExecutorService(executorService)裝飾線程池之後,在每次調用任務的時,都會将目前的主線程的TransmittableThreadLocal資料copy到子線程裡面,執行完成後,再清除掉。同時子線程裡面的修改回到主線程時其實并沒有生效。這樣可以保證每次任務執行的時候都是互不幹涉的。這是怎麼做到的呢?來看源碼。
- TtlExecutors和TransmittableThreadLocal源碼
private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
com.alibaba.ttl.TtlRunnable#run
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
Object captured = capturedRef.get();// 擷取線程的ThreadLocalMap
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured);// 暫存目前子線程的ThreadLocalMap到backup
try {
runnable.run();
} finally {
restore(backup);// 恢複線程執行時被改版的Threadlocal對應的值
}
}
com.alibaba.ttl.TransmittableThreadLocal.Transmitter#replay
/**
* Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()},
* and return the backup {@link TransmittableThreadLocal} values in current thread before replay.
*
* @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()}
* @return the backup {@link TransmittableThreadLocal} values before replay
* @see #capture()
* @since 2.3.0
*/
public static Object replay(Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL value only in captured
// avoid extra TTL value in captured, when run task.
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set value to captured TTL
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}
com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore
/**
* Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}.
*
* @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}
* @since 2.3.0
*/
public static void restore(Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// clear the TTL value only in backup
// avoid the extra value of backup after restore
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL value
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : backupMap.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
可以看下整個過程的完整時序圖:
OK,既然問題都解決了,來看看實際使用吧,有兩種使用,先看第一種,涉及HTTP請求、Dubbo請求和 job,采用的是資料級别的隔離。
三、 TTL 在海外商城的實際應用
3.1 不分庫,分資料行 + SpringMVC
使用者 HTTP 請求,首先我們要從url或者cookie中解析出國家編号,然後在TransmittableThreadLocal中存放國家資訊,在 MyBatis 的攔截器中讀取國家資料,進行sql改造,最終操作指定的國家資料,多線程場景下用TtlExecutors包裝原有自定義線程池,保障在使用線程池的時候能夠正确将國家資訊傳遞下去。
- HTTP 請求
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
return TtlThreadPoolExecutors.getAsyncExecutor();
}
public class TtlThreadPoolExecutors {
private static final String COMMON_BUSINESS = "COMMON_EXECUTOR";
public static final int QUEUE_CAPACITY = 20000;
public static ExecutorService getExecutorService() {
return TtlExecutorServiceMananger.getExecutorService(COMMON_BUSINESS);
}
public static ExecutorService getExecutorService(String threadGroupName) {
return TtlExecutorServiceMananger.getExecutorService(threadGroupName);
}
public static ThreadPoolTaskExecutor getAsyncExecutor() {
// 用TtlExecutors裝飾Executor,結合TransmittableThreadLocal解決異步線程threadlocal傳遞問題
return getTtlThreadPoolTaskExecutor(initTaskExecutor());
}
private static ThreadPoolTaskExecutor initTaskExecutor () {
return initTaskExecutor(TtlThreadPoolFactory.DEFAULT_CORE_SIZE, TtlThreadPoolFactory.DEFAULT_POOL_SIZE, QUEUE_CAPACITY);
}
private static ThreadPoolTaskExecutor initTaskExecutor (int coreSize, int poolSize, int executorQueueCapacity) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(coreSize);
taskExecutor.setMaxPoolSize(poolSize);
taskExecutor.setQueueCapacity(executorQueueCapacity);
taskExecutor.setKeepAliveSeconds(120);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setThreadNamePrefix("TaskExecutor-ttl");
taskExecutor.initialize();
return taskExecutor;
}
private static ThreadPoolTaskExecutor getTtlThreadPoolTaskExecutor(ThreadPoolTaskExecutor executor) {
if (null == executor || executor instanceof ThreadPoolTaskExecutorWrapper) {
return executor;
}
return new ThreadPoolTaskExecutorWrapper(executor);
}
}
/**
* @ClassName : LocaleContextHolder
* @Description : 本地化資訊上下文holder
*/
public class LocalizationContextHolder {
private static TransmittableThreadLocal<LocalizationContext> localizationContextHolder = new TransmittableThreadLocal<>();
private static LocalizationInfo defaultLocalizationInfo = new LocalizationInfo();
private LocalizationContextHolder(){}
public static LocalizationContext getLocalizationContext() {
return localizationContextHolder.get();
}
public static void resetLocalizationContext () {
localizationContextHolder.remove();
}
public static void setLocalizationContext (LocalizationContext localizationContext) {
if(localizationContext == null) {
resetLocalizationContext();
} else {
localizationContextHolder.set(localizationContext);
}
}
public static void setLocalizationInfo (LocalizationInfo localizationInfo) {
LocalizationContext localizationContext = getLocalizationContext();
String brand = (localizationContext instanceof BrandLocalizationContext ?
((BrandLocalizationContext) localizationContext).getBrand() : null);
if(StringUtils.isNotEmpty(brand)) {
localizationContext = new SimpleBrandLocalizationContext(localizationInfo, brand);
} else if(localizationInfo != null) {
localizationContext = new SimpleLocalizationContext(localizationInfo);
} else {
localizationContext = null;
}
setLocalizationContext(localizationContext);
}
public static void setDefaultLocalizationInfo(@Nullable LocalizationInfo localizationInfo) {
LocalizationContextHolder.defaultLocalizationInfo = localizationInfo;
}
public static LocalizationInfo getLocalizationInfo () {
LocalizationContext localizationContext = getLocalizationContext();
if(localizationContext != null) {
LocalizationInfo localizationInfo = localizationContext.getLocalizationInfo();
if(localizationInfo != null) {
return localizationInfo;
}
}
return defaultLocalizationInfo;
}
public static String getCountry(){
return getLocalizationInfo().getCountry();
}
public static String getTimezone(){
return getLocalizationInfo().getTimezone();
}
public static String getBrand(){
return getBrand(getLocalizationContext());
}
public static String getBrand(LocalizationContext localizationContext) {
if(localizationContext == null) {
return null;
}
if(localizationContext instanceof BrandLocalizationContext) {
return ((BrandLocalizationContext) localizationContext).getBrand();
}
throw new LocaleException("unsupported localizationContext type");
}
}
@Override
public LocaleContext resolveLocaleContext(final HttpServletRequest request) {
parseLocaleCookieIfNecessary(request);
LocaleContext localeContext = new TimeZoneAwareLocaleContext() {
@Override
public Locale getLocale() {
return (Locale) request.getAttribute(LOCALE_REQUEST_ATTRIBUTE_NAME);
}
@Override
public TimeZone getTimeZone() {
return (TimeZone) request.getAttribute(TIME_ZONE_REQUEST_ATTRIBUTE_NAME);
}
};
// 設定線程中的國家标志
setLocalizationInfo(request, localeContext.getLocale());
return localeContext;
}
private void setLocalizationInfo(HttpServletRequest request, Locale locale) {
String country = locale!=null?locale.getCountry():null;
String language = locale!=null?(locale.getLanguage() + "_" + locale.getVariant()):null;
LocaleRequestMessage localeRequestMessage = localeRequestParser.parse(request);
final String countryStr = country;
final String languageStr = language;
final String brandStr = localeRequestMessage.getBrand();
LocalizationContextHolder.setLocalizationContext(new BrandLocalizationContext() {
@Override
public String getBrand() {
return brandStr;
}
@Override
public LocalizationInfo getLocalizationInfo() {
return LocalizationInfoAssembler.assemble(countryStr, languageStr);
}
});
}
對于 Dubbo 接口和無法判斷國家資訊的 HTTP 接口,在入參部分增加國家資訊參數,通過攔截器或者手動set國家資訊到TransmittableThreadLocal。
對于定時任務 job,因為所有國家都需要執行,是以會把所有國家進行周遊執行,這也可以通過簡單的注解來解決。
這個版本的改造,點檢測試也基本通過了,自動化腳本驗證也是沒問題的,不過因為業務發展問題最終沒上線。
3.2 分庫 + SpringBoot
後續在建設新的國家商城的時候,分庫分表方案調整為每個國家獨立資料庫,同時整體開發架構更新到SpringBoot,我們把這套方案做了更新,總體思路是一樣的,隻是在實作細節上略有不同。
SpringBoot 裡面的異步一般通過@Async這個注解來實作,通過自定義線程池來包裝,使用時在 HTTP 請求判斷locale資訊的寫入國家資訊,後續完成切DB的操作。
對于 Dubbo 接口和無法判斷國家資訊的 HTTP 接口,在入參部分增加國家資訊參數,通過攔截器或者手動set國家資訊到TransmittableThreadLocal。
對于定時任務job,因為所有國家都需要執行,是以會把所有國家進行周遊執行,這也可以通過簡單的注解和AOP來解決。
四、總結
本文從業務拓展的角度闡述了在複雜業務場景下如何通過ThreadLocal,過渡到InheritableThreadLocal,再通過TransmittableThreadLocal解決實際業務問題。因為海外的業務在不斷的探索中前進,技術也在不斷的探索中演進,面對這種複雜多變的情況,我們的應對政策是先做國際化,再做本地化,more global才能more local,多國家的隔離隻是國際化最基本的起點,未來還有很多業務和技術等着我們去挑戰。
作者:vivo 官網商城開發團隊