天天看點

java.util.concurrent包之ArrayBlockingQueue 阻塞隊列 學習筆記描述ArrayBlockingQueue 小結ArrayBlockingQueue 示例

描述

ArrayBlockingQueue 是一個有界的阻塞隊列,其内部實作是将對象放到一個 數組裡。有界也就意味着,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你 可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者注:因為它是基于數組實 現的,也就具有數組的特性:一旦初始化,大小就無法修改)。

ArrayBlockingQueue 類圖

java.util.concurrent包之ArrayBlockingQueue 阻塞隊列 學習筆記描述ArrayBlockingQueue 小結ArrayBlockingQueue 示例

如上圖 ArrayBlockingQueue 内部有個數組 items 用來存放隊列元素,putindex 下标标示入隊元素下标, takeIndex 是出隊下标,count 統計隊列元素個數,從定義可知道并沒有使用 volatile 修飾,這是因為通路這些變量使用都是在鎖塊内,并不存在可見性問題。另外有個獨占鎖 lock 用來對出入隊操作加鎖,這導緻同時隻有一個線程可以 通路入隊出隊,另外 notEmpty,notFull 條件變量用來進行出入隊的同步。 另外構造函數必須傳入隊列大小參數,是以為有界隊列,預設是 Lock 為非公平鎖。

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
/**
 * capacity 數組長度
 * fair 是否公平鎖 
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}
           

注:

所謂公平鎖:就是在并發環境中,每個線程在擷取鎖時會先檢視此鎖維護的等待隊列,如果為空,或者目前線 程線程是等待隊列的第一個,就占有鎖,否則就會加入到等待隊列中,以後會按照 FIFO 的規則從隊列中取到 自己。

非公平鎖:比較粗魯,上來就直接嘗試占有鎖,如果嘗試失敗,就再采用類似公平鎖那種方式

lock.newCondition 的作用可參考:http://www.importnew.com/30150.html

ArrayBlockingQueue 方法

offer 方法

在隊尾插入元素,如果隊列滿則傳回 false,否者入隊傳回 true。

public boolean offer(E e) {
    //e 為 null,則抛出 NullPointerException 異常
    checkNotNull(e);
    //擷取獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果隊列滿則傳回 false
        if (count == items.length)
            return false;
        else {
            //否則插入元素
            insert(e);
            return true;
        }
    } finally {
        //釋放鎖
        lock.unlock();
    }
}
private void insert(E x) {
    //元素入隊
    items[putIndex] = x;
    //計算下一個元素應該存放的下标
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}
//循環隊列,計算下标
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}
           

這裡由于在操作共享變量前加了鎖,是以不存在記憶體不可見問題,加過鎖後擷取的共享變量都是從主記憶體擷取的, 而不是在 CPU 緩存或者寄存器裡面的值,釋放鎖後修改的共享變量值會重新整理會主記憶體中。 另外這個隊列是使用循環數組實作,是以計算下一個元素存放下标時候有些特殊。

另外 insert 後調用 notEmpty.signal();是為了激活調用 notEmpty.await()阻塞後放入 notEmpty 條件隊列中的線程。

Put 操作

在隊列尾部添加元素,如果隊列滿則等待隊列有空位置插入後傳回。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //擷取可被中斷鎖
    lock.lockInterruptibly();
    try {
        //如果隊列滿,則把目前線程放入 notFull 管理的條件隊列
        while (count == items.length){
            //調用Condition的await()方法會使線程進入notFull等待隊列,并釋放鎖,線程狀态變為等待狀态。
            //喚醒同步隊列中的後繼節點,該線程中斷,跳入lockInterruptibly
            notFull.await();
        }
        //插入元素
        insert(e);
    } finally {
        lock.unlock();
    }
}
           

需要注意的是如果隊列滿了那麼目前線程會阻塞,知道出隊操作調用了 notFull.signal 方法激活該線程。代碼邏 輯很簡單,但是這裡需要思考一個問題為啥調用 lockInterruptibly 方法而不是 Lock 方法。我的了解是因為調用了條件變量的 await()方法,而 await()方法會在中斷标志設定後抛出 InterruptedException 異常後退出,是以還不如在加 鎖時候先看中斷标志是不是被設定了,如果設定了直接抛出 InterruptedException 異常,就不用再去擷取鎖了。然後 看了其他并發類裡面凡是調用了 await 的方法擷取鎖時候都是使用的 lockInterruptibly 方法而不是 Lock 也驗證了這 個想法。

對lockInterruptibly不了解的孩童,可以閱讀這篇文章:https://blog.csdn.net/qq_31803503/article/details/87888427

對Condition不了解的孩童,可查閱:https://mp.csdn.net/postedit/87876461

Poll 操作

從隊頭擷取并移除元素,隊列為空,則傳回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //目前隊列為空則傳回 null,否者
        return (count == 0) ? null : extract();
    } finally {
        lock.unlock();
    }
}
private E extract() {
    final Object[] items = this.items;
    //擷取元素值
    E x = this.<E>cast(items[takeIndex]);
    //數組中值值為 null;
    items[takeIndex] = null;
    //隊頭指針計算,隊列元素個數減一
    takeIndex = inc(takeIndex);
    --count;
    //發送信号激活 notFull 條件隊列裡面的線程
    notFull.signal();
    return x;
}
           

Take 操作

從隊頭擷取元素,如果隊列為空則阻塞直到隊列有元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //隊列為空,則等待,直到隊列有元素
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}
           

需要注意的是如果隊列為空,目前線程會被挂起放到 notEmpty 的條件隊列裡面,直到入隊操作執行調用 notEmpty.signal 後目前線程才會被激活,await 才會傳回。

Peek 操作

傳回隊列頭元素但不移除該元素,隊列為空,傳回 null。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //隊列為空傳回 null,否者傳回頭元素
        return (count == 0) ? null : itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}
final E itemAt(int i) {
    return this.<E>cast(items[i]);
}
           

Size 操作

擷取隊列元素個數,非常精确因為計算 size 時候加了獨占鎖,其他線程不能入隊或者出隊或者删除元素。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}
           

ArrayBlockingQueue 小結

ArrayBlockingQueue 通過使用全局獨占鎖實作同時隻能有一個線程進行入隊或者出隊操作,這個鎖的粒度比較 大,有點類似在方法上添加 synchronized 的意味。其中 offer,poll 操作通過簡單的加鎖進行入隊出隊操作,而 put,take 則使用了條件變量實作如果隊列滿則等待,如果隊列空則等待,然後分别在出隊和入隊操作中發送信号激活等待線程 實作同步。另外相比 LinkedBlockingQueue,ArrayBlockingQueue 的 size 操作的結果是精确的,因為計算前加了 全局鎖。

ArrayBlockingQueue 示例

需求:在多線程操作下,一個數組中最多隻能存入 3 個元素。多放入不可以存入數組,或等待某線程對數組中某個元素取走才能放入,要求使用 java 的多線程來實作

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 需求:在多線程操作下,一個數組中最多隻能存入 3 個元素。多放入不可以存入數組,或等待某線程對數組中某個元素取走才能放入,要求使用 java 的多線程來實作。
 *
 * @author: JC.Lin
 * @data: 2019/2/22 13:06
 */
