天天看點

Java 多線程 & 鎖原理 | Java Debug 筆記

後端講求的是高并發、高性能、高可用(3H),但是要實作 3H,通常是通過優化架構(橫向分層,縱向分割)、使用緩存、分布式化和叢集化等手段來實作。很少會自己寫線程代碼,日常開發在需要用到多線程的地方也大多都交給架構處理,對多線程和鎖原理的了解一直不夠深入。

趁着周末有時間,從簡單的線程建立方式開始講起,逐漸深入了解關于線程的幾種狀态和鎖原理。針對一些以前沒寫過獨立文章的鎖實作,還會展開來講。

Java 建立線程的幾種方式

先簡單介紹一下建立線程的幾種方式:

  • 繼承 Thread
• class SubThread extends Thread {
@Override
public void run() {
System.out.println(getName());
}      
public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            SubThread subThread = new SubThread();
            subThread.start();
        }
    }
}      
  • 實作 Runnable 接口
• class RunnableImpl implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}      
public static void main(String[] args) {
        RunnableImpl runnable = new RunnableImpl();
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
    }
}      

由于 Runnable 接口隻有一個 run 方法需要我們實作,是以也可以在建立 Thread 執行個體的時候傳入 lambda 表達式,這裡不再贅述。

  • 線程池
• class RunnableImpl implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}      
public static void main(String[] args) {
        final int count = 10;
        ExecutorService threadPool = Executors.newFixedThreadPool(count);
        RunnableImpl runnable = new RunnableImpl();
        for (int i = 0; i < count; i++) {
            threadPool.submit(runnable);
        }
    }
}      

Executors 作為線程池工具類提供了諸多快速建立線程池的 API 。一部分底層使用的是 ThreadPoolExecutor ,一部分底層使用的是 ForkJoinPool 。

使用線程池将不再需要顯式建立線程,而隻需要将任務送出到線程池(線程池内部使用一個 BlockingQueue 存放),線程池内部會自動建立/銷毀線程來完成任務。

更多與線程池相關的内容可以看:深入了解 Java ThreadPool

線程的幾種狀态

首先 Java 中的線程和作業系統的線程是一一對應的關系。線程狀态可以通過調用線程的 getState() 方法擷取,傳回值是一個狀态枚舉:

public enum State {
    NEW,
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED;
}      

前面也說了建立線程的幾種方式,無論是通過何種方式建立的線程,總離不開以下 6 種線程狀态:

  1. New :一個線程被 New 出來之後,調用 start() 方法之前,就是 New 狀态
  2. Runnable :一個被建立出來的線程調用 start() 方法,線程開始進入 Runnable 狀态
  1. Ready :就緒狀态,指線程可以被 CPU 執行,所有的處于 Ready 狀态的線程會被存放在一個等待隊列裡。當線程被排程器選中之後,會從 Ready 狀态轉換為 Running 狀态
  2. Running :運作狀态,指線程正在被 CPU 執行。當線程被挂起或調用 Thread.yleid() ,會從 Runniing 切換為 Ready 狀态
  1. TimedWaiting :線程在 Running 狀态調用以下方法會進入 TimedWaiting 狀态,等待時間到了之後,線程會重新變為 Ready 狀态(回到就緒隊列當中)
  • Thread.sleep(millis);
  • o.wait(timeout);
  • thread.join(millis);
  • LockSupport.parkNanos(nanos);
  • LockSupport.parkUntil(deadline);
  1. Waiting :線程在 Runnable 裡的 Running 狀态調用以下 ① 方法會進入 Waiting 狀态,直到調用以下 ② 方法回到 Ready 狀态
  1. 從 Running 變為 Waiting 狀态的方法:
  • o.wait();
  • thread.join();
  • LockSupport.park();
  1. 從 Waiting 狀态回到 Ready 狀态的方法:
  • o.notify();
  • o.notifyAll();
  • LockSupport.unpark(currentThread);
  1. Blocked :一個處于 Running 狀态的線程試圖擷取進入同步代碼塊的鎖失敗的時候,會進入 Blocked 狀态。直到擷取到進入同步代碼塊的鎖,回到 Ready 狀态
  2. Teminated :當線程任務正常完成後的線程狀态

