天天看點

ReentrantLock筆記一、Lock接口二、ReentrantLock源碼 三、AQS

最近看了Java的并發包concurrent的ReentrantLock類源碼,做下筆記。

一、Lock接口

//在java的concurrent包中有個Lock接口,用來規範定義鎖的行為

public interface Lock{

    /**
    * 獲得鎖,如果鎖不可用,目前線程會一直休眠(即使目前線程被其他線程中斷的情況也會一直休眠)直到鎖可獲得。
    */
    void lock();

    /**
    * 獲得鎖,如果鎖不可用,目前線程會一直休眠直到目前線程被其他線程中斷則會抛出InterruptedException異常。
    */
    void lockInterruptibly() throws InterruptedException;

    /**
    * 鎖是空閑的則立即傳回true,如果鎖是不可用的則立即傳回false。
    */
    boolean tryLock();


    /**
    * 如果鎖可用,獲得鎖,傳回true;
    * 如果鎖不可用,目前線程會一直休眠直到目前線程被其他線程中斷則會抛出InterruptedException異常;
    * 如果已經超過指定的時間,還沒獲得鎖,傳回false。
    */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    /*
    * 釋放鎖
    */
    void unlock();

    /*
    * 傳回一個綁定到目前Lock執行個體的Condition執行個體。
    */
    Condition newCondition();
}
           

..

二、ReentrantLock源碼

//===============================================================

進入主題,ReentrantLock源碼

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    //同步器(很多功能繼承于抽象類AbstractQueuedSynchronizer)
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        //在非公平鎖方式下,嘗試獲得鎖
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes this lock instance from a stream.
         * @param s the stream
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    //非公平鎖的同步器
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires); //調用父類Sync.nonfairTryAcquire(int acquires)
        }
    }

    //公平鎖的同步器
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        //嘗試獲得鎖,成功傳回true,失敗則傳回false
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();//調用了AbstractQueuedSynchronizer.getStatus()==>>state
            if (c == 0) { //state為0,表示沒有線程獲得鎖
                //!hasQueuedPredecessors():隊列裡沒有排在目前線程之前的線程,
                //compareAndSetState(0, acquires):則将鎖的狀态設定為已被擷取
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//目前線程就是擷取得鎖的線程
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

}
           

..

三、AQS

//==========================================

其中鎖背後真正的英雄是AQS(AbstractQueuedSynchronizer類,下面簡稱為AQS)的一些代碼,當然,還有一些方法沒寫上來。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronzier{

    static final class Node {
        //表示線程已經取消
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        //表示線程需要喚醒
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        //waitStatus的值有:CANCELLED,SIGNAL,CONDITION,PROPAGATE
        volatile int waitStatus;
        //省略其他代碼
    }
   
    //同步器的狀态,volatile辨別則保證了屬性值的對多線程的可見性
    private volatile int state;

    //獲得arg個許可
    public final void acquire(int arg) {
        //Node.EXCLUSIVE:線程節點标記為獨占模式
        if ( ! tryAcquire(arg)  && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) {

            selfInterrupt();

        }
    }

    //建立一個等待線程,插入隊列
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {//隊尾不為空,則表示隊尾已經被初始化
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {//将目前線程節點銜接到隊尾
                pred.next = node;
                return node;
            }
        }
        //代碼執行到這裡,表示隊尾為空,即隊尾還沒被初始化
        enq(node);
        return node;
    }

   public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }


    //線程節點入隊
    private Node enq(final Node node) {
        //如果隊列還沒被初始化,則初始化隊頭為(new Node()),隊尾也指向隊頭
        //然後将線程節點銜接到隊尾
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))//使用CAS原子操作設定隊頭
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {//使用CAS原子操作設定隊尾
                    t.next = node;
                    return t;
                }
            }
        }
    }

    //隊列裡有優先獲得鎖的線程
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;

        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
            //隊頭head和隊尾tail的指向不同
            //(因為隊列還沒初始化時,head和tail都為null,隊列剛初始化時,它們的指向也是相同的)
            //隊尾沒有在排隊的線程或者排在隊尾的線程是其他線程
    }

    排隊獲得鎖(CLH鎖)
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//排線上程節點前面的節點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果前趨節點p是正在阻塞等待鎖的狀态(shouldParkAfterFailedAcquire方法),則目前線程挂起(parkAndCheckInterrupt方法)。
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) //SIGNAL:-1,前趨節點的狀态是需要喚醒的狀态
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { //waitStatus > 0,即是前趨節點的鎖被取消的值(Node.CANCELLED,值是1)
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev; //修改目前節點的前趨節點,往前移一個節點。
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //前趨節點設定成需要喚醒的狀态。
        }
        return false;
    }

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }           

}
           

