天天看点

Java基础-抽象队列同步器:AbstractQueuedSynchronizer(1)简介自定义同步器ReentrantLockCountDownLatchSemaphore

简介

AQS定义了一套多线程访问共享资源的同步器框架。

许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列。

多线程争用资源被阻塞时会进入此队列。

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

自定义同步器

不同的自定义同步器争用共享资源的方式也不同。

自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可;

至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

自定义同步器实现时主要实现以下几种方法:

isHeldExclusively:该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire:独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease:独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

ReentrantLock

state初始化为0,表示未锁定状态。

线程lock()时,会调用tryAcquire()独占该锁并将state+1。

其他线程再tryAcquire()时就会失败。

直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。

当然,释放锁之前,线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。

要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

CountDownLatch

任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。

这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。

等到所有子线程都执行完后(即state=0),会unpark()主调用线程。

然后主调用线程就会从await()函数返回,继续后余动作。

Semaphore

是用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。

Semaphore通过使用计数器来控制对共享资源的访问。

如果计数器大于0,则允许访问。 如果为0,则拒绝访问。

计数器所计数的是允许访问共享资源的许可。 因此,要访问资源,必须从信号量中授予线程许可。

信号量控制登录用户,示例代码:

// 登录用户信号量
static final int MAX_AVAILABLE = 10;
static final Semaphore LOGIN_SEMAPHORE = new Semaphore(MAX_AVAILABLE, false);

// 线程池初始化
static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void main(String[] args) {
    // 连续登录十个用户
    for(int i = 0; i < 10; i++) {
        final int num = i;
        executorService.execute(() -> {
            if(LOGIN_SEMAPHORE.tryAcquire()) {
                System.out.println("用户:" + num + " 登录成功");
            } else {
                System.out.println("用户:" + num + " 登录失败");
            }
        });
    }
    executorService.shutdown();
    
    // 第十一个用户登录
    if(LOGIN_SEMAPHORE.tryAcquire()) {
        System.out.println("登录成功");
    } else {
        System.out.println("登录用户已满,登录失败");
    }
    
    // 有用户退出
    LOGIN_SEMAPHORE.release();
    
    // 第十二个用户登录
    if(LOGIN_SEMAPHORE.tryAcquire()) {
        System.out.println("登录成功");
    } else {
        System.out.println("登录用户已满,登录失败");
    }
}