天天看點

并發程式設計-常見并發工具BlockingQueue的使用及原了解析

作者:馬士兵教育CTO
并發程式設計-常見并發工具BlockingQueue的使用及原了解析
Java中的阻塞隊列是一種特殊類型的隊列,它支援在隊列為空或隊列已滿時自動阻塞等待。它是并發程式設計中常用的線程安全資料結構之一,用于在多線程環境下安全地傳遞資料。

Java提供了java.util.concurrent包中的BlockingQueue接口和幾個實作類來實作阻塞隊列,其中最常用的實作類是:

  • ArrayBlockingQueue:一個基于數組的有界阻塞隊列。
  • LinkedBlockingQueue:一個基于連結清單的可選有界阻塞隊列。
  • PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。
  • DelayQueue:一個支援延遲擷取元素的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列,用于線程間的直接傳輸。

阻塞隊列的主要方法包括:

  • put(E element):将元素添加到隊列的末尾,如果隊列已滿則阻塞等待。
  • take():移除并傳回隊列頭部的元素,如果隊列為空則阻塞等待。
  • offer(E element):将元素添加到隊列的末尾,如果隊列已滿則傳回false。
  • poll():移除并傳回隊列頭部的元素,如果隊列為空則傳回null。

BlockingQueue的使用

java複制代碼package org.example;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) {
        // 建立一個容量為3的ArrayBlockingQueue
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        // 建立生産者線程
        Thread producerThread = new Thread(() -> {
            try {
                // 生産者往隊列中添加元素
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(1);
                    queue.put("A"+i);
                    System.out.println("Producer added: A"+i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 建立消費者線程
        Thread consumerThread = new Thread(() -> {
            try {
                // 消費者從隊列中取出元素
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(1);
                    String item = queue.take();
                    System.out.println("Consumer removed: " + item);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 啟動生産者和消費者線程
        producerThread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        consumerThread.start();
    }
}
           

如上代碼,定義的阻塞隊列大小是3,我們先啟動生産者線程睡幾秒後再啟動消費者線程我看看到列印結果是生成到A3就停了等到消費者有産品被消費了此時又喚醒了生産者線程繼續生産

并發程式設計-常見并發工具BlockingQueue的使用及原了解析

BlockingQueue源碼分析

初始化源碼

ini複制代碼public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
           

發現用了ReentrantLock和Condition,具體這兩個類是做什麼用的繼續往下看

put源碼分析

csharp複制代碼public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
           

這裡也很簡單其實就是利用ReentrantLock加鎖往阻塞隊列中添加元素,如果此時滿了就調用notFull.await()進行等待,然後将目前線程添加到notFull的 Condition隊列中,沒滿則添加元素到阻塞隊列enqueue

ini複制代碼private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
           

這裡是在添加到隊列後需要調用notEmpty.signal();說明此時隊列有東西,消費者Condition如果有線程此時會被喚醒,然後重新獲得鎖消費元素

take源碼分析

csharp複制代碼public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
           

發現和 put源碼很像也是加鎖移除元素,如果目前阻塞隊列是0則調用notEmpty.await();,将目前線程添加到 notEmpty的Condition隊列中等待被喚起

BlockingQueue原理流程圖

并發程式設計-常見并發工具BlockingQueue的使用及原了解析

繼續閱讀