一,ThreadLocal概述
1,TheadLocal —— 線程隔離對象
* JDK内部提供了線程隔離對象 ThreadLocal ,注意該類與AQS或者JUC基本沒有關系。線程對 ThreadLocal 操作,實際上對線程屬性 threadLocals 進行操作,該屬性是 ThreadLocal 内部類 ThreadLocalMap 的執行個體。ThreadLocal 底層通過 ThreadLocalMap 進行存儲,具體在 ThreadLocalMap 内通過 Entry 數組進行資料儲存。Entry 在 ThreadLocal 的内部實作,并通過 WeakReference 包裝泛型 ThreadLocal 為弱引用(在每一次GC時,弱引用對象如果沒有強引用對象引用,會被GC回收),Entry 以每一個 ThreadLocal 作為鍵值在 Thread 内進行不同的資料存儲
* 總而言之,一個 ThreadLocal 對象可以存儲在多個線程的成員變量 threadLocals 中。每一個 ThreadLocal 線上程中的存在形式,通過包裝為 Entry 對象,存儲在 ThreadLocalMap 的成員變量 Entry[] 中。
2,類圖
3,常用API
// 構造
public ThreadLocal();
// 初始化, 初始化對象時, 可以重寫此方法, 完成給每一個線程賦初值
protected T initialValue();
// 存儲資料
public void set(T value);
// 擷取資料
public T get();
// 移除資料
public void remove();
4,功能DEMO
* 從輸出結果可以看出,各個線程儲存的是操作時候的變量值,後續變量變化并沒有産生影響;示範比較簡單,感覺一個Map就能玩,不過在開發中,ThreadLocal 理論上是存在在整個線程周期的,是以不需要頻繁傳值
package com.gupao.concurrent;
/**
* @author pj_zhang
* @create 2019-10-21 22:09
**/
public class ThreadLocalTest {
private static volatile int count = 0;
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
threadLocal.set(++count);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": " + threadLocal.get());
}, "THREAD_" + i).start();
}
}
}
二,ThreadLocal源碼分析
1,初始化
* 構造:ThreadLocal()
// 直接初始化,無特殊
public ThreadLocal() {
}
* 初始化值:initialValue()
// 該方法用于重寫,為每一個線程賦初值
protected T initialValue() {
return null;
}
2,set()源碼分析
* set(T value)
public void set(T value) {
Thread t = Thread.currentThread();
// 擷取目前線程的成員成員 threadLocals
ThreadLocalMap map = getMap(t);
// 成員變量不為空,說明已經初始化,給該線程的目前 ThreadLocal 指派
if (map != null)
map.set(this, value);
// 成員變量為空,建立并指派
else
createMap(t, value);
}
* getMap(Thread t):擷取線程成員變量 threadLocals
ThreadLocalMap getMap(Thread t) {
// 直接傳回線程成員變量
return t.threadLocals;
}
* createMap(Thread t, T firstValue):成員變量不存在,建立并指派
void createMap(Thread t, T firstValue) {
// 直接初始化 ThreadLocalMap
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
* ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue):初始化加魔數及擴容
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化table,預設數組長度為16
table = new Entry[INITIAL_CAPACITY];
// 根據斐波拉契散列擷取下标值
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 建構Entry
table[i] = new Entry(firstKey, firstValue);
size = 1;
// 設定擴容因子,擴容因子由兩方面構成
setThreshold(INITIAL_CAPACITY);
}
// 斐波拉契散列魔數
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
// 魔數
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
// 通過 AtomicInteger 類實作原子遞增
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
// 擴容因子,設定為2/3并取整
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
* set(ThreadLocal<?> key, Object value):成員變量已經初始化,直接重指派
private void set(ThreadLocal<?> key, Object value) {
// 擷取到存儲容器:Entry{]
Entry[] tab = table;
// 擷取容器容量,并通過斐波拉契散列擷取線程應該所在的索引下标
// key.threadLocalHashCode:ThreadLocal成員變量,通過Atomic原子類實作遞增(CAS)
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
// 擷取數組中該下标的值,判斷是否已經存在元素
// nextIndex:擷取下一個下标,通過三元表達式實作首尾相連
// 如果Entry為空,說明目前 TheadLocal 執行個體不存在,同時也不可能存在在其他索引位置,分析如下:
// 首先GC回收失效隻對ThreadLocal進行清理,不會對Entry進行清理,是以Entry隻會為空,不會為null
// 此時如果目前 ThreadLocal 在此Entry有效時插入,則會順序尋找下一個槽位。
// 此時需要做的就是從其他槽位找到目前ThreadLocal,并把目前 ThreadLocal 拉回到正确槽位
// 其次,如果目前線程在插入 ThreadLocal 時所在槽位已經被占用,此時該 ThreadLocal 占用其他槽位,
// 之後如果占用該槽位的 ThreadLocal 被程式清理,注意程式清理會直接将Entry置null
// 清理完成後,程式會重新向下整理一次元素的下标位置,盡量将元素放在hash算出的hash位置
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 存在元素,且是目前 ThreadLocal 元素,則直接覆寫
if (k == key) {
e.value = value;
return;
}
// 存在元素,但是元素key為null
// 因為Entry的類定義為 static class Entry extends WeakReference<ThreadLocal<?>>
// 即Entry繼承自包裝了ThreadLocal對象的弱引用
// 弱引用對象在GC時如果沒有強引用對象指向,則會直接被回收,是以ThreadLocal可能為null
// 但是在回收時候,Entry對象不會被回收,是以會出現下列情況
if (k == null) {
// 清理并存儲目前節點
replaceStaleEntry(key, value, i);
return;
}
}
// 如果目前下标未被占用,則直接初始化為目前 ThreadLocal
tab[i] = new Entry(key, value);
int sz = ++size;
// 清理節點,并且判斷擴容因子
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 重新進行hash配置設定,是否進行擴容内部存在二次判斷s
rehash();
}
* replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot):
// 該方法實作兩個功能
// 移除:移除已經失效的節點
// 建構:建構目前線程的新節點
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// 每一次運作都會處理一次失效資料,減少資料擴容和資料沖突
// 循環到上一個非null的Entry對象,并保留失效的索引下标到slotToExpunge,以備後續清理
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// 從目前節點向下找,判斷每一個槽位,是否存在目前 ThreadLocal
// 因為目前槽位已經被占用,是以如果目前 ThreadLocal 在占用之後曾通路一次,則會重新配置設定槽位
// 此時需要周遊尋找,判斷目前 ThreadLocal 是否已經初始化為 Entry
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// key相同,說明目前 ThreadLocal 已經初始化為Entry,并占用了其他Hash槽位
if (k == key) {
// 修改線程值
e.value = value;
// 将原槽位已經廢棄的slot遷移到該槽位
// 将目前 ThreadLocal 遷移到正确槽位
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 如果失效的下标相等,因為目前槽位已經與下一個槽位互轉
// 是以修改失效槽位索引
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// expungeStaleEntry: 消除目前槽位資訊,此處為徹底消除,
// cleanSomeSlots: 清除部分失效節點,此處沒有全周遊清除
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 槽位失效,并且相等,轉換為下一個槽位索引
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 如果沒有找到key,說明目前 ThreadLocal 并沒有初始化,直接初始化
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// 存在失效節點,清理掉!
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
* expungeStaleEntry(int staleSlot):移除失效節點,并傳回下一個有效節點索引,該方法為底層方法
// staleSlot:參數表示目前要清理的失效節點
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 注意此處清理節點是對Entry直接置空
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// 從目前節點向下reHash,直到遇到空,
// Entry元素插入,如果遇到Hash沖突,會順序向下尋找節點,此處向下reHash,目的是為了把順序向下存儲的節點拉回到合适位置
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 向下尋找的節點為失效節點,直接移除
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 節點未失效,重新計算節點hash值,并判斷下标位置是否與目前位置一緻
// 位置不一緻時,将元素拉到對應的合适位置
// 此處也就是為什麼當位置元素為空時,直接插入,不需要再循環判斷是否已經存在
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
// 此處傳回的i表示向下遇到的第一個null節點的下表
return i;
}
* cleanSomeSlots(int i, int n):從指定下标索引開始向下位移一位找失效節點并移除,該方法為底層方法
// i表示開始清理的索引下标,len表示目前ThreadLocal容器長度
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// 周遊向下擷取每一個節點,
i = nextIndex(i, len);
Entry e = tab[i];
// 如果節點已經失效,對n重新賦初值
if (e != null && e.get() == null) {
n = len;
removed = true;
// 移除目前節點,傳回的i為目前節點的下一個null節點索引
i = expungeStaleEntry(i);
}
// 節點全部有效的情況下,最多跑 n = 2 ^ m 的 m 次
} while ( (n >>>= 1) != 0);
return removed;
}
* rehash():擴容
private void rehash() {
// 清理所有失效資料
expungeStaleEntries();
// 判斷是否需要擴容
// threshold = len * 2 / 3
// threshold - threshold / 4 = size * 1 / 2
// 是以一半擴容,擴容一倍,用較低的門檻值來保證内部判斷邏輯更快,減少遲滞
if (size >= threshold - threshold / 4)
resize();
}
* expungeStaleEntries()
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
// 周遊每一個節點判斷是否失效,并移除
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
* resize():擴容,代碼中并沒有看到移除後減小容量的部分,應該是不支援
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 兩倍擴容
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
// 将舊容器中的元素全部拿出來,重新根據新容器長度計算下标位置,并建構
// 如果hash沖突,同樣向下尋找節點
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 重新指派全局參數
setThreshold(newLen);
size = count;
table = newTab;
}
3,get()源碼分析
* get()
public T get() {
Thread t = Thread.currentThread();
// 擷取線程屬性 threadLocals
ThreadLocalMap map = getMap(t);
if (map != null) {
// 從 ThreadLocalMap 内部 Entry[] 中,根據目前請求的Threadlocal執行個體,擷取對應Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
// Entry 不為空,直接擷取對應值
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// map為空或者entry為空,建構并填充初值
return setInitialValue();
}
* getEntry(ThreadLocal<?> key):從 ThreadLocalMap 中擷取對應 Entry
private Entry getEntry(ThreadLocal<?> key) {
// 擷取索引位置
int i = key.threadLocalHashCode & (table.length - 1);
// Entry存在且key為目前 ThreadLocal,則比對成功
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
// key值不為目前 ThreadLocal,則說明已經被占用,向後繼續尋找
return getEntryAfterMiss(key, i, e);
}
* getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e)
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// e不為空,說明槽點被占用,繼續向後尋找
// 找到下一個為空,說明目前段不存在,則不存在
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
// 失效,直接移除,則後續重新指派後e為null
if (k == null)
expungeStaleEntry(i);
// 未失效,說明存在,繼續下一個索引尋找
else
i = nextIndex(i, len);
e = tab[i];
}
// e為空,說明線程對應key确實不存在,直接傳回null
return null;
}
* setInitialValue():擷取 Entry 失敗,ThreadLocalMap 或者 Entry 為空,賦初值
private T setInitialValue() {
// 擷取初值
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// map存在,直接添加,map不存在,建構map後添加
if (map != null)
map.set(this, value);
else
createMap(t, value);
// 傳回結果值
return value;
}
4,remove()源碼分析
* remove()
public void remove() {
// 擷取線程對應的 ThreadLocalMap
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
// map存在,直接移除目前 ThreadLocal
m.remove(this);
}
* remove(ThreadLocal<?> key)
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
// 擷取下标位置
int i = key.threadLocalHashCode & (len-1);
// 從下标位置開始,向下尋找,擷取指定的 ThreadLocal後直接移除
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
三,ThreadLocal工具類
package com.gupao.concurrent;
import java.util.*;
public final class ThreadLocalUtil {
private static final ThreadLocal<Map<String, Object>> threadLocal = new ThreadLocal() {
protected Map<String, Object> initialValue() {
return new HashMap(16);
}
};
public static Map<String, Object> getThreadLocal() {
return threadLocal.get();
}
public static <T> T get(String key) {
Map map = (Map) threadLocal.get();
return (T) map.get(key);
}
public static <T> T get(String key, T defaultValue) {
Map map = (Map) threadLocal.get();
return (T) map.get(key) == null ? defaultValue : (T) map.get(key);
}
public static void set(String key, Object value) {
Map map = (Map) threadLocal.get();
map.put(key, value);
}
public static void set(Map<String, Object> keyValueMap) {
Map map = (Map) threadLocal.get();
map.putAll(keyValueMap);
}
public static void remove() {
threadLocal.remove();
}
public static <T> T remove(String key) {
Map map = (Map) threadLocal.get();
return (T) map.remove(key);
}
}