初步認識JUC
Java.util.concurrent 是在并發程式設計中比較常用的工具類,裡面包含很多用來在并發場景中使用的元件。比如線程池、阻塞隊列、計時器、同步器、并發集合等等。并發包的作者是大名鼎鼎的 Doug Lea。
lock
在 Lock 接口出現之前,Java 中的應用程式對于多線程的并發安全處理隻能基于synchronized 關鍵字來解決。但是 synchronized 在有些場景中會存在一些短闆,也就是它并不适合于所有的并發場景。但是在 Java5 以後,Lock 的出現可以解決synchronized 在某些場景中的短闆,它比 synchronized 更加靈活。
Lock 本質上是一個接口,它定義了釋放鎖和獲得鎖的抽象方法,定義成接口就意味着它定義了鎖的一個标準規範,也同時意味着鎖的不同實作。實作 Lock 接口的類有很多,以下為幾個常見的鎖實作
Lock接口
void lock() // 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放
void lockInterruptibly() // 和lock()方法相似, 但阻塞的線程可中斷,抛 出java.lang.InterruptedException 異常
boolean tryLock() // 非阻塞擷取鎖;嘗試擷取鎖,如果成功傳回 true
boolean tryLock(longtimeout, TimeUnit timeUnit)//帶有逾時時間的擷取鎖方法
void unlock() // 釋放鎖
類圖

