写在最前,本人也只是个大三的学生,如果你发现任何我写的不对的,请在评论中指出。
默认版本 JDK8
CAS
锁是分两种的,一种是悲观锁,另一种是乐观锁。
悲观锁
就是常说的锁,像 Lock、synchronized 这种。悲观锁总是认为要访问的共享资源是要发生冲突的,所以每次都会在访问的时候加上锁,以此保证每次访问共享资源的时候只有一个线程。
乐观锁
是纯粹的乐天派,它是认为要访问的共享资源是不会发生冲突的,无需加锁也无需等待,自然能够避免死锁。而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。
CAS概念理解
CAS全拼是比较并交换(Compare And Swap)。在CAS中,有这样三个值:
- V:要更新的变量 var
- E:期望的值 expected
- N:新值 new
整体过程如下: 首先判断 V 是否等于 E,如果等于,将 V 的值设置为 N;如果不等,说明其他线程更新了该值,则当前线程放弃更新,什么都不做。所以这里的 E 本质是一个旧值。
可以举例来说明:
- 假设一个线程想要将共享变量 i=5 更新为6
- 情况1是我们首先去判断i此时是等于5的,说明没有被其他线程所更改,那么就会把值设置为6,此次 CAS 成功,i的值等于6
- 情况2是发生i的值不是期望的5,而是其他什么值,那么就会判定此次的 CAS 失败,然后什么也不做,i 的值保持原状。
你可能会疑惑,那假如你判断 i 等于5之后,正要修改,然后被其他线程更新了值怎么办?
我刚开始也有这种疑惑,实际上 CAS 操作是原子性的,它毕竟是 native 方法,是 JVM 底层或 C++ 实现的,它是一种系统原语,是一条 CPU 原子指令,从 CPU 层面保证了它的原子性。
CAS实现技术——Unsafe
Unsafe 是 sun.misc 包下的,
它里面一些 native 是关于 CAS 的。Unsafe 中对 CAS 的实现是 C++ 写的,它的具体实现和操作系统、CPU 都有关系。
用到 CAS 技术的可以参照一下各种原子类,比如:
AtomicInteger、AtomicBoolean、AtomicLong等等......
这里就用 AtomicInteger 类的 getAndAdd( int delta ) 来说明:
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
// 可以看到 AtomicInteger 类实际上也是调用 Unsafe 类里的 getAndAddInt 方法来实现功能的
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// 获取旧值的操作
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
我们来一步步解析这段源码。首先,对象o是this,也就是一个AtomicInteger对象。然后offset是一个常量VALUE ➡
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value")。
因为CAS是无锁的,它允许操作失败,所以一般都是搭配while循环来进行的,这里采用的是do while循环,是保证循环体内的代码至少执行一次的。它这里声明了一个变量v, 如果你执行失败那就返回原值v, 如果执行成功那么就是 v + delta。
这里的 weakCompareAndSetInt 方法我并不是太清晰,所以采用百度上的说法:
简单来说,weakCompareAndSet操作仅保留了volatile自身变量的特性,而除去了happens-before规则带来的内存语义。也就是说,weakCompareAndSet无法保证处理操作目标的volatile变量外的其他变量的执行顺序( 编译器和处理器为了优化程序性能而对指令序列进行重新排序 ),同时也无法保证这些变量的可见性。这在一定程度上可以提高性能。
CAS不得不注意的问题
- ABA的问题: CAS是无法注意到: **假设一个共享变量从A到B又从B到A的变化的,**此时可以采用的操作就是加上版本号或者时间戳(AtomicStampedReference类可以解决)。
- 循环时间长: 因为CAS基本多与自旋配合,那么长时间不成功,会导致CPU资源浪费。推荐JVM支持处理器提供的pause指令。
- 只能保证一个共享变量是原子操作的(AtomicReference类可以解决)。
AQS
AQS是AbstractQueuedSynchronizer的简称,翻译过来就是
抽象队列同步器
,它也可以拆分来理解:
- 抽象: 提供了一种定义的方式,但是不包括具体的实现。
- 队列:使用了基于队列的方式去实现。
- 同步:实现了同步的功能。
AQS是一系列并发流程控制类的基石,比如:
CountDownLatch / CyclicBarrier / Semaphore / ReentrantLock ReentrantReadWriteLock 等等。。。
同步器设计思路
在具体分析之前,需要了解两种同步的方式,
独占模式和共享模式:
- 独占模式: 指的是当前资源仅供一个线程访问(ReentrantLock )
- 共享模式: 指的是同时可以被多个线程所获取, 具体的资源的个数可以通过参数指定(CountDownLatch 、CyclicBarrier )。
以下是AQS的设计方案:
- 设置一个拥有可见性的变量
,使用这个变量表示被获取的资源的数量int state = 0
- 线程在获取资源之前需要检查
的状态,如果为0,就设置为1,表示获取资源成功,如果不为0,则表示当前资源已经被占用,此时线程要阻塞并等待其他线程释放资源。state
- 为了能使得资源释放后找到那些被阻塞的线程,需要把这些线程放到一个FIFO的队列内。
- 当占用资源的线程释放资源后,就可以从队列里取出一个正在阻塞的线程并唤醒执行。
AQS的数据成员:
在AQS中被定义的成员变量:
-
作用就是第一条。state
state是被volatile修饰的变量, 保证了自身的可见性。 并且对它的操作都是
原子性的,state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
其中 compareAndSetState() 是借用了Unsafe的compareAndSwapInt() 方法实现的。
-
和head
,定义了上面第四条中的FIFO队列,head和tail分别指向队列的头部和尾部,并且这是一个双向队列。tail
- 定义
,队列中的每个元素都是Node
节点(相当重要)Node
static final class Node{
//标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;
//waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;
//waitStatus的值,表示该结点(对应的线程)在等待某一条件
static final int CONDITION = -2;
/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;
// 等待状态,取值范围,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的结点
//成员方法忽略,可以参考具体的源码
}
AQS主要方法
AQS的设计时基于模板方法模式的,它有一些方法必须要子类去实现的,主要有:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但是没有剩余资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false。
这些方法都是
protected
方法,但是它们并没有在AQS具体实现,而是直接抛出异常(这里不适用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要关注自己关心的抽象方法即可)。
AQS获取资源
获取资源的入口是
acquire(int arg)方法。
arg是要获取资源的个数,在独占模式下始终为1,可以浏览一下这个函数的代码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先调用tryAcquire(arg)尝试去获取资源。前面提到了这个方法是在子类具体实现的。
如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE)方法把这个线程插入到等待队列中。其中传入的参数代表要插入的Node是独占式的。这个方法的具体实现:
// 为当前线程和给定模式创建和划分节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 找到尾节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 将当前节点的前置节点置为尾节点
node.prev = pred;
// 使用CAS尝试 如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}
// final 不变性
// 自旋CAS将节点插入队列,必要时初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上面的两个函数好理解,就是在队列的尾部插入新的Node节点,但是需要注意的是会出现多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过CAS自旋的方式保证了操作的线程安全性。
现在可以去看看
acquireQueued
方法,该方法就是把处于等待队列的结点从头到尾一个一个获取:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点。
// 所以head所指的结点,就是当前获取到资源的那个结点或null。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS释放资源
释放资源相对于获取资源来说,会简单许多。在AQS中自会有一小段实现。源码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 如果状态是负数,尝试把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到头结点的后继结点head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于0
// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待队列中所有还有用的结点,都向前移动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,
if (s != null)
LockSupport.unpark(s.thread);
}