天天看点

Java并发编程系列---Lock接口和AQS初识一、 java中的锁二、队列同步器(AQS)

一、 java中的锁

1.1 什么是锁

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同 时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在 Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后, 并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized 关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过 synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

使用synchronized关键字将会隐式地获取锁,但是它将锁的获取和释放固化了,也就是先获取再释放。当然,这种方式简化了同步的管理,可是扩展性没有显示的锁获取和释放来的好。例如,针对一个场景,手把手进行锁获取和释放,先获得锁A,然后再获取锁 B,当锁B获得后,释放锁A同时获取锁C,当锁C获得后,再释放B同时获取锁D,以此类推。这种场景下,synchronized关键字就不那么容易实现了,而使用Lock却容易许多。

1.2 Lock的使用

Lock的使用也很简单,Lock的使用的方式如下。

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author : pengweiwei
 * @date : 2020/2/6 8:14 下午
 */
public class LockDemo {

    public static Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        //加锁
        lock.lock();
        try {
            System.out.println("do something...");
        } finally {
            //释放锁
            lock.unlock();
        }
    }


}
           

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。

注意:不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。

1.3 Lock接口提供的synchronized关键字所不具备的主要特性

特性 描述
尝试非阻塞地获取锁 当前线程尝试获取锁,如果这一-时 刻锁没有被其他线程获取到,则成功获取并持有锁
能被中断地获取锁 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回

1.4 Lock的API

方法 描述
void lock() 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回
void lockInterruptibly() 可中断地获取锁,和lock(方法的不同之处在于该方法会响应中断,即在锁的获取
boolean tryLock() 中可以中断当前线程尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false
boolean tryLock(longtime, TimeUnit unit) 超时的获取锁,当前线程在以下3种情况下会返回: ①当前线程在超时时间内获得了锁 ②当前线程在超时时间内被中断 ③超时时间结束
void unlock() 释放锁
Condition newCondition() 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁

二、队列同步器(AQS)

2.1 什么是AQS

队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量(state)表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用AQS提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状 态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件 (ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

2.2 AQS的接口与示例

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定 的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法, 而这些模板方法将会调用使用者重写的方法。

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状 态。

  • getState():获取当前同步状态。
  • setState(int newState):设置当前同步状态。
  • compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证 状态设置的原子性。

下表是AQS可重写的方法:

方法名称 描述
boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
boolean tryReleaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占

实现自定义同步组件时,将会调用AQS提供的模板方法。如下:

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(intarg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回
boolean tryAcquireNanos(int arg,long nanos) 在acquirelnterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true
void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进人同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 与acquireShared(int arg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanos) 在acquireSharedInterruptibly(int arg)基础上增加了超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection getQueuedThreads() 获取等待在同步队列上的线程集合

2.3 基于AQS实现一个独占锁

package com.example.demo.thread;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author : pengweiwei
 * @date : 2020/2/9 6:51 下午
 */
public class Mutex implements Lock {

    // 静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否处于占用状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 当状态为0的时候获取锁
        public boolean tryAcquire(int acquires) {
            //如果状态从 0 变成1 成功 则表示获取到了锁
            if (compareAndSetState(0, 1)) {
                //设置当前线程拥有独占访问权
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 释放锁,将状态设置为0
        protected boolean tryRelease(int releases) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

           

上述示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为 1),则代表获取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为 0。

测试:

package com.example.demo.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author : pengweiwei
 * @date : 2020/2/9 7:42 下午
 */
public class ThreadDemo3 {

    private Mutex mutex = new Mutex();
    private int i = 0;

    public void add() {
    	//加锁
        mutex.lock(); 
        try {
            i++;
        }finally {
        	//释放锁
            mutex.unlock();
        }

    }

    public int res() {
        return i;
    }

    public static void main(String[] args) throws InterruptedException {
        int threadSize = 1000;
        ThreadDemo3 t = new ThreadDemo3();
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < threadSize; i++) {
            executorService.execute(() -> {
                        t.add();
                    }
            );
            countDownLatch.countDown();
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(t.res());
    }

}

           

如果不加自己实现的锁,就会出现数据不一致的情况。