public class BlockingQueueTest {

    public static void main(String[] args) {
        final BlockingQueue queue = new ArrayBlockingQueue(3);
        for (int i = 0; i < 2; i++) {
            new Thread() {
                public void run() {
                    while (true) {
                        try {
                            Thread.sleep((long) (Math.random() * 1000));
                            System.out.println(Thread.currentThread().getName() + "準備放資料!");
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + "已經放了資料," + "隊列目前有" + queue.size() + "個資料");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
        new Thread() {
            public void run() {
                while (true) {
                    try {
                        //将此處的睡眠時間分别改為 100 和 1000,觀察運作結果
                        Thread.sleep(100);
                        System.out.println(Thread.currentThread().getName() + "準備取資料!");
                        System.err.println(queue.take());
                        System.out.println(Thread.currentThread().getName() + "已經取走資料," + "隊列目前有" + queue.size() + "個資料");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }


}
           

輸出結果:

Thread-2準備取資料!
1
Thread-0準備放資料!
Thread-0已經放了資料,隊列目前有1個資料
Thread-2已經取走資料,隊列目前有0個資料
Thread-2準備取資料!
Thread-1準備放資料!
Thread-2已經取走資料,隊列目前有0個資料
Thread-1已經放了資料,隊列目前有0個資料
1
Thread-0準備放資料!
Thread-0已經放了資料,隊列目前有1個資料
1
Thread-2準備取資料!
Thread-2已經取走資料,隊列目前有0個資料
Thread-0準備放資料!
Thread-0已經放了資料,隊列目前有1個資料
Thread-2準備取資料!
1
Thread-2已經取走資料,隊列目前有0個資料
           

繼續閱讀