前言
netty的concurrent包下有一些非常優秀的并發操作類,FastThreadLocal就是其中之一。
類 | 簡稱 |
FastThreadLocalThread | ftlt |
FastThreadLocal | ftl |
談談JDK的ThreadLocal
簡介
ThreadLocal 是 Java 裡一種特殊變量,它是一個線程級别變量,每個線程都有一個 ThreadLocal 就是每個線程都擁有了自己獨立的一個變量,競态條件被徹底消除了,在并發模式下是絕對安全的變量。
可以通過 ThreadLocal value = new ThreadLocal(); 來使用。
會自動在每一個線程上建立一個 T 的副本,副本之間彼此獨立,互不影響,可以用 ThreadLocal 存儲一些參數,以便線上程中多個方法中使用,用以代替方法傳參的做法。這是一種空間換時間的思想。
使用
既然jdk已經有ThreadLocal,為何netty還要自己造個FastThreadLocal?FastThreadLocal快在哪裡?
這需要從jdk ThreadLocal的本身說起。如下圖:

在java線程中,每個線程都有一個ThreadLocalMap執行個體變量(如果不使用ThreadLocal,不會建立這個Map,一個線程第一次通路某個ThreadLocal變量時,才會建立)。
該Map是使用線性探測的方式解決hash沖突的問題,如果沒有找到空閑的slot,就不斷往後嘗試,直到找到一個空閑的位置,插入entry,這種方式在經常遇到hash沖突時,影響效率。
FastThreadLocal(下文簡稱ftl)直接使用數組避免了hash沖突的發生,具體做法是:每一個FastThreadLocal執行個體建立時,配置設定一個下标index;配置設定index使用AtomicInteger實作,每個FastThreadLocal都能擷取到一個不重複的下标。
當調用ftl.get()方法擷取值時,直接從數組擷取傳回,如return array[index],如下圖:
源碼分析
根據上文圖示可知,ftl的實作,涉及到InternalThreadLocalMap、FastThreadLocalThread和FastThreadLocal幾個類,自底向上,我們先從InternalThreadLocalMap開始分析。
InternalThreadLocalMap類的繼承關系圖如下:
InternalThreadLocalMap介紹
InternalThreadLocalMap是Netty用來代替JDK中的ThreadLocal.ThreadLocalMap類的,InternalThreadLocalMap使用數組來代替Hash表,每個FastThreadLocal被建立時,會擁有一個全局唯一且遞增的索引index,該index就代表FastThreadLocal對應數組的下标,Value會被直接放到該下标處,通路也是一樣,根據index快速定位元素,非常的快速,壓根就不存在哈希沖突,時間複雜度始終是O(1),缺點就是會浪費點記憶體空間,不過在記憶體越來越廉價的今天,這是值得的。先看幾個和FastThreadLocal相關的屬性,後面會用到:
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();static final AtomicInteger nextIndex = new AtomicInteger();Object[] indexedVariables;
數組indexedVariables就是用來存儲ftl的value的,使用下标的方式直接通路。nextIndex在ftl執行個體建立時用來給每個ftl執行個體配置設定一個下标,slowThreadLocalMap線上程不是ftlt時使用到。
屬性分析
InternalThreadLocalMap的主要屬性:
// 用于辨別數組的槽位還未使用public static final Object UNSET = new Object();/** * 用于辨別ftl變量是否注冊了cleaner * BitSet簡要原理: * BitSet預設底層資料結構是一個long[]數組,開始時長度為1,即隻有long[0],而一個long有64bit。 * 當BitSet.set(1)的時候,表示将long[0]的第二位設定為true,即0000 0000 ... 0010(64bit),則long[0]==2 * 當BitSet.get(1)的時候,第二位為1,則表示true;如果是0,則表示false * 當BitSet.set(64)的時候,表示設定第65位,此時long[0]已經不夠用了,擴容處long[1]來,進行存儲 * * 存儲類似 {index:boolean} 鍵值對,用于防止一個FastThreadLocal多次啟動清理線程 * 将index位置的bit設為true,表示該InternalThreadLocalMap中對該FastThreadLocal已經啟動了清理線程 */private BitSet cleanerFlags;
private InternalThreadLocalMap() { super(newIndexedVariableTable());}
private static Object[] newIndexedVariableTable() { Object[] array = new Object[32]; Arrays.fill(array, UNSET); return array;}
比較簡單,newIndexedVariableTable()方法建立長度為32的數組,然後初始化為UNSET,然後傳給父類。之後ftl的值就儲存到這個數組裡面。
❝
注意,這裡儲存的直接是變量值,不是entry,這是和jdk ThreadLocal不同的。InternalThreadLocalMap就先分析到這,其他方法在後面分析ftl再具體說。
❞
FastThreadLocal介紹
ftlt的實作分析
要發揮ftl的性能優勢,必須和ftlt結合使用,否則就會退化到jdk的ThreadLocal。ftlt比較簡單,關鍵代碼如下:
public class FastThreadLocalThread extends Thread { // This will be set to true if we have a chance to wrap the Runnable. private final boolean cleanupFastThreadLocals;
private InternalThreadLocalMap threadLocalMap;
public final InternalThreadLocalMap threadLocalMap() { return threadLocalMap; } public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { this.threadLocalMap = threadLocalMap; }}
ftlt的訣竅就在threadLocalMap屬性,它繼承java Thread,然後聚合了自己的InternalThreadLocalMap。後面通路ftl變量,對于ftlt線程,都直接從InternalThreadLocalMap擷取變量值。
ftl的屬性和執行個體化
private final int index;
public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex();}
非常簡單,就是給屬性index指派,指派的靜态方法在InternalThreadLocalMap:
public static int nextVariableIndex() { int index = nextIndex.getAndIncrement(); if (index < 0) { nextIndex.decrementAndGet(); throw new IllegalStateException(“too many thread-local indexed variables”); } return index; }
可見,每個ftl執行個體以步長為1的遞增序列,擷取index值,這保證了InternalThreadLocalMap中數組的長度不會突增。
ftl的 get()方法實作分析
public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 1 Object v = threadLocalMap.indexedVariable(index); // 2 if (v != InternalThreadLocalMap.UNSET) { return (V) v; }
V value = initialize(threadLocalMap); // 3 registerCleaner(threadLocalMap); // 4 return value;}
「1. 先來看看InternalThreadLocalMap.get()方法如何擷取threadLocalMap:」
public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { return slowGet(); } }
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; }
因為結合FastThreadLocalThread使用才能發揮FastThreadLocal的性能優勢,是以主要看fastGet方法。該方法直接從ftlt線程擷取threadLocalMap,還沒有則建立一個InternalThreadLocalMap執行個體并設定進去,然後傳回
private static InternalThreadLocalMap slowGet() { InternalThreadLocalMap ret = slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; }
slowGet方法是通過slowThreadLocalMap(ThreadLocal對象包裝的)去擷取InternalThreadLocalMap,相當于使用原生的 ThreadLocal了
「2. threadLocalMap.indexedVariable(index)就簡單了,直接從數組擷取值,然後傳回:」
public Object indexedVariable(int index) { Object[] lookup = indexedVariables; return index < lookup.length? lookup[index] : UNSET; }
「3. 如果擷取到的值不是UNSET,那麼是個有效的值,直接傳回。如果是UNSET,則初始化。」
private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } // 擷取ftl的初始值,然後儲存到ftl裡的數組,如果數組長度不夠則擴充數組長度,然後儲存,不展開。 threadLocalMap.setIndexedVariable(index, v); //addToVariablesToRemove(threadLocalMap, this)的實作,是将ftl執行個體儲存在threadLocalMap内部數組第0個元素的Set集合中。 addToVariablesToRemove(threadLocalMap, this); return v; }
「4. registerCleaner(threadLocalMap)的實作」
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) { Thread current = Thread.currentThread(); if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || threadLocalMap.isCleanerFlagSet(index)) { return; }
threadLocalMap.setCleanerFlag(index);
// TODO: We need to find a better way to handle this. /* // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released // and FastThreadLocal.onRemoval(...) will be called. ObjectCleaner.register(current, new Runnable() { @Override public void run() { remove(threadLocalMap);
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once // the Thread is collected by GC. In this case the ThreadLocal will be gone away already. } }); */
ftl的資源回收機制
在netty中對于ftl提供了三種回收機制:
自動: 使用ftlt執行一個被FastThreadLocalRunnable wrap的Runnable任務,在任務執行完畢後會自動進行ftl的清理。
手動: ftl和InternalThreadLocalMap都提供了remove方法,在合适的時候使用者可以(有的時候也是必須,例如普通線程的線程池使用ftl)手動進行調用,進行顯示删除。
自動: 為目前線程的每一個ftl注冊一個Cleaner,當線程對象不強可達的時候,該Cleaner線程會将目前線程的目前ftl進行回收。(netty推薦如果可以用其他兩種方式,就不要再用這種方式,因為需要另起線程,耗費資源,而且多線程就會造成一些資源競争,在netty-4.1.34版本中,已經注釋掉了調用ObjectCleaner的代碼。)
ftl在netty中的使用
ftl在netty中最重要的使用,就是配置設定ByteBuf。基本做法是:每個線程都配置設定一塊記憶體(PoolArena),當需要配置設定ByteBuf時,線程先從自己持有的PoolArena配置設定,如果自己無法配置設定,再采用全局配置設定。
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { // PoolThreadCache即為各個線程持有的記憶體塊的封裝 return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } }