天天看點

Java篇 - 鎖機制與應用場景全集3 (阻塞隊列, 死鎖, CountdownLatch, CyclicBarrier)

今天更新java中的鎖機制:第三章(大綱9-12)。

終于要把java鎖這塊收尾了,不容易。下個博文的主題是:Java并發容器類源碼分析和性能對比,這章我本來想放在鎖機制裡的,但是感覺并發容器可以單獨抽出一篇,主要分析源碼及性能對比。

大綱:

  • 1. 并發的特性
  • 2. 鎖的分類
  • 3. synchronized
  • 4. volatile
  • 5. Lock
  • 6. ThreadLocal
  • 7. Atmoic
  • 8. Semaphore
  • 9. 阻塞隊列
  • 10. 死鎖
  • 11. CountdownLatch
  • 12.CyclicBarrier

9. 阻塞隊列

java中提供了很多非阻塞隊列,比如PriorityQueue、LinkedList(LinkedList是雙向連結清單,它實作了Dequeue接口)。

使用非阻塞隊列有一個問題,就是它不會對目前線程造成阻塞。但是我們前面講的生産者-消費者模型,如果使用非阻塞隊列,就需要額外的實作同步政策和線程間喚醒政策。如果使用阻塞隊列,它會對目前線程産生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素後,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒),這樣提供了極大的友善性。

  • 9.1 幾種主要的阻塞隊列

從JDK1.5之後,java就提供了幾種阻塞隊列,位于java.util.concurrent包。

ArrayBlockingQueue:基于數組實作的一個阻塞隊列,在建立ArrayBlockingQueue對象時必須制定容量大小。并且可以指定公平性與非公平性,預設情況下為非公平的,即不保證等待時間最長的隊列最優先能夠通路隊列。

LinkedBlockingQueue:基于連結清單實作的一個阻塞隊列,在建立LinkedBlockingQueue對象時如果不指定容量大小,則預設大小為Integer.MAX_VALUE。

PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信号标志),前面2種都是有界隊列。

DelayQueue:基于PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素隻有當其指定的延遲時間到了,才能夠從隊列中擷取到該元素。DelayQueue也是一個無界隊列,是以往隊列中插入資料的操作(生産者)永遠不會被阻塞,而隻有擷取資料的操作(消費者)才會被阻塞。

  • 9.2 非阻塞隊列和阻塞隊列方法對比

非阻塞隊列中的幾個主要方法:

add(E e): 将元素e插入到隊列末尾,如果插入成功,則傳回true,如果插入失敗(即隊列已滿),則會抛出異常;

remove():移除隊首元素,若移除成功,則傳回true,如果移除失敗(隊列為空),則會抛出異常;

offer(E e):将元素e插入到隊列末尾,如果插入成功,則傳回true,如果插入失敗(即隊列已滿),則傳回false;

poll():移除并擷取隊首元素,若成功,則傳回隊首元素,并移除隊首,否則傳回null;

peek():擷取隊首元素,若成功,則傳回隊首元素,否則傳回null;
           

對于非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過傳回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果,而且需要自己處理異常。注意,非阻塞隊列中的方法都沒有進行同步措施。

阻塞隊列中的幾個主要方法:

put(E e): 用來向隊尾存入元素,如果隊列滿,則等待;

take(): 用來從隊首取元素,如果隊列為空,則等待;

offer(E e): 用來向隊尾存入元素,如果隊列滿傳回false,否則傳回true;

offer(E e,long timeout, TimeUnit unit): 用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則傳回false,否則傳回true;

poll(): 用來從隊首取元素,如果隊列空則傳回null,否則傳回取得的元素,并移除隊首;

poll(long timeout, TimeUnit unit): 用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則傳回null,否則傳回取得的元素,并移除隊首;

peek(): 用來從隊首取元素,如果隊列為空,則傳回null,否則傳回取得的元素;
           
  • 9.3 阻塞隊列的實作原理

以ArrayBlockingQueue為例:

/** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
           

上面是ArrayBlockingQueue的構造器,可以看到使用了重入鎖ReentrantLock以及Condition實作同步政策和線程間喚醒政策。

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
   }

   public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

   public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
           

add(E e)方法:内部調用了offer方法,如果隊列已滿則抛出異常。

offer(E e)方法: 插入隊尾的時候加鎖,如果隊列已滿傳回false,否則插入并傳回true。

offer(E e, long timeout, TimeUnit unit): 插入隊尾,先加鎖lockInterruptibly,如果在等待鎖的過程中,可以調用interrupt()中斷其等待。如果隊列已滿,則notFull.awaitNanos(nanos),如果在這期間隊列有位置,則插入并傳回true,否則傳回false。

put(E e): 插入隊尾的時候加鎖,如果隊列已滿,則等待notFull.await(),直到隊列有空餘位置。

poll(): 從隊首取元素時加鎖,如果隊列為空,傳回null,否則傳回并移除隊首元素。

poll(long timeout, TimeUnit unit) : 從隊首取元素時加鎖,如果隊列為空,則等待notEmpty.awaitNanos(nanos),如果這期間隊列有元素則傳回并移除隊首元素,否則傳回null。

take(): 從隊首取元素時加鎖,如果隊列為空,則等待notEmpty.await(),直到隊列中有元素,移除隊首并傳回。

peek(): 從隊尾取元素時加鎖,如果隊列為空傳回null,否則傳回隊首元素。

  • 9.4 阻塞隊列使用例子
private static final int QUEUE_SIZE = 12;

    private static final class Consumer extends Thread {

        private final BlockingQueue<Integer> queue;

        Consumer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    queue.take();
                    System.out.println("從隊列取走一個元素,隊列剩餘" + queue.size() + "個元素");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static final class Producer extends Thread {

        private final BlockingQueue<Integer> queue;

        Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    queue.put(1);
                    System.out.println("向隊列取中插入一個元素,隊列剩餘空間:" + (QUEUE_SIZE - queue.size()));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        producer.start();
        consumer.start();
    }
           

執行輸出:

向隊列取中插入一個元素,隊列剩餘空間:12

從隊列取走一個元素,隊列剩餘0個元素

向隊列取中插入一個元素,隊列剩餘空間:11

從隊列取走一個元素,隊列剩餘0個元素

向隊列取中插入一個元素,隊列剩餘空間:11

從隊列取走一個元素,隊列剩餘0個元素

向隊列取中插入一個元素,隊列剩餘空間:11

從隊列取走一個元素,隊列剩餘0個元素

從隊列取走一個元素,隊列剩餘0個元素

向隊列取中插入一個元素,隊列剩餘空間:12

向隊列取中插入一個元素,隊列剩餘空間:12

從隊列取走一個元素,隊列剩餘0個元素

向隊列取中插入一個元素,隊列剩餘空間:11

從隊列取走一個元素,隊列剩餘0個元素

向隊列取中插入一個元素,隊列剩餘空間:11

從隊列取走一個元素,隊列剩餘0個元素

可以看到,生産者消費者模型能夠有效的運作,而編碼中不需要考慮同步和線程間喚醒問題。

阻塞隊列的使用場景常常是在生産者和消費者模型中,關于什麼是生産者和消費者模型,可以看這篇文章:https://blog.csdn.net/snow_5288/article/details/72794306

10. 死鎖

前面我們也提到過死鎖,在進行并發程式設計時,如果使用不當,很容易造成死鎖,死鎖便會導緻這些線程處于等待狀态,無法繼續執行。當線程進入對象的synchronized代碼塊時,便占有了資源,直到它退出該代碼塊或者調用wait方法,才釋放資源,在此期間,其他線程将不能進入該代碼塊。當線程互相持有對方所需要的資源時,會互相等待對方釋放資源,如果線程都不主動釋放所占有的資源,将産生死鎖。當然重入的那種情況除外,在鎖的第一篇中有講解。

  • 10.1 死鎖産生的條件

1. 互斥條件:指程序對所配置設定到的資源進行排它性使用,即在一段時間内某資源隻由一個程序占用。如果此時還有其它程序請求資源,則請求者隻能等待,直至占有資源的程序用畢釋放。

2. 請求和保持條件:一個程序因請求被占用資源而發生阻塞時,對已獲得的資源保持不放。 

3. 不剝奪條件:任何一個資源在沒被該程序釋放之前,任何其他程序都無法對他剝奪占用。

4.循環等待條件:當發生死鎖時,所等待的程序必定會形成一個環路(類似于死循環),造成永久阻塞。

  • 10.2 死鎖的例子

面試時,經常會有面試官問,手寫一個死鎖代碼。其實不用怕,了解後,死鎖也就不難了。

private static class SimpleDeadLock {

        private static class A extends Thread {

            private final Object lock1;
            private final Object lock2;

            A(Object lock1, Object lock2) {
                this.lock1 = lock1;
                this.lock2 = lock2;
            }

            @Override
            public void run() {
                // A先拿到了lock1鎖
                synchronized (lock1) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 休眠1秒後,嘗試拿lock2鎖,但是lock2鎖被B持有無法釋放,而B在等待A釋放lock1鎖才能繼續走
                    synchronized (lock2) {
                        System.out.println("Hello A");
                    }
                }
            }
        }

        private static class B extends Thread {

            private final Object lock1;
            private final Object lock2;

            B(Object lock1, Object lock2) {
                this.lock1 = lock1;
                this.lock2 = lock2;
            }

            @Override
            public void run() {
                // B先拿到了lock2鎖
                synchronized (lock2) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 休眠1秒後,嘗試拿lock1鎖
                    synchronized (lock1) {
                        System.out.println("Hello B");
                    }
                }
            }
        }

        static void test() {
            final Object lock1 = new Object();
            final Object lock2 = new Object();
            A a = new A(lock1, lock2);
            a.start();
            B b = new B(lock1, lock2);
            b.start();
        }
    }
           
  • 10.3 如何避免死鎖

1. 避免一個線程同時擷取多個鎖

2. 避免一個線程在鎖内同時占用多個資源,盡量保證每個鎖隻占用一個資源

3. 嘗試使用定時鎖,使用lock.tryLock來代替使用内置鎖。

11. CountdownLatch

CountDownLatch是在java1.5被引入的,跟它一起被引入的并發工具類還有CyclicBarrier、Semaphore(上一章有講)、ConcurrentHashMap和BlockingQueue,它們都存在于java.util.concurrent包下。CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作後再執行。例如,應用程式的主線程希望在負責啟動架構服務的線程已經啟動所有的架構服務之後再執行。

CountDownLatch是通過一個計數器來實作的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢複執行任務。

  • 11.1 先看一個例子
private static class CountdownLatchTest {

        private final static int THREAD_NUM = 10;

        private static class CountdownLatchTask implements Runnable {

            private final CountDownLatch countDownLatch;
            private final String threadName;

            CountdownLatchTask(CountDownLatch countDownLatch, String threadName) {
                this.countDownLatch = countDownLatch;
                this.threadName = threadName;
            }

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadName + " 執行完畢");
                countDownLatch.countDown();
            }
        }

        static void test() {
            CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
            ExecutorService exec = Executors.newCachedThreadPool();
            for (int i = 0; i < THREAD_NUM; i++) {
                exec.execute(new CountdownLatchTask(countDownLatch, "thread - " + i));
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("大家都執行完成了,做總結性工作");
            exec.shutdown();
        }
    }
           

執行輸出:

thread - 4 執行完畢

thread - 0 執行完畢

thread - 3 執行完畢

thread - 1 執行完畢

thread - 2 執行完畢

thread - 5 執行完畢

thread - 6 執行完畢

thread - 7 執行完畢

thread - 9 執行完畢

thread - 8 執行完畢

大家都執行完成了,做總結性工作

  • 11.2 CountdownLatch源碼分析
public class CountDownLatch {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}
           

當調用了countDownLatch.await()方法後,目前線程就會進入了一個死循環當中,在這個死循環裡面,會不斷的進行判斷,通過調用tryAcquireShared方法,不斷判斷我們上面說的那個計數器,看看它的值是否為0了(為0的時候,其實就是我們調用了足夠多次數的countDownLatch.countDown()方法的時候),如果是為0的話,tryAcquireShared就會傳回1,然後跳出了循環,也就不再"阻塞"目前線程了。需要注意的是,說是在不停的循環,其實也并非在不停的執行for循環裡面的内容,因為在後面調用parkAndCheckInterrupt()方法時,在這個方法裡面是會調用 LockSupport.park(this) 來禁用目前線程。

  • 11.3 CountdownLatch使用場景

1. 實作最大的并行性:有時我們想同時啟動多個線程,實作最大程度的并行性。例如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,并讓所有線程都在這個鎖上等待,那麼我們可以很輕松地完成測試。我們隻需調用 一次countDown()方法就可以讓所有的等待線程同時恢複執行。

2. 開始執行前等待n個線程完成各自任務:例如應用程式啟動類要確定在處理使用者請求前,所有N個外部系統已經啟動和運作了。

3. 死鎖檢測:一個非常友善的使用場景是,你可以使用n個線程通路共享資源,在每次測試階段的線程數目是不同的,并嘗試産生死鎖。

12.CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞。

  • 12.1 CyclicBarrier例子
private final static int THREAD_NUM = 10;

    private static class CountdownLatchTask implements Runnable {

        private final CyclicBarrier lock;
        private final String threadName;

        CountdownLatchTask(CyclicBarrier lock, String threadName) {
            this.lock = lock;
            this.threadName = threadName;
        }

        @Override
        public void run() {
            System.out.println(threadName + " 準備完成");
            try {
                lock.await();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadName + " 執行完成");
        }
    }

    public static void main(String[] args) {
        CyclicBarrier lock = new CyclicBarrier(THREAD_NUM, () -> {
            System.out.println("大家都準備完成了");
        });
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < THREAD_NUM; i++) {
            exec.submit(new CountdownLatchTask(lock, "Thread-" + i));
        }
        exec.shutdown();
    }
           

執行輸出:

Thread-0 準備完成

Thread-3 準備完成

Thread-2 準備完成

Thread-1 準備完成

Thread-5 準備完成

Thread-4 準備完成

Thread-6 準備完成

Thread-7 準備完成

Thread-8 準備完成

Thread-9 準備完成

大家都準備完成了

Thread-9 執行完成

Thread-0 執行完成

Thread-2 執行完成

Thread-5 執行完成

Thread-8 執行完成

Thread-1 執行完成

Thread-3 執行完成

Thread-7 執行完成

Thread-4 執行完成

Thread-6 執行完成

 先讓所有線程準備,互相等待,調用cyclicBarrier.await()實作,直到到達某個公共屏障點,然後再一起執行。

  • 12.2 CyclicBarrier使用場景

CyclicBarrier可以用于多線程計算資料,最後合并計算結果的應用場景。比如我們用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多線程處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

  • 12.3 CyclicBarrier與CountdownLatch

看了各種資料和書,大家一緻的意見都是CountDownLatch是計數器,隻能使用一次,而CyclicBarrier的計數器提供reset功能,可以多次使用。但是我不那麼認為它們之間的差別僅僅就是這麼簡單的一點。我們來從jdk作者設計的目的來看,javadoc是這麼描述它們的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

從javadoc的描述可以得出:

  • CountDownLatch:一個或者多個線程,等待其他多個線程完成某件事情之後才能執行;
  • CyclicBarrier:多個線程互相等待,直到到達同一個同步點,再繼續一起執行。

對于CountDownLatch來說,重點是"一個或者多個線程等待",而其他的N個線程在完成"某件事情"之後,可以終止,也可以等待。而對于CyclicBarrier,重點是多個線程,在任意一個線程沒有完成,所有的線程都必須等待。CountDownLatch是計數器,線程完成一個記錄一個,隻不過計數不是遞增而是遞減,而CyclicBarrier更像是一個閥門,需要所有線程都到達,閥門才能打開,然後繼續執行。