天天看點

ReentrantLock加鎖與釋放過程

背景

通過

lock

unlock

研究ReentrantLock相關實作的原理 :

  • 通過

    state

    變量标志目前資源占用
  • ExclusiveThread

    标志目前占用CPU的線程
  • LockSupport.park/unpark

    來申請和釋放CPU資源
  • Unsafe.compareAndSwapObject

    來完成CAS操作
  • Unsafe.putObject

    來完成原子操作

重要變量

  • state

    變量
    • 為0 : 代表該節點為初創節點 , 什麼都沒幹
    • 大于0 : 即代表CANCELLED , 也就是所有大于0的節點都是被取消的節點
    • 小于0 : SIGNAL = -1 : 代表該節點等待unpark

      CONDITION = -2 : 代表該節點等待condition的觸發

      PROPAGATE = -3 : 主要用于共享鎖 , 代表該節點需要無條件傳遞上一個節點

lock流程簡述

  1. 通過CAS設定

    state

    (0-->1) , 如果設定成功 , 則獨占該鎖
  2. 設定失敗 , 通過

    addWaiter

    建立本線程的Node , 插入隊列中
  3. 在添加完節點後 , 如果目前線程未搶占成功 , 則會周遊删除Cancel節點
  4. 最後會找到可以執行的Node , 開始通過

    park

    來讓出目前線程的CPU
  5. 公平鎖會在申請鎖的時候判斷隊列後繼節點是否存在 , 如果目前節點不是該後繼節點的話 , 就不會申請鎖.

unlock流程簡述

  1. 嘗試釋放相應的資源
  2. 釋放成功的話 , 則會把獨占線程讓出
  3. 會周遊Node隊列 , 從頭節點開始尋找可以unpark的線程

非公平鎖的加鎖與釋放

lock的實作

  1. 當調用到

    ReentrantLock.lock()

// 以非公平鎖為例
    static final class NonfairSync extends Sync {
    
        // 擷取鎖
        final void lock() {
            // 通過CAS操作設定state的狀态
            if (compareAndSetState(0, 1))
                 // 如果設定成功的話 , 則代表目前獨占鎖的線程為目前線程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 嘗試擷取鎖
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            // 重寫子類的Acquire , 調用nonfairTryAcquire
            return nonfairTryAcquire(acquires);
        }
    }           

複制

  1. acquire

    中 , 會擷取鎖 , 或者插入等待隊列中
public final void acquire(int arg) {
        //  tryAcquire嘗試擷取鎖 , 如果擷取失敗的話 , 
        // 則通過acquireQueued将目前線程添加到隊列中
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果成功的話 , 則中斷目前線程 , 僅僅隻是中斷
            selfInterrupt();
    }           

複制

  1. 在Sync重寫的

    tryAcquire

    中 , 會嘗試擷取鎖
  • 如果目前鎖沒有被占用(即state=0)時 , 則獨占該鎖
  • 如果目前獨占線程是本線程的話 , 則将State遞增
  • 否則 , 則傳回false , 将本線程插入隊列中
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 擷取目前State狀态
            int c = getState();
            if (c == 0) {
                // 如果目前State為0的話 , 則獨占該鎖 ,并且傳回true,不需要插入隊列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                //  如果目前線程就是獨占的線程的話 , 則将State加一 , 并且傳回true
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }           

複制

  1. 通過

    addWaiter(Node.EXCLUSIVE)

    建立Node , 由于是通過

    Unsafe.putObject

    , 是以也是保證線程間的可見性的.
private Node addWaiter(Node mode) {
        // 通過構造函數 , 将Node.EXCLUSIVE作為nextWaiter放入該節點
        Node node = new Node(mode);
        for (;;) {
            // 找到尾節點 , head/tail都是volatile的
            Node oldTail = tail;
            // 尾節點不為空 , 雙線連結清單增加尾節點操作
            if (oldTail != null) {
                // 将tail節點放到node.prev中
                U.putObject(node, Node.PREV, oldTail);
                // 通過cas操作 , 設定tail
                if (compareAndSetTail(oldTail, node)) {
                    // 如果設定成功的話 , 則将tail.next設定成該節點
                    oldTail.next = node;
                    // 傳回node
                    return node;
                }
            } else {
                // 如果沒有頭尾節點 , 則建立head/tail節點
                initializeSyncQueue();
            }
        }
    }           

