作者 | 李新然
來源 | 阿裡技術公衆号
一 背景
JUC 工具包是 JAVA 并發程式設計的利器。
本文講述在沒有 JUC 工具包幫助下,借助原生的 JAVA 同步原語, 如何實作一個公平有界的阻塞隊列。
希望你也能在文後體會到并發程式設計的複雜之處,以及 JUC 工具包的強大。
二 方法
本文使用到的基本工具:
- 同步監聽器 synchronized ,方法基本和代碼塊級别;
- Object 基礎類的 wait, notify, notifyAll;
基于以上基礎工具,實作公平有界的阻塞隊列,此處:
- 将公平的定義限定為 FIFO ,也就是先阻塞等待的請求,先解除等待;
- 并不保證解除等待後執行 Action 的先後順序;
- 確定隊列的大小始終不超過設定的容量;但阻塞等待的請求數不做限制;
三 實作
1 基礎版本
首先,考慮在非并發場景下,借助 ADT 實作一個基礎版本
interface Queue {
boolean offer(Object obj);
Object poll();
}
class FairnessBoundedBlockingQueue implements Queue {
// 目前大小
protected int size;
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果隊列已滿,通過傳回值辨別
public boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}
// 如果隊列為空,head.next == null;傳回空元素
public Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢棄頭結點
--size;
return result;
}
return null;
}
class Node {
Object value;
Node next;
Node(Object obj) {
this.value = obj;
next = null;
}
}
}
以上
- 定義支援隊列的兩個基礎接口, poll 和 offer;
- 隊列的實作,采用經典實作;
- 考慮在隊列空的情況下, poll 傳回為空,非阻塞;
- 隊列在滿的情況下, offer 傳回 false ,入隊不成功,無異常;
需要注意的一點:在出隊時,本文通過遷移頭結點的方式實作,避免修改尾結點。
在下文實作并發版本時,會看到此處的用意。
2 并發版本
如果在并發場景下,上述的實作面臨一些問題,同時未實作給定的一些需求。
通過添加 synchronized ,保證并發條件下的線程安全問題。
注意此處做同步的原因是為了保證類的不變式。
并發問題
在并發場景下,基礎版本的實作面臨的問題包括:原子性,可見性和指令重排的問題。
參考 JMM 的相關描述。
并發問題,最簡單的解決方法是:通過 synchronized 加鎖,一次性解決問題。
// 省略接口定義
class BoundedBlockingQueue implements Queue {
// 目前大小
protected int size;
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果隊列已滿,通過傳回值辨別
public synchronized boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}
// 如果隊列為空,head.next == null;傳回空元素
public synchronized Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢棄頭結點
--size;
return result;
}
return null;
}
// 省略 Node 的定義
}
以上,簡單粗暴的加 synchronized 可以解決問題,但會引入新的問題:系統活性問題(此問題下文會解決)。
同時,簡單加 synchronized 同步是無法實作阻塞等待;即
- 如果隊列為空,那麼出隊的動作還是會立即傳回,傳回為空;
- 如果隊列已滿,那麼入隊動作還是會立即傳回,傳回操作不成功;
實作阻塞等待,需要借助 JAVA 中的 PV 原語:wait, notify, notifyAll 。
參考:JDK 中對 wait, notify, notifyAll 的相關描述。
衛式方法
阻塞等待,可以通過簡單的衛式方法來實作,此問題本質上可以抽象為:
- 任何一個方法都需要在滿足一定條件下才可以執行;
- 執行方法前需要首先校驗不變式,然後執行變更;
- 在執行完成後,校驗是否滿足後驗不變式;
WHEN(condition) Object action(Object arg) {
checkPreCondition();
doAction(arg);
checkPostCondition();
}
此種抽象 Ada 在語言層面上實作。在 JAVA 中,借助 wait, notify, notifyAll 可以翻譯為:
// 目前線程
synchronized Object action(Object arg) {
while(!condition) {
wait();
}
// 前置條件,不變式
checkPreCondition();
doAction();
// 後置條件,不變式
checkPostCondition();
}
// 其他線程
synchronized Object notifyAction(Object arg) {
notifyAll();
}
需要注意:
- 通常會采用 notifyAll 發送通知,而非 notify ;因為如果目前線程收到 notify 通知後被中斷,那麼系統将一直等待下去。
- 如果使用了 notifyAll 那麼衛式語句必須放在 while 循環中;因為線程喚醒後,執行條件已經不滿足,雖然目前線程持有互斥鎖。
- 衛式條件的所有變量,有任何變更都需要發送 notifyAll 不然面臨系統活性問題
據此,不難實作簡單的阻塞版本的有界隊列,如下
interface Queue {
boolean offer(Object obj) throws InterruptedException;
Object poll() throws InterruptedException;
}
class FairnessBoundedBlockingQueue implements Queue {
// 目前大小
protected int size;
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果隊列已滿,通過傳回值辨別
public synchronized boolean offer(Object obj) throws InterruptedException {
while (size < capacity) {
wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
notifyAll(); // 可以出隊
return true;
}
// 如果隊列為空,阻塞等待
public synchronized Object poll() throws InterruptedException {
while (head.next == null) {
wait();
}
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢棄頭結點
--size;
notifyAll(); // 可以入隊
return result;
}
// 省略 Node 的定義
}
以上,實作了阻塞等待,但也引入了更大的性能問題
- 入隊和出隊動作阻塞等待同一把鎖,惡性競争;
- 當隊列變更時,所有阻塞線程被喚醒,大量的線程上下文切換,競争同步鎖,最終可能隻有一個線程能執行;
需要注意的點:
- 阻塞等待 wait 會抛出中斷異常。關于異常的問題下文會處理;
- 接口需要支援抛出中斷異常;
- 隊裡變更需要 notifyAll 避免線程中斷或異常,丢失消息;
3 鎖拆分優化
以上第一個問題,可以通過鎖拆分來解決,即:定義兩把鎖,讀鎖和寫鎖;讀寫分離。
// 省略接口定義
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.head = new Node(null);
this.tail = head;
}
// 如果隊列已滿,通過傳回值辨別
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
offerLock.wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollLock.notifyAll();
}
return true;
}
// 如果隊列為空,阻塞等待
public Object poll() throws InterruptedException {
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
pollLock.wait();
}
result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerLock.notifyAll();
}
return result;
}
// 省略 Node 定義
}
- 定義了兩把鎖, pollLock 和 offerLock 拆分出隊和入隊競争;
- 入隊鎖同步的變量為:callOfferCount 和 tail;
- 出隊鎖同步的變量為:canPollCount 和 head;
- 出隊的動作:首先拿到 pollLock 衛式等待後,完成出隊動作;然後拿到 offerLock 發送通知,解除入隊的等待線程。
- 入隊的動作:首先拿到 offerLock 衛式等待後,完成入隊的動作;然後拿到 pollLock 發送通知,解除出隊的等待線程。
以上實作
- 確定通過入隊鎖和出隊鎖,分别保證入隊和出隊的原子性;
- 出隊動作,通過特别的實作,確定出隊隻會變更 head ,避免擷取 offerLock;
- 通過 offerLock.notifyAll 和 pollLock.notifyAll 解決讀寫競争的問題;
但上述實作還有未解決的問題:
當有多個入隊線程等待時,一次出隊的動作會觸發所有入隊線程競争,大量的線程上下文切換,最終隻有一個線程能執行。
即,還有 讀與讀 和 寫與寫 之間的競争問題。
4 狀态追蹤解除競争
此處可以通過狀态追蹤,解除讀與讀之間和寫與寫之間的競争問題
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;
// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果隊列已滿,通過傳回值辨別
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
offerLock.wait();
waitOfferCount--;
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}
// 如果隊列為空,阻塞等待
public Object poll() throws InterruptedException {
Object result;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
pollLock.wait();
waitPollCount--;
}
result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定義
}
- 通過 waitOfferCount 和 waitPollCount 的狀态追蹤解決 讀寫内部的競争問題;
- 當隊列變更時,根據追蹤的狀态,決定是否派發消息,觸發線程阻塞狀态解除;
但,上述的實作在某些場景下會運作失敗,面臨活性問題,考慮
情況一:
- 初始狀态隊列為空 線程 A 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==1;
- 此時線程 A 在執行 wait 時被中斷,抛出異常, waitPollCount==1 并未被重置;
- 阻塞隊列為空,但 waitPollCount==1 類狀态異常;
情況二:
- 初始狀态隊列為空 線程 A B 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==2;
- 線程 C 執行入隊動作,可以立即執行,執行完成後,觸發 pollLock 解除一個線程等待 notify;
- 觸發的線程在 JVM 實作中是随機的,假設線程 A 被解除阻塞;
- 假設線程 A 在阻塞過程中已被中斷,阻塞解除後 JVM 檢查 interrupted 狀态,抛出 InterruptedException 異常;
- 此時隊列中有一個元素,但線程 A 仍阻塞在 pollLock 中,且一直阻塞下去;
以上為解除阻塞消息丢失的例子,問題的根源在與異常處理。
5 解決異常問題
解決線程中斷退出的問題,線程校驗中斷狀态的場景
- JVM 通常隻會在有限的幾個場景檢測線程的中斷狀态, wait, Thread.join, Thread.sleep;
- JVM 在檢測到線程中斷狀态 Thread.interrupted() 後,會清除中斷标志,抛出 InterruptedException;
- 通常為了保證線程對中斷及時響應, run 方法中需要自主檢測中斷标志,中斷線程,特别是對中斷比較敏感需要保持類的不變式的場景;
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
// guard: canPollCount, head, waitPollCount
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;
// guard: canOfferCount, tail, waitOfferCount
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果隊列已滿,通過傳回值辨別
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競争鎖
}
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
try {
offerLock.wait();
} catch (InterruptedException e) {
// 觸發其他線程
offerLock.notify();
throw e;
} finally {
waitOfferCount--;
}
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}
// 如果隊列為空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
try {
pollLock.wait();
} catch (InterruptedException e) {
pollLock.notify();
throw e;
} finally {
waitPollCount--;
}
}
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定義
}
- 當等待線程中斷退出時,捕獲中斷異常,通過 pollLock.notify 和 offerLock.notify 轉發消息;
- 通過在 finally 中恢複狀态追蹤變量;
通過狀态變量追蹤可以解決讀與讀之間和寫與寫之間的鎖競争問題。
以下考慮如果解決讀與讀之間和寫與寫之間的公平性問題。
6 解決公平性
公平性的問題的解決需要将狀态變量的追蹤轉換為:請求螢幕追蹤。
- 每個請求對應一個螢幕;
- 通過内部維護一個 FIFO 隊列,實作公平性;
- 在隊列狀态變更時,釋放隊列中的螢幕;
以上邏輯可以統一抽象為
boolean needToWait;
synchronized(this) {
needToWait = calculateNeedToWait();
if (needToWait) {
enqueue(monitor); // 請求對應的monitor
}
}
if (needToWait) {
monitor.doWait();
}
需要注意
- monitor.doWait() 需要在 this 的衛式語句之外,因為如果在内部, monitor.doWait 并不會釋放 this鎖;
- calculateNeedToWait() 需要在 this 的守衛之内完成,避免同步問題;
- 需要考慮中斷異常的問題;
基于以上的邏輯抽象,實作公平隊列
// 省略接口定義
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指針,empty: head.next == tail == null
protected Node head;
// 尾指針
protected Node tail;
// guard: canPollCount, head, pollQueue
protected final Object pollLock = new Object();
protected int canPollCount;
// guard: canOfferCount, tail, offerQueue
protected final Object offerLock = new Object();
protected int canOfferCount;
protected final WaitQueue pollQueue = new WaitQueue();
protected final WaitQueue offerQueue = new WaitQueue();
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canOfferCount = capacity;
this.canPollCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果隊列已滿,通過傳回值辨別
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競争鎖
}
WaitNode wait = null;
synchronized(offerLock) {
// 在有阻塞請求或者隊列為空時,阻塞等待
if (canOfferCount <= 0 || !offerQueue.isEmpty()) {
wait = new WaitNode();
offerQueue.enq(wait);
} else {
// continue.
}
}
try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
offerQueue.doNotify();
throw e;
}
// 確定此時線程狀态正常,以下不會校驗中斷
synchronized(offerLock) {
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollQueue.doNotify();
}
return true;
}
// 如果隊列為空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
WaitNode wait = null;
synchronized(pollLock) {
// 在有阻塞請求或者隊列為空時,阻塞等待
if (canPollCount <= 0 || !pollQueue.isEmpty()) {
wait = new WaitNode();
pollQueue.enq(wait);
} else {
// ignore
}
}
try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
// 傳遞消息
pollQueue.doNotify();
throw e;
}
// 以下不會檢測線程中斷狀态
synchronized(pollLock) {
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerQueue.doNotify();
}
return result;
}
class WaitQueue {
WaitNode head;
WaitNode tail;
WaitQueue() {
head = new WaitNode();
tail = head;
}
synchronized void doNotify() {
for(;;) {
WaitNode node = deq();
if (node == null) {
break;
} else if (node.doNotify()) {
// 此處確定NOTIFY成功
break;
} else {
// ignore, and retry.
}
}
}
synchronized boolean isEmpty() {
return head.next == null;
}
synchronized void enq(WaitNode node) {
tail.next = node;
tail = tail.next;
}
synchronized WaitNode deq() {
if (head.next == null) {
return null;
}
WaitNode res = head.next;
head = head.next;
if (head.next == null) {
tail = head; // 為空,遷移tail節點
}
return res;
}
}
class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}
synchronized void doWait() throws InterruptedException {
try {
while (!released) {
wait();
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果是NOTIFY之後收到中斷的信号,不能抛出異常;需要做RELAY處理
Thread.currentThread().interrupt();
}
}
}
synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
// 明确釋放了一個線程,傳回true
return true;
} else {
// 沒有釋放新的線程,傳回false
return false;
}
}
}
// 省略 Node 的定義
}
- 核心是替換狀态追蹤變量為同步節點, WaitNode;
- WaitNode 通過簡單的同步隊列組織實作 FIFO 協定,每個線程等待各自的 WaitNode 螢幕;
- WaitNode 内部維持 released 狀态,辨別線程阻塞狀态是否被釋放,主要是為了進行中斷的問題;
- WaitQueue 本身是全同步的,由于已解決了讀寫競争已經讀寫内部競争的問題, WaitQueue 同步并不會造成問題;
- WaitQueue 是無界隊列,是一個潛在的問題;但由于其隻做同步的追蹤,而且追蹤的通常是線程,通常并不是問題;
- 最終的公平有界隊列實作,無論是入隊還是出隊,首先衛式語句判定是否需要入隊等待,如果入隊等待,通過公平性協定等待;
當信号釋放時,借助讀寫鎖同步更新隊列;最後同樣借助讀寫鎖,觸發隊列更新消息;
7 等待時間的問題
并發場景下,等待通常會設定為限時等待 TIMED_WAITING ,避免死鎖或損失系統活性;
實作同步隊列的限時等待,并沒想象的那麼困難
class TimeoutException extends InterruptedException {}
class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}
synchronized void doWait(long milliSeconds) throws InterruptedException {
try {
long startTime = System.currentTimeMillis();
long toWait = milliSeconds;
for (;;) {
wait(toWait);
if (released) {
return;
}
long now = System.currentTimeMillis();
toWait = toWait - (now - startTime);
if (toWait <= 0) {
throw new TimeoutException();
}
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果已經釋放信号量,此處不抛出異常;但恢複中斷狀态
Thread.currentThread().interrupt();
}
}
}
synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
return true;
} else {
return false;
}
}
由于所有的等待都阻塞在 WaitNode 螢幕,以上
- 首先定義逾時異常,此處隻是為了友善異常處理,繼承 InterruptedException;
- 此處依賴于 wait(long timeout) 的逾時等待實作,這通常不是問題;
最後,将 WaitNode 逾時等待的邏輯,帶入到 FairnessBoundedBlockingQueue 實作中,即可。
四 總結
本文通過一步步疊代,最終借助 JAVA 同步原語實作初版的公平有界隊列。疊代實作過程中可以看到以下幾點:
- 觀念的轉變,将調用一個類的方法思維轉換為:在滿足一定條件下方法才可以調用,在調用前需要滿足不變式,調用後滿足不變式;由于并發的問題很難測試,通常要采用衛式表達證明并發的正确性;
- 在疊代實作中會看到很多模式,比如,讀寫分離時,其實可以抽象為讀鎖和寫鎖;就得到了一個抽象的 Lock 的定義;比如,讀寫狀态追蹤,可以采用 Exchanger 抽象表達;
- 另外,本文的實作遠非完善,還需要考慮支援 Iterator 周遊、狀态查詢及資料遷移等操作;
最後,相信大家再看 JUC 的工具包實作,定有不一樣的體會。
阿裡雲資源編排ROS使用教程
資源編排(Resource Orchestration)是一種簡單易用的雲計算資源管理和自動化運維服務。使用者通過模闆描述多個雲計算資源的依賴關系、配置等,并自動完成所有資源的建立和配置,以達到自動化部署、運維等目的。編排模闆同時也是一種标準化的資源和應用傳遞方式,并且可以随時編輯修改,使基礎設施即代碼(Infrastructure as Code)成為可能。
點選這裡,檢視詳情!