天天看點

【死磕 Java 集合】— LinkedBlockingQueue源碼分析【死磕 Java 集合】— LinkedBlockingQueue源碼分析

【死磕 Java 集合】— LinkedBlockingQueue源碼分析

問題

(1)LinkedBlockingQueue的實作方式?

(2)LinkedBlockingQueue是有界的還是無界的隊列?

(3)LinkedBlockingQueue相比ArrayBlockingQueue有什麼改進?

簡介

LinkedBlockingQueue是java并發包下一個以單連結清單實作的阻塞隊列,它是線程安全的,至于它是不是有界的,請看下面的分析。

源碼分析

主要屬性

// 容量
private final int capacity;

// 元素數量
private final AtomicInteger count = new AtomicInteger();

// 連結清單頭
transient Node<E> head;

// 連結清單尾
private transient Node<E> last;

// take鎖
private final ReentrantLock takeLock = new ReentrantLock();

// notEmpty條件
// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();

// 放鎖
private final ReentrantLock putLock = new ReentrantLock();

// notFull條件
// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();
           

(1)capacity,有容量,可以了解為LinkedBlockingQueue是有界隊列

(2)head, last,連結清單頭、連結清單尾指針

(3)takeLock,notEmpty,take鎖及其對應的條件

(4)putLock, notFull,put鎖及其對應的條件

(5)入隊、出隊使用兩個不同的鎖控制,鎖分離,提高效率

内部類

static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}
           

典型的單連結清單結構。

主要構造方法

public LinkedBlockingQueue() {
    // 如果沒傳容量,就使用最大int值初始化其容量
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化head和last指針為空值節點
    last = head = new Node<E>(null);
}
           

入隊

入隊同樣有四個方法,我們這裡隻分析最重要的一個,put(E e)方法:

public void put(E e) throws InterruptedException {
    // 不允許null元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 建立一個節點
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 使用put鎖加鎖
    putLock.lockInterruptibly();
    try {
        // 如果隊列滿了,就阻塞在notFull條件上
        // 等待被其它線程喚醒
        while (count.get() == capacity) {
            notFull.await();
        }
        // 隊列不滿了,就入隊
        enqueue(node);
        // 隊列長度加1
        c = count.getAndIncrement();
        // 如果現隊列長度如果小于容量
        // 就再喚醒一個阻塞在notFull條件上的線程
        // 這裡為啥要喚醒一下呢?
        // 因為可能有很多線程阻塞在notFull這個條件上的
        // 而取元素時隻有取之前隊列是滿的才會喚醒notFull
        // 為什麼隊列滿的才喚醒notFull呢?
        // 因為喚醒是需要加putLock的,這是為了減少鎖的次數
        // 是以,這裡索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程
        // 說白了,這也是鎖分離帶來的代價
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 釋放鎖
        putLock.unlock();
    }
    // 如果原隊列長度為0,現在加了一個元素後立即喚醒notEmpty條件
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    // 直接加到last後面
    last = last.next = node;
}    

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 加take鎖
    takeLock.lock();
    try {
        // 喚醒notEmpty條件
        notEmpty.signal();
    } finally {
        // 解鎖
        takeLock.unlock();
    }
}
           

(1)使用putLock加鎖;

(2)如果隊列滿了就阻塞在notFull條件上;

(3)否則就入隊;

(4)如果入隊後元素數量小于容量,喚醒其它阻塞在notFull條件上的線程;

(5)釋放鎖;

(6)如果放元素之前隊列長度為0,就喚醒notEmpty條件;

出隊

出隊同樣也有四個方法,我們這裡隻分析最重要的那一個,take()方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 使用takeLock加鎖
    takeLock.lockInterruptibly();
    try {
        // 如果隊列無元素,則阻塞在notEmpty條件上
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 否則,出隊
        x = dequeue();
        // 擷取出隊前隊列的長度
        c = count.getAndDecrement();
        // 如果取之前隊列長度大于1,則喚醒notEmpty
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 釋放鎖
        takeLock.unlock();
    }
    // 如果取之前隊列長度等于容量
    // 則喚醒notFull
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    // head節點本身是不存儲任何元素的
    // 這裡把head删除,并把head下一個節點作為新的值
    // 并把其值置空,傳回原來的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 喚醒notFull
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
           

(1)使用takeLock加鎖;

(2)如果隊列空了就阻塞在notEmpty條件上;

(3)否則就出隊;

(4)如果出隊前元素數量大于1,喚醒其它阻塞在notEmpty條件上的線程;

(5)釋放鎖;

(6)如果取元素之前隊列長度等于容量,就喚醒notFull條件;

總結

(1)LinkedBlockingQueue采用單連結清單的形式實作;

(2)LinkedBlockingQueue采用兩把鎖的鎖分離技術實作入隊出隊互不阻塞;

(3)LinkedBlockingQueue是有界隊列,不傳入容量時預設為最大int值;

彩蛋

(1)LinkedBlockingQueue與ArrayBlockingQueue對比?

a)後者入隊出隊采用一把鎖,導緻入隊出隊互相阻塞,效率低下;

b)前才入隊出隊采用兩把鎖,入隊出隊互不幹擾,效率較高;

c)二者都是有界隊列,如果長度相等且出隊速度跟不上入隊速度,都會導緻大量線程阻塞;

d)前者如果初始化不傳入初始容量,則使用最大int值,如果出隊速度跟不上入隊速度,會導緻隊列特别長,占用大量記憶體;

贊(3)  打賞

【公告】版權聲明

版權歸原創作者所有,任何形式的轉載請聯系部落客:daming_90:Java 技術驿站 » 【死磕 Java 集合】— LinkedBlockingQueue源碼分析

【死磕 Java 集合】— LinkedBlockingQueue源碼分析【死磕 Java 集合】— LinkedBlockingQueue源碼分析

chenssy

不想當廚師的程式員不是好的架構師....

上一篇

【死磕 Java 集合】— PriorityQueue源碼分析下一篇

【死磕 Java 集合】— SynchronousQueue源碼分析

  • 暫無文章

評論 1

【死磕 Java 集合】— LinkedBlockingQueue源碼分析【死磕 Java 集合】— LinkedBlockingQueue源碼分析

送出評論

  • 昵稱 (必填)
  • 郵箱 (必填)
  • 網址
  1. #1
    【死磕 Java 集合】— LinkedBlockingQueue源碼分析【死磕 Java 集合】— LinkedBlockingQueue源碼分析

    public LinkedBlockingQueue(Collection c) {

    //設定容量和建立首尾節點

    this(Integer.MAX_VALUE);

    final ReentrantLock putLock = this.putLock;

    //從未競争,但對于可見度而言卻是必需的,這裡為什麼是必需的????

    putLock.lock(); // Never contended, but necessary for visibility

    try {

    int n = 0;

    for (E e : c) {

    if (e == null)

    throw new NullPointerException();

    if (n == capacity)

    throw new IllegalStateException(“Queue full”);

    enqueue(new Node(e));

    ++n;

    }

    count.set(n);

    } finally {

    putLock.unlock();

    }

    }