天天看點

并發程式設計-ReentrantLook底層設計

作者:網際網路進階架構師

重入鎖

顧名思義:就是可以重入的互斥鎖,但是這個重入是有條件的,允許同一個線程多次獲得同一個鎖,避免了死鎖的發生。

重入鎖在實作上比 synchronized 關鍵字更加靈活,提供了一些額外的特性,比如可定時的鎖等待(tryLock)、可中斷的鎖等待(lockInterruptibly)、公平性等。另外,使用重入鎖需要手動加鎖和解鎖,使得線程控制更加精細。

使用重入鎖可以避免死鎖的發生,提高程式的可靠性。但是,過多地使用重入鎖可能會影響程式的性能,因為重入鎖需要進行額外的狀态管理和線程切換操作。是以,在使用重入鎖時需要謹慎考慮鎖的範圍和粒度,以及鎖的公平性和性能等方面的問題。

這裡可能有人對tryLock和lockInterruptibly有疑問 具體的使用方式如下:

  • tryLock
ReentrantLock lock = new ReentrantLock();
 if (lock.tryLock()) {
     try {
         // 獲得鎖後的操作
     } finally {
         lock.unlock();
     }
 } else {
     // 擷取鎖失敗的操作
 }           

它的作用可以讓線程在擷取鎖的時候最多等待一定的時間,如果在等待時間内沒有擷取到鎖,則傳回false,表示擷取鎖失敗

  • lockInterruptibly
Thread t = new Thread(() -> {
      ReentrantLock lock = new ReentrantLock();
      try {
          lock.lockInterruptibly();
          try {
              // 獲得鎖後的操作
              while (!Thread.currentThread().isInterrupted()) {
                  System.out.println("1");
              }
          } finally {
              lock.unlock();
          }
      } catch (InterruptedException e) {
          // 目前線程被中斷的操作
          Thread.currentThread().interrupt();
      }
  });
  t.start();
  Thread.sleep(1000);
  t.interrupt();           

作用就是可以抛出InterruptedException異常,是以線程可以使用interrupt的方式被中斷

Lock

Lock(鎖)是一種線程同步機制,用于實作互斥和協作通路共享資源。與 synchronized 關鍵字相比,Lock 接口提供了更加靈活的鎖實作,可以實作更加精細的線程控制和進階功能。

其實Lock就是一個接口,定義了鎖的一些方法:

并發程式設計-ReentrantLook底層設計
  1. lock():擷取鎖,如果鎖被其他線程持有,則線程會阻塞等待鎖的釋放。
  2. unlock():釋放鎖,如果目前線程持有鎖,則釋放鎖,否則會抛出 IllegalMonitorStateException 異常。
  3. tryLock():嘗試擷取鎖,如果鎖目前沒有被其他線程持有,則擷取鎖并立即傳回 true,否則立即傳回 false,不會阻塞線程。
  4. tryLock(long time, TimeUnit unit):在一定時間内嘗試擷取鎖,如果鎖在給定的時間内未被其他線程持有,則擷取鎖并立即傳回 true,否則等待指定時間,如果在指定時間内還未擷取到鎖,則傳回 false。
  5. newCondition():擷取與該鎖關聯的 Condition 對象,用于實作等待/通知模式。

ReentrantLock

ReentrantLock就是Lock的一種實作:

并發程式設計-ReentrantLook底層設計

而ReentrantLock是基于AbstractQueuedSynchronizer實作的可重入鎖

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer基于 FIFO 等待隊列的同步器架構,它為自定義同步器的實作提供了一種底層架構和算法,可以友善地實作各種類型的同步器,用于存儲等待擷取鎖的線程 我們先看一下AQS的代碼結構:

并發程式設計-ReentrantLook底層設計

lock源碼實作

并發程式設計-ReentrantLook底層設計

發現是調用了sync.lock();而sync就是一個内部類并且內建AQS:

并發程式設計-ReentrantLook底層設計

并且我們可以發現sync是個抽象類一共有兩個實作:

并發程式設計-ReentrantLook底層設計

也就是公平鎖和非公平鎖(下文會說到詳細的實作)至于用哪個是在執行個體化的時候指派的如下:

并發程式設計-ReentrantLook底層設計

接着看sync.lock()源碼實作如下:

并發程式設計-ReentrantLook底層設計

這裡我們就拿非公平鎖舉例:源碼實作是這樣的

initialTryLock

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}           

compareAndSetState方法使用了unsafe類的一個方法具體實作在jvm裡面

并發程式設計-ReentrantLook底層設計
并發程式設計-ReentrantLook底層設計

是以這個方法呢就是對比state狀态如果是0 則說明目前沒有線程占用則會指派1然後調用setExclusiveOwnerThread,而setExclusiveOwnerThread方法就是給exclusiveOwnerThread指派記錄一下目前這個鎖是被這個線程占用的:

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}           

