天天看點

java.util.concurrent——Locks1.Locks包2.Condition

1.Locks包

java.util.concurrent——Locks1.Locks包2.Condition

下面這個圖更友善了解:

java.util.concurrent——Locks1.Locks包2.Condition

juc-locks鎖架構中一共就三個接口:Lock、Condition、ReadWriteLock,接下來對這些接口作介紹。

1.1 Lock

Lock接口可以視為synchronized的增強版,提供了更靈活的功能。該接口提供了限時鎖等待、鎖中斷、鎖嘗試等功能。

Lock接口主要的實作是ReentrantLock重入鎖,另外還有ConcurrentHashMap中的Segment繼承了ReentrantLock,在ReentrantReadWriteLock中的WriteLock和ReadLock也實作了Lock接口。

談到并發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)(同步器)!

1.1.1 AQS

AbstractQueuedSynchronizer

就是大名鼎鼎的AQS類,怎麼說呢,這個類就是JUC鎖的根基,是JUC中不同鎖的共同抽象類,鎖的許多公共方法都是在這個類中實作的,我給你看看它類繼承結構你就知道了

java.util.concurrent——Locks1.Locks包2.Condition

有沒有發現,無論是帶有

Lock

字尾的類如

ReentrantLock

還是不帶

Lock

字尾的類如

Semaphore

它們裡面的内部類

Sync

都繼承了

AQS

。其實JUC中的各種鎖隻是一個表面裝飾,它們裡面真正實作功能的還是

Sync

java.util.concurrent——Locks1.Locks包2.Condition

AQS核心原理是,如果被請求的共享資源空閑,則将目前請求資源的線程設定為有效的工作線程,并且将共享資源設定為鎖定狀态。如果被請求的共享資源被占用,那麼就需要一套線程阻塞等待以及被喚醒時鎖配置設定的機制,這個機制AQS是用CLH隊列鎖實作的,即将暫時擷取不到鎖的線程加入到隊列中。

  同步器的核心方法是acquire和release操作,其背後的思想也比較簡潔明确。

acquire操作是這樣的:

  while (目前同步器的狀态不允許擷取操作) {

          如果目前線程不在隊列中,則将其插入隊列

          阻塞目前線程

  }

  如果線程位于隊列中,則将其移出隊列  

release操作是這樣的:

  更新同步器的狀态

  if (新的狀态允許某個被阻塞的線程擷取成功){

         解除隊列中一個或多個線程的阻塞狀

       }

AQS定義兩種資源共享方式

  • Exclusive(獨占):隻有一個線程能執行,如ReentrantLock。又可分為公平鎖和非公平鎖:
    • 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
    • 非公平鎖:當線程要擷取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的
  • Share(共享):多個線程可同時執行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我們都會在後面講到。

一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作

tryAcquire-tryRelease

tryAcquireShared-tryReleaseShared

中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如

ReentrantReadWriteLock

1.1.2 ReentrantLock

可重入原理,以非公平鎖為例:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //1\. 如果該鎖未被任何線程占有,該鎖能被目前線程擷取
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //2.若被占有,檢查占有線程是否是目前線程
    else if (current == getExclusiveOwnerThread()) {
        // 3\. 再次擷取,計數加一
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

ReenTrantLock可重入鎖(和synchronized的差別)總結

java.util.concurrent——Locks1.Locks包2.Condition

2.Condition

Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),隻有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,進而重新争奪鎖。

Condition函數清單

// 造成目前線程在接到信号或被中斷之前一直處于等待狀态。
void await()
// 造成目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。
boolean await(long time, TimeUnit unit)
// 造成目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。
long awaitNanos(long nanosTimeout)
// 造成目前線程在接到信号之前一直處于等待狀态。
void awaitUninterruptibly()
// 造成目前線程在接到信号、被中斷或到達指定最後期限之前一直處于等待狀态。
boolean awaitUntil(Date deadline)
// 喚醒一個等待線程。
void signal()
// 喚醒所有等待線程。
void signalAll()
           

   看一個生産者消費者的例子  (使用了signal喚醒生産線程)

/**
 * 生産者、消費者示例
 */
public class ConditionTest {
    private int storage;
    private int putCounter;
    private int getCounter;
    private Lock lock = new ReentrantLock();
    private Condition putCondition = lock.newCondition();
    private Condition getCondition = lock.newCondition();

    public void put() throws InterruptedException {
        try {
            lock.lock();
            if (storage > 0) {
                putCondition.await();
            }
            storage++;
            System.out.println("put => " + ++putCounter );
            getCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void get() throws InterruptedException {
        try {
            lock.lock();
            lock.lock();
            if (storage <= 0) {
                getCondition.await();
            }
            storage--;
            System.out.println("get  => " + ++getCounter);
            putCondition.signal();
        } finally {
            lock.unlock();
            lock.unlock();
        }
    }

    public class PutThread extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    put();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public class GetThread extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    get();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public static void main(String[] args) {
        final ConditionTest test = new ConditionTest();
        Thread put = test.new PutThread();
        Thread get = test.new GetThread();
        put.start();
        get.start();
    }
           

繼續閱讀