ReentrantLock
表示重入鎖,它是唯一一個實作了 Lock 接口的類。重入鎖指的是線程在獲得鎖之後,再次擷取該鎖不需要阻塞,而是直接關聯一次計數器增加重入次數,也就是如果目前線程 t1 通過調用 lock 方法擷取了鎖之後,再次調用 lock,是不會再阻塞去擷取鎖的,直接增加重試次數就行了。synchronized 和 ReentrantLock 都是可重入鎖。
重入鎖的設計目的
比如調用 demo 方法獲得了目前的對象鎖,然後在這個方法中再去調用demo2,demo2 中的存在同一個執行個體鎖,這個時候目前線程會因為無法獲得demo2 的對象鎖而阻塞,就會産生死鎖。重入鎖的設計目的是避免線程的死鎖。
public class ReentrantDemo{
public synchronized void demo(){// main獲得對象鎖
System.out.println("begin:demo");
demo2();
}
public void demo2(){
System.out.println("begin:demo1");
// 在次獲得對象鎖
synchronized (this){
}
}
public static void main(String[] args) {
ReentrantDemo rd=new ReentrantDemo();
new Thread(rd::demo).start();
}
}
ReentrantReadWriteLock(讀寫鎖)
重入讀寫鎖,它實作了 ReadWriteLock 接口,在這個類中維護了兩個鎖,一個是 ReadLock,一個是 WriteLock,他們都分别實作了 Lock接口。讀寫鎖是一種适合讀多寫少的場景下解決線程安全問題的工具,基本原則是: 讀和讀不互斥、讀和寫互斥、寫和寫互斥。也就是說涉及到影響資料變化的操作都會存在互斥。
讀寫鎖的設計目的
我們以前了解的鎖,基本都是排他鎖,也就是這些鎖在同一時刻隻允許一個線程進行通路,而讀寫鎖在同一時刻可以允許多個線程通路,但是在寫線程通路時,所有的讀線程和其他寫線程都會被阻塞。讀寫鎖維護了一對鎖,一個讀鎖、一個寫鎖;一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多于寫的。在讀多于寫的情況下,讀寫鎖能夠提供比排它鎖更好的并發性和吞吐量.
public class RWLock {
static ReentrantReadWriteLock wrl=new ReentrantReadWriteLock();
static Map<String,Object> cacheMap=new HashMap<>();
static Lock read=wrl.readLock();
static Lock write=wrl.writeLock();
//線程B/C/D
public static final Object get(String key){
System.out.println("begin read data:"+key);
read.lock(); //獲得讀鎖-> 阻塞
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
//線程A
public static final Object put(String key,Object val){
write.lock();//獲得了寫鎖
try{
return cacheMap.put(key,val);
}finally {
write.unlock();
}
}
public static void main(String[] args) {
wrl.readLock();//B線程 ->阻塞
wrl.writeLock(); //A線程
//讀->讀是可以共享
//讀->寫 互斥
//寫->寫 互斥
//讀多寫少的場景
}
}
在這個案例中,通過 hashmap 來模拟了一個記憶體緩存,然後使用讀寫所來保證這個記憶體緩存的線程安全性。當執行讀操作的時候,需要擷取讀鎖,在并發通路的時候,讀鎖不會被阻塞,因為讀操作不會影響執行結果。在執行寫操作是,線程必須要擷取寫鎖,當已經有線程持有寫鎖的情況下,目前線程會被阻塞,隻有當寫鎖釋放以後,其他讀寫操作才能繼續執行。使用讀寫鎖提升讀操作的并發性,也保證每次寫操作對所有的讀寫操作的可見性
- 讀鎖與讀鎖可以共享
- 讀鎖與寫鎖不可以共享(排他)
- 寫鎖與寫鎖不可以共享(排他)
StampedLock
stampedLock 是 JDK8 引入的新的鎖機制,可以簡單認為是讀寫鎖的一個改進版本,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全并發,但是讀和寫是有沖突的,如果大量的讀線程存在,可能會引起寫線程的饑餓。stampedLock 是一種樂觀的讀政策,使得樂觀鎖完全不會阻塞寫線程
ReentrantLock 的實作原理
AQS
在 Lock 中,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer,它是一個同步工具也是 Lock 用來實作線程同步的核心元件獨占鎖,每次隻能有一個線程持有鎖,比如前面給大家示範的 ReentrantLock 就是以獨占方式實作的互斥鎖共 享 鎖 , 允 許 多 個線程同時擷取鎖,并發通路共享資源,比如ReentrantReadWriteLock
AQS 隊列内部維護的是一個 FIFO 的雙向連結清單,這種結構的特點是每個資料結構都有兩個指針,分别指向直接的後繼節點和直接前驅節點。是以雙向連結清單可以從任意一個節點開始很友善的通路前驅和後繼。每個 Node 其實是由線程封裝,當線程争搶鎖失敗後會封裝成 Node 加入到 ASQ 隊列中去;當擷取鎖的線程釋放鎖以後,會從隊列中喚醒一個阻塞的節點(線程)
Node 的組成
釋放鎖以及添加線程對于隊列的變化
裡會涉及到兩個變化
- 新的線程封裝成 Node 節點追加到同步隊列中,設定 prev 節點以及修改目前節點的前置節點的 next 節點指向自己
- 通過 CAS 講 tail 重新指向新的尾部節點
head 節點表示擷取鎖成功的節點,當頭結點在釋放同步狀态時,會喚醒後繼節點,如果後繼節點獲得鎖成功,會把自己設定為頭結點,節點的變化過程如下
涉及到兩個變化
- 修改head節點指向下一個獲得鎖的節點
- 新的獲得鎖的節點,将prev的指針指向null
設定 head 節點不需要用 CAS,原因是設定 head 節點是由獲得鎖的線程來完成的,而同步鎖隻能由一個線程獲得,是以不需要 CAS 保證,隻需要把 head 節點設定為原首節點的後繼節點,并且斷開原 head 節點的 next 引用即可
ReentrantLock 的源碼分析
ReentrantLock 預設是非公平鎖,因為無論在什麼場景非公平鎖的效率都是會大于公平鎖的。
時序圖
public void lock(){
sync.lock();
}
sync是一個靜态内部類,它繼承了AQS這個抽象類,前面說過AQS是一個同步工具,主要用來實作同步控制。我們在利用這個工具的時候,會繼承它來實作同步控制功能。 通過進一步分析,發現Sync這個類有兩個具體的實作,分别是 NofairSync(非公平鎖), FailSync(公平鎖).
公平鎖 表示所有線程嚴格按照FIFO來擷取鎖
非公平鎖 表示可以存在搶占鎖的功能,也就是說不管目前隊列上是否存在其他線程等待,新線程都有機會搶占鎖
NonfairSync.lock
- 非公平鎖和公平鎖最大的差別在于,在非公平鎖中我搶占鎖的邏輯是,不管有沒有線程排隊,我先上來 cas 去搶占一下
- CAS 成功,就表示成功獲得了鎖
- CAS 失敗,調用 acquire(1)走鎖競争邏輯
非公平的AQS隊列頭結點的下一個節點不一定會獲得鎖,就是因為在非公平鎖的釋放後,如果節點在cas的時候。另外一個線程正好進來cas鎖,如果這個節點沒他快,那麼鎖就會被這個插隊的獲得
final void lock() {
if (compareAndSetState(0, 1)) //通過cas操作來修改state狀态,表示争搶鎖的操作
setExclusiveOwnerThread(Thread.currentThread());//設定目前獲得鎖狀态的線程
else
acquire(1);//嘗試去擷取鎖
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通過 cas 樂觀鎖的方式來做比較并替換,這段代碼的意思是,如果目前記憶體中的state 的值和預期值 expect 相等,則替換為 update。更新成功傳回 true,否則傳回 false.
這個操作是原子的,不會出現線程安全問題,這裡面涉及到Unsafe這個類的操作,以及涉及到 state 這個屬性的意義。state 是 AQS 中的一個屬性,它在不同的實作中所表達的含義不一樣,對于重入鎖的實作來說,表示一個同步狀态。它有兩個含義的表示
- 當 state=0 時,表示無鎖狀态
- 當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1,但是因為ReentrantLock 允許重入,是以同一個線程多次獲得同步鎖的時候,state 會遞增,比如重入 5 次,那麼 state=5。而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0其他線程才有資格獲得鎖
Unsafe 類
Unsafe 類是在 sun.misc 包下,不屬于 Java 标準。但是很多 Java 的基礎類庫,包括一些被廣泛使用的高性能開發庫都是基于 Unsafe 類開發的,比如 Netty、Hadoop、Kafka 等;Unsafe 可認為是 Java 中留下的後門,提供了一些低層次操作,如直接記憶體通路、線程的挂起和恢複、CAS、線程同步、記憶體屏障而 CAS 就是 Unsafe 類中提供的一個原子操作,第一個參數為需要改變的對象,第二個為偏移量(即之前求出來的 headOffset 的值),第三個參數為期待的值,第四個為更新後的值整個方法的作用是如果目前時刻的值等于預期值 var4 相等,則更新為新的期望值 var5,如果更新成功,則傳回 true,否則傳回 false;
acquire(1)方法
- 通過 tryAcquire 嘗試擷取獨占鎖,如果成功傳回 true,失敗傳回 false
- 如果 tryAcquire 失敗,則會通過 addWaiter 方法将目前線程封裝成 Node 添加到 AQS 隊列尾部
- acquireQueued,将 Node 作為參數,通過自旋去嘗試擷取鎖。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync.tryAcquire
這個方法的作用是嘗試擷取鎖,如果成功傳回 true,不成功傳回 false它是重寫 AQS 類中的 tryAcquire 方法,并且大家仔細看一下 AQS 中 tryAcquire方法的定義,并沒有實作,而是抛出異常。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //擷取目前執行的線程
int c = getState(); //獲得 state 的值
if (c == 0) { //表示無鎖狀态
if (compareAndSetState(0, acquires)) { //cas 替換 state 的值,cas 成功表示擷取鎖成功
setExclusiveOwnerThread(current); //儲存目前獲得鎖的線程,下次再來的時候不要再嘗試競争鎖
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果同一個線程來獲得鎖,直接增加重入次數
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter
當 tryAcquire 方法擷取鎖失敗以後,則會先調用 addWaiter 将目前線程封裝成Node.
入參 mode 表示目前節點的狀态,傳遞的參數是 Node.EXCLUSIVE,表示獨占狀态。意味着重入鎖用到了 AQS 的獨占鎖功能
- 将目前線程封裝成 Node
- 目前連結清單中的 tail 節點是否為空,如果不為空,則通過 cas 操作把目前線程的node 添加到 AQS 隊列
- 如果為空或者 cas 失敗,調用 enq 将節點添加到 AQS 隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //tail 是 AQS 中表示同比隊列隊尾的屬性,預設是 null
if (pred != null) { //tail 不為空的情況下,說明隊列中存在節點
node.prev = pred; //把目前線程的 Node 的 prev 指向 tail
if (compareAndSetTail(pred, node)) { //通過 cas 把 node加入到 AQS 隊列,也就是設定為 tail
pred.next = node; ;//設定成功以後,把原 tail 節點的 next指向目前 node
return node;
}
}
enq(node); ;//tail=null,把 node 添加到同步隊列
return node;
}
enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
圖解分析
假設 3 個線程來争搶鎖,那麼截止到 enq 方法運作結束之後,或者調用 addwaiter方法結束後,AQS 中的連結清單結構圖
acquireQueued
通過 addWaiter 方法把線程添加到連結清單後,會接着把 Node 作為參數傳遞給acquireQueued 方法,去競争鎖
- 擷取目前節點的 prev 節點
- 如果 prev 節點為 head 節點,那麼它就有資格去争搶鎖,調用 tryAcquire 搶占鎖
- 搶占鎖成功以後,把獲得鎖的節點設定為 head,并且移除原來的初始化 head節點
- 如果獲得鎖失敗,則根據 waitStatus 決定是否需要挂起線程
- 最後,通過 cancelAcquire 取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); ;//擷取目前節點的 prev 節點
if (p == head && tryAcquire(arg)) { //如果是 head 節點,說明有資格去争搶鎖
setHead(node); //擷取鎖成功,也就是ThreadA 已經釋放了鎖,然後設定 head 為 ThreadB 獲得執行權限
p.next = null; // help GC //把原 head 節點從連結清單中移除
failed = false;
return interrupted;
}
//ThreadA 可能還沒釋放鎖,使得 ThreadB 在執行 tryAcquire 時會傳回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//并且傳回目前線程在等待過程中有沒有中斷過。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire 争搶鎖失敗調用的方法
如果 ThreadA 的鎖還沒有釋放的情況下,ThreadB 和 ThreadC 來争搶鎖肯定是會失敗,那麼失敗以後會調用shouldParkAfterFailedAcquire 方法
Node 有 5 中狀态,分别是:CANCELLED(1),SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、預設狀态(0)
CANCELLED: 在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該 Node 的結點, 其結點的 waitStatus 為 CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化
SIGNAL: 隻要前置節點釋放鎖,就會通知辨別為 SIGNAL 狀态的後續節點的線程
CONDITION: 和 Condition 有關系,後續會講解
PROPAGATE:共享模式下,PROPAGATE 狀态的線程處于可運作狀态
0:初始狀态
- 如果 ThreadA 的 pred 節點狀态為 SIGNAL,那就表示可以放心挂起目前線程
- 通過循環掃描連結清單把 CANCELLED 狀态的節點移除
-
修改 pred 節點的狀态為 SIGNAL,傳回 false.
傳回 false 時,也就是不需要挂起,傳回 true,則需要調用 parkAndCheckInterrupt挂起目前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前置節點的waitStatus
if (ws == Node.SIGNAL)//如果前置節點為 SIGNAL,意味着隻需要等待其他前置節點的線程被釋放
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; //傳回 true,意味着可以直接放心的挂起了
if (ws > 0) { //ws 大于 0,意味着 prev 節點取消了排隊,直接移除這個節點就行
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev; //相當于: pred=pred.prev;node.prev=pred;
} while (pred.waitStatus > 0); //這裡采用循環,從雙向清單中移除 CANCELLED 的節點
pred.next = node;
} else { //利用 cas 設定 prev 節點的狀态為 SIGNAL(-1)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt 中斷
如果shouldParkAfterFailedAcquire傳回了true,則會執行: parkAndCheckInterrupt()方法,它是通過LockSupport.park(this)将目前線程挂起到WATING狀态,它需要等待一個中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實作了Lock的操作。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport是Java6引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了 Unsafe 類裡的函數,歸結到 Unsafe 裡,隻有兩個函數–底層的就不分析了
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
鎖的釋放流程
unlock
public final boolean release(int arg) {
if (tryRelease(arg)) { //釋放鎖成功
Node h = head; //得到 aqs 中 head 節點
if (h != null && h.waitStatus != 0) //如果 head 節點不為空并且狀态!=0.調用 unparkSuccessor(h)喚醒後續節點
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
這個方法可以認為是一個設定鎖狀态的操作,通過将 state 狀态減掉傳入的參數值(參數是 1),如果結果狀态為 0,就将排它鎖的 Owner 設定為 null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀态會增加 1(當然可以自己修改這個值),在解鎖的時候減掉 1,同一個鎖,在可以重入後,可能會被疊加為 2、3、4 這些值,隻有 unlock()的次數與 lock()的次數對應才會将 Owner 線程設定為空,而且也隻有這種情況下才會傳回 true
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //獲得 head 節點的狀态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); ;// 設定 head 節點狀态為 0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; //得到 head 節點的下一個節點
if (s == null || s.waitStatus > 0) {
//如果下一個節點為 null 或者 status>0 表示 cancelled 狀态.
//通過從尾部節點開始掃描,找到距離 head 最近的一個waitStatus<=0 的節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //next 節點不為空,直接喚醒這個線程即可
LockSupport.unpark(s.thread);
}
為什麼在釋放鎖的時候是從 最後節點 tail 進行掃描
- 将新的節點的 prev 指向 tail
- 通過 cas 将 tail 設定為新的節點,因為 cas 是原子操作是以能夠保證線程安全性
- t.next=node;設定原 tail 的 next 節點指向新的節點
注意看else寫的邏輯
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在 cas 操作之後,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head開始往後周遊,由于 t.next=node 還沒執行意味着連結清單的關系還沒有建立完整。就會導緻周遊到 t 節點的時候被中斷。是以從後往前周遊,一定不會存在這個問題。
圖解分析
通過鎖的釋放,原本的結構就發生了一些變化。head 節點的 waitStatus 變成了 0,ThreadB 被喚醒
然後通過unlock後 原來被挂起的線程是在 acquireQueued 方法中自選,是以被喚醒以後繼續從這個方法開始執行在acquireQueued方法中
- 設定新 head 節點的 prev=null
- 設定原 head 節點的 next 節點為 null
公平鎖和非公平鎖的差別
鎖的公平性是相對于擷取鎖的順序而言的,如果是一個公平鎖,那麼鎖的擷取順序就應該符合請求的絕對時間順序,也就是 FIFO。 在上面分析的例子來說,隻要CAS 設定同步狀态成功,則表示目前線程擷取了鎖,而公平鎖則不一樣,差異點有兩個
lock
final void lock() {
acquire(1);
}
非公平鎖在擷取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
tryAcquire
公平鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
不同的地方在于判斷條件多了hasQueuedPredecessors()方法,也就是加入了同步隊列中目前節點是否有前驅節點的判斷,如果該方法傳回 true,則表示有線程比目前線程更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。