如果不是state不是0則說明目前的鎖被占用了,是以此時需要判斷被占用的線程是否是自己。

并發程式設計-ReentrantLook底層設計

這裡也比較簡單就是state加一加好了,然後傳回true,目前線程擷取了是以可以執行對應的代碼塊否則就會傳回false

acquire

上文我們知道如果鎖被占用了initialTryLock會傳回false則此時會執行acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}           

首先是tryAcquire

protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}           

這裡又嘗試着擷取鎖,因為可能到這一步剛好這個鎖就釋放掉了是以就可以直接擷取到鎖

如果還是沒有擷取到鎖才會走到acquire方法

final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}           

梳理一下邏輯:

初始變量:

spins = 0,

postSpins = 0;

interrupted = false,

first = false;

pred = null;

node = null

share = false

第一次循環:

第一個if中 !first -> true (pred = null) != null -> false 是以第一個if不執行

第二個if (first || pred == null) pred == null -> true

是以此時執行了,執行的結果其實就是再一次嘗試擷取鎖,擷取成功就傳回了否自繼續往下執行

第三個if(node == null)第一個條件命中 node = new ExclusiveNode();

第一次循環結束

第二次循環

此時條件有一些變化

spins = 0,

postSpins = 0;

interrupted = false,

first = false;

pred = null;

node = new ExclusiveNode()

share = false

第一個if:(pred = (node == null) ? null : node.prev) != null -> false 是以沒有執行

第二個if: pred == null -> true 一樣的繼續嘗試擷取鎖

第三個if:走到了else if (pred == null) 這裡

這裡将waiter指派給目前線程

然後将node->prev指向tail,但是此時tail = null

繼續走到tryInitializeHead方法

private void tryInitializeHead() {

Node h = new ExclusiveNode();

if (U.compareAndSetReference(this, HEAD, null, h))

tail = h;

}

就是新建立一個node,然後head和tail都指向新節點h

第三次循環

第一個if和第二個if不說了還是一樣

到第三個else if (pred == null) {

此時就會将node結點的prev指針指向tail,然後tail的next指針指向node

至此一個連結清單的結構也就行程了

第四次循環

直接看最後一個if直接走到了最後一個else

LockSupport.park(this);

開始阻塞線程,等待被喚醒了

unlock源碼實作

看完上文的邏輯這裡就很簡單了

public void unlock() {
    sync.release(1);
}           
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}           
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //這裡其實就是将state-1
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
       //如果目前線程和記錄的所占用的線程不符合則抛出異常
        throw new IllegalMonitorStateException();
    
    //判斷是否釋放成功了
    //釋放成功的條件是state == 0
    boolean free = (c == 0);
    if (free)
        //如果釋放成功則修改目前占用線程為null
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}           

如果沒有釋放成功則繼續不修改鎖的狀态,釋放成功了則會走到signalNext方法

private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}           

可以發現是将目前等待的線程喚醒并且設定status為0(這裡也可以看出是FIFO結構,去的是頭結點的next結點)這又回到了上文中的循環會走到第二個if

if (first || pred == null) {
    boolean acquired;
    try {
        if (shared)
            acquired = (tryAcquireShared(arg) >= 0);
        else
            acquired = tryAcquire(arg);
    } catch (Throwable ex) {
        cancelAcquire(node, interrupted, false);
        throw ex;
    }
    if (acquired) {
        if (first) {
            node.prev = null;
            head = node;
            pred.next = null;
            node.waiter = null;
            if (shared)
                signalNextIfShared(node);
            if (interrupted)
                current.interrupt();
        }
        return 1;
    }
}           

嘗試擷取鎖,如果擷取不到

else if (node.status == 0) {
    node.status = WAITING;          // enable signal and recheck
}           

又繼續将status置為1

公平性展現

我們可以看一下initialTryLock代碼的實作

FairSync:

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (getExclusiveOwnerThread() == current) {
        if (++c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    }
    return false;
}           

在這裡會先去判斷一下目前的鎖是不是被占用了,被占用的是誰,而且這裡還多了一個判斷是線程隊列中是否有等待的線程,如果有直接傳回false了 而後續的喚醒線程都是從隊列的第一個開始喚醒一定是順序的,是以這個是公平的 NonfairSync:

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}           

當然後續的喚醒線程都是從隊列的第一個開始喚醒一定是順序的,這個是公平的,

但是差別于上文中的公平鎖的實作是他沒有判斷目前是否有等待的隊列,如果此時一個新的線程進來需要擷取鎖,恰巧一個老的線程正在釋放鎖已經将state置為0了是以此時新線程不就插隊成功了,是以非公平就展現在這裡

底層實作邏輯流程圖

并發程式設計-ReentrantLook底層設計

作者:Potato_洋芋

連結:https://juejin.cn/post/7231447658565124155

來源:稀土掘金

繼續閱讀