“工欲善其事必先利其器”,有了這些并發工具,多線程控制變得So easy。

與文無關
JDK中已經給我們内置了很多并發工具,都屬于應用類型,知道具體如何使用就好,主要講以下幾個類:
- CountDownLatch
- CyclicBarrier
- Semaphore
- LockSupport
- BlockingQueue
這次的幾個案例都需要實際運作,看運作效果才明白怎麼回事,代碼可以直接複制粘貼。
多線程控制類,計數器栅欄,當計數器滿足條件的時候,再開始執行接下來的操作。
public class CountDownLatchTest {
static final int THREAD_COUNT = 10;
static final CountDownLatch end = new CountDownLatch(THREAD_COUNT);
public static void main(String[] args) throws InterruptedException {
Runnable demo = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("檢查完成");
end.countDown();
}
};
//線程池内有5個線程友善看效果
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < THREAD_COUNT; i++) {
executorService.submit(demo);
}
end.await();
System.out.println("一切就緒");
executorService.shutdown();
}
}
CountDownLatch 運作效果
循環栅欄,可以看做CountDownLatch的重複利用。當滿足一定的條件時候,才開始執行某線程。
// 當線程的數量滿足條件時候,才開始執行。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("一切就緒,準備出發");
}
});
Runnable task = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId() + ":就緒");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
};
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
}
CyclicBarrier運作結果
所有的線程都在等待,當等待的線程達到一定的數量,然後開始執行接下來的操作。
Semaphore,也是控制線程的一種手段,可以控制并發線程的數量,某些時候我們線程數過多,在通路有限的資源時候,可以使用Semaphore來控制線程的數量。
public class SemaphoreDemo implements Runnable {
Semaphore mSemaphore = new Semaphore(5);
@Override
public void run() {
try {
mSemaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done!");
mSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
SemaphoreDemo demo = new SemaphoreDemo();
for (int i = 0; i < 20; i++) {
executorService.submit(demo);
}
executorService.shutdown();
;
}
}
Semaphore 運作案例
LockSupport提供了一些靜态方法用于阻塞線程,和喚醒線程的功能。
處于park()挂起狀态的線程是Waiting狀态,park()方法阻塞的線程還支援中斷,不抛出中斷異常的同時設定中斷标志位,然後我們可以通過中斷标志位來檢查。
public class LockDemo implements Runnable{
public static Object sObject = new Object();
@Override
public void run() {
synchronized (sObject){
System.out.println("目前線程名稱:" + Thread.currentThread().getName());
LockSupport.park();
if (Thread.currentThread().isInterrupted()){
System.out.println( Thread.currentThread().getName() + "被中斷了");
}
System.out.println("執行結束");
}
}
public static void main(String[] args) throws InterruptedException {
LockDemo demo = new LockDemo();
Thread t1 = new Thread(demo,"t1");
Thread t2 = new Thread(demo,"t2");
t1.start();
Thread.sleep(3000);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
}
}
LockSupport的demo
Java的Queue也是面試中經常提到的知識點,這次因為我們隻涉及到并發相關知識,是以隻提一些并發相關的Queue,關于Queue的具體分析等後面的資料結構系列的時候再詳細解說。
BlockingQueue是Java中的阻塞隊列,JDK中提供了7個阻塞隊列
- ArrayBlockingQueue : 數組實作的有界隊列,對元素進行FIFO(先進先出)的原則排序。
- LinkedBlockingQueue: 連結清單組成的有界隊列,長度預設最大值為Integer.MAX_VALUE,元素按FIFO原則排序,性能好于ArrayBlockingQueue。
- PriorityBlockingQueue:支援優先級的無界阻塞隊列。
- DelayQueue: 支援延遲擷取元素的無界阻塞隊列
- SynchronousQueue:不存儲元素的阻塞隊列。每一個put操作必須等待take操作,否則不能繼續添加元素。
- LinkedTransferQueue:連結清單組成的無界傳輸隊列
- LinkedBlockingDeque:由連結清單組成的雙向阻塞隊列。可以從兩段插入和移除元素。
帶大家看一下LinkedBlockingQueue的幾個關鍵方法:
//LinkedBlockingQueue 方法探索
// 添加元素
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//如果隊列滿了,直接傳回false
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
// 建立新的節點
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 如果隊列不滿的話,就讓元素加入隊列。
//然後判斷,目前隊列元素是否滿了,不滿的話,通知notFull條件。
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 假如添加的是第一個元素,通知隊列不為空了。
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 當隊列滿的時候進行等待。若不滿入隊
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
// 同offer
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 同offer
if (c == 0)
signalNotEmpty();
}
可以看出添加元素上:
- 當隊列滿的時候,offer不添加元素,立刻傳回。put則會阻塞操作,直到隊列為不滿。
- 還有一個帶參數的offer方法,和put的唯一差別就是有逾時時間,在一段時間内隊列還不空的話,就傳回。
//LinkedBlockingQueue 方法探索
// 移除
public E poll() {
final AtomicInteger count = this.count;
// 隊列為空,傳回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//隊列有元素的話,取出元素
//取出元素後如果隊列是不為空,發出不為空的信号。
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果取出元素之前,隊列是滿的,因為取出了元素,現在發出不滿的信号
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 隊列為空的話,就等待隊列不為空。
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
可以看出LinkedBlockingQueue的移除操作poll和take方法:
- poll不阻塞,take會阻塞
- poll(long timeout, TimeUnit unit),當隊列為空的時候,等待指定的時間,如果隊列扔為空,那麼就傳回。
這次是帶領大家一起看了下LinkedBlockingQueue的關鍵方法,其它的隊列的操作也都類似,望大家自行檢視,JDK中Queue的實作并不難了解。
最後
這次主要介紹了幾個并發中可能會用到的工具類,最後說了下JDK并發包中的阻塞隊列,阻塞隊列相對比較重要,就簡單的分析了其實作。
希望能幫助到大家。
參考
- 《Java并發實戰》
- 《Java高并發程式設計》
- 《并發程式設計的藝術》