天天看點

并發程式設計實踐之公平有界阻塞隊列實作

并發程式設計實踐之公平有界阻塞隊列實作

作者 | 李新然

來源 | 阿裡技術公衆号

一 背景

JUC 工具包是 JAVA 并發程式設計的利器。

本文講述在沒有 JUC 工具包幫助下,借助原生的 JAVA 同步原語, 如何實作一個公平有界的阻塞隊列。

希望你也能在文後體會到并發程式設計的複雜之處,以及 JUC 工具包的強大。

二 方法

本文使用到的基本工具:

  1. 同步監聽器 synchronized ,方法基本和代碼塊級别;
  2. Object 基礎類的 wait, notify, notifyAll;

基于以上基礎工具,實作公平有界的阻塞隊列,此處:

  1. 将公平的定義限定為 FIFO ,也就是先阻塞等待的請求,先解除等待;
  2. 并不保證解除等待後執行 Action 的先後順序;
  3. 確定隊列的大小始終不超過設定的容量;但阻塞等待的請求數不做限制;

三 實作

1 基礎版本

首先,考慮在非并發場景下,借助 ADT 實作一個基礎版本

interface Queue {

    boolean offer(Object obj);

    Object poll();

}
class FairnessBoundedBlockingQueue implements Queue {
    // 目前大小
    protected int size;

    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.head = new Node(null);
        this.tail = head;
        this.size = 0;
    }

    // 如果隊列已滿,通過傳回值辨別
    public boolean offer(Object obj) {
        if (size < capacity) {
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            ++size;
            return true;
        }
        return false;
    }

    // 如果隊列為空,head.next == null;傳回空元素
    public Object poll() {
        if (head.next != null) {
            Object result = head.next.value;
            head.next.value = null;
            head = head.next; // 丢棄頭結點
            --size;
            return result;
        }
        return null;
    }

    class Node {
        Object value;
        Node next;
        Node(Object obj) {
            this.value = obj;
            next = null;
        }
    }
}           

以上

  1. 定義支援隊列的兩個基礎接口, poll 和 offer;
  2. 隊列的實作,采用經典實作;
  3. 考慮在隊列空的情況下, poll 傳回為空,非阻塞;
  4. 隊列在滿的情況下, offer 傳回 false ,入隊不成功,無異常;

需要注意的一點:在出隊時,本文通過遷移頭結點的方式實作,避免修改尾結點。

在下文實作并發版本時,會看到此處的用意。

2 并發版本

如果在并發場景下,上述的實作面臨一些問題,同時未實作給定的一些需求。

通過添加 synchronized ,保證并發條件下的線程安全問題。

注意此處做同步的原因是為了保證類的不變式。

并發問題

在并發場景下,基礎版本的實作面臨的問題包括:原子性,可見性和指令重排的問題。

參考 JMM 的相關描述。

并發問題,最簡單的解決方法是:通過 synchronized 加鎖,一次性解決問題。

// 省略接口定義
class BoundedBlockingQueue implements Queue {
    // 目前大小
    protected int size;

    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    public BoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.head = new Node(null);
        this.tail = head;
        this.size = 0;
    }

    // 如果隊列已滿,通過傳回值辨別
    public synchronized boolean offer(Object obj) {
        if (size < capacity) {
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            ++size;
            return true;
        }
        return false;
    }

    // 如果隊列為空,head.next == null;傳回空元素
    public synchronized Object poll() {
        if (head.next != null) {
            Object result = head.next.value;
            head.next.value = null;
            head = head.next; // 丢棄頭結點
            --size;
            return result;
        }
        return null;
    }
    // 省略 Node 的定義
}           

以上,簡單粗暴的加 synchronized 可以解決問題,但會引入新的問題:系統活性問題(此問題下文會解決)。

同時,簡單加 synchronized 同步是無法實作阻塞等待;即

  1. 如果隊列為空,那麼出隊的動作還是會立即傳回,傳回為空;
  2. 如果隊列已滿,那麼入隊動作還是會立即傳回,傳回操作不成功;

