天天看點

從源碼全面解析LinkedBlockingQueue的來龍去脈

作者:java小悠

一、引言

并發程式設計在網際網路技術使用如此廣泛,幾乎所有的後端技術面試官都要在并發程式設計的使用和原理方面對小夥伴們進行 360° 的刁難。

作為一個在網際網路公司面一次拿一次 Offer 的面霸,打敗了無數競争對手,每次都隻能看到無數落寞的身影失望的離開,略感愧疚(請允許我使用一下誇張的修辭手法)。

于是在一個寂寞難耐的夜晚,暖男我痛定思痛,決定開始寫 《吊打面試官》 系列,希望能幫助各位讀者以後面試勢如破竹,對面試官進行 360° 的反擊,吊打問你的面試官,讓一同面試的同僚瞠目結舌,瘋狂收割大廠 Offer!

雖然現在是網際網路寒冬,但乾坤未定,你我皆是黑馬

二、使用

對于阻塞隊列,想必大家應該都不陌生,我們這裡簡單的介紹一下,對于 Java 裡面的阻塞隊列,其使用了 生産者和消費者 的模型

對于生産者來說,主要有以下幾部分:

add(E)     	// 添加資料到隊列,如果隊列滿了,無法存儲,抛出異常
offer(E)    // 添加資料到隊列,如果隊列滿了,傳回false
offer(E,timeout,unit)   // 添加資料到隊列,如果隊列滿了,阻塞timeout時間,如果阻塞一段時間,依然沒添加進入,傳回false
put(E)      // 添加資料到隊列,如果隊列滿了,挂起線程,等到隊列中有位置,再扔資料進去,死等!
複制代碼           

對于消費者來說,主要有以下幾部分:

remove()    // 從隊列中移除資料,如果隊列為空,抛出異常
poll()      // 從隊列中移除資料,如果隊列為空,傳回null,麼的資料
poll(timeout,unit)   // 從隊列中移除資料,如果隊列為空,挂起線程timeout時間,等生産者扔資料,再擷取
take()     // 從隊列中移除資料,如果隊列為空,線程挂起,一直等到生産者扔資料,再擷取
複制代碼           

我們本篇來講講堵塞隊列中的第二員猛将,LinkedBlockingQueue 的故事

我們先來看其基本使用

public class LinkedBlockingQueueTest {
    public static void main(String[] args) throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue();

        // 生産者扔資料
        queue.add("1");
        queue.offer("2");
        queue.offer("3", 2, TimeUnit.SECONDS);
        queue.put("2");

        // 消費者取資料
        System.out.println(queue.remove());
        System.out.println(queue.poll());
        System.out.println(queue.poll(2, TimeUnit.SECONDS));
        System.out.println(queue.take());
    }
}
複制代碼           

三、源碼

1、初始化

由于我們的 LinkedBlockingQueue 底層是連結清單實作的,是以我們初始化的時候不需要指定其大小

LinkedBlockingQueue queue = new LinkedBlockingQueue();

// 如果我們不指定容量大小的話,這裡的容量預設為Integer.MAX_VALUE
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    // 如果容量傳進來是小于等于0的,直接抛異常
    if (capacity <= 0){
        throw new IllegalArgumentException();
    }
    // 目前的容量指派
    this.capacity = capacity;
    // 這裡其實和我們的AQS有點像
    // 搞一個虛拟的頭結點,減少後面的判空
    last = head = new Node<E>(null);
}
複制代碼           

當然,除了我們初始化的這些成員變量,我們還有一部分:

class Node<E> {
    // 目前的資料
    E item;
    // 指向下一個資料的指針
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

// 目前連結清單中存在的資料數量
private final AtomicInteger count = new AtomicInteger();

// 讀鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 喚醒消費者線程
private final Condition notEmpty = takeLock.newCondition();

// 寫鎖
private final ReentrantLock putLock = new ReentrantLock();

// 喚醒生産者線程
private final Condition notFull = putLock.newCondition();
複制代碼           

這裡可能有的小夥伴有點懵逼,為什麼這哥們(LinkedBlockingQueue)用了兩個鎖呢?為什麼我 ArrayBlockingQueue 隻能用一把鎖?

不要急,我們慢慢的往下看他源碼

2、生産者的源碼

2.1 add()源碼實作

public boolean add(E e) {
    return super.add(e);
}

// 走到這裡會發現,我們的add方法就是調用了offer方法
// offer: 添加資料到隊列,如果隊列滿了,傳回false
// 是以這裡offer滿了,就會抛出異常:"Queue full"
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
複制代碼           

2.2 offer()源碼實作

public boolean offer(E e) {
    // 如果是空值,直接抛出異常
    if (e == null) throw new NullPointerException();
    // 引用,上篇我們分析過
    final AtomicInteger count = this.count;
    // 判斷目前資料量是否和我們總容量一樣
    if (count.get() == capacity){
        return false;
    }
    // 标記位
    int c = -1;
    // 建立節點
    Node<E> node = new Node<E>(e);
    // 引用寫鎖
    final ReentrantLock putLock = this.putLock;
    // 上鎖
    putLock.lock();
    try {
        // 如果目前資料量小于總容量
        // 這裡我們上面也檢查過,相當于DCL的意思
        if (count.get() < capacity) {
            // 插入隊列
            enqueue(node);
            // 得到目前資料量
            // 這裡需要注意:getAndIncrement先傳回資料,再加一
            c = count.getAndIncrement();
            // 如果我們發現目前資料量還小于總容量
            // 也就是我們可以繼續放資料
            if (c + 1 < capacity)
                // 喚醒其他的生産者線程扔資料
                // 當然這裡稍微多說一點,這裡的喚醒指的是将生産者從Condition隊列放到AQS隊列中
                // 具體什麼時候執行還需要看AQS的排程
                notFull.signal();
        }
    } finally {
        // 解鎖
        putLock.unlock();
    }
    // 如果我們目前資料量為0,代表隊列中原來無資料
    // 但上面現在扔進去了一個
    if (c == 0)
        // 需要喚醒所有的消費者消費資料
        signalNotEmpty();
    return c >= 0;
}

private void enqueue(Node<E> node) {
    // 将當面節點挂在last節點後
    // 将last節點指向目前節點
    last = last.next = node;
}


// 這裡我們的Condition聊過
// 必須持有目前鎖資源才可以使用Condition的方法
private void signalNotEmpty() {
    // 拿到讀鎖
    final ReentrantLock takeLock = this.takeLock;
    // 加鎖
    takeLock.lock();
    try {
        // 喚醒消費者線程
        notEmpty.signal();
    } finally {
        // 解鎖
        takeLock.unlock();
    }
}
複制代碼           

2.3 offer(time)源碼實作

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    // 如果是空值,直接抛出異常
    if (e == null) throw new NullPointerException();
    // 轉成統一的機關
    long nanos = unit.toNanos(timeout);
    int c = -1;
    // 寫鎖
    final ReentrantLock putLock = this.putLock;
    // 目前容量
    final AtomicInteger count = this.count;
    // 加鎖
    putLock.lockInterruptibly();
    try {
        // 如果目前資料量小于總容量
        // 這裡我們上面也檢查過,相當于DCL的意思
        while (count.get() == capacity) {
            // 如果我們剩餘時間小于0,直接失敗即可
            if (nanos <= 0)
                return false;
            // 反之生産者線程寫入挂起nanos時間
            nanos = notFull.awaitNanos(nanos);
        }
        // 添加至隊列
        enqueue(new Node<E>(e));
        // 得到目前資料量
        // 這裡需要注意:getAndIncrement先傳回資料,再加一
        c = count.getAndIncrement();
        // 如果我們發現目前資料量還小于總容量
        // 也就是我們可以繼續放資料
        if (c + 1 < capacity)
            // 喚醒其他的生産者線程扔資料
            // 當然這裡稍微多說一點,這裡的喚醒指的是将生産者從Condition隊列放到AQS隊列中
            // 具體什麼時候執行還需要看AQS的排程
            notFull.signal();
    } finally {
        // 解鎖
        putLock.unlock();
    }
    // 如果我們目前資料量為0,代表隊列中原來無資料
    // 但現在扔進去了一個,喚醒消費者線程
    if (c == 0)
        signalNotEmpty();
    return true;
}
複制代碼           

2.4 put()源碼實作

  • 這裡就不寫了,其實和我們的 offer 一樣,大家自己看看就好
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
複制代碼           

3、消費者的源碼