注意:當一個線程的處于 Teminated 狀态時,不能通過調用 start() 重新回到 Runnable 狀态。

AQS

在講具體的鎖之前,先來了解一下 AQS ( java.util.concurrent.locks.AbstractQueuedSynchronizer )。

AQS 為 Java 中的各種 CAS 鎖提供了上層抽象,AQS 中最為核心的四部分内容:

  1. Node 内部類。由于 AQS 中使用雙向連結清單存儲想要擷取鎖的線程,Node 作為雙向連結清單中的節點類,與線程進行綁定,同時記錄前一位和後一位節點,同時設立了 Node 一些狀态屬性。
1. static final class Node {
/** Marker to indicate a node is waiting in shared mode/
static final Node SHARED = new Node();
/* Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;      
static final int CANCELLED =  1;
 static final int SIGNAL    = -1;
 static final int CONDITION = -2;
 static final int PROPAGATE = -3;

 volatile int waitStatus;

 volatile Node prev;
 volatile Node next;
 volatile Thread thread;

 Node nextWaiter;

 final boolean isShared() {
     return nextWaiter == SHARED;
 }

 final Node predecessor() throws NullPointerException {
     Node p = prev;
     if (p == null)
         throw new NullPointerException();
     else
         return p;
 }

 Node() { }

 Node(Thread thread, Node mode) {     // Used by addWaiter
     this.nextWaiter = mode;
     this.thread = thread;
 }

 Node(Thread thread, int waitStatus) { // Used by Condition
     this.waitStatus = waitStatus;
     this.thread = thread;
 }      

}

  1. state 屬性值。一個被 volatile 修飾的 int 類型屬性(保證了線程可見性)。提供了基本的 Setter & Getter。至于 state 代表的含義是什麼要看具體的子類實作。
1. public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private volatile int state;      
protected final int getState() {
     return state;
 }

 protected final void setState(int newState) {
     state = newState;
 }      

...

}

  1. 各種 CAS 操作。
...
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}      
  1. ...
  2. 骨架邏輯。不難發現,AQS 封裝了不少骨架邏輯,使得子類隻需要實作部分方法即可以完成自定義鎖。

    ...

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
...      

有了以上的基礎認知,現在可以來看看 Java 的幾種鎖類型。

synchronized

synchronized 是最常用的實作線程安全的通用方案,synchronized 本質是鎖更新過程。

使用 synchronized 的方式有兩種。一種是直接使用同步代碼塊;另一種是将 synchronized 加在方法上,如果修飾的方法是執行個體方法,則使用 this 作為鎖對象,如果修飾的方法是靜态方法,則使用所在類的類對象作為鎖對象。整個鎖更新過程為:偏向鎖 -> 輕量級鎖 -> OS 鎖 。

關于 synchronized 更詳細的内容可以檢視:深入了解 synchronized

ReentrantLock

ReentrantLock : 基于 CAS ,相比于 synchronized 能夠更好的控制鎖的狀态。 synchronized 隻要進入代碼塊就代表上鎖,離開代碼塊就是釋放鎖;而 ReentranLock 需要手動的 lock 和 unlock,同時還提供 tryLock 方法 ,能夠讓我們在嘗試擷取鎖失敗後進行自定義操作。

ReentrantLock 還支援在初始化的實時指定 fair 參數,代表是否使用公平政策。公平鎖使用的是 ReentrantLock 内部的 FairSync ;非公平鎖使用的是 NonfairSync 。兩者都間接繼承自 AQS ,對 AQS 中的 state 的運用是用作記錄是否上鎖以及目前重入次數。

而 tryLock 方法,能夠指定一個 timeout 參數,會在指定時間内進行嘗試加鎖,并傳回加鎖結果,不會像 lock 方法那樣一直阻塞直到擷取成功為止。

關于 ReentrantLock 更詳細的内容可以檢視:深入了解 ReentrantLock

ReadWriteLock

ReadWriteLock : 讀寫鎖,其實是包含讀鎖(共享鎖)和寫鎖(排它鎖)。

讀鎖(共享鎖): 當添加的是讀鎖,允許其他的線程同樣使用讀鎖進入(其他的讀線程),不允許使用寫鎖進入(寫線程)。即讀讀并發,讀寫不并發。

寫鎖(排它鎖): 當添加的是寫鎖的時候,其他使用讀鎖或者寫鎖的線程都不能進入。

關于 synchronized 更詳細的内容可以檢視:深入了解 ReadWriteLock

LockSupport

LockSupport :

  • synchronized 的實作原理 & 鎖更新問題

關于 LockSupport 更詳細的内容可以檢視:深入了解 LockSupport

CountDownLatch

CountDownLatch : 倒計時,初始化的時候指定一個倒計時數值,當倒計時結束後,調用 wait 的線程會往下執行:

public static void main(String[] args) throws InterruptedException {
    int count = 100;
    // 設定倒計時數為 100
    final CountDownLatch countDownLatch = new CountDownLatch(count);

    for (int i = 0; i < count; i++) {
        new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                // 每個線程調用一次 countDown 代表倒計時減 1
                countDownLatch.countDown();
            }
        }).start();
    }

    System.out.println("countdown start");

    // 在倒計時結束前(count 數為 0)一直阻塞,直到倒計時結束
    countDownLatch.await();

    System.out.println("countdown end");
}      

沒寫過獨立分析 CountDownLatch 的文章。這裡簡單分析下 CountDownLatch 的實作。CountDownLatch 的源碼也十分簡單,本質就是将 AQS 中的 state 作為計數:

public class CountDownLatch {

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        /**
        對目前 state 值固定減一,沒有使用 release 參數
        也就是說隻能每次調用 countDown 來進行倒計時減一操作
        而且隻有 state 減為 0 ,才算是真正的鎖釋放,AQS 中的 doReleaseShared 方法才被執行,對線程執行 unpark 操作
        */
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
    嘗試擷取鎖,如果擷取失敗,對線程進行 park 操作
    */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }
}      

