天天看點

Netty-FastThreadLocal快在哪裡呢?

前言

netty的concurrent包下有一些非常優秀的并發操作類,FastThreadLocal就是其中之一。

簡稱
FastThreadLocalThread ftlt
FastThreadLocal ftl

談談JDK的ThreadLocal

簡介

ThreadLocal 是 Java 裡一種特殊變量,它是一個線程級别變量,每個線程都有一個 ThreadLocal 就是每個線程都擁有了自己獨立的一個變量,競态條件被徹底消除了,在并發模式下是絕對安全的變量。

可以通過 ThreadLocal value = new ThreadLocal(); 來使用。

會自動在每一個線程上建立一個 T 的副本,副本之間彼此獨立,互不影響,可以用 ThreadLocal 存儲一些參數,以便線上程中多個方法中使用,用以代替方法傳參的做法。這是一種空間換時間的思想。

使用

既然jdk已經有ThreadLocal,為何netty還要自己造個FastThreadLocal?FastThreadLocal快在哪裡?

這需要從jdk ThreadLocal的本身說起。如下圖:

Netty-FastThreadLocal快在哪裡呢?

在java線程中,每個線程都有一個ThreadLocalMap執行個體變量(如果不使用ThreadLocal,不會建立這個Map,一個線程第一次通路某個ThreadLocal變量時,才會建立)。

該Map是使用線性探測的方式解決hash沖突的問題,如果沒有找到空閑的slot,就不斷往後嘗試,直到找到一個空閑的位置,插入entry,這種方式在經常遇到hash沖突時,影響效率。

FastThreadLocal(下文簡稱ftl)直接使用數組避免了hash沖突的發生,具體做法是:每一個FastThreadLocal執行個體建立時,配置設定一個下标index;配置設定index使用AtomicInteger實作,每個FastThreadLocal都能擷取到一個不重複的下标。

當調用ftl.get()方法擷取值時,直接從數組擷取傳回,如return array[index],如下圖:

源碼分析

根據上文圖示可知,ftl的實作,涉及到InternalThreadLocalMap、FastThreadLocalThread和FastThreadLocal幾個類,自底向上,我們先從InternalThreadLocalMap開始分析。

InternalThreadLocalMap類的繼承關系圖如下:

Netty-FastThreadLocal快在哪裡呢?

InternalThreadLocalMap介紹

InternalThreadLocalMap是Netty用來代替JDK中的ThreadLocal.ThreadLocalMap類的,InternalThreadLocalMap使用數組來代替Hash表,每個FastThreadLocal被建立時,會擁有一個全局唯一且遞增的索引index,該index就代表FastThreadLocal對應數組的下标,Value會被直接放到該下标處,通路也是一樣,根據index快速定位元素,非常的快速,壓根就不存在哈希沖突,時間複雜度始終是O(1),缺點就是會浪費點記憶體空間,不過在記憶體越來越廉價的今天,這是值得的。先看幾個和FastThreadLocal相關的屬性,後面會用到:

static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();static final AtomicInteger nextIndex = new AtomicInteger();Object[] indexedVariables;      

數組indexedVariables就是用來存儲ftl的value的,使用下标的方式直接通路。nextIndex在ftl執行個體建立時用來給每個ftl執行個體配置設定一個下标,slowThreadLocalMap線上程不是ftlt時使用到。

屬性分析

InternalThreadLocalMap的主要屬性:

// 用于辨別數組的槽位還未使用public static final Object UNSET = new Object();/** * 用于辨別ftl變量是否注冊了cleaner * BitSet簡要原理: * BitSet預設底層資料結構是一個long[]數組,開始時長度為1,即隻有long[0],而一個long有64bit。 * 當BitSet.set(1)的時候,表示将long[0]的第二位設定為true,即0000 0000 ... 0010(64bit),則long[0]==2 * 當BitSet.get(1)的時候,第二位為1,則表示true;如果是0,則表示false * 當BitSet.set(64)的時候,表示設定第65位,此時long[0]已經不夠用了,擴容處long[1]來,進行存儲 * * 存儲類似 {index:boolean} 鍵值對,用于防止一個FastThreadLocal多次啟動清理線程 * 将index位置的bit設為true,表示該InternalThreadLocalMap中對該FastThreadLocal已經啟動了清理線程 */private BitSet cleanerFlags; 
private InternalThreadLocalMap() {        super(newIndexedVariableTable());}
private static Object[] newIndexedVariableTable() {        Object[] array = new Object[32];        Arrays.fill(array, UNSET);        return array;}      

比較簡單,newIndexedVariableTable()方法建立長度為32的數組,然後初始化為UNSET,然後傳給父類。之後ftl的值就儲存到這個數組裡面。

注意,這裡儲存的直接是變量值,不是entry,這是和jdk ThreadLocal不同的。InternalThreadLocalMap就先分析到這,其他方法在後面分析ftl再具體說。

FastThreadLocal介紹

ftlt的實作分析

要發揮ftl的性能優勢,必須和ftlt結合使用,否則就會退化到jdk的ThreadLocal。ftlt比較簡單,關鍵代碼如下:

public class FastThreadLocalThread extends Thread {  // This will be set to true if we have a chance to wrap the Runnable.  private final boolean cleanupFastThreadLocals;
  private InternalThreadLocalMap threadLocalMap;
  public final InternalThreadLocalMap threadLocalMap() {        return threadLocalMap;  }  public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {        this.threadLocalMap = threadLocalMap;  }}      

