天天看點

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

文章目錄

      • 一、ArrayBlockingQueue
        • 1、類圖
        • 2、ArrayBlockingQueue 原理
          • (1) offer 操作
          • (2)put 操作
          • (3)poll 操作
          • (4)take 操作
          • (5)peek 操作
          • (6)size 操作
      • 二、PriorityBlockingQueue
        • 1、類圖
        • 2、PriorityBlockingQueue 原理
          • (1) offer 操作
            • 擴容邏輯
            • 建堆算法
          • (2) poll 操作
          • (3) put 操作
          • (4) take 操作
          • (5) size 操作
      • 三、 DelayQueue
        • 1、 類圖
        • 2、DelayQueue 原理
          • (1) offer 操作
          • (2) take 操作
          • (3) poll 操作
          • (4) size 操作

一、ArrayBlockingQueue

1、類圖

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    可以看到,ArrayBlockingQueue 内部有一個數組 items,用來存放隊列元素,putIndex 變量表示入隊元素下标,takeIndex 表示出隊元素下标,count 統計隊列元素個數。這些變量并沒有使用 volatile 修飾,因為通路它們都是在鎖塊裡,加鎖已經保證了記憶體可見性。另外還有個獨占鎖 lock 用來保證入隊、出隊的原子性,還保證了 同時隻有一個線程 可以入隊、出隊操作,另外,notEmpty、notFull 條件變量用來進行入隊、出隊的同步。

    ArrayBlockingQueue 是 有界隊列 ,是以構造方法必須傳入隊列大小參數,源碼:

public ArrayBlockingQueue(int capacity) {
		// (一)
        this(capacity, false);
    }
           

(一):

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
           

2、ArrayBlockingQueue 原理

(1) offer 操作

    向隊列尾部插入一個元素,如果隊列有空閑空間,則插入成功後傳回 true,如果隊列已滿,則丢棄目前元素然後傳回 false,如果 e 元素為 null 則抛出 NullPointerException 異常,該方法是不阻塞的。

源碼:

public boolean offer(E e) {
   		// e 為 null ,則抛出 NullPointerException 異常
        checkNotNull(e);

		// 擷取獨占鎖
        final ReentrantLock lock = this.lock;
        
        lock.lock();
        try {
        	// 如果隊列滿則傳回 false
            if (count == items.length)
                return false;
            else {
            	// (一)否則 插入元素
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
           

(一):

private void enqueue(E x) {       
        final Object[] items = this.items;
        items[putIndex] = x;

		// 計算下标
        if (++putIndex == items.length)
            putIndex = 0;

		// 遞增元素個數計數器
        count++;

		// 激活 notEmpty 條件隊列中因為調用 take ,但隊空 而被阻塞的線程
        notEmpty.signal();
    }
           
(2)put 操作

    向隊列尾部插入一個元素,如果隊列有空閑空間,則插入成功後傳回 true,如果隊列已滿,則阻塞目前線程 直到隊列有空閑并插入成功後傳回 true。如果阻塞時被其他線程設定了中斷标志,則 被阻塞的線程會抛出 InterruptedException 異常而傳回。另外,如果 e 為 null 則抛出NullPointerException 異常。

源碼:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    
    // 擷取鎖,可被中斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    
    	// 如果隊列滿,則把目前線程放入 notFull 管理的條件隊列,while 循環避免虛假喚醒
        while (count == items.length)
            notFull.await();

		// 插入元素
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
           
(3)poll 操作

    從隊列頭部擷取并移除一個元素,如果隊列為空,則傳回 null,該方法是不阻塞的。

源碼:

public E poll() {
  		// 擷取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	// 目前隊列為空則傳回 null,    (一)否則調用 dequeue
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
           

(一):

private E dequeue() {
       
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
	
		// 擷取元素值
        E x = (E) items[takeIndex];
        
		// 設定隊頭元素為 null
        items[takeIndex] = null;
        
        if (++takeIndex == items.length)
            takeIndex = 0;
            
        // 隊列元素個數減 1    
        count--;
        if (itrs != null)
            itrs.elementDequeued();
            
		// 激活 notFull 條件隊列中的一個線程
        notFull.signal();
        return x;
    }
           
(4)take 操作

    擷取目前隊列頭部元素并從隊列中移除它。如果隊列為空,則阻塞目前線程,直到隊列不為空,然後傳回元素;如果在阻塞時 被其他線程設定了中斷标志,則被阻塞線程會抛出 InterruptedException 異常而傳回。

源碼:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
            
            	// (一)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
           

    可以看到,take 操作的代碼比較簡單,比 poll 相比隻是 (一)處不同,在這裡,如果隊列為空 ,則把目前線程挂起後放入 notEmpty 的條件隊列,等其他線程調用

notEmpty.signal()

方法後在傳回。

(5)peek 操作

    擷取隊列頭部元素 但是不從隊列中移除它,如果隊列為空 則 傳回 null,該方法是不阻塞的。

源碼:

public E peek() {
 		// 擷取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {   // (一)
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
           

(一):

/**
     * Returns item at index i.
     */
    @SuppressWarnings("unchecked")
    final E itemAt(int i) {
        return (E) items[i];
    }
           
(6)size 操作

    計算目前隊列元素個數。

源碼:

public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
           

    可以看到,size() 方法比較簡單,擷取鎖後直接傳回 count,并在傳回前釋放鎖。❓為什麼這裡擷取 count 值需要加鎖呢 ❓ 因為擷取鎖的語義之一是,擷取鎖後的變量都要從主記憶體中擷取,這樣保證了記憶體的可見性。

🎭總結:

    ✨ ArrayBlockingQueue 通過使用全局獨占鎖實作了同時隻能有一個線程進行入隊 或者 出隊操作,這個鎖的粒度比較大,有點類似在方法上添加 synchronized 的意思。其中 offer 和 poll 操作通過簡單的加鎖進行入隊、出隊操作,而 put、take 則使用條件變量,如果隊列滿則等待,如果隊列空也等待,然後分别在出隊和入隊操作中發送信号激活等待線程 實作同步。另外,相比 LinkedBlockingQueue ,ArrayBlockingQueue 的 size 操作的結果是精确的,因為計算前加了全局鎖。✨

二、PriorityBlockingQueue

1、類圖

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    可以看到,PriorityBlockingQueue 内部有一個數組 queue,用來存放隊列元素,size 用來存放隊列元素個數 ,allocationSpinLock 是個自旋鎖,使用 CAS 操作來保證同時隻有一個線程可以擴容隊列,狀态為 0 【目前沒有進行擴容】或 1【目前真正擴容】。

    PriorityBlockingQueue是一個 帶優先級的無界阻塞隊列 ,每次出隊都傳回優先級最高 或者 最低的元素,是以有一個比較器 comparator 用來比較元素大小,預設使用對象的 compareTo 方法提供比較規則,如果使用者想要自定義比較規則可以自定義 comparators 。

    notEmpty 條件變量用來實作 take 方法阻塞模式,沒有 notFull 是因為這裡的 put 是非阻塞的,這是無界隊列。

    PriorityBlockingQueue 内部是使用平衡二叉樹實作的,是以直接周遊隊列元素不保證有序。

構造方法源碼:

public PriorityQueue() {
 	 // (一) (二)
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
           

(一):

public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        // Note: This restriction of at least one is not actually needed,
        // but continues for 1.5 compatibility
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }
           

(二):

    可以看到,預設隊列容量是 11,預設比較器是 null,也就是使用元素的 compareTo 方法進行比較來确定元素的優先級,這意味着元素必須實作了 Comparable 接口。

2、PriorityBlockingQueue 原理

(1) offer 操作

     在隊列中插入一個元素,由于是無界隊列,是以一直傳回 true。

源碼:

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();

		// 擷取獨占鎖
        final ReentrantLock lock = this.lock;
        
        lock.lock();
        int n, cap;
        Object[] array;

		// 如果目前元素 >= 隊列容量,則擴容
        while ((n = size) >= (cap = (array = queue).length))
        	// (一) 擴容
            tryGrow(array, cap);
            
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
            	
            	// (二) 建堆
                siftUpComparable(n, e, array);
            else
            	// 自定義比較器
                siftUpUsingComparator(n, e, array, cmp);

			// 隊列元素個數加 1 
            size = n + 1;

			// 激活 notEmpty 的條件隊列中的一個阻塞線程
            notEmpty.signal();
        } finally {

			// 釋放獨占鎖
            lock.unlock();
        }
        return true;
    }

           

擴容邏輯

(一):

private void tryGrow(Object[] array, int oldCap) {
  		// 釋放鎖
        lock.unlock(); // must release and then re-acquire main lock
        
        Object[] newArray = null;
        
        // CAS 成功則擴容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }

		// 第一個線程 CAS 成功後,第二個線程會進入這段代碼
        if (newArray == null) // back off if another thread is allocating
        
			// 第二個線程讓出 CPU,盡量讓第一個線程擷取鎖,但是這得不到保證
            Thread.yield();

		// 擷取鎖
        lock.lock();
   
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
           

    可以看到,擴容前先釋放鎖,❓ 為什麼要擴容前先釋放了鎖,然後使用 CAS 控制 隻有一個線程可以擴容成功 ❓其實這裡不先釋放掉鎖,也是可行的,也就是在整個擴容期間一直持有鎖,但是擴容是需要花時間的,如果擴容時還占用鎖,那麼其他線程這時是不能進行入隊 和 出隊操作的,這大大降低了并發性,是以為了提高性能,使用 CAS 控制 隻有一個線程進行擴容,并在擴容前釋放鎖,讓其他線程可以入隊和出隊。

    具體的擴容方法 是,如果 oldCap < 64,執行 oldCap + 2;否則擴容 50%,并且最大為 MAX_ARRAY_SIZE:

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    spinlock 鎖使用 CAS 控制,隻有一個線程可以進行擴容,CAS 失敗的線程會調用

Thread.yield()

方法讓出 CPU,目的是盡可能讓擴容線程擴容後 優先調用 lock.lock() 重新擷取鎖,(擴容邏輯裡,為了提高性能,先釋放了鎖),但是這得不到保證。有可能 yield 的線程 A 在 擴容線程 B 擴容完成 前已經退出了CPU 的讓出,然後線程 A 緊接着擷取了鎖,而線程 A 的 newArray 仍是 null,是以線程 A 的 tryGrow 就執行完畢了,傳回 offer 代碼中的 (一)處,繼續 while 循環。如果目前數組還沒擴容完,目前線程 C 又會調用 tryGrow 方法,然後釋放鎖,這就又給了線程 B 擷取鎖的機會,如果這時候線程 B 還沒擴容完,CPU 給到線程 C ,線程 C 會調用 Thread.yield 方法交出 CPU。是以 ,當擴容線程進行擴容時,其他線程原地自旋,通過 offer 代碼中的 (一) 檢查目前擴容是否完畢,擴容完畢後才退出 (一)的循環。

建堆算法

(二):

private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
           

    假設隊列初始化容量為 2,建立的優先級隊列的泛型參數為 Integer。size 為優先級隊列中實際有的元素個數;

/**
     * The number of elements in the priority queue.
     */
    private transient int size;
           
Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    先調用隊列的 offer(2) ,希望向隊列插入元素 2,

(n = size) >= (cap = (array = queue).length)

不成立,就走到siftUpComparable 方法中的代碼

siftUpComparable(n, e, array);

,由于 k = n =0,是以執行

array[k] = key;

,然後 offer 中會執行

size = n + 1;

,隊列個數加 1:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    接下來調用 offer(4) ,希望向隊列插入元素 4,

(n = size) >= (cap = (array = queue).length)

不成立,就走到siftUpComparable 方法中的代碼

siftUpComparable(n, e, array);

,由于 k = 1,是以進入 while 循環,由于 parent = 0, e= 2,key = 4,預設元素比較器使用元素的 compareTo 方法,可知 key > e,是以執行 break 退出循環,然後執行

array[k] = key

; 然後 offer 中會執行

size = n + 1;

,隊列個數加 1:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    接下來調用 offer(6) ,希望向隊列插入元素 6,

(n = size) >= (cap = (array = queue).length)

成立,調用 tryGrow 進行擴容,由于 就走到siftUpComparable 方法中的代碼

siftUpComparable(n, e, array);

,由于 k = 1,是以進入 while 循環,由于

oldCap < 64

,是以執行 newCap = oldCap + (oldCap+2) = 6,然後建立新數組并複制,之後調用 siftUpComparable 方法,由于 k = 2 > 0, 進入 while 循環,由于 parent = 0, e = 2,key = 6, key > e,是以執行 break 退出 while 循環,并把怨怒是 6 放入數組下标為 2 的地方,最後将 size 值 +1 ,變為 3 :

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    接下來,調用 offer(1),

(n = size) >= (cap = (array = queue).length)

不成立,執行 siftUpComparable 方法,由于 k=3,是以進入 while 循環,由于 parent = 1,e = 4,key = 1, key < e,是以把 4 賦給 array[3] ,然後把 1 賦給 k。 再次循環,parent 變成 0,e 變為 2 , key < e,是以把 2 賦給 array[1],然後 把 0 賦給 k,退出 while 循環,最後把 1 賦給 array[0]:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    這時二叉樹堆的樹形圖:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    可見,堆的根元素是 1 ,也就是 這是一個最小堆,那麼 當調用這個優先級隊列的 poll 方法時,會依次傳回堆裡面最小的值。

(2) poll 操作

    擷取隊列内部堆樹的根節點元素,如果隊列為空,則傳回 null。

源碼:

public E poll() {
        final ReentrantLock lock = this.lock;

		// 擷取獨占鎖
        lock.lock();
        
        try {
        	// (一)
            return dequeue();
            
        } finally {
        
        	// 釋放獨占鎖
            lock.unlock();
        }
    }
           

     可以看到,在出隊時要先加鎖,這意味着, 目前線程在進行出隊操作時,其他線程不能再進行入隊和出隊操作 ,但是擴容是不影響的。

(一):

private E dequeue() {
        int n = size - 1;
        
        // 隊列為空則傳回 null
        if (n < 0)
            return null;
            
        else {
            Object[] array = queue;
            
            // 擷取隊頭元素
            E result = (E) array[0];

			// 擷取隊尾元素,并指派 null
            E x = (E) array[n];
            array[n] = null;
            
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
            	// (一)把變量 x 插入到數組下标為 0 的位置,之後重新調整堆 為 最大 或 最小 堆
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
           

(一)調整成堆的邏輯:

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
           

    繼續 offer 圖解,隊列元素的序列為 1、2、6、4。

    第一次調用 poll() 方法時,size = 4 ,n = 3,result = 1, x = 4,然後 把 array[3] 置為 null :

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

     然後執行 siftDownComparable ,傳入的參數 k = 0, x= 4 .n = 3, 首先 half = 1, key = 4,k < half ,進入 while 循環, child 取值為 1 ,c 取值為 2 , right 取值為 2,把 2 賦給 array[0],1 賦給 k,這時 k 不小于 half ,跳出 while 循環,然後把 4 賦給 array[1] :

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

     接下來調用 poll() ,這時 size = 3,n = 2 ,result = 2,x = 6,把隊尾元素置為 null :

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    然後執行 siftDownComparable 方法,調整後 :

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    接下來調用 poll() 方法,size = 2,n = 1, result = 4 ,x = 6:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    然後執行 siftDownComparable 方法,調整後 :

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    調用 poll ,依次出隊的順序是 1、2、4、6。

    下面說說 siftDownComparable 調整堆的算法。思路是,由于隊列數組第 0 個元素為樹根,是以出隊時要移除它 (這個例子是小頂堆)。 這時 數組就不是最小的堆了,需要調整,從被移除的樹根的左右子樹中 找一個最小的值來當樹根,左右子樹 又會去找 自己左右子樹裡的最小值,這是一個遞歸過程,直到樹葉節點結束遞歸。

    假設目前隊列是:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    其對應的二叉樹為:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    這時如果調用了 poll(),那麼 result = 2, x = 11,并且隊列末尾的元素被設定為 null ,然後對于剩下的元素,調整堆的步驟如下:

    樹根的 左節點 leftChildVal = 4,rightChildVal = 6,由于 4 < 6,是以 c = 4,(c 取小的那個) 然後由于 11 > 4,也就是 key > 4,是以使用元素 4 覆寫樹根節點的值,堆變為:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    然後 樹根的左子樹 的 左右孩子節點中 leftChildValue = 8,rightChildVal = 10, 由于 8 < 10,是以 c = 8,由于 11 > 8,也就是 key > c,是以元素 8 作為樹根左子樹的根節點:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    這時判斷是否 k < half,結果為 false,退出循環,然後把 x = 11 的元素設定到數組下标為 3 的地方:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    這樣就調整堆完畢, siftDownCompable 傳回的 result 為 2 ,是以 poll 方法也傳回了。

(3) put 操作

    内部是 offer 操作,源碼:

public void put(E e) {
        offer(e); // never need to block
    }
           
(4) take 操作

    擷取隊列内部堆樹的根節點元素,如果隊列為空,則阻塞。源碼:

public E take() throws InterruptedException {

	// 擷取鎖
    final ReentrantLock lock = this.lock;

	// 可被中斷
    lock.lockInterruptibly();
    
    E result;
    try {
    	// 如果為空則阻塞,把目前線程放入 notEmpty 的條件隊列,while 避免虛假喚醒
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
           
(5) size 操作

    擷取隊列元素個數,在傳回 size 前加了鎖,以保證在調用 size() 方法時不會有其他線程進行入隊 和 出隊操作。雖然 size 沒有用 volatile 修飾,但是加鎖保證了記憶體可見性。

public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return size;
        } finally {
            lock.unlock();
        }
    }
           

例👀:

import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueTest {

    static class Task implements Comparable<Task>{
        private int priority = 0;
        private String taskName;

        public int getPriority() {
            return priority;
        }

        public void setPriority(int priority) {
            this.priority = priority;
        }

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }

        /**
         * 自定義元素優先級比較規則
         */
        @Override
        public int compareTo(Task o) {
            if(this.priority >= o.getPriority()){
                return 1;
            }else {
            return -1;
        }
    }

    public void doSomeThing(){
        System.out.println(taskName + ":" + priority);
        }
    }
    public static void main(String[] args) {
        // 建立任務,并添加到隊列
        PriorityBlockingQueue<Task> priorityBlockingQueue=new PriorityBlockingQueue<>();
        Random random = new Random();
        for(int i=0;i<10;i++){
            Task task = new Task();
            // 使用随機數生成器生成 10 個随機的有優先級的任務
            task.setPriority(random.nextInt(10));
            task.setTaskName("taskName"+i);
            priorityBlockingQueue.offer(task);
        }
        // 取出任務執行
        while (!priorityBlockingQueue.isEmpty()){
            Task task=priorityBlockingQueue.poll();
            if(null != task){
                task.doSomeThing();
            }
        }

    }
}
           

運作結果:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    可以看到,任務執行的先後順序和它們被放入隊列的先後順序沒有關系,而是和它們的優先級有關系。

🎭總結:

     ✨ PriorityBlockingQueue 隊列在内部使用二叉樹堆維護元素優先級,使用數組作為元素存儲的資料結構,這個數組是可擴容的,當目前元素個數 >= 最大容量時 ,會通過 CAS 算法擴容,出隊的是堆樹的根節點,使用元素的 compareTo 方法提供預設的元素優先級比較規則,使用者可以自定義比較規則。

     PriorityBlockingQueue 類似于 ArrayBlockingQueue,在内部使用一個獨占鎖來控制同時隻有一個線程可以進行入隊 和 出隊 操作。PriorityBlockingQueue 隻使用了 notEmpty 條件變量 而沒有使用 notFull,因為 PriorityBlockingQueue 是無界隊列,執行 put 操作時 永遠不會處于 await 狀态,是以也就不需要被喚醒。而 take 方法是阻塞方法,并且是可被中斷的。當需要存放有優先級的元素時,可以考慮 PriorityBlockingQueue 。✨

三、 DelayQueue

1、 類圖

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

    可以看到,✨ DelayQueue 内部使用 PriorityQueue 存放資料,使用 ReentrantLock 實作線程同步。隊列中的元素要實作 Delay 接口,由于每個元素都有一個過期時間,是以要實作 獲知 目前元素還剩下多少時間就過期 的 接口,由于内部使用優先級隊列來實作,是以需要實作元素互相比較的接口。✨

源碼:

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}
           

     available 和 lock 鎖是對應的,其目的是為了實作線程同步。

源碼:

     leader 變量的使用基于 Leader-Follower 模式的變體,用于減少不必要的線程等待。當一個線程調用隊列的 take 方法,變為 leader 線程 後,它會調用條件變量 available.awaitNanos (delay) 進行無限等待,而 其他線程(follow 線程)則會調用 available.await() 進行無限等待。leader 線程延遲時間過期後,會退出 take 方法,并通過調用 available.signal() 方法喚醒一個 follow 線程,被喚醒的 follow 線程被選舉為新的 leader 線程。

2、DelayQueue 原理

(1) offer 操作

     插入元素到隊列,如果插入元素為 null 則抛出 NullPointerException 異常 ,否則由于是 無界隊列 ,是以一直傳回 true。插入元素要實作 Delayed 接口:

public boolean offer(E e) {
 		
 		// 擷取獨占鎖
        final ReentrantLock lock = this.lock;
        
        lock.lock();
        try {
            q.offer(e);
            
            // 如果目前元素 e 等于隊首元素,也就是 最先過期的
            if (q.peek() == e) {
            
            	// 重置 leader 線程為 null 
                leader = null;

				// 激活 available 變量條件隊列中的一個線程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
           
(2) take 操作

     擷取并移除隊列裡延遲時間過期的元素,如果隊列裡沒有過期元素,則等待。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    
        for (;;) {
            E first = q.peek();

			// 如果隊列為空
            if (first == null)
            	// 把目前線程放入 available 的條件隊列裡阻塞等待
                available.await();
                
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting

				// 說明其他線程也在執行 take 方法
                if (leader != null)
                    available.await();
                    
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
           

     可以看到,假設 線程 A 第一次調用 take 方法時 隊列為空,就會把目前線程放入 available 的條件隊列裡阻塞等待。

     當有另一個線程 B 執行了 offer 方法,并添加元素到隊列時,假設這時沒有其他線程執行入隊操作,則 線程 B 添加的元素就是隊首元素,在 offer 方法中,

q.peek() == e

,會重置 leader 為 null,并且激活 available 條件隊列裡的線程,就是說,線程 A 會被激活,然後線程 A 就會繼續執行 for 循環,重新擷取隊頭元素,這時的 first 就是線程 B 新增的元素,這時 first 不為 null,則調用 first.getDelay(NANOSECONDS) 方法檢視該元素還剩餘多少時間就要過期,如果 delay <= 0 ,則說明已經過期,那麼直接出隊傳回。否則,看 leader 是否為 null,不為 null 說明其他線程也在執行 take 方法(因為 調用 take 方法,就會把目前線程設定為 leader ) ,則把目前線程放入條件隊列;如果這時 leader 為 null,則 設定目前線程 A 為 leader 線程,然後等待 delay 時間,這期間該線程會釋放鎖,是以其他線程可以 offer 添加元素,也可以 take 阻塞自己,剩餘過期時間到後,線程 A 再重新競争得到鎖,然後重置 leader 線程為 null,繼續循環,這時發現 隊頭元素以及過期了,就會傳回隊頭元素。 在傳回前執行 finally 塊裡的代碼,如果

leader == null && q.peek() != null

,就說明 目前線程從隊列移除過期元素後,又有其他線程執行了入隊操作,那麼這時候 調用條件變量的 signal 方法,激活條件隊列的等待線程。 (妙!!! 👍 對于可能出現的情況的考慮可以說是面面俱到了👍 )

(3) poll 操作

    擷取并移除隊頭過期元素,如果沒有過期元素則傳回 null 。

源碼:

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
           
(4) size 操作

    計算隊列元素個數,包含過期的和沒過期的。

public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
           

例👀:

import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
   建立一個延遲隊列,使用随機數生成 10 個延遲任務,最後依次擷取延遲任務,并列印
 */
public class DelayQueueTest {
    static class DelayedEle implements Delayed {

        // 延遲時間,表示 目前任務需要延遲多少 ms 時間 過期
        private final long delayTime;

        // 到期時間
        private final long expire;

        // 任務名稱
        private String taskName;

        public DelayedEle(long delay,String taskName){
            delayTime = delay;
            this.taskName = taskName;
            expire = System.currentTimeMillis() + delay;
        }

        /**
         * 剩餘時間 = 到期時間 - 目前時間
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }

        /**
         優先級隊列裡的優先級規則
         */
        @Override
        public int compareTo(Delayed o) {
            return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override
        public String toString() {
            return "DelayedEle{" +
                    "delayTime=" + delayTime +
                    ", expire=" + expire +
                    ", taskName='" + taskName + '\'' +
                    '}';
        }

    }
    public static void main(String[] args) {
        // 建立 delay 隊列
        DelayQueue<DelayedEle> delayQueue = new DelayQueue<>();

        // 建立延遲任務
        Random random = new Random();
        for(int i = 0;i < 10;++i){
            DelayedEle element = new DelayedEle(random.nextInt(500),"task:" + i);
            delayQueue.offer(element);
        }

        // 依次取出任務并列印
        DelayedEle ele = null;
        try{

            for( ; ;){
                // 擷取過期任務并列印
                while ((ele = delayQueue.take()) != null){
                    System.out.println(ele.toString());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

運作結果:

Java 并發包中并發隊列原理剖析(二)ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue

🎭總結:

    DelayQueue 内部使用 PriorityQueue 存放資料,使用 ReentrantLock 實作線程同步。隊列中的元素要實作 Delayed 接口,其中一個方法 是擷取目前元素到過期時間剩餘時間的 getDelay,在出隊時判斷元素是否過期了, Delayed 實作了 Comparable 接口,是以這是個有優先級的隊列。

繼續閱讀