天天看點

Java多線程之AQS源碼分析AQS源碼分析

AQS源碼分析

AQS(AbstractQueueedSynchronizer)使用一個int成員變量表示同步狀态,通過内置的FIFO隊列完成資源擷取的排隊工作

Java多線程之AQS源碼分析AQS源碼分析

voaltile state 是為了保證state變量線程的可見性,

AQS改變state的方法主要有以下3個

getState()
setState()
compareAndSetState()
           

采用CAS添加節點到隊列中好處

不必鎖住整個連結清單就可以實作添加節點

lock()方法獨占式

首先調用sync的acquire方法

public void lock() {
        sync.acquire(1);
    }
           

進入AQS,調用acqurie()方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

第一個分支執行tryAcquire(),

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
           

點進去之後發現執行的nonfairTryAcquire()

/**
*嘗試擷取鎖,成功傳回true,失敗傳回false
**/
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //擷取目前的标志的狀态
            int c = getState();
            //c==0表示沒人擷取這把鎖
            if (c == 0) {
            	//使用CAS操作嘗試擷取這把鎖
                if (compareAndSetState(0, acquires)) {
                	//如果擷取成功,設定為獨占線程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果該線程已經占有了該鎖,将标志位加1,這就是可重入的原理
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
           

如果tryAccquire()傳回false,進入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)分支

首先,來看下addWaiter()的源碼

private Node addWaiter(Node mode) {
		//建立一個新的節點
        Node node = new Node(mode);
		//for是個死循環,不達目的不罷休
        for (;;) {
        	//tail是等待隊列的尾巴節點
            Node oldTail = tail;
            if (oldTail != null) {
            	//尾部節點不為空,用setPrevRelaxed()函數設定
            	//node的上一節點
                node.setPrevRelaxed(oldTail);
                //通過CAS操作加入等待隊列
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }
    //初始化等待隊列
private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
           

下面檢視acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
        	//死循環
            for (;;) {
            	//p為node的前驅
                final Node p = node.predecessor();
                //如果p為頭節點,tryAcquire成功
                if (p == head && tryAcquire(arg)) {
                	//設定node為頭節點,也就是擷取鎖
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }

    /**
     * 如果嘗試擷取鎖失敗,檢查并更新節點狀态.
     * 如果節點線程被block住就傳回true
     *
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             * 大于0跳過節點
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }


           

waitStatus主要有以下幾種狀态

static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled. */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking. 後繼unpark*/
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition. condition隊列*/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate.
         */
        static final int PROPAGATE = -3;

           

Signal

The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases orcancels. To avoid races, acquire methods mustfirst indicate they need a signal, then retry the atomic acquire, and then,on failure, block.

目前節點必須unpark()後繼節點當目前節點釋放資源或取消掉的時候

unLock()獨占式

進入release

public void unlock() {
        sync.release(1);
    }
           
public final boolean release(int arg) {
    	//釋放資源成功
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            //喚醒後繼
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

tryRelease()

//當c=0,釋放資源
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
           

讀寫鎖

main(){
ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
        reentrantLock.readLock().lock();
}
           

調用sync.acqurireShared()

public void lock() {
            sync.acquireShared(1);
        }
           

調用模闆方法

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
           

tryAcquireShared(int arg)方法傳回值為int類型,當傳回值大于等于0時,表示能夠擷取到同步狀态

doAcquireShared ()加入等待隊列

Java多線程之AQS源碼分析AQS源碼分析

标志位為32位

讀狀态高16位,寫狀态低16位

寫狀态=

state&0x0000FFFF

讀狀态=

state>>>16

寫狀态加1=state+1

讀狀态加1=

state+(1<<<16)

當state>0,

state&0x0000FFFF=0,state>>>16 > 0

,讀鎖擷取

state>0

state&0x0000FFFF>0,state>>>16 >=0

,寫鎖擷取`

鎖降級

鎖降級指的是寫鎖降級成為讀鎖。如果目前線程擁有寫鎖,然後将其釋放,最後再擷取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(目前擁有的)寫鎖,再擷取到讀鎖,随後釋放(先前擁有的)寫鎖的過程