天天看點

并發程式設計(十一):ThreadLocal源碼分析一,ThreadLocal概述二,ThreadLocal源碼分析三,ThreadLocal工具類

一,ThreadLocal概述

1,TheadLocal —— 線程隔離對象

    * JDK内部提供了線程隔離對象 ThreadLocal ,注意該類與AQS或者JUC基本沒有關系。線程對 ThreadLocal 操作,實際上對線程屬性 threadLocals 進行操作,該屬性是 ThreadLocal 内部類 ThreadLocalMap 的執行個體。ThreadLocal 底層通過 ThreadLocalMap 進行存儲,具體在 ThreadLocalMap 内通過 Entry 數組進行資料儲存。Entry 在 ThreadLocal 的内部實作,并通過 WeakReference 包裝泛型 ThreadLocal 為弱引用(在每一次GC時,弱引用對象如果沒有強引用對象引用,會被GC回收),Entry 以每一個 ThreadLocal 作為鍵值在 Thread 内進行不同的資料存儲

    * 總而言之,一個 ThreadLocal 對象可以存儲在多個線程的成員變量 threadLocals 中。每一個 ThreadLocal 線上程中的存在形式,通過包裝為 Entry 對象,存儲在 ThreadLocalMap 的成員變量 Entry[] 中。

2,類圖

并發程式設計(十一):ThreadLocal源碼分析一,ThreadLocal概述二,ThreadLocal源碼分析三,ThreadLocal工具類

3,常用API

// 構造
public ThreadLocal();
// 初始化, 初始化對象時, 可以重寫此方法, 完成給每一個線程賦初值
protected T initialValue();
// 存儲資料
public void set(T value);
// 擷取資料
public T get();
// 移除資料
public void remove();
           

4,功能DEMO

    * 從輸出結果可以看出,各個線程儲存的是操作時候的變量值,後續變量變化并沒有産生影響;示範比較簡單,感覺一個Map就能玩,不過在開發中,ThreadLocal 理論上是存在在整個線程周期的,是以不需要頻繁傳值

package com.gupao.concurrent;

/**
 * @author pj_zhang
 * @create 2019-10-21 22:09
 **/
public class ThreadLocalTest {

    private static volatile int count = 0;

    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                threadLocal.set(++count);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ": " + threadLocal.get());
            }, "THREAD_" + i).start();
        }
    }

}
           
并發程式設計(十一):ThreadLocal源碼分析一,ThreadLocal概述二,ThreadLocal源碼分析三,ThreadLocal工具類

二,ThreadLocal源碼分析

1,初始化

    * 構造:ThreadLocal()

// 直接初始化,無特殊
public ThreadLocal() {
}
           

    * 初始化值:initialValue()

// 該方法用于重寫,為每一個線程賦初值
protected T initialValue() {
	return null;
}
           

2,set()源碼分析

    * set(T value)

public void set(T value) {
	Thread t = Thread.currentThread();
	// 擷取目前線程的成員成員 threadLocals
	ThreadLocalMap map = getMap(t);
	// 成員變量不為空,說明已經初始化,給該線程的目前 ThreadLocal 指派
	if (map != null)
		map.set(this, value);
	// 成員變量為空,建立并指派
	else
		createMap(t, value);
}
           

    * getMap(Thread t):擷取線程成員變量 threadLocals

ThreadLocalMap getMap(Thread t) {
	// 直接傳回線程成員變量
	return t.threadLocals;
}
           

    * createMap(Thread t, T firstValue):成員變量不存在,建立并指派

void createMap(Thread t, T firstValue) {
	// 直接初始化 ThreadLocalMap
	t.threadLocals = new ThreadLocalMap(this, firstValue);
}
           

    * ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue):初始化加魔數及擴容

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
	// 初始化table,預設數組長度為16
	table = new Entry[INITIAL_CAPACITY];
	// 根據斐波拉契散列擷取下标值
	int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
	// 建構Entry
	table[i] = new Entry(firstKey, firstValue);
	size = 1;
	// 設定擴容因子,擴容因子由兩方面構成
	setThreshold(INITIAL_CAPACITY);
}

// 斐波拉契散列魔數
private final int threadLocalHashCode = nextHashCode();

private static AtomicInteger nextHashCode = new AtomicInteger();

// 魔數
private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
	// 通過 AtomicInteger 類實作原子遞增
	return nextHashCode.getAndAdd(HASH_INCREMENT);
}

// 擴容因子,設定為2/3并取整
private void setThreshold(int len) {
	threshold = len * 2 / 3;
}
           

    * set(ThreadLocal<?> key, Object value):成員變量已經初始化,直接重指派