是以 CountDownLatch 顧名思義,就是将 AQS 中的 state 作為計數器使用,每次調用 countDown 則對 state 進行減一操作,隻有 state 減到 0 才算釋放鎖。

CyclicBarrier

CyclicBarrier : 循環障礙。對調用了 await 方法的線程進行等待,直到有達到數量的等待線程之後再集體釋放。

設定一個循環門檻值和 Runnable 對象,每達到一次循環門檻值,執行一次 Runnable 對象的 run 方法。

public static void main(String[] args) {
    int batchNumber = 10;

    final CyclicBarrier cyclicBarrier = new CyclicBarrier(batchNumber, new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + "數量達到規定,集體釋放等待隊列中的線程。");
        }
    });

    for (int i = 0; i < batchNumber; i++) {
        new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                for (int j = 0; j < 100; j++) {
                    System.out.println(Thread.currentThread().getName() + ": " + String.valueOf(j));
                    // 調用該方法會将目前線程放入 trip 等待隊列中,直到隊列中的數量達到門檻值,再集體喚醒隊列中的所有線程
                    cyclicBarrier.await();
                }
            }
        }).start();
    }
}      

上述實驗的列印結果是:

Thread-0: 0
Thread-4: 0
Thread-3: 0
Thread-2: 0
Thread-1: 0
Thread-6: 0
Thread-5: 0
Thread-7: 0
Thread-8: 0
Thread-9: 0
Thread-9: 數量達到規定,集體釋放等待隊列中的線程。
Thread-9: 1
Thread-0: 1
Thread-3: 1
Thread-6: 1
Thread-8: 1
Thread-1: 1
Thread-2: 1
Thread-4: 1
Thread-7: 1
Thread-5: 1
Thread-5: 數量達到規定,集體釋放等待隊列中的線程。
...      

每次線程列印一次目前計數之後被加入 trip 等待隊列,等待其他線程同樣的值列印完了相同的值之後(這時候 trip 達到規定數量),才會繼續執行(達到規定數量的 trip 隊列會對所有線程集體釋放)。

