volatile : 可見性:緩存一緻性協定 MSI 禁止指令重排:JMM,記憶體屏障
synchronized 和 ReentrantLock 的差別
synchronized 系統自動上鎖,
ReentrantLock 需要手工上鎖,ReentrantLock 可以出現各種 condition (狀況)。
例如 tryLock 嘗試加鎖,lockInterruptibly()上鎖的時候異常了、getQueueLength()擷取隊列長度、hasQueuedThreads() 是否有線程在隊列裡,某一個線程是否在隊列裡 等等
實作:
ReentrantLock CAS的實作。 synchronized 四種鎖的實作,
無鎖狀态、偏向鎖狀态、輕量級鎖狀态、重量級鎖狀态 ( 見 多線程與高并發程式設計一 )
AQS AbstractQueuedSynchronizer 類,volatile state 狀态可重入鎖
AQS 底層就是CAS+ volatile
volatile state 用來記錄鎖的狀态,為0則沒有上鎖,否則上鎖了。
ReentrantLock.lock() 會調用 acquire(1);
AbstractQueuedSynchronizer.acquire(1) 方法裡面的if 會調用 addWaiter(Node.EXCLUSIVE)
static final Node EXCLUSIVE = null; 就是說 Node.EXCLUSIVE 這個值永遠是空的
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先,tail 為空, 傳進來的mode為空,是以pred為空,直接進入 enq(node) 方法
Node node = new Node(Thread.currentThread(), mode); node 資訊裡面的線程為目前線程,mode 為空!!!
enq:
for循環 t = tail , t 為空, compareAndSetHead(new Node()) , 方法如下
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
這裡調用的是unsafe的compare方法,這個是系統底層的,不做深入研究,大家了解下下面的概念原理就好
compareAndSwapObject方法中的第一個參數和第二個參數,用于确定待操作對象在記憶體中的具體位置的,然後取出值和第三個參數進行比較,如果相等,則将記憶體中的值更新為第四個參數的值,同時傳回true,表明原子更新操作完畢。反之則不更新記憶體中的值,同時傳回false,表明原子操作失敗
上面的方法基本上就是比較 head == null ,由于剛開始都是null的,很明顯為true,将 new Node 指派給 head。這時候,這個node.thread 已經是目前線程了。tail = head
接着進行for循環,這時候tail已經不為空了,進入else,繼續将 tail 設定為目前node ,next 指向目前node。
這裡添加第一個完成。
第二個線程來的時候,addWaiter 方法, pred != null 為true,進行 compareAndSetTail 将尾部設為第二個線程,next 指向第二個線程。
基本原理如下圖

圖可能有點錯誤,走下代碼邏輯吧
線程 A , B ,C
線程A
private Node addWaiter(Node mode) {
//Node node = new Node(A, null);
Node node = new Node(Thread.currentThread(), mode);
//Node pred = null 因為剛進來 tail = null;
Node pred = tail;
// A 不執行這個判斷
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
A進入enq node = A
private Node enq(final Node node) {
for (;;) {
//第一次 tail = null , 第二次 tail = 空 node
Node t = tail;
if (t == null) {
// 第一次進入這裡
//head = 空node
if (compareAndSetHead(new Node()))
//tail = 空node
tail = head;
} else {
//第二次進入 //A的prev = 空 node
node.prev = t;
// 将 tail 設定為 A(node)
if (compareAndSetTail(t, node)) {
//空node 的next 設定為 Anode
t.next = node;
//傳回一個空node
return t;
}
}
}
到這裡 tail = A;不是null,head 為 空node,t.next = Anode; A.prev=空node
線程B
private Node addWaiter(Node mode) {
//Node node = new Node(B, null);
Node node = new Node(Thread.currentThread(), mode);
//Node pred = A;
Node pred = tail;
// tail = A ,走這裡
if (pred != null) {
// B.prev = A
node.prev = pred;
// tail = B
if (compareAndSetTail(pred, node)) {
// A.next = B
pred.next = node;
//傳回 B
return node;
}
}
enq(node);
return node;
}
到這裡 tail = B;head 為 空node,A.next = Bnode; B.prev=A;
線程C
private Node addWaiter(Node mode) {
//Node node = new Node(C, null);
Node node = new Node(Thread.currentThread(), mode);
//Node pred = B;
Node pred = tail;
// tail = B ,走這裡
if (pred != null) {
// C.prev =B
node.prev = pred;
// tail = C
if (compareAndSetTail(pred, node)) {
// B.next = C
pred.next = node;
//傳回 C
return node;
}
}
enq(node);
return node;
}
最終 tail = C;head 為 空node,A.next = Bnode; B.prev=A;B.next = Cnode; C.prev=B;
最後發現 整個 head 一直都隻是一個聲明的 空 node
ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //嘗試釋放
Node h = head; //成功釋放,取出 head 裡面标志的線程
if (h != null && h.waitStatus != 0) // 有等待的線程并且等待狀态不為0
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 擷取到目前state的值 - 1
if (Thread.currentThread() != getExclusiveOwnerThread()) //判斷釋放鎖的線程是否是目前持有鎖的線程
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果state - 1 的值為0,則需要釋放,如果不為0,說明還有該線程還有重入鎖
free = true;
setExclusiveOwnerThread(null); //修改鎖的擁有者為null
}
setState(c); //修改state的值
return free;
}
/***
* 喚醒
* @param node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒排隊中的下一個(node.next)線程
}
unlock 方法源碼基本如上。
先嘗試釋放目前鎖,判斷有沒有重入鎖,沒有則進行釋放。釋放之後,喚醒下一個排隊中的線程。
那麼 AQS中的tail 是什麼時候釋放的呢,就是連結清單中的元素是什麼時候被銷毀的呢
在 加鎖的時候 addWaiter 外層有個 acquireQueued 方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 休眠
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //釋放 自己的node
}
}
VarHandle jdk1.9之後支援
例如: Object o = new Object(); 在記憶體中聲明了一塊記憶體,o 指向這個記憶體。然後有一個 VarHandle 也可以指向這個 o。
相當于記憶體是一個保險櫃。x 是這個保險櫃的所有者。 x 告訴你這個保險櫃的具體位址、櫃号、密碼。然後你通過這些資訊也可以找到這個保險櫃。
以及,進行一些原子性操作。即打開保險櫃之後,你可以修改裡面内容,并且是安全的。