複制

  1. 在添加完節點後 , 開始在隊列中找尋要喚醒的線程節點.
  • 如果

    shouldParkAfterFailedAcquire

    傳回true , 則會Park目前線程
  • 如果

    shouldParkAfterFailedAcquire

    傳回false , 就會一直自旋周遊前驅節點 , 直到前驅接節點為空 , 則将目前節點設定為head節點 , 可以擷取目前鎖
final boolean acquireQueued(final Node node, int arg) {
        //  其實node就是尾節點
        try {
            boolean interrupted = false;
            for (;;) {
                // 從尾節點往前周遊 , 擷取其前驅
                final Node p = node.predecessor();
                // 如果周遊到了head節點 , 并且擷取成功了, 則傳回false
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                // shouldParkAfterFailedAcquire:判斷在acquire後,是否應該park讓出CPU
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }           

複制

  1. shouldParkAfterFailedAcquire

    主要根據前驅節點狀态判斷目前節點是否需要Park讓出CPU
  • 如果前驅節點的ws為

    Node.SIGNAL

    的話 , 則傳回true , park目前線程
  • 如果前驅節點的ws為canceld的話 , 則從隊列中去除掉canceld節點
  • 如果前驅節點的ws為0或者

    PROPAGATE

    的話 , 則将前驅節點的ws設定成SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //  如果前驅節點的waitStatus為SIGNAL,就代表前驅節點可以被喚醒
            //  傳回true , park目前線程
            return true;
        if (ws > 0) {
            //  如果ws>0, 則代表前驅節點cancelled了 ,  則過濾掉canceld節點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //  删除掉cancedld節點
            pred.next = node;
        } else {
            // 如果ws為0或者PROPAGATE的話 , 則将前驅節點的waitStatus設定成SIGNAL , 但是先不park
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }           

複制

  1. 如果前驅節點要喚醒的話 , 則會Park目前線程
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }           

複制

unlock的實作

當某個線程在使用完鎖後 , 使用

unlock

來釋放鎖資源.

  1. 調用

    unlock

    也就是釋放

    1

    的資源
public void unlock() {
        sync.release(1);
    }           

複制

  1. 在AQS中的

    release

    會先嘗試釋放鎖 , 如果成功 ,則

    unpark

    前驅線程
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }           

複制

  1. Sync

    中的

    tryRelease

    中 , 會釋放相關資源 , 并且傳回鎖是否釋放成功
protected final boolean tryRelease(int releases) {
            // 将目前申請資源數 - 要釋放的數量
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 如果目前線程不是獨占線程的話 , 則抛出異常
                throw new IllegalMonitorStateException();
            // 是否釋放鎖
            boolean free = false;
            if (c == 0) {
                // 如果目前沒有資源占用的話 , 則認為可以釋放鎖
                free = true;
                // 将獨占線程設為空
                setExclusiveOwnerThread(null);
            }
            // 設定目前鎖所對應的資源數量
            setState(c);  
            //  傳回目前鎖是否會被釋放
            return free;           

複制

  1. 當鎖成功釋放後 , 會判斷前驅節點的

    waitStatus

    是否不為0 , 由于

    lock

    階段會清除所有的

    Cancel

    節點 , 是以此處判斷不為0 , 則代表前驅節點可以擷取資源占有鎖. 于是 , 調用

    unparkSuccessor

    讓前驅節點擷取節點
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果waitStatus小于0 , 
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        // 開始尋找可執行的前驅節點
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            // unpark對應的線程
            LockSupport.unpark(s.thread);
    }           

複制

公平鎖的加鎖與釋放

對于公平鎖(

FairSync

)而言在加鎖的過程中會有所不同 , 僅僅隻是在申請鎖的時候 , 加入了隊列的判斷 , 如果頭節點有後繼節點的話 , 則讓後繼節點擷取CPU

  1. 在申請鎖的時候 , 會判斷隊列是否有後繼節點 , 如果有後繼節點的話 , 則讓後繼節點優先獲得CPU
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 判斷隊列中是否有後繼節點
                if (!hasQueuedPredecessors() &&
                    // 如果沒有前驅的話 , 則會通過cas來設定state
                    compareAndSetState(0, acquires)) {
                    // 設定目前線程獨占該鎖
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }           

複制

  1. 判斷head/tail節點是否相同 , 如果不同的話 , 代表head節點仍有後繼節點
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
         // 傳回頭尾節點不相等 , 并且頭節點的next不為空 , 頭節點的線程不是目前線程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }           

複制