實作阻塞等待,需要借助 JAVA 中的 PV 原語:wait, notify, notifyAll 。

參考:JDK 中對 wait, notify, notifyAll 的相關描述。

衛式方法

阻塞等待,可以通過簡單的衛式方法來實作,此問題本質上可以抽象為:

  1. 任何一個方法都需要在滿足一定條件下才可以執行;
  2. 執行方法前需要首先校驗不變式,然後執行變更;
  3. 在執行完成後,校驗是否滿足後驗不變式;
WHEN(condition) Object action(Object arg) {
    checkPreCondition();
    doAction(arg);
    checkPostCondition();
}           

此種抽象 Ada 在語言層面上實作。在 JAVA 中,借助 wait, notify, notifyAll 可以翻譯為:

// 目前線程
synchronized Object action(Object arg) {
    while(!condition) {
        wait();
    }
    // 前置條件,不變式
    checkPreCondition();
    doAction();
    // 後置條件,不變式
    checkPostCondition();
}

// 其他線程
synchronized Object notifyAction(Object arg) {
    notifyAll();
}           

需要注意:

  1. 通常會采用 notifyAll 發送通知,而非 notify ;因為如果目前線程收到 notify 通知後被中斷,那麼系統将一直等待下去。
  2. 如果使用了 notifyAll 那麼衛式語句必須放在 while 循環中;因為線程喚醒後,執行條件已經不滿足,雖然目前線程持有互斥鎖。
  3. 衛式條件的所有變量,有任何變更都需要發送 notifyAll 不然面臨系統活性問題

據此,不難實作簡單的阻塞版本的有界隊列,如下

interface Queue {

    boolean offer(Object obj) throws InterruptedException;

    Object poll() throws InterruptedException;

}
class FairnessBoundedBlockingQueue implements Queue {
    // 目前大小
    protected int size;

    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.head = new Node(null);
        this.tail = head;
        this.size = 0;
    }

    // 如果隊列已滿,通過傳回值辨別
    public synchronized boolean offer(Object obj) throws InterruptedException {
        while (size < capacity) {
            wait();
        }
        Node node = new Node(obj);
        tail.next = node;
        tail = node;
        ++size;
        notifyAll(); // 可以出隊
        return true;
    }

    // 如果隊列為空,阻塞等待
    public synchronized Object poll() throws InterruptedException {
        while (head.next == null) {
            wait();
        }
        Object result = head.next.value;
        head.next.value = null;
        head = head.next; // 丢棄頭結點
        --size;
        notifyAll(); // 可以入隊
        return result;
    }
    // 省略 Node 的定義
}           

以上,實作了阻塞等待,但也引入了更大的性能問題

  1. 入隊和出隊動作阻塞等待同一把鎖,惡性競争;
  2. 當隊列變更時,所有阻塞線程被喚醒,大量的線程上下文切換,競争同步鎖,最終可能隻有一個線程能執行;

需要注意的點:

  1. 阻塞等待 wait 會抛出中斷異常。關于異常的問題下文會處理;
  2. 接口需要支援抛出中斷異常;
  3. 隊裡變更需要 notifyAll 避免線程中斷或異常,丢失消息;

3 鎖拆分優化

以上第一個問題,可以通過鎖拆分來解決,即:定義兩把鎖,讀鎖和寫鎖;讀寫分離。

// 省略接口定義
class FairnessBoundedBlockingQueue implements Queue {
    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    // guard: canPollCount, head
    protected final Object pollLock = new Object();
    protected int canPollCount;

