寫在前面
這個包對象是JDK 1.5,1.6 ,1.8 之後,新增的,大概2009年,新增了 ,枚舉定義,基本類型包裝,注解,加強for循環,等新特性,提高了Java對并發程式設計的支援,也擴大了Java的市場,前景
JDK 1.8 下 JUC 下 共有 17(atomic包下) + 10(lock包下) + 58 個對象組成;
在後續的Java 高版本中,有更改 17(atomic包下) + 10(lock包下) + 61個對象組成,多了的 3 個對象是
- java.util.concurrent.Flow
- java.util.concurrent.Helpers
- java.util.concurrent.SubmissionPublisher
一、lock 包下
這個包下共 10 個對象(接口、類)
如圖,
下面簡單介紹,這10 個對象
1.1、AbstractOwnableSynchronizer
可參考文章,介紹了AbstractOwnableSynchronizer和AbstractQueuedSynchronizer,連結
1.2、AbstractQueuedLongSynchronizer
1.3、AbstractQueuedSynchronizer
這個就是 簡稱 AQS 的東西,
java.util.concurrent包中很多類都依賴于這個類所提供隊列式同步器,比如說常用的ReentranLock,Semaphore和CountDownLatch
代碼示例 RoboVM(java 建立 IOS APP架構)
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
1.4、Condition
在使用Lock之前,我們使用的最多的同步方式應該是synchronized關鍵字來實作同步方式了。配合Object的wait()、notify()系列方法可以實作等待/通知模式。Condition接口也提供了類似Object的螢幕方法,與Lock配合可以實作等待/通知模式
簡介,可參考,連結
1.4.1、apache-druid中使用示例
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
@VisibleForTesting
final ArrayDeque<T> objects;
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;
...
...
@Nullable
private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
}
finally {
lock.unlock();
}
}
1.5、Lock
Jdk中核心實作類包括一下
代碼示例 apache/hive 任務排程中
private final Lock scheduleLock = new ReentrantLock()
....
private void trySchedulingPendingTasks() {
scheduleLock.lock();
try {
pendingScheduleInvocations.set(true);
scheduleCondition.signal();
} finally {
scheduleLock.unlock();
}
}
1.6、LockSupport
代碼示例 h2oai/h2o-2
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
1.7、ReadWriteLock
Synchronized存在明顯的一個性能問題就是讀與讀之間互斥,
ReadWriteLock管理一組鎖,一個是隻讀的鎖,一個是寫鎖。可以做到讀和讀互不影響,讀和寫互斥,寫和寫互斥,提高讀寫的效率
Java并發庫中ReetrantReadWriteLock實作了ReadWriteLock接口并添加了可重入的特性。
代碼示例 apache/rocketMQ
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
...
...
public String getAllConfigsFormatString() {
try {
readWriteLock.readLock().lockInterruptibly();
try {
return getAllConfigsInternal();
} finally {
readWriteLock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getAllConfigsFormatString lock error");
}
return null;
}
代碼示例 jenkinsci/jenkins
/*protected*/ void start(WorkUnit task) {
lock.writeLock().lock();
try {
this.workUnit = task;
super.start();
started = true;
} finally {
lock.writeLock().unlock();
}
}
1.8、ReentrantLock
java除了使用關鍵字synchronized外,還可以使用ReentrantLock實作獨占鎖的功能。而且ReentrantLock相比synchronized而言功能更加豐富,使用起來更為靈活,也更适合複雜的并發場景
構造參數為 true代表公平鎖,會根據入排隊時間,優先擷取鎖
@Override
public Lock get() {
return new ReentrantLock(false);
}
});
代碼示例 alibaba/druid
public void setEnable(boolean enable) {
lock.lock();
try {
this.enable = enable;
if (!enable) {
notEmpty.signalAll();
notEmptySignalCount++;
}
} finally {
lock.unlock();
}
}
1.9、ReentrantReadWriteLock
代碼示例 apache / hive
private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
...
...
private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
lock.lock();
try {
ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
if (dagLock == null) {
dagLock = new ReentrantReadWriteLock();
dagSpecificLocks.put(queryIdentifier, dagLock);
}
return dagLock;
} finally {
lock.unlock();
}
}
代碼示例 oracle/opengrok
private RuntimeEnvironment() {
configuration = new Configuration();
configLock = new CloseableReentrantReadWriteLock();
watchDog = new WatchDogService();
lzIndexerParallelizer = LazilyInstantiate.using(() ->
new IndexerParallelizer(this));
lzSearchExecutor = LazilyInstantiate.using(() -> newSearchExecutor());
lzRevisionExecutor = LazilyInstantiate.using(() -> newRevisionExecutor());
}
...
...
/**
* Add repositories to the list.
* @param repositories list of repositories
*/
public void addRepositories(List<RepositoryInfo> repositories) {
Lock writeLock = configLock.writeLock();
try {
writeLock.lock();
configuration.addRepositories(repositories);
} finally {
writeLock.unlock();
}
}
1.10、StampedLock
這個對象是 JDK 1.8 之後出現的對象,作為讀寫鎖
StampedLock和ReadWriteLock相比,改進之處在于:讀的過程中也允許擷取寫鎖後寫入!這樣一來,我們讀的資料就可能不一緻,是以,需要一點額外的代碼來判斷讀的過程中是否有寫入,這種讀鎖是一種樂觀鎖。
樂觀鎖的意思就是樂觀地估計讀的過程中大機率不會有寫入,是以被稱為樂觀鎖。反過來,悲觀鎖則是讀的過程中拒絕有寫入,也就是寫入必須等待。顯然樂觀鎖的并發效率更高,但一旦有小機率的寫入導緻讀取的資料不一緻,需要能檢測出來,再讀一遍就行。
代碼示例 apache/pulsar
private final StampedLock rwLock = new StampedLock();
...
private final ArrayList<Item> heap = Lists.newArrayList();
...
public boolean isEmpty() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = heap.isEmpty();
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = heap.isEmpty();
} finally {
rwLock.unlockRead(stamp);
}
}
return isEmpty;
}