private void set(ThreadLocal<?> key, Object value) {
	// 擷取到存儲容器:Entry{]
	Entry[] tab = table;
	// 擷取容器容量,并通過斐波拉契散列擷取線程應該所在的索引下标
	// key.threadLocalHashCode:ThreadLocal成員變量,通過Atomic原子類實作遞增(CAS)
	int len = tab.length;
	int i = key.threadLocalHashCode & (len-1);

	// 擷取數組中該下标的值,判斷是否已經存在元素
	// nextIndex:擷取下一個下标,通過三元表達式實作首尾相連
	
	// 如果Entry為空,說明目前 TheadLocal 執行個體不存在,同時也不可能存在在其他索引位置,分析如下:
	// 首先GC回收失效隻對ThreadLocal進行清理,不會對Entry進行清理,是以Entry隻會為空,不會為null
	// 此時如果目前 ThreadLocal 在此Entry有效時插入,則會順序尋找下一個槽位。
	// 此時需要做的就是從其他槽位找到目前ThreadLocal,并把目前 ThreadLocal 拉回到正确槽位
	
	// 其次,如果目前線程在插入 ThreadLocal 時所在槽位已經被占用,此時該 ThreadLocal 占用其他槽位,
	// 之後如果占用該槽位的 ThreadLocal 被程式清理,注意程式清理會直接将Entry置null
	// 清理完成後,程式會重新向下整理一次元素的下标位置,盡量将元素放在hash算出的hash位置
	for (Entry e = tab[i];
		 e != null;
		 e = tab[i = nextIndex(i, len)]) {
		ThreadLocal<?> k = e.get();
		// 存在元素,且是目前 ThreadLocal 元素,則直接覆寫
		if (k == key) {
			e.value = value;
			return;
		}

		// 存在元素,但是元素key為null
		// 因為Entry的類定義為 static class Entry extends WeakReference<ThreadLocal<?>>
		// 即Entry繼承自包裝了ThreadLocal對象的弱引用
		// 弱引用對象在GC時如果沒有強引用對象指向,則會直接被回收,是以ThreadLocal可能為null
		// 但是在回收時候,Entry對象不會被回收,是以會出現下列情況
		if (k == null) {
			// 清理并存儲目前節點
			replaceStaleEntry(key, value, i);
			return;
		}
	}
	
	// 如果目前下标未被占用,則直接初始化為目前 ThreadLocal
	tab[i] = new Entry(key, value);
	int sz = ++size;
	// 清理節點,并且判斷擴容因子
	if (!cleanSomeSlots(i, sz) && sz >= threshold)
		// 重新進行hash配置設定,是否進行擴容内部存在二次判斷s
		rehash();
}
           

    * replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot):

// 該方法實作兩個功能
// 移除:移除已經失效的節點
// 建構:建構目前線程的新節點
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
	Entry[] tab = table;
	int len = tab.length;
	Entry e;

	// 每一次運作都會處理一次失效資料,減少資料擴容和資料沖突
	// 循環到上一個非null的Entry對象,并保留失效的索引下标到slotToExpunge,以備後續清理
	int slotToExpunge = staleSlot;
	for (int i = prevIndex(staleSlot, len);
		 (e = tab[i]) != null;
		 i = prevIndex(i, len))
		if (e.get() == null)
			slotToExpunge = i;

	// 從目前節點向下找,判斷每一個槽位,是否存在目前 ThreadLocal
	// 因為目前槽位已經被占用,是以如果目前 ThreadLocal 在占用之後曾通路一次,則會重新配置設定槽位
	// 此時需要周遊尋找,判斷目前 ThreadLocal 是否已經初始化為 Entry
	for (int i = nextIndex(staleSlot, len);
		 (e = tab[i]) != null;
		 i = nextIndex(i, len)) {
		ThreadLocal<?> k = e.get();

		// key相同,說明目前 ThreadLocal 已經初始化為Entry,并占用了其他Hash槽位
		if (k == key) {
			// 修改線程值
			e.value = value;
			// 将原槽位已經廢棄的slot遷移到該槽位
			// 将目前 ThreadLocal 遷移到正确槽位
			tab[i] = tab[staleSlot];
			tab[staleSlot] = e;

			// 如果失效的下标相等,因為目前槽位已經與下一個槽位互轉
			// 是以修改失效槽位索引
			if (slotToExpunge == staleSlot)
				slotToExpunge = i;
			// expungeStaleEntry: 消除目前槽位資訊,此處為徹底消除,
			// cleanSomeSlots: 清除部分失效節點,此處沒有全周遊清除
			cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
			return;
		}

		// 槽位失效,并且相等,轉換為下一個槽位索引
		if (k == null && slotToExpunge == staleSlot)
			slotToExpunge = i;
	}

	// 如果沒有找到key,說明目前 ThreadLocal 并沒有初始化,直接初始化
	tab[staleSlot].value = null;
	tab[staleSlot] = new Entry(key, value);

	// 存在失效節點,清理掉!
	if (slotToExpunge != staleSlot)
		cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
           

    * expungeStaleEntry(int staleSlot):移除失效節點,并傳回下一個有效節點索引,該方法為底層方法