3.1 remove()源碼實作

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
複制代碼           

3.2 poll()源碼實作

public E poll() {
    // 擷取目前連結清單的資料量
    final AtomicInteger count = this.count;
    // 如果資料量為0,說明無資料
    // 消費者無法消費,直接傳回null即可
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    // 拿到讀鎖
    final ReentrantLock takeLock = this.takeLock;
    // 加鎖
    takeLock.lock();
    try {
        // 如果資料量大于0,說明有資料
        // 這裡我們上面也檢查過,相當于DCL的意思
        if (count.get() > 0) {
            // 取數
            x = dequeue();
            // 得到目前資料量
            // 這裡需要注意:getAndIncrement先傳回資料,再減一
            c = count.getAndDecrement();
            // 如果我們的資料量大于1,則喚醒消費者來消費
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        // 解鎖
        takeLock.unlock();
    }
    // 如果資料量等于目前的總容量
    // 說明目前的連結清單已經有空餘了,喚醒生産者生産
    if (c == capacity)
        signalNotFull();
    return x;
}

// 這個取資料和我們的AQS有點像
// 去除目前資料并且将目前節點作為頭結點
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    // 拿到寫鎖
    final ReentrantLock putLock = this.putLock;
    // 上鎖
    putLock.lock();
    try {
        // 喚醒生産者
        notFull.signal();
    } finally {
        // 解鎖
        putLock.unlock();
    }
}
複制代碼           

3.3 poll(time)源碼實作

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    // 統一時間機關
    long nanos = unit.toNanos(timeout);
    // 拿到目前資料量 + 讀鎖
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 加可中斷鎖
    takeLock.lockInterruptibly();
    try {
        // 如果目前的資料量為0
        while (count.get() == 0) {
            // 如果時間沒有剩餘,直接傳回null即可
            if (nanos <= 0)
                return null;
            // 讓消費者線程等待nanos時間
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 取資料
        x = dequeue();
        // 後面都是一樣的
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
複制代碼           

3.4 take()源碼實作

  • 這個大家可以自己看一下補充,也算一個小測試
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
複制代碼           

4、疑惑

看到這裡,我想大家可能有和我一樣的疑惑?

之前我們聊 ArrayBlockingQueue 的時候,他隻用了一把鎖(互斥鎖),但 LinkedBlockingQueue 卻使用了兩把鎖(讀鎖、寫鎖)

這時候你腦子會不會有一種疑問,我 ArrayBlockingQueue 能不能使用兩把鎖(讀鎖、寫鎖)來進行通路

如果你有這種想法,說明你确實思考了,哈哈哈

沒錯,部落客我查閱了相關的資料,ArrayBlockingQueue 确實可以使用兩把鎖進行邏輯的更改

考慮部分小夥伴可能沒有VPN,部落客貼下代碼

整體的邏輯基本上是仿造 LinkedBlockingQueue 的業務邏輯改造的,經測試這種性能要比原始的 ArrayBlockingQueue 要快 20%~30% 左右,感興趣的也可以自己去測試一下。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class ArrayBlockingQueueUsingTwoLockApproach {
    
     /** The queued items */
    final Object[] items;
 
    /** items index for next take, poll, peek or remove */
    int takeIndex;
 
    /** items index for next put, offer, or add */
    int putIndex;
 
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
 
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
 
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
 
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
 
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
 
    public ArrayBlockingQueueUsingTwoLockApproach(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
    }
    
    public void put(Object e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == items.length) {
                notFull.await();
            }
            enqueue(e);
            c = count.getAndIncrement();
            if (c + 1 < items.length)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    
    public Object take() throws InterruptedException {
        Object x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == items.length)
            signalNotFull();
        return x;
    }
    
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
 
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(Object x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count.incrementAndGet();
    }
 
    
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private Object dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        Object x = (Object) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count.decrementAndGet();
        return x;
    }
    
    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
}
複制代碼           

四、流程圖

其實,我們 LinkedBlockingQueue 整體的代碼邏輯和 ArrayBlockingQueue 類似,隻不過底層資料結構不同罷了

我們這裡簡單的畫一下,有興趣的同學也可以自己畫吆

從源碼全面解析LinkedBlockingQueue的來龍去脈
原文連結:https://juejin.cn/post/7225974794730274873

繼續閱讀