    // guard: canOfferCount, tail
    protected final Object offerLock = new Object();
    protected int canOfferCount;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.canPollCount = 0;
        this.canOfferCount = capacity;
        this.head = new Node(null);
        this.tail = head;
    }

    // 如果隊列已滿,通過傳回值辨別
    public boolean offer(Object obj) throws InterruptedException {
        synchronized(offerLock) {
            while(canOfferCount <= 0) {
                offerLock.wait();
            }
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            canOfferCount--;
        }
        synchronized(pollLock) {
            ++canPollCount;
            pollLock.notifyAll();
        }
        return true;
    }

    // 如果隊列為空,阻塞等待
    public Object poll() throws InterruptedException {
        Object result = null;
        synchronized(pollLock) {
            while(canPollCount <= 0) {
                pollLock.wait();
            }

            result = head.next.value;
            head.next.value = null;
            head = head.next;
            canPollCount--;
        }
        synchronized(offerLock) {
            canOfferCount++;
            offerLock.notifyAll();
        }
        return result;
    }
    // 省略 Node 定義
}           
  1. 定義了兩把鎖, pollLock 和 offerLock 拆分出隊和入隊競争;
  2. 入隊鎖同步的變量為:callOfferCount 和 tail;
  3. 出隊鎖同步的變量為:canPollCount 和 head;
  4. 出隊的動作:首先拿到 pollLock 衛式等待後,完成出隊動作;然後拿到 offerLock 發送通知,解除入隊的等待線程。
  5. 入隊的動作:首先拿到 offerLock 衛式等待後,完成入隊的動作;然後拿到 pollLock 發送通知,解除出隊的等待線程。

以上實作

  1. 確定通過入隊鎖和出隊鎖,分别保證入隊和出隊的原子性;
  2. 出隊動作,通過特别的實作,確定出隊隻會變更 head ,避免擷取 offerLock;
  3. 通過 offerLock.notifyAll 和 pollLock.notifyAll 解決讀寫競争的問題;

但上述實作還有未解決的問題:

當有多個入隊線程等待時,一次出隊的動作會觸發所有入隊線程競争,大量的線程上下文切換,最終隻有一個線程能執行。

即,還有 讀與讀 和 寫與寫 之間的競争問題。

4 狀态追蹤解除競争

此處可以通過狀态追蹤,解除讀與讀之間和寫與寫之間的競争問題

class FairnessBoundedBlockingQueue implements Queue {
    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    // guard: canPollCount, head
    protected final Object pollLock = new Object();
    protected int canPollCount;
    protected int waitPollCount;

    // guard: canOfferCount, tail
    protected final Object offerLock = new Object();
    protected int canOfferCount;
    protected int waitOfferCount;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.canPollCount = 0;
        this.canOfferCount = capacity;
        this.waitPollCount = 0;
        this.waitOfferCount = 0;
        this.head = new Node(null);
        this.tail = head;
    }

    // 如果隊列已滿,通過傳回值辨別
    public boolean offer(Object obj) throws InterruptedException {
        synchronized(offerLock) {
            while(canOfferCount <= 0) {
                waitOfferCount++;
                offerLock.wait();
                waitOfferCount--;
            }
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            canOfferCount--;
        }
        synchronized(pollLock) {
            ++canPollCount;
            if (waitPollCount > 0) {
                pollLock.notify();
            }
        }
        return true;
    }

    // 如果隊列為空,阻塞等待
    public Object poll() throws InterruptedException {
        Object result;
        synchronized(pollLock) {
            while(canPollCount <= 0) {
                waitPollCount++;
                pollLock.wait();
                waitPollCount--;
            }

            result = head.next.value;
            head.next.value = null;
            head = head.next;
            canPollCount--;
        }
        synchronized(offerLock) {
            canOfferCount++;
            if (waitOfferCount > 0) {
                offerLock.notify();
            }
        }
        return result;
    }
    // 省略 Node 的定義
}           
  1. 通過 waitOfferCount 和 waitPollCount 的狀态追蹤解決 讀寫内部的競争問題;
  2. 當隊列變更時,根據追蹤的狀态,決定是否派發消息,觸發線程阻塞狀态解除;

但,上述的實作在某些場景下會運作失敗,面臨活性問題,考慮

情況一:

  1. 初始狀态隊列為空 線程 A 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==1;
  2. 此時線程 A 在執行 wait 時被中斷,抛出異常, waitPollCount==1 并未被重置;
  3. 阻塞隊列為空,但 waitPollCount==1 類狀态異常;