// staleSlot:參數表示目前要清理的失效節點
private int expungeStaleEntry(int staleSlot) {
	Entry[] tab = table;
	int len = tab.length;

	// 注意此處清理節點是對Entry直接置空
	tab[staleSlot].value = null;
	tab[staleSlot] = null;
	size--;

	// 從目前節點向下reHash,直到遇到空,
	// Entry元素插入,如果遇到Hash沖突,會順序向下尋找節點,此處向下reHash,目的是為了把順序向下存儲的節點拉回到合适位置
	Entry e;
	int i;
	for (i = nextIndex(staleSlot, len);
		 (e = tab[i]) != null;
		 i = nextIndex(i, len)) {
		ThreadLocal<?> k = e.get();
		// 向下尋找的節點為失效節點,直接移除
		if (k == null) {
			e.value = null;
			tab[i] = null;
			size--;
		} else {
			// 節點未失效,重新計算節點hash值,并判斷下标位置是否與目前位置一緻
			// 位置不一緻時,将元素拉到對應的合适位置
			// 此處也就是為什麼當位置元素為空時,直接插入,不需要再循環判斷是否已經存在
			int h = k.threadLocalHashCode & (len - 1);
			if (h != i) {
				tab[i] = null;
				while (tab[h] != null)
					h = nextIndex(h, len);
				tab[h] = e;
			}
		}
	}
	// 此處傳回的i表示向下遇到的第一個null節點的下表
	return i;
}
           

    * cleanSomeSlots(int i, int n):從指定下标索引開始向下位移一位找失效節點并移除,該方法為底層方法

// i表示開始清理的索引下标,len表示目前ThreadLocal容器長度
private boolean cleanSomeSlots(int i, int n) {
	boolean removed = false;
	Entry[] tab = table;
	int len = tab.length;
	do {
		// 周遊向下擷取每一個節點,
		i = nextIndex(i, len);
		Entry e = tab[i];
		// 如果節點已經失效,對n重新賦初值
		if (e != null && e.get() == null) {
			n = len;
			removed = true;
			// 移除目前節點,傳回的i為目前節點的下一個null節點索引
			i = expungeStaleEntry(i);
		}
	// 節點全部有效的情況下,最多跑 n = 2 ^ m 的 m 次
	} while ( (n >>>= 1) != 0);
	return removed;
}
           

    * rehash():擴容

private void rehash() {
	// 清理所有失效資料
	expungeStaleEntries();

	// 判斷是否需要擴容
	// threshold = len * 2 / 3
	// threshold - threshold / 4 = size * 1 / 2
	// 是以一半擴容,擴容一倍,用較低的門檻值來保證内部判斷邏輯更快,減少遲滞
	if (size >= threshold - threshold / 4)
		resize();
}
           

    * expungeStaleEntries()

private void expungeStaleEntries() {
	Entry[] tab = table;
	int len = tab.length;
	// 周遊每一個節點判斷是否失效,并移除
	for (int j = 0; j < len; j++) {
		Entry e = tab[j];
		if (e != null && e.get() == null)
			expungeStaleEntry(j);
	}
}
           

    * resize():擴容,代碼中并沒有看到移除後減小容量的部分,應該是不支援

private void resize() {
	Entry[] oldTab = table;
	int oldLen = oldTab.length;
	// 兩倍擴容
	int newLen = oldLen * 2;
	Entry[] newTab = new Entry[newLen];
	int count = 0;

	// 将舊容器中的元素全部拿出來,重新根據新容器長度計算下标位置,并建構
	// 如果hash沖突,同樣向下尋找節點
	for (int j = 0; j < oldLen; ++j) {
		Entry e = oldTab[j];
		if (e != null) {
			ThreadLocal<?> k = e.get();
			if (k == null) {
				e.value = null;
			} else {
				int h = k.threadLocalHashCode & (newLen - 1);
				while (newTab[h] != null)
					h = nextIndex(h, newLen);
				newTab[h] = e;
				count++;
			}
		}
	}

	// 重新指派全局參數
	setThreshold(newLen);
	size = count;
	table = newTab;
}
           