閱讀 CyclicBarrier 相關源碼:

public class CyclicBarrier {
    // 一個幫助記錄是否打斷控制的内部類,主要用作跳出死循環
    private static class Generation {
        boolean broken = false;
    }

    // 是否打斷控制的變量
    private Generation generation = new Generation();

    // 使用 ReentrantLock 作為鎖對象
    private final ReentrantLock lock = new ReentrantLock();
    // 使用 lock.newCondition() 來建立一個 trip 隊列
    private final Condition trip = lock.newCondition();
    // 儲存調用建構函數時指定的循環數值
    private final int parties;
    // 儲存調用建構函數時指定的 Runnable
    private final Runnable barrierCommand;
    // 目前的實際計數數值,當 count = parties,代表需要執行 Runnable 的 run 方法了
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
...
    // 每次調用 await 方法(指定逾時或者不指定逾時),都會執行該方法
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 每次都會對 count 進行減一操作
            int index = --count;
            // 達到門檻值(trip 隊列中線程數量達到 parties 個)
            if (index == 0) {
                boolean ranAction = false;
                try {
                    // 執行 Runnable 的 run 方法,通常在最後一位添加到 trip 的線程中執行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 該方法的作用是集體釋放 trip 等待隊列隊列中的線程,重置 count 值和 generation 控制變量
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 如果 trip 隊列中線程還沒達到門檻值,使用死循環一直對等待隊列執行 await
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                // 當數量滿足門檻值,generation 會被重置,跳出死循環
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
}      

總結一下,CyclicBarrier 的底層使用 ReentrantLock 作為鎖,并調用 newCondition 方法來建立 trip 等待隊列,在等待隊列的線程數沒達到門檻值時,調用 await 方法讓隊列中的線程集體等待,直到等待隊列中線程數達到門檻值,調用 signalAll 來對隊列中的線程集體喚醒,同時重置計數器進行下一次的循環計數控制。

Phaser

Phaser : 相位器,也稱為階段器。可以看作是一個分段的 CyclicBarrier 。

當一個線程調用 arriveAndAwaitAdvance 的時候,代表該線程到達目前階段,進入等待隊列,直到所有線程都到達目前階段(等待隊列滿了),再集體釋放,進入下一階段或結束。

public static void main(String[] args) {
    int workerNum = 3;
    int phases = 4;

    Phaser phaser = new Phaser(workerNum) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println(String.format("phase: %s - parties: %s", phase, registeredParties));
            return registeredParties == 0;
        }
    };

    for (int i = 0; i < workerNum; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < phases; j++) {
                    System.out.println(Thread.currentThread().getName());
                    // 到達并等待前進,是指該線程到達這個階段,等待其他線程到達這個階段再一同前進。
                    phaser.arriveAndAwaitAdvance();
                }
            }
        }).start();
    }
}      

當任務涉及多個階段,并需要某一個階段的任務全部完成後才能開始下一階段的任務的時候,可以考慮使用 Phaser 。

Semaphore

Semaphore : 信号量,指定允許多少個線程同時執行(獲得鎖)。使用上與 ThreadPoolExecutor 的 maximumPoolSize 參數類似。最多允許有多少任務同時執行。

與 ReentrantLock 類似,支援公平鎖和非公平鎖。通過構造函數的 fair 參數進行指定。

public static void main(String[] args) {
    int count = 5;
    Semaphore semaphore = new Semaphore(count);

    for (int i = 0; i < count * 2; i++) {
        new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                try {
                    // 阻塞直到擷取鎖
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(5000);
                } finally {
                    // 釋放鎖
                    semaphore.release();
                }
            }
        }).start();
    }
}      

Semaphore 在實作上也 ReentrantLock 大緻相同,内部有一個繼承自 AQS 的内部類 Sync 。将 AQS 中的 state 作為計數器使用,記錄目前剩下多少個可運作名額。