情況二:

  1. 初始狀态隊列為空 線程 A B 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==2;
  2. 線程 C 執行入隊動作,可以立即執行,執行完成後,觸發 pollLock 解除一個線程等待 notify;
  3. 觸發的線程在 JVM 實作中是随機的,假設線程 A 被解除阻塞;
  4. 假設線程 A 在阻塞過程中已被中斷,阻塞解除後 JVM 檢查 interrupted 狀态,抛出 InterruptedException 異常;
  5. 此時隊列中有一個元素,但線程 A 仍阻塞在 pollLock 中,且一直阻塞下去;

以上為解除阻塞消息丢失的例子,問題的根源在與異常處理。

5 解決異常問題

解決線程中斷退出的問題,線程校驗中斷狀态的場景

  1. JVM 通常隻會在有限的幾個場景檢測線程的中斷狀态, wait, Thread.join, Thread.sleep;
  2. JVM 在檢測到線程中斷狀态 Thread.interrupted() 後,會清除中斷标志,抛出 InterruptedException;
  3. 通常為了保證線程對中斷及時響應, run 方法中需要自主檢測中斷标志,中斷線程,特别是對中斷比較敏感需要保持類的不變式的場景;
class FairnessBoundedBlockingQueue implements Queue {
    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    // guard: canPollCount, head, waitPollCount
    protected final Object pollLock = new Object();
    protected int canPollCount;
    protected int waitPollCount;

    // guard: canOfferCount, tail, waitOfferCount
    protected final Object offerLock = new Object();
    protected int canOfferCount;
    protected int waitOfferCount;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.canPollCount = 0;
        this.canOfferCount = capacity;
        this.waitPollCount = 0;
        this.waitOfferCount = 0;
        this.head = new Node(null);
        this.tail = head;
    }

    // 如果隊列已滿,通過傳回值辨別
    public boolean offer(Object obj) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競争鎖
        }
        synchronized(offerLock) {
            while(canOfferCount <= 0) {
                waitOfferCount++;
                try {
                    offerLock.wait();
                } catch (InterruptedException e) {
                    // 觸發其他線程
                    offerLock.notify();
                    throw e;

                } finally {
                    waitOfferCount--;
                }
            }
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            canOfferCount--;
        }
        synchronized(pollLock) {
            ++canPollCount;
            if (waitPollCount > 0) {
                pollLock.notify();
            }
        }
        return true;
    }

    // 如果隊列為空,阻塞等待
    public Object poll() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        Object result = null;
        synchronized(pollLock) {
            while(canPollCount <= 0) {
                waitPollCount++;
                try {
                    pollLock.wait();
                } catch (InterruptedException e) {
                    pollLock.notify();
                    throw e;
                } finally {
                    waitPollCount--;
                }
            }

            result = head.next.value;
            head.next.value = 0;
            // ignore head;
            head = head.next;
            canPollCount--;
        }
        synchronized(offerLock) {
            canOfferCount++;
            if (waitOfferCount > 0) {
                offerLock.notify();
            }
        }
        return result;
    }
    // 省略 Node 的定義
}           
  1. 當等待線程中斷退出時,捕獲中斷異常,通過 pollLock.notify 和 offerLock.notify 轉發消息;
  2. 通過在 finally 中恢複狀态追蹤變量;

通過狀态變量追蹤可以解決讀與讀之間和寫與寫之間的鎖競争問題。

以下考慮如果解決讀與讀之間和寫與寫之間的公平性問題。

6 解決公平性

公平性的問題的解決需要将狀态變量的追蹤轉換為:請求螢幕追蹤。

  1. 每個請求對應一個螢幕;
  2. 通過内部維護一個 FIFO 隊列,實作公平性;
  3. 在隊列狀态變更時,釋放隊列中的螢幕;

以上邏輯可以統一抽象為

boolean needToWait;
synchronized(this) {
    needToWait = calculateNeedToWait();
    if (needToWait) {
        enqueue(monitor); // 請求對應的monitor
    }
}
if (needToWait) {
    monitor.doWait();
}           

需要注意

  1. monitor.doWait() 需要在 this 的衛式語句之外,因為如果在内部, monitor.doWait 并不會釋放 this鎖;
  2. calculateNeedToWait() 需要在 this 的守衛之内完成,避免同步問題;
  3. 需要考慮中斷異常的問題;

