第五章-Java中的锁
前言:最近有点懒。
第五章是比较难的一章了,在前面的铺垫之下开始讲实现方式了。Java中锁的实现方法,当然抛开由JVM自带的synchronized关键字,本章节主要还是讲一些由API实现的锁,这样一来,Java多线程就有许多可玩性了。
5.1 Lock接口
特点
它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
- 可操作性 = 可中断 + 超时锁的处理 + 另外一些高级特性(也是第5章的内容)
书中 5-1中Lock的使用方式
Lock lock = new ReentrantLock();
lock.locak();
try{
}finally{
lock.unlock();
}
- 需要注意的一点是上锁需要在try外,不然会导致锁无顾释放(
之前会执行return
)。当然,放外面可能会引起锁无法释放如finally
public static void main(String[] args) {
Lock lock = new ReentrantLock();
lock.lock();
if(1 == 1)return;
try{
}finally {
System.out.println("解锁");
lock.unlock();
}
}
-
还需要注意的一点是一旦上锁之后 return 的代码需要放在try里。
你可能会问到底是放里还是放外好,个人觉得放外好,放里出现问题不好找。而且锁住总比释放安全吧。
Lock接口提供的Synchronized关键字不具备的主要特性
- 1 尝试非阻塞式的获取锁
Synchronized当线程获取锁时时,其它的线程将阻塞等待直到获取到锁为止。但是Lock接口则是在尝试获取锁失败之后将会有返回值,成功为true失败为false,这样我们就可以对返回结果进行处理,可以去完成其他的事情。
- void lock() 正常获取锁
- boolean tryLock() 非阻塞式获取锁,区别是否有返回值
- 2 能被中断地获取锁
使用Synchronized获取到的锁无法被中断,Lock可以,并且被中断之后会释放锁。
- void lockInterruptibly() throws InterruptedException() 提供可中断获取锁
- 3 超时获取锁
在尝试获取锁超过设定的时间之后会返回,而Synchronized 没有提供
- boolean tryLock(long time,TimeUnit) throws InterruptedException 提供超时获取锁(可中断)
5.2 队列同步器(AbstractQueuedSynchronizer)
是用来构建锁或其他同步器的框架,主要需要掌握的是构建的基本方式和内置的FIFO队列的工作原理。
-
1构建
我们在前面的章节中直到了Synchronized的工作原理是在同步代码块进入之前加入一个monitor标记,标记的值在进入时加1,在出同步代码块时减1。在队列同步器中提供了类似的原理。
- getState() 获取标记
- setState(int newState) 设置标记
- compareAndSetState(int expect,int update) 更新标记
而言,我们只需要Synchronized
和getState()
法方就可以了,但是同步器还提供了compareAndSetState()
方法,这样我们就有更多的方式去实现我们的需求。比如后面的示例代码要设计一个同一时刻只能由两个线程获取的锁setState()
上述的接口是操作同步状态的方法。由于同步器当设计是基于模版方法模式的,所以我们需要继承同步器,并重写指定的方法,之后在调用同步器的对外接口时,接口会再去调用我们重写的方法。也就是我们规定锁的行为是和调用分开的。
- 1 我们可以重写的方法,也就是编写锁规则的地方。这就代表这同步器会根据我们写的规则去判断锁的获取成功失败。
- protected boolean tryAcquire(int arg)
独占式获取同步状态,独占式也就是说一次只能有一个线程持有锁,所以返回状态也只有true和false
- protected boolean tryRelease(int arg)
独占式释放同步状态
- protected int tryAcquireShared(int arg)
共享式获取同步状态,与独占式不同的式,在这里只要返回值大于0时就代表获取锁成功,这就代表者同一时刻可以多个线程获取到锁。
- protected boolean tryReleaseShared(int arg)
共享式释放同步状态,注意这里是返回boolean类型
- protected boolean isHeldExclusively()
在独占模式下使用,判断是否被当前线程所独占。此方法仅在conditionobject方法内部 调用,因此如果不使用条件,则不需要定义此方法。
- protected boolean tryAcquire(int arg)
- 2 ok,重写完我们自定义的锁规则之后我们如何去调用呢。这里同步器会提供的模版方法,我们通过调用模版方法,模版方法会去调用我们的重写的方法。如果我们直接调用模版方法,且没有重写锁规则时会怎么样?
将会直接抛出异常。那为什么要设计成这样子呢,这说明在模版方法里同步器还会为我们做一些操作,如上面内容所说的同步器会内置一个FIFO队列来完成资源获取线程的排队工作。protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
- 独占式
- void acquire(int arg)
独占式获取同步状态,获取成功返回,失败则加入同步队列中等待(这个后面说),上面说的,这个方法会调用重写的tryAcquire(int arg)方法。
- void acquireInterruptibly(int arg)
独占式可中断,也就是添加了处理中断的方法
- boolean tryAcquireNanos(int arg,long namos)
独占式可限时,同样的添加了超时逻辑
- void acquire(int arg)
- 下面共享式同样道理
- void acquireShared(int arg)
- void acquireSharedInterruptibly(int arg)
- boolean tryAcquireSharedNanos(int arg,long namos)
- 同步器的操作
- boolean release(int arg) //独占式释放同步状态之后释放状态之后会唤醒头节点
- boolean releaseShared(int arg)
- Collection<Thread>getQueuedThreads()//获取等待在同步队列上的线程集合
- 独占式
现在有两个例子,一个是使用同步器构造独占式锁,一个是构造同步式锁(并且一次只能有两个线程获取到锁)
独占式锁
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
//继承锁接口
public class Mutex implements Lock {
private final Sync sync = new Sync();
public Mutex() throws InterruptedException {
}
//继承同步器接口
private static final class Sync extends AbstractQueuedSynchronizer {
//实现锁规则方法
@Override
public boolean tryAcquire(int reduceCount){
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
public boolean tryRelease(int returnCount){
if(getState() == 0)throw new IllegalArgumentException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively(){
return getState() == 1;
}
}
//模版方法 =》调用锁规则
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public Condition newCondition() {
return null;
}
}
共享锁(一次两个线程获取)
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
public TwinsLock() throws InterruptedException {
}
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) throws InterruptedException {
if(count <= 0){
throw new InterruptedException("count must large than zero");
}
setState(count);
}
@Override
public int tryAcquireShared(int reduceCount){
for(;;){
int current = (int) getState();
int newCount = current - reduceCount;
if(newCount < 0 || compareAndSetState(current,newCount)){
return newCount;
}
}
}
@Override
public boolean tryReleaseShared(int returnCount){
for(;;){
int current = (int) getState();
int newCount = current + returnCount;
if(compareAndSetState(current,newCount)){
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
队列同步器的实现与分析
上面讲的模版方法会控制线程到一个内置的FIFO队列,并且控制线程的同步。
- FIFO队列是一个双向队列,把尝试获取同步状态的线程构造成一个节点,加入到获取到同步状态的下一个节点中,加入节点时使用CAS方式确保构造队列的安全。当首节点(第一次获取到同步状态的节点)释放同步状态时,首节点将会唤醒后继节点,并且使其尝试获取同步状态。当后继节点获取到同步状态时候会将自己设置为首节点。循环此操作。
独占式同步状态获取与释放
获取
模版方法为acquire(int arg)
我们先看看节点状态
- CANCELLED: Thread已取消
- SIGNAL: Thread正等待被unpark(获取锁未成功进入队列等待的线程, 会被标记成SIGNAL, 线程也会通过LockSupport.park被挂起)
- CONDITION: Thread正在等待Condition, 在condition queue中
- PROPAGATE: 只可能头节点被设置该状态, 在releaseShared时需要被传播给后续节点.
public final void acquire(int arg) {
//尝试获取我们自己定义的锁规则 如果失败之后加入队列
//EXCLUSIVE 表示独占节点 同一时刻只能有一个线程获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//中断当前节点
}
在尝试获取同步状态失败之后,首先会执行
addWaiter(Node.EXCLUSIVE)
去构造一个节点
// 将当前线程包装成结点并添加到同步队列尾部
private Node addWaiter(Node mode) {
// 指定持有锁的模式
Node node = new Node(Thread.currentThread(), mode);
// 获取同步队列尾结点引用
Node pred = tail;
// 如果尾结点不为空, 表明同步队列已存在结点
if (pred != null) {
// 1.指向当前尾结点
node.prev = pred;
// 2.设置当前结点为尾结点
if (compareAndSetTail(pred, node)) {
// 3.将旧的尾结点的后继指向新的尾结点
pred.next = node;
return node;
}
}
// 否则表明同步队列还没有进行初始化
enq(node);
return node;
}
// 结点入队操作
private Node enq(final Node node) {
//死循环直到添加到队列
for (;;) {
// 获取同步队列尾结点引用
Node t = tail;
// 如果尾结点为空说明同步队列还没有初始化
if (t == null) {
// 初始化同步队列
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 1.指向当前尾结点
node.prev = t;
// 2.设置当前结点为尾结点
if (compareAndSetTail(t, node)) {
// 3.将旧的尾结点的后继指向新的尾结点
t.next = node;
return t;
}
}
}
}
我们在看看构建节点方法
acquireQueued
是怎么实现的
//传入的是一个节点和同步值,当然由上可知 节点是通过addWaiter(Node mode)来创建
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的上一个节点
final Node p = node.predecessor();
//如果p是首节点,就尝试获取同步状态
if (p == head && tryAcquire(arg)) {
//获取同步状态成功之后将自己设置成首节点
setHead(node);
//断开首节点
p.next = null; // help GC
//中断标志设为false
failed = false;
return interrupted;
}
//判断前一个节点状态是否为SIGNAL,如果是代表没有机会获取同步状态了(FIFO规则),return true
//并调用parkAndCheckInterrupt()中断该线程,当然了,这个过程不影响线程中断,在方法里会判断是否被中断,
//如果被中断将会设置中断状态为true。这样就会执行selfInterrupt()中断当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//中断状态为true
interrupted = true;
}
} finally {
// 在最后确保如果获取失败就取消获取
if (failed)
cancelAcquire(node);
}
}
整个for循环就只有一个出口,那就是等线程成功的获取到同步状态 / 锁之后才能出去,在没有获取到同步状态 / 锁之前就一直是挂在for循环的parkAndCheckInterrupt()方法里。线程被唤醒后也是从这个地方继续执行for循环。
释放
释放就很简单了,只要头部节点唤醒自己的后继节点并且修改同步状态即可
public final boolean release(int arg) {
// 释放状态
if (tryRelease(arg)) {
// 获取head结点
Node h = head;
// 如果head结点不为空并且等待状态不等于0就去唤醒后继结点
if (h != null && h.waitStatus != 0) {
// 唤醒后继结点
unparkSuccessor(h);
}
return true;
}
return false;
}
// 唤醒后继结点
private void unparkSuccessor(Node node) {
// 获取给定结点的等待状态
int ws = node.waitStatus;
// 将等待状态更新为0
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 获取给定结点的后继结点
Node s = node.next;
// 后继结点为空或者等待状态为取消状态
if (s == null || s.waitStatus > 0) {
s = null;
// 从后向前遍历队列找到第一个不是取消状态的结点
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
// 唤醒给定结点后面首个不是取消状态的结点
if (s != null) {
LockSupport.unpark(s.thread);
}
}
共享式同步状态获取与释放
与独占式获取的主要区别式在同一时刻是否可以允许多个线程同时获得同步状态
下面看看同步器式的共享式模版方法
//可以看出共享式获取同步状态只要式返回大于0就式获取同步状态成功
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
看看
doAcquireShared
private void doAcquireShared(int arg) {
//判断类型
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//返回前继节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//与独占的唯一区别就式获取锁式只要返回大于0就可以退出队列了
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放
释放和独占式的唯一区别式需要保证修改状态的安全,采用的式CAS
共享式同步状态获取与释放
独占式超时获取同步状态
模版方法为
doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
规定请求锁的请求时间是synchronized关键字所不具备的。
超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”。
代码如下
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//获取同步状态失败之后会进入自旋获取锁,并且判断超时时间
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//计算过期时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//之前代码与独占锁获取同样
//计算离过期时间还有多久
nanosTimeout = deadline - System.nanoTime();
//如果过期就直接返回
if (nanosTimeout <= 0L)
return false;
//第一步还是判断线程是否应该被挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//可以看到这里如果当距离超时时间1000时,将会无条件的自旋
nanosTimeout > spinForTimeoutThreshold)
//挂起线程还剩超时时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//中间这段时
ReentrantLock
和
ReentrantReadWriteLock
源码分析,后面在写
Condition接口
任意一个Java对象都有一组监视器方法
wait()
,
wait(long timeout)
,
notify()
,
notifyAll()
,这些方法与synchronizde关键字配合可以实现等待/通知模式。API实现方式是使用Condition配合Lock实现。