//==========================================================

一.公平鎖FairSync

使用公平鎖,建立ReentrantLock

ReentrantLock fairLock = new java.util.concurrent.locks.ReentrantLock(true);
		fairLock.lock();
		fairLock.unlock();
           

ReentrantLock.lock();==>>FairSync.lock()==>>AQS.acquire(1)實作:

1.!FairSync.tryAcquire(int acquires)//1為真,則表示沒有獲得鎖,注意,前面有個"!"(非邏輯)

并且

2.AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

   //Node.EXCLUSIVE就獨占式鎖,因為隻有一個線程能獲得鎖,是以使用獨占式鎖;

   //比如信号量Semaphore可以設定多個許可供給多個線程獲得,則Semaphore會使用共享式鎖Node.SHARED

(1和2)都為真 ==>>AQS.selfInterrupt()

//============================================================

FairSync.tryAcquire(int acquires)實作:

1.狀态為0,則表示鎖是空閑的

1.1 !AQS.hasQueuedPredecessors() //1.1為真,則表示等待隊列裡沒有優先等待鎖的線程節點

并且

1.2 AQS.compareAndSetState(0, 1) //使用CAS原子操作設定state值由0變為1

(1.1和1.2)都為真 ==>> 目前線程獲得鎖成功傳回true。

2.狀态不為0,則表示鎖已被某個線程獲得,

2.1判斷鎖的持有者是否目前線程

2.1.1是目前線程,則表示線程重複獲得鎖,将狀态值state增加acquires,獲得鎖成功傳回true。

2.1.2不是目前線程,嘗試獲得鎖失敗,傳回false。

3. 傳回false

//============================================================

總結:公平鎖

線程請求擷取鎖時,判斷狀态state的值

1.當狀态state為0時(即鎖還沒被線程占有),

1.1判斷隊列裡有沒前任優先線程節點,即,在目前線程之前有沒其他線程在等待鎖的釋放

1.1.1如果沒有優先線程,則目前線程可以獲得鎖

1.1.2如果有優先線程,則目前線程銜接在隊尾阻塞等待鎖空閑,等待其他線程喚醒來獲得鎖

2.當狀态state不為0時,說明鎖已經被某個線程占有,判斷占有線程是否目前線程

2.1是目前線程,則線程可重複獲得鎖,将狀态state值加1,表示線程又一次重複獲得鎖

2.2不是目前線程,則目前線程銜接在隊尾阻塞等待鎖空閑,等待其他線程喚醒來獲得鎖

//============================================================

總結:非公平鎖

線程請求擷取鎖時,首先比其他線程優先獲得鎖(搶占式地,直接使用CAS(調用:compareAndSetState(0, 1))設定狀态state值),

總之,如有剛好鎖空閑時,非公平鎖會優先獲得鎖,鎖已被占有時,再銜接到隊尾,排隊等待鎖。

不管是公平鎖,還是非公平鎖,沒獲得鎖進入阻塞排隊時,AQS都會使用CLH隊列(對應到上面的acquireQueued方法),

目前線程會判斷它的前趨節點,如果前趨節點的等待狀态waitStatus是SIGNAL,則說明前趨節點也是在等待鎖挂起中,那目前線程也會挂起等待鎖;

如果前趨節點是隊頭head,則說明已經輪到目前線程獲得鎖;

釋放鎖時,會從隊頭的後繼節點開始,喚醒挂起的後繼節點,則後繼獲得鎖。