天天看點

《Java源碼分析》:BlockingQueue之LinkedBlockingQueue《Java源碼分析》:LinkedBlockingQueue

《Java源碼分析》:LinkedBlockingQueue

LinkedBlockingQueue是大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。

1、LinkedBlockingQueue的繼承體系結構

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable 
           

LinkedBlockingQueue的繼承體系結構如下,和ArrayBlockingQueue一模一樣。

2、LinkedBlockingQueue的相關屬性介紹

1、連結清單節點

static class Node<E> {
        E item;
        Node<E> next;

        Node(E x) { item = x; }
    }
           

2、private final int capacity;容量邊界,如果沒有指定,則為Integer.MAX_VALUE

3、private final AtomicInteger count = new AtomicInteger(); 目前隊列中存儲元素的數量

4、transient Node head;隊列的頭結點,且始終head.item=null,即此頭結點不存儲任何元素。

5、private transient Node last;隊列的尾結點,且始終last.next=null.

6、private final ReentrantLock takeLock = new ReentrantLock();take操作時所需要加的鎖

7、當take操作時如果隊列中存儲的元素為空,則調用此Condition的await方法。

private final Condition notEmpty = takeLock.newCondition();
           

8、put操作時所需要加的鎖和Condition

private final ReentrantLock putLock = new ReentrantLock();

    private final Condition notFull = putLock.newCondition();
           

從上面可以看到,LinkedBlockingQueue阻塞隊列的實作采用了兩把鎖,一個是take鎖,一個是put鎖,這就說明,put和take操作是可以同時進行的。

3、LinkedBlockingQueue的構造方法

創造一個容量為Integer.MAX_VALUE的隊列

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
           

創造一個指定大小(必須大于零)的隊列,如果參數小于等于零,則抛異常。

public LinkedBlockingQueue(int capacity) {
        if (capacity <= ) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//頭結點和尾結點中存儲的元素為null
    }
           

4、LinkedBlockingQueue的put方法介紹

函數功能:插入一個元素到隊列的末尾,如果隊列已滿則等待直到有空間進行存儲

源碼如下:(有詳細的注釋)

public void put(E e) throws InterruptedException {
        //檢查是否為null,如果為null,則抛異常
        if (e == null) throw new NullPointerException();
        //注意:用此局部變量持有一個負數來訓示CAS操作是否操作成功。 c = count.getAndIncrement();//利用原子性加一
        int c = -;
        Node<E> node = new Node<E>(e);//根據目前的值構造一個節點
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//加鎖
        try {

            //如果已到達最大容量,則等待直到有空間來進行存儲才會被喚醒
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);//将節點加入到連結清單的末尾
            c = count.getAndIncrement();//利用原子性加一
            //如果目前存儲的元素個數小于容量,則喚醒正在等待的生産者的線程。
            if (c +  < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();//釋放鎖
        }
        //隊列不為空,剛剛添加了一個元素,是以需要喚醒一個消費者線程
        if (c == )
            signalNotEmpty();
    }

    /*
     * 将節點加入到連結清單的末尾
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    //喚醒一個消費者線程
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
           

put方法實作的思想也比較簡單,叙述如下:

1、首先檢查元素是否為null,如果是,則抛異常,如果不是,則繼續進行

2、加鎖

3、檢查隊列是否已滿,如果已滿,則調用put所對應的Condition的await方法進行等待,直到有signal來将其喚醒。如果沒有滿,則進行 4.

4、調用enqueue方法将此節點加入到隊列中,并将計數器進行加一操作。

本來到這裡就差不多結束了,但是還需要處理兩種喚醒操作。

5、如果此時隊列中存儲的元素小于最大容量,則喚醒其他正在等待生産的線程。

6、由于剛剛生産了元素,是以最後還需要喚醒消費者線程。

7、釋放鎖

put(e)方法的思想相當簡單哈,下面來看take方法

LinkedBlockingQueue的take方法介紹

函數功能:擷取隊列中隊首的元素。

源碼如下:(有詳細的注釋)

public E take() throws InterruptedException {
        E x;
        int c = -;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//加take鎖
        try {
            //如果目前隊列為空,則一直等待直到隊列非空
            while (count.get() == ) {
                notEmpty.await();
            }
            x = dequeue();//将頭結點取出來
            c = count.getAndDecrement();//利用原子性進行進行減一操作
            //如果此時的容量是大于1的,則說明還有其它元素可以被消費線程擷取,是以喚醒其他消費者線程
            if (c > )
                notEmpty.signal();
        } finally {
            takeLock.unlock();//釋放鎖
        }
        //如果c的容量是等于capacity,又被消費了一個,是以可以通知生産者線程來進行生産
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        //頭結點指向下一個節點,并将頭結點中的元素置為null。即頭結點的item始終為null。
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }


    /**
     * 喚醒一個生産者線程
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
           

此方法實作的思想也特别簡單,叙述如下:

1、加鎖

2、判斷隊列中是否有元素,如果沒有,則阻塞等待直至隊列非空被喚醒。如果有,則進行 3

3、取出隊列中頭結點的下一個節點(頭結點為傀儡節點)。并将計數器進行減一操作。

4、如果此時的容量是大于1的,則說明還有其它元素可以被消費線程擷取,是以喚醒一個正在等待的消費者線程

5、如果此時的容量小于隊列的最大容量,則喚醒正在等待的生産者線程進行生産。

6、釋放鎖

以上就是關于LinkedBlockingQueue的相關分析,比較簡單哈。

小結

關于LinkedBlockingQueue我們隻需要記住以下幾點

1、是基于連結清單來實作的

2、使用了兩個鎖,take、put操作各一把鎖。以及借用了兩個Condition來進行阻塞和喚醒操作。