ftlt的訣竅就在threadLocalMap屬性,它繼承java Thread,然後聚合了自己的InternalThreadLocalMap。後面通路ftl變量,對于ftlt線程,都直接從InternalThreadLocalMap擷取變量值。

ftl的屬性和執行個體化

private final int index;
public FastThreadLocal() {    index = InternalThreadLocalMap.nextVariableIndex();}      

非常簡單,就是給屬性index指派,指派的靜态方法在InternalThreadLocalMap:

public static int nextVariableIndex() {        int index = nextIndex.getAndIncrement();        if (index < 0) {            nextIndex.decrementAndGet();            throw new IllegalStateException(“too many thread-local indexed variables”);        }        return index;  }      

可見,每個ftl執行個體以步長為1的遞增序列,擷取index值,這保證了InternalThreadLocalMap中數組的長度不會突增。

ftl的 get()方法實作分析

public final V get() {    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 1    Object v = threadLocalMap.indexedVariable(index); // 2    if (v != InternalThreadLocalMap.UNSET) {        return (V) v;    }
    V value = initialize(threadLocalMap); // 3    registerCleaner(threadLocalMap);  // 4    return value;}      

「1. 先來看看InternalThreadLocalMap.get()方法如何擷取threadLocalMap:」

public static InternalThreadLocalMap get() {        Thread thread = Thread.currentThread();        if (thread instanceof FastThreadLocalThread) {            return fastGet((FastThreadLocalThread) thread);        } else {            return slowGet();        }    }
  private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();        if (threadLocalMap == null) {            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());        }        return threadLocalMap;    }      

因為結合FastThreadLocalThread使用才能發揮FastThreadLocal的性能優勢,是以主要看fastGet方法。該方法直接從ftlt線程擷取threadLocalMap,還沒有則建立一個InternalThreadLocalMap執行個體并設定進去,然後傳回

private static InternalThreadLocalMap slowGet() {        InternalThreadLocalMap ret = slowThreadLocalMap.get();        if (ret == null) {            ret = new InternalThreadLocalMap();            slowThreadLocalMap.set(ret);        }        return ret;    }      

slowGet方法是通過slowThreadLocalMap(ThreadLocal對象包裝的)去擷取InternalThreadLocalMap,相當于使用原生的 ThreadLocal了

「2. threadLocalMap.indexedVariable(index)就簡單了,直接從數組擷取值,然後傳回:」

public Object indexedVariable(int index) {        Object[] lookup = indexedVariables;        return index < lookup.length? lookup[index] : UNSET;    }      

「3. 如果擷取到的值不是UNSET,那麼是個有效的值,直接傳回。如果是UNSET,則初始化。」

private V initialize(InternalThreadLocalMap threadLocalMap) {        V v = null;        try {            v = initialValue();        } catch (Exception e) {            PlatformDependent.throwException(e);        }        // 擷取ftl的初始值,然後儲存到ftl裡的數組,如果數組長度不夠則擴充數組長度,然後儲存,不展開。        threadLocalMap.setIndexedVariable(index, v);         //addToVariablesToRemove(threadLocalMap, this)的實作,是将ftl執行個體儲存在threadLocalMap内部數組第0個元素的Set集合中。        addToVariablesToRemove(threadLocalMap, this);         return v;    }      
Netty-FastThreadLocal快在哪裡呢?

「4. registerCleaner(threadLocalMap)的實作」

private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {        Thread current = Thread.currentThread();        if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || threadLocalMap.isCleanerFlagSet(index)) {            return;        }
        threadLocalMap.setCleanerFlag(index);
        // TODO: We need to find a better way to handle this.        /*        // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released        // and FastThreadLocal.onRemoval(...) will be called.        ObjectCleaner.register(current, new Runnable() {            @Override            public void run() {                remove(threadLocalMap);
                // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once                // the Thread is collected by GC. In this case the ThreadLocal will be gone away already.            }        });        */      

ftl的資源回收機制

在netty中對于ftl提供了三種回收機制:

自動: 使用ftlt執行一個被FastThreadLocalRunnable wrap的Runnable任務,在任務執行完畢後會自動進行ftl的清理。

手動: ftl和InternalThreadLocalMap都提供了remove方法,在合适的時候使用者可以(有的時候也是必須,例如普通線程的線程池使用ftl)手動進行調用,進行顯示删除。

自動: 為目前線程的每一個ftl注冊一個Cleaner,當線程對象不強可達的時候,該Cleaner線程會将目前線程的目前ftl進行回收。(netty推薦如果可以用其他兩種方式,就不要再用這種方式,因為需要另起線程,耗費資源,而且多線程就會造成一些資源競争,在netty-4.1.34版本中,已經注釋掉了調用ObjectCleaner的代碼。)

ftl在netty中的使用

ftl在netty中最重要的使用,就是配置設定ByteBuf。基本做法是:每個線程都配置設定一塊記憶體(PoolArena),當需要配置設定ByteBuf時,線程先從自己持有的PoolArena配置設定,如果自己無法配置設定,再采用全局配置設定。

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    @Override        protected synchronized PoolThreadCache initialValue() {            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
            Thread current = Thread.currentThread();            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {              // PoolThreadCache即為各個線程持有的記憶體塊的封裝                return new PoolThreadCache(                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);            }            // No caching so just use 0 as sizes.            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);        }    }