1.Locks包

下面這個圖更友善了解:
juc-locks鎖架構中一共就三個接口:Lock、Condition、ReadWriteLock,接下來對這些接口作介紹。
1.1 Lock
Lock接口可以視為synchronized的增強版,提供了更靈活的功能。該接口提供了限時鎖等待、鎖中斷、鎖嘗試等功能。
Lock接口主要的實作是ReentrantLock重入鎖,另外還有ConcurrentHashMap中的Segment繼承了ReentrantLock,在ReentrantReadWriteLock中的WriteLock和ReadLock也實作了Lock接口。
談到并發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)(同步器)!
1.1.1 AQS
AbstractQueuedSynchronizer
就是大名鼎鼎的AQS類,怎麼說呢,這個類就是JUC鎖的根基,是JUC中不同鎖的共同抽象類,鎖的許多公共方法都是在這個類中實作的,我給你看看它類繼承結構你就知道了
有沒有發現,無論是帶有
Lock
字尾的類如
ReentrantLock
還是不帶
Lock
字尾的類如
Semaphore
它們裡面的内部類
Sync
都繼承了
AQS
。其實JUC中的各種鎖隻是一個表面裝飾,它們裡面真正實作功能的還是
Sync
。
AQS核心原理是,如果被請求的共享資源空閑,則将目前請求資源的線程設定為有效的工作線程,并且将共享資源設定為鎖定狀态。如果被請求的共享資源被占用,那麼就需要一套線程阻塞等待以及被喚醒時鎖配置設定的機制,這個機制AQS是用CLH隊列鎖實作的,即将暫時擷取不到鎖的線程加入到隊列中。
同步器的核心方法是acquire和release操作,其背後的思想也比較簡潔明确。
acquire操作是這樣的:
while (目前同步器的狀态不允許擷取操作) {
如果目前線程不在隊列中,則将其插入隊列
阻塞目前線程
}
如果線程位于隊列中,則将其移出隊列
release操作是這樣的:
更新同步器的狀态
if (新的狀态允許某個被阻塞的線程擷取成功){
解除隊列中一個或多個線程的阻塞狀
}
AQS定義兩種資源共享方式
- Exclusive(獨占):隻有一個線程能執行,如ReentrantLock。又可分為公平鎖和非公平鎖:
- 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
- 非公平鎖:當線程要擷取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的
- Share(共享):多個線程可同時執行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我們都會在後面講到。
一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作
tryAcquire-tryRelease
、
tryAcquireShared-tryReleaseShared
中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如
ReentrantReadWriteLock
。
1.1.2 ReentrantLock
可重入原理,以非公平鎖為例:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//1\. 如果該鎖未被任何線程占有,該鎖能被目前線程擷取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//2.若被占有,檢查占有線程是否是目前線程
else if (current == getExclusiveOwnerThread()) {
// 3\. 再次擷取,計數加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReenTrantLock可重入鎖(和synchronized的差別)總結
2.Condition
Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),隻有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,進而重新争奪鎖。
Condition函數清單
// 造成目前線程在接到信号或被中斷之前一直處于等待狀态。
void await()
// 造成目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。
boolean await(long time, TimeUnit unit)
// 造成目前線程在接到信号、被中斷或到達指定等待時間之前一直處于等待狀态。
long awaitNanos(long nanosTimeout)
// 造成目前線程在接到信号之前一直處于等待狀态。
void awaitUninterruptibly()
// 造成目前線程在接到信号、被中斷或到達指定最後期限之前一直處于等待狀态。
boolean awaitUntil(Date deadline)
// 喚醒一個等待線程。
void signal()
// 喚醒所有等待線程。
void signalAll()
看一個生産者消費者的例子 (使用了signal喚醒生産線程)
/**
* 生産者、消費者示例
*/
public class ConditionTest {
private int storage;
private int putCounter;
private int getCounter;
private Lock lock = new ReentrantLock();
private Condition putCondition = lock.newCondition();
private Condition getCondition = lock.newCondition();
public void put() throws InterruptedException {
try {
lock.lock();
if (storage > 0) {
putCondition.await();
}
storage++;
System.out.println("put => " + ++putCounter );
getCondition.signal();
} finally {
lock.unlock();
}
}
public void get() throws InterruptedException {
try {
lock.lock();
lock.lock();
if (storage <= 0) {
getCondition.await();
}
storage--;
System.out.println("get => " + ++getCounter);
putCondition.signal();
} finally {
lock.unlock();
lock.unlock();
}
}
public class PutThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
put();
} catch (InterruptedException e) {
}
}
}
}
public class GetThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
get();
} catch (InterruptedException e) {
}
}
}
}
public static void main(String[] args) {
final ConditionTest test = new ConditionTest();
Thread put = test.new PutThread();
Thread get = test.new GetThread();
put.start();
get.start();
}