volatile : 可见性:缓存一致性协议 MSI 禁止指令重排:JMM,内存屏障
synchronized 和 ReentrantLock 的区别
synchronized 系统自动上锁,
ReentrantLock 需要手工上锁,ReentrantLock 可以出现各种 condition (状况)。
例如 tryLock 尝试加锁,lockInterruptibly()上锁的时候异常了、getQueueLength()获取队列长度、hasQueuedThreads() 是否有线程在队列里,某一个线程是否在队列里 等等
实现:
ReentrantLock CAS的实现。 synchronized 四种锁的实现,
无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态 ( 见 多线程与高并发编程一 )
AQS AbstractQueuedSynchronizer 类,volatile state 状态可重入锁
AQS 底层就是CAS+ volatile
volatile state 用来记录锁的状态,为0则没有上锁,否则上锁了。
ReentrantLock.lock() 会调用 acquire(1);
AbstractQueuedSynchronizer.acquire(1) 方法里面的if 会调用 addWaiter(Node.EXCLUSIVE)
static final Node EXCLUSIVE = null; 就是说 Node.EXCLUSIVE 这个值永远是空的
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先,tail 为空, 传进来的mode为空,所以pred为空,直接进入 enq(node) 方法
Node node = new Node(Thread.currentThread(), mode); node 信息里面的线程为当前线程,mode 为空!!!
enq:
for循环 t = tail , t 为空, compareAndSetHead(new Node()) , 方法如下
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
这里调用的是unsafe的compare方法,这个是系统底层的,不做深入研究,大家理解下下面的概念原理就好
compareAndSwapObject方法中的第一个参数和第二个参数,用于确定待操作对象在内存中的具体位置的,然后取出值和第三个参数进行比较,如果相等,则将内存中的值更新为第四个参数的值,同时返回true,表明原子更新操作完毕。反之则不更新内存中的值,同时返回false,表明原子操作失败
上面的方法基本上就是比较 head == null ,由于刚开始都是null的,很明显为true,将 new Node 赋值给 head。这时候,这个node.thread 已经是当前线程了。tail = head
接着进行for循环,这时候tail已经不为空了,进入else,继续将 tail 设置为当前node ,next 指向当前node。
这里添加第一个完成。
第二个线程来的时候,addWaiter 方法, pred != null 为true,进行 compareAndSetTail 将尾部设为第二个线程,next 指向第二个线程。
基本原理如下图

图可能有点错误,走下代码逻辑吧
线程 A , B ,C
线程A
private Node addWaiter(Node mode) {
//Node node = new Node(A, null);
Node node = new Node(Thread.currentThread(), mode);
//Node pred = null 因为刚进来 tail = null;
Node pred = tail;
// A 不执行这个判断
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
A进入enq node = A
private Node enq(final Node node) {
for (;;) {
//第一次 tail = null , 第二次 tail = 空 node
Node t = tail;
if (t == null) {
// 第一次进入这里
//head = 空node
if (compareAndSetHead(new Node()))
//tail = 空node
tail = head;
} else {
//第二次进入 //A的prev = 空 node
node.prev = t;
// 将 tail 设置为 A(node)
if (compareAndSetTail(t, node)) {
//空node 的next 设置为 Anode
t.next = node;
//返回一个空node
return t;
}
}
}
到这里 tail = A;不是null,head 为 空node,t.next = Anode; A.prev=空node
线程B
private Node addWaiter(Node mode) {
//Node node = new Node(B, null);
Node node = new Node(Thread.currentThread(), mode);
//Node pred = A;
Node pred = tail;
// tail = A ,走这里
if (pred != null) {
// B.prev = A
node.prev = pred;
// tail = B
if (compareAndSetTail(pred, node)) {
// A.next = B
pred.next = node;
//返回 B
return node;
}
}
enq(node);
return node;
}
到这里 tail = B;head 为 空node,A.next = Bnode; B.prev=A;
线程C
private Node addWaiter(Node mode) {
//Node node = new Node(C, null);
Node node = new Node(Thread.currentThread(), mode);
//Node pred = B;
Node pred = tail;
// tail = B ,走这里
if (pred != null) {
// C.prev =B
node.prev = pred;
// tail = C
if (compareAndSetTail(pred, node)) {
// B.next = C
pred.next = node;
//返回 C
return node;
}
}
enq(node);
return node;
}
最终 tail = C;head 为 空node,A.next = Bnode; B.prev=A;B.next = Cnode; C.prev=B;
最后发现 整个 head 一直都只是一个声明的 空 node
ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //尝试释放
Node h = head; //成功释放,取出 head 里面标志的线程
if (h != null && h.waitStatus != 0) // 有等待的线程并且等待状态不为0
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 获取到当前state的值 - 1
if (Thread.currentThread() != getExclusiveOwnerThread()) //判断释放锁的线程是否是当前持有锁的线程
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果state - 1 的值为0,则需要释放,如果不为0,说明还有该线程还有重入锁
free = true;
setExclusiveOwnerThread(null); //修改锁的拥有者为null
}
setState(c); //修改state的值
return free;
}
/***
* 唤醒
* @param node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒排队中的下一个(node.next)线程
}
unlock 方法源码基本如上。
先尝试释放当前锁,判断有没有重入锁,没有则进行释放。释放之后,唤醒下一个排队中的线程。
那么 AQS中的tail 是什么时候释放的呢,就是链表中的元素是什么时候被销毁的呢
在 加锁的时候 addWaiter 外层有个 acquireQueued 方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 休眠
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //释放 自己的node
}
}
VarHandle jdk1.9之后支持
例如: Object o = new Object(); 在内存中声明了一块内存,o 指向这个内存。然后有一个 VarHandle 也可以指向这个 o。
相当于内存是一个保险柜。x 是这个保险柜的所有者。 x 告诉你这个保险柜的具体地址、柜号、密码。然后你通过这些信息也可以找到这个保险柜。
以及,进行一些原子性操作。即打开保险柜之后,你可以修改里面内容,并且是安全的。