基于以上的邏輯抽象,實作公平隊列

// 省略接口定義
class FairnessBoundedBlockingQueue implements Queue {
    // 容量
    protected final int capacity;

    // 頭指針,empty: head.next == tail == null
    protected Node head;

    // 尾指針
    protected Node tail;

    // guard: canPollCount, head, pollQueue
    protected final Object pollLock = new Object();
    protected int canPollCount;

    // guard: canOfferCount, tail, offerQueue
    protected final Object offerLock = new Object();
    protected int canOfferCount;

    protected final WaitQueue pollQueue = new WaitQueue();
    protected final WaitQueue offerQueue = new WaitQueue();

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.canOfferCount = capacity;
        this.canPollCount = 0;
        this.head = new Node(null);
        this.tail = head;
    }

    // 如果隊列已滿,通過傳回值辨別
    public boolean offer(Object obj) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競争鎖
        }
        WaitNode wait = null;
        synchronized(offerLock) {
            // 在有阻塞請求或者隊列為空時,阻塞等待
            if (canOfferCount <= 0 || !offerQueue.isEmpty()) {
                wait = new WaitNode();
                offerQueue.enq(wait);
            } else {
                // continue.
            }
        }

        try {
            if (wait != null) {
                wait.doWait();
            }
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
        } catch (InterruptedException e) {
            offerQueue.doNotify();
            throw e;
        }

        // 確定此時線程狀态正常,以下不會校驗中斷
        synchronized(offerLock) {
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            canOfferCount--;
        }
        synchronized(pollLock) {
            ++canPollCount;
            pollQueue.doNotify();
        }
        return true;
    }

    // 如果隊列為空,阻塞等待
    public Object poll() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        Object result = null;
        WaitNode wait = null;
        synchronized(pollLock) {
            // 在有阻塞請求或者隊列為空時,阻塞等待
            if (canPollCount <= 0 || !pollQueue.isEmpty()) {
                wait = new WaitNode();
                pollQueue.enq(wait);
            } else {
                // ignore
            }
        }

        try {
            if (wait != null) {
                wait.doWait();
            }
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
        } catch (InterruptedException e) {
            // 傳遞消息
            pollQueue.doNotify();
            throw e;
        }

        // 以下不會檢測線程中斷狀态
        synchronized(pollLock) {
            result = head.next.value;
            head.next.value = 0;
            // ignore head;
            head = head.next;
            canPollCount--;
        }

        synchronized(offerLock) {
            canOfferCount++;
            offerQueue.doNotify();
        }
        return result;
    }

    class WaitQueue {

        WaitNode head;
        WaitNode tail;

        WaitQueue() {
            head = new WaitNode();
            tail = head;
        }

        synchronized void doNotify() {
            for(;;) {
                WaitNode node = deq();
                if (node == null) {
                    break;
                } else if (node.doNotify()) {
                    // 此處確定NOTIFY成功
                    break;
                } else {
                    // ignore, and retry.
                }
            }
        }

        synchronized boolean isEmpty() {
            return head.next == null;
        }

        synchronized void enq(WaitNode node) {
            tail.next = node;
            tail = tail.next;
        }

        synchronized WaitNode deq() {
            if (head.next == null) {
                return null;
            }
            WaitNode res = head.next;
            head = head.next;
            if (head.next == null) {
                tail = head; // 為空,遷移tail節點
            }
            return res;
        }
    }

    class WaitNode {
        boolean released;
        WaitNode next;
        WaitNode() {
            released = false;
            next = null;
        }

        synchronized void doWait() throws InterruptedException {
            try {
                while (!released) {
                    wait();
                }             
            } catch (InterruptedException e) {
                if (!released) {
                    released = true;
                    throw e;
                } else {
                    // 如果是NOTIFY之後收到中斷的信号,不能抛出異常;需要做RELAY處理
                    Thread.currentThread().interrupt();
                }
            }
        }

        synchronized boolean doNotify() {
            if (!released) {
                released = true;
                notify();
                // 明确釋放了一個線程,傳回true
                return true;
            } else {
                // 沒有釋放新的線程,傳回false
                return false;
            }
        }
    }
    // 省略 Node 的定義
}           
  1. 核心是替換狀态追蹤變量為同步節點, WaitNode;
  2. WaitNode 通過簡單的同步隊列組織實作 FIFO 協定,每個線程等待各自的 WaitNode 螢幕;
  3. WaitNode 内部維持 released 狀态,辨別線程阻塞狀态是否被釋放,主要是為了進行中斷的問題;
  4. WaitQueue 本身是全同步的,由于已解決了讀寫競争已經讀寫内部競争的問題, WaitQueue 同步并不會造成問題;
  5. WaitQueue 是無界隊列,是一個潛在的問題;但由于其隻做同步的追蹤,而且追蹤的通常是線程,通常并不是問題;
  6. 最終的公平有界隊列實作,無論是入隊還是出隊,首先衛式語句判定是否需要入隊等待,如果入隊等待,通過公平性協定等待;

當信号釋放時,借助讀寫鎖同步更新隊列;最後同樣借助讀寫鎖,觸發隊列更新消息;

7 等待時間的問題

并發場景下,等待通常會設定為限時等待 TIMED_WAITING ,避免死鎖或損失系統活性;

實作同步隊列的限時等待,并沒想象的那麼困難

class TimeoutException extends InterruptedException {}

class WaitNode {
    boolean released;
    WaitNode next;
    WaitNode() {
        released = false;
        next = null;
    }

    synchronized void doWait(long milliSeconds) throws InterruptedException {
        try {
            long startTime = System.currentTimeMillis();
            long toWait = milliSeconds;
            for (;;) {
                wait(toWait);
                if (released) {
                    return;
                }
                long now = System.currentTimeMillis();
                toWait = toWait - (now - startTime);
                if (toWait <= 0) {
                    throw new TimeoutException();
                }
            }
        } catch (InterruptedException e) {
            if (!released) {
                released = true;
                throw e;
            } else {
                // 如果已經釋放信号量,此處不抛出異常;但恢複中斷狀态
                Thread.currentThread().interrupt();
            }
        }
    }

    synchronized boolean doNotify() {
        if (!released) {
            released = true;
            notify();
            return true;
        } else {
            return false;
        }
    }           

由于所有的等待都阻塞在 WaitNode 螢幕,以上

  • 首先定義逾時異常,此處隻是為了友善異常處理,繼承 InterruptedException;
  • 此處依賴于 wait(long timeout) 的逾時等待實作,這通常不是問題;

最後,将 WaitNode 逾時等待的邏輯,帶入到 FairnessBoundedBlockingQueue 實作中,即可。

四 總結

本文通過一步步疊代,最終借助 JAVA 同步原語實作初版的公平有界隊列。疊代實作過程中可以看到以下幾點:

  1. 觀念的轉變,将調用一個類的方法思維轉換為:在滿足一定條件下方法才可以調用,在調用前需要滿足不變式,調用後滿足不變式;由于并發的問題很難測試,通常要采用衛式表達證明并發的正确性;
  2. 在疊代實作中會看到很多模式,比如,讀寫分離時,其實可以抽象為讀鎖和寫鎖;就得到了一個抽象的 Lock 的定義;比如,讀寫狀态追蹤,可以采用 Exchanger 抽象表達;
  3. 另外,本文的實作遠非完善,還需要考慮支援 Iterator 周遊、狀态查詢及資料遷移等操作;

最後,相信大家再看 JUC 的工具包實作,定有不一樣的體會。

阿裡雲資源編排ROS使用教程

資源編排(Resource Orchestration)是一種簡單易用的雲計算資源管理和自動化運維服務。使用者通過模闆描述多個雲計算資源的依賴關系、配置等,并自動完成所有資源的建立和配置,以達到自動化部署、運維等目的。編排模闆同時也是一種标準化的資源和應用傳遞方式,并且可以随時編輯修改,使基礎設施即代碼(Infrastructure as Code)成為可能。

點選這裡

,檢視詳情!