3,get()源碼分析

    * get()

public T get() {
	Thread t = Thread.currentThread();
	// 擷取線程屬性 threadLocals
	ThreadLocalMap map = getMap(t);
	if (map != null) {
		// 從 ThreadLocalMap 内部 Entry[] 中,根據目前請求的Threadlocal執行個體,擷取對應Entry
		ThreadLocalMap.Entry e = map.getEntry(this);
		if (e != null) {
			// Entry 不為空,直接擷取對應值
			@SuppressWarnings("unchecked")
			T result = (T)e.value;
			return result;
		}
	}
	// map為空或者entry為空,建構并填充初值
	return setInitialValue();
}
           

    * getEntry(ThreadLocal<?> key):從 ThreadLocalMap 中擷取對應 Entry

private Entry getEntry(ThreadLocal<?> key) {
	// 擷取索引位置
	int i = key.threadLocalHashCode & (table.length - 1);
	// Entry存在且key為目前 ThreadLocal,則比對成功
	Entry e = table[i];
	if (e != null && e.get() == key)
		return e;
	else
		// key值不為目前 ThreadLocal,則說明已經被占用,向後繼續尋找
		return getEntryAfterMiss(key, i, e);
}
           

    * getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e)

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
	Entry[] tab = table;
	int len = tab.length;

	// e不為空,說明槽點被占用,繼續向後尋找
	// 找到下一個為空,說明目前段不存在,則不存在
	while (e != null) {
		ThreadLocal<?> k = e.get();
		if (k == key)
			return e;
		// 失效,直接移除,則後續重新指派後e為null
		if (k == null)
			expungeStaleEntry(i);
		// 未失效,說明存在,繼續下一個索引尋找
		else
			i = nextIndex(i, len);
		e = tab[i];
	}
	// e為空,說明線程對應key确實不存在,直接傳回null
	return null;
}
           

    * setInitialValue():擷取 Entry 失敗,ThreadLocalMap 或者 Entry 為空,賦初值

private T setInitialValue() {
	// 擷取初值
	T value = initialValue();
	Thread t = Thread.currentThread();
	ThreadLocalMap map = getMap(t);
	// map存在,直接添加,map不存在,建構map後添加
	if (map != null)
		map.set(this, value);
	else
		createMap(t, value);
	// 傳回結果值
	return value;
}
           

4,remove()源碼分析

    * remove()

public void remove() {
	// 擷取線程對應的 ThreadLocalMap
	ThreadLocalMap m = getMap(Thread.currentThread());
	if (m != null)
		// map存在,直接移除目前 ThreadLocal
		m.remove(this);
}
           

    * remove(ThreadLocal<?> key)

private void remove(ThreadLocal<?> key) {
	Entry[] tab = table;
	int len = tab.length;
	// 擷取下标位置
	int i = key.threadLocalHashCode & (len-1);
	// 從下标位置開始,向下尋找,擷取指定的 ThreadLocal後直接移除
	for (Entry e = tab[i];
		 e != null;
		 e = tab[i = nextIndex(i, len)]) {
		if (e.get() == key) {
			e.clear();
			expungeStaleEntry(i);
			return;
		}
	}
}
           

三,ThreadLocal工具類

package com.gupao.concurrent;

import java.util.*;

public final class ThreadLocalUtil {
    
    private static final ThreadLocal<Map<String, Object>> threadLocal = new ThreadLocal() {
        protected Map<String, Object> initialValue() {
            return new HashMap(16);
        }
    };

    public static Map<String, Object> getThreadLocal() {
        return threadLocal.get();
    }

    public static <T> T get(String key) {
        Map map = (Map) threadLocal.get();
        return (T) map.get(key);
    }

    public static <T> T get(String key, T defaultValue) {
        Map map = (Map) threadLocal.get();
        return (T) map.get(key) == null ? defaultValue : (T) map.get(key);
    }

    public static void set(String key, Object value) {
        Map map = (Map) threadLocal.get();
        map.put(key, value);
    }

    public static void set(Map<String, Object> keyValueMap) {
        Map map = (Map) threadLocal.get();
        map.putAll(keyValueMap);
    }

    public static void remove() {
        threadLocal.remove();
    }

    public static <T> T remove(String key) {
        Map map = (Map) threadLocal.get();
        return (T) map.remove(key);
    }

}
           

繼續閱讀