...
abstract static class Sync extends AbstractQueuedSynchronizer {

    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }

    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            // remaining >= 0 代表還有可用名額,使用 CAS 嘗試擷取
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 釋放鎖就是歸還可用名額
            if (compareAndSetState(current, next))
                return true;
        }
    }

    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

static final class NonfairSync extends Sync {

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

static final class FairSync extends Sync {

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 相比隻要有可用名額,調用 acquire 就有機會調用 CAS 嘗試擷取鎖的非公平版本不同
            // 公平版本會在多一步檢查,檢查是否已經有别的線程正在排隊
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
...      

Exchanger

Exchanger : 用作線程資料交換。

調用 exchange 方法後目前線程會阻塞,直到别的線程也調用 exchange 完成交換,才會往下執行。

public static void main(String[] args) {
    Exchanger<Object> exchanger = new Exchanger<>();

    new Thread(new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            String s = (String) exchanger.exchange("String from T1");
            System.out.println(Thread.currentThread().getName() + ": " + s);
        }
    }, "T1").start();

    new Thread(new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            String s = (String) exchanger.exchange("String from T2");
            System.out.println(Thread.currentThread().getName() + ": " + s);
        }
    }, "T2").start();
}      

查閱 Exchanger 的源碼:

...
// 用作交換資料的 Node 類,還用了 @sun.misc.Contended 注解保證一個對象能夠在一個獨立的緩存行裡
@sun.misc.Contended static final class Node {
    int index;              // Arena index
    int bound;              // Last recorded value of Exchanger.bound
    int collides;           // Number of CAS failures at current bound
    int hash;               // Pseudo-random for spins
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}

/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}
...      

CAS

  • ABA 問題

上面說到的幾種鎖,除了 synchronized ,其餘都是基于 CAS 實作的。也就是其實作線程同步的方式都是:

  1. 取出目前的值,指派給臨時變量;或者直接指定希望的初始值
  2. 計算要設定的值;或者直接指定希望的目标值
  3. 将目前最新值和之前指派的臨時變量進行比較
  1. 如果相同,說明期間沒有被其他線程修改過,将目标資源更新為要設定的值
  2. 如果不同,說明期間有其他線程修改過,重新從步驟一開始執行

以 ReentrantLock 的擷取鎖過程為例進行說明:無論是初始化是否指定 fair 參數的公平鎖或者非公平鎖(公平鎖時 sync 為内部類 FairSync 執行個體;非公平鎖時 sync 為内部類的 NonfairSync 執行個體),擷取鎖的時候都是調用 compareAndSetState(0, 1) (意思為期望目前狀态為無鎖狀态 0 ,期望設定成有鎖狀态 1)。

但這流程當中有個問題:在高并發環境下,如果在計算要設定的值期間,有其他線程将目标資源從 A 修改為 B,再從 B 重新修改為 A ,是能夠通過步驟三的判斷的。但資源又确實是被其他線程修改過的。

這時候如果目标資源是基本資料類型,其實并不影響。例如我要 compareAndSetInt(0,1) ,那麼代表我隻關心初始值為 0 ,設定為 1 的條件,至于在我擷取初始值(步驟一)和進行比較(步驟三)過程中發生了什麼。并不需要關心。

如果是引用類型的話呢?引用對象沒改變,但是對象中的某個屬性發生了改變又該如何處理?當然是重寫 equals 和 hashCode 方法,在步驟三中調用 equals 進行比較我們關心的屬性值。

還有一個問題,如果我們确實需要保證在步驟一和步驟三之間沒有被修改過,徹底避免 ABA 問題,能怎麼處理?兩種方案,一是為目标資源綁定一個 version,在步驟三中對 version 也進行比較;二是使用修改時間戳,同樣在步驟三中進行比較。

  • 高并發場景
  • 循環時間過長
  1. 可以延遲流水線執行指令(de-pipeline),使 CPU 不會消耗過多的執行資源,延遲的時間取決于具體實作的版本,在一些處理器上延遲時間是零。
  2. 可以避免在退出循環的時候因記憶體順序沖突(memory order violation)而引起 CPU 流水線被清空(CPU pipeline flush),進而提高 CPU 的執行效率。