天天看點

LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

孤獨,所有人都是孤獨的,沒有人能獨自超脫這一切。

——瑪娅·安傑格

0 前言

LinkedBlockingQueue - 單連結清單實作的阻塞隊列。該隊列按 FIFO(先進先出)排序元素,新元素從隊列尾部插入,從隊首擷取元素.是深入并發程式設計的基礎資料結構.

1 繼承體系

LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • Queue 作為最基礎的接口,定義了隊列的三大類基本操作:
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • BlockingQueue 即在 Queue 的基礎上加上了阻塞的概念,比如
    • 一直阻塞
    • 阻塞一定時間,傳回特殊值
remove 方法,BlockingQueue 類注釋中定義的是抛異常,但 LinkedBlockingQueue 中 remove 方法實際是傳回 false。

2 屬性

2.1 鍊式存儲

  • 節點的資料結構
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • next 為目前元素的下一個,為 null 表示目前節點是最後一個
  • 連結清單容量
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
    預設大小為 Integer.MAX_VALUE !顯然太大了!禁用!
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • 連結清單已有元素大小

    使用了 AtomicInteger,線程安全

    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • 連結清單頭、尾
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

2.2 鎖

  • take 時的鎖
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • take 的條件隊列,condition 可以簡單了解為基于 ASQ 建立的條件隊列
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • put 時的鎖
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • put 的條件隊列
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

鎖有 take 鎖和 put 鎖,是為了保證隊列操作時的線程安全,設計兩種鎖,是為了 take 和 put 兩種操作可以同時進行,而互不影響。

2.3 疊代器

  • 實作了自己的疊代器
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

3 構造方法

3.1 無參

  • 預設為 Integer 的最大值
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

3.2 有參

  • 指定容量大小.連結清單頭尾相等,節點值(item)都是 null
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結
  • 已有集合資料public LinkedBlockingQueue(Collection<? extends E> c) {

    this(Integer.MAX_VALUE);

    final ReentrantLock putLock = this.putLock;

    putLock.lock(); // Never contended, but necessary for visibility

    try {

    int n = 0;

    for (E e : c) {

    // 集合元素不能為 null

    if (e == null)

    throw new NullPointerException();

    // capacity 代表連結清單的大小,在這為 Integer.MAX_VALUE

    // 若集合大小 > IntegerMAX_VALUE,直接抛異常.

    if (n == capacity)

    throw new IllegalStateException("Queue full");

    enqueue(new Node<E>(e));

    ++n;

    }

    count.set(n);

    } finally {

    putLock.unlock();

    }

    }

4 新增

以 offer 方法為例.

将元素E添加到隊列的末尾.

public boolean offer(E e) { 
  if (e == null) throw new NullPointerException(); 
  // 如果“隊列已滿”,則傳回false,表示插入失敗。 
  final AtomicInteger count = this.count;
  if (count.get() == capacity) return false; 
  int c = -1; 
  // 建立“節點e” 
  Node<E> node = new Node(e); 
  final ReentrantLock putLock = this.putLock; 
  // 擷取“插入鎖putLock” 
  putLock.lock(); 
  try { 
    // 再次對“隊列是不是滿”的進行判斷。 
    // 若“隊列未滿”,則插入節點。 
    if (count.get() < capacity) { 
      // 插入節點 enqueue(node);
      // 将“目前節點數量”+1,并傳回“原始的數量” 
      c = count.getAndIncrement(); 
      // 如果在插入元素之後,隊列仍然未滿,則喚醒notFull上的等待線程。 
      if (c + 1 < capacity)  notFull.signal(); 
    } 
  } finally { 
      // 釋放“插入鎖putLock” 
      putLock.unlock();
  } 

  // 如果在插入節點前,隊列為空;則插入節點後,喚醒notEmpty上的等待線程 
  if (c == 0) 
    signalNotEmpty(); 
    return c >= 0;
  }           

複制

enqueue

  • 将node添加到隊列末尾,并設定node為新的尾節點
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

signalNotEmpty

  • 發出等待信号。 僅能從put/offer調用(否則通常不鎖定takeLock),喚醒notEmpty上的等待線程.
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

新增資料成功後,在适當時機,會喚起 put 的等待線程(隊列不滿時),或者 take 的等待線程(隊列不為空時),這樣保證隊列一旦滿足 put 或者 take 條件時,立馬就能喚起阻塞線程,繼續運作,保證了喚起的時機不被浪費。

5 取出

以take()為例.取出并傳回隊列的頭。若隊列為空,則一直等待。

public E take() throws InterruptedException { 
  E x; 
  int c = -1; 
  final AtomicInteger count = this.count; 
  final ReentrantLock takeLock = this.takeLock; 
  // 擷取“取出鎖”,若目前線程是中斷狀态,則抛出InterruptedException異常 
  takeLock.lockInterruptibly(); 
  try { 
    // 若“隊列為空”,則一直等待。 
    while (count.get() == 0) { 
      notEmpty.await(); 
    } 
    // 取出元素 
    x = dequeue(); 
    // 取出元素之後,将“節點數量”-1;并傳回“原始的節點數量”。 
    c = count.getAndDecrement();
    if (c > 1) notEmpty.signal();
  } finally { 
    // 釋放“取出鎖” 
    takeLock.unlock(); 
  } 
  // 如果在“取出元素之前”,隊列是滿的;則在取出元素之後,喚醒notFull上的等待線程。 
  if (c == capacity) 
    signalNotFull(); 
  return x;
}           

複制

dequeue

  • 删除隊列的頭節點,并将表頭指向“原頭節點的下一個節點”。
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

signalNotFull

  • 發出等待的信号。 僅能從take/poll調用.喚醒notFull上的等待線程.
    LinkedBlockingQueue 核心源碼解析0 前言1 繼承體系2 屬性3 構造方法4 新增5 取出6 總結

6 總結

了解好阻塞隊列很重要,經常會被用線上程池的場景中.