文章目錄
- JUC
-
- 程序與線程
- 賣票問題(ReentanrLock)
- 生産者與消費者(Condition)
-
- Condition 的精确通知
- “8 鎖的現象”
- 多線程下的集合
-
- List
- Set
- HashMap
- Callable
- JUC 輔助類
-
- CountDownLatch
- CyclicBarrier
- Semaphore
- 讀寫鎖
- 阻塞隊列
-
- 種類分析
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- DelayQueue
- SynchronousQueue
- LinkedTransferQueue
- LinkedBlockingDeque
- 阻塞的實作
- 線程池
-
- ThreadPoolExecutor
JUC
java.util.concurrent 是 Java 的并發工具包;
程序與線程
程序(Process) 是系統進行資源配置設定和排程的基本機關,是作業系統結構的基礎。
線程(thread) **是作業系統能夠進行運算排程的最小機關。**它被包含在程序之中,是程序中的實際運作機關。一條線程指的是程序中一個單一順序的控制流,一個程序中可以并發多個線程,每條線程并行執行不同的任務。
- 線程是程式執行的最小機關,而程序是作業系統配置設定資源的最小機關;
- 一個程序由一個或多個線程組成,線程是一個程序中代碼的不同執行路線;
- 程序之間互相獨立,但同一個程序下的各個線程之間共享程式的記憶體空間(包括代碼段,資料集,堆等)及一些程序級的資源(如打開檔案和信号等),某程序内的線程在其他程序不可見;
- 排程和切換:線程上下文切換比程序上下文切換要快得多;
線程和程序一樣分為五個階段:建立(New)、就緒(Runnable)、運作(Running)、阻塞(Blocked)和死亡(Dead)。
在Java的線程中(Thread.State)存在六種基本狀态:建立(NEW),運作(RUNNABLE),阻塞(BLOCKED),等待(WAITING),時間等待(TIMED_WAITING,存在時間,逾時後不再等待),死亡(TERMINATED)。

JUC 其實也就是 java.util.concurrent 包,是一個對于線程的工具包
賣票問題(ReentanrLock)
三個核心:線程,公共資源,操作(對外方法);
java.util.concurrent.locks
-
實作提供比使用Lock
方法和語句可以獲得的更廣泛的鎖定操作。 它們允許更靈活的結構化,可能具有完全不同的屬性,并且可以支援多個相關聯的對象synchronized
;Condition
class Ticket{
private int ticketNum = 30;
private Lock lock = new ReentrantLock();
public void saleTicket(){
lock.lock();
try {
if(ticketNum > 0){
System.out.println(Thread.currentThread().getName() + "\t 正在銷售第" + ticketNum + "張票,剩餘" + --ticketNum + "張票");
}
}finally{
lock.unlock();
}
}
}
public class Test{
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(()->{for(int i = 1; i <=40; i++) ticket.saleTicket();},"A").start();;
new Thread(()->{for(int i = 1; i <=40; i++) ticket.saleTicket();},"B").start();;
new Thread(()->{for(int i = 1; i <=40; i++) ticket.saleTicket();},"C").start();;
}
}
生産者與消費者(Condition)
Condition 接口中的重要方法:
void await() throws InterruptedException;
void signal();
void signalAll();
之前的生産者與消費者模式:(注意:虛假喚醒問題)
核心:判斷,操作(生産/消費),通知;
//資源類
class AirConditioner{
private int number = 0;
public synchronized void increment() throws InterruptedException{
//1. 判斷
if(number != 0){
this.wait();
}
//2. 生産
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3. 通知
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException{
//1. 判斷
if(number == 0){
this.wait();
}
//2. 消費
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3. 通知
this.notifyAll();
}
}
public class ThreadWaitNotifyDemo{
public static void main(String[] args) {
AirConditioner airConditioner = new AirConditioner();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
airConditioner.increment();
} catch (InterruptedException e) {
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
airConditioner.decrement();
} catch (InterruptedException e) {
}
}
},"B").start();
}
}
上述代碼雖然可以正常運作,但是在多個生産者和消費者的情況下可能會出現虛假喚醒的情況;是以,我們必須解決虛假喚醒的狀況;
在資源類判斷時,不能使用 if 語句,而是 while 循環的判斷才能解決該問題;
原因: if 語句時,當生産者判斷為 true 時,進入等待狀态并釋放鎖資源,但下次進入的線程可能不是消費者而是生産者,這時生産者又進入等待狀态,再到下一次時肯定是消費者線程進入,當消費者消費完成後調用 notifyAll 方法會将之前等待的兩個消費者線程全部喚醒,緊接着就回去執行“生産語句”,最終就會出現列印的結果有 2 的情況了;
但如果是 while 循環判斷的話,當兩個消費者線程運作後,就會先再次判斷一次循環條件,而這時 number 公共資源已經不滿足條件,然後就會讓其中一個消費者線程去執行“生産語句”,另一個繼續等待,這時就會成功的避免上述的問題。
public synchronized void increment() throws InterruptedException{
//1. 判斷
while(number != 0){
this.wait();
}
//2. 生産
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3. 通知
this.notifyAll();
}
使用 JUC 中的 locks 下的 Condition 接口專注的去解決 wait 和 notify 等功能;
Lock
替換
synchronized
方法和語句的使用,
Condition
取代了對象螢幕方法的使用;
//資源類
class AirConditioner{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws InterruptedException{
lock.lock();
try {
//1. 判斷
while(number != 0){
condition.await();
}
//2. 生産
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3. 通知
condition.signalAll();
} finally{
lock.unlock();
}
}
public void decrement() throws InterruptedException{
lock.lock();
try {
//1. 判斷
while(number == 0){
condition.await();
}
//2. 生産
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3. 通知
condition.signalAll();
} finally{
lock.unlock();
}
}
}
Condition 的精确通知
當然, Condition 接口不僅僅是簡單的将 wait 和 notify 替換,它還能做精确通知(有順序的等待和喚醒);
示例:三個線程 A , B 和 C ,A 線程列印 A 5 次,B 線程列印 B 10 次, C 線程列印 C 15 次,共列印 10 輪;
相當于 A 線程對用的 number 值為1,以此類推;并且,在判斷時,隻要不等于線程對應的值時,就讓該線程進行等待;注意: condition1/2/3 隻是代表 condition 對象,而不是對應的線程的 condition ;
比如,方法 print5 :如果當 A 線程進行執行時并且 numner 也為 1 時(為 1 時,其他線程就會進行等待,如,B 線程的方法中的判斷就會為 true 進而執行 condition2.await() ,是以在 A 線程列印時,B 和 C 線程就會等待 A),判斷傳回 false 不會進行等待,就會去執行下面的 for 循環,當 for 執行完後,就需要 B 線程執行它的 for 循環(記住,是有順序的執行),是以将 number 的值修改為 2 ,并且讓正在等待的 B 線程喚醒;
//資源類
class ShareResource {
// 1:A ,2:B , 3:C
private int number = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() throws InterruptedException {
lock.lock();
try {
// 1. 判斷
while (number != 1) {
//隻要 number 不為 1 時,說明是其它線程對應的 number 值,是以讓該線程進行等待
condition1.await();
}
// 2. 操作
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3. 通知
number = 2;
//喚醒 condition2
condition2.signal();
} finally {
lock.unlock();
}
}
public void print10() throws InterruptedException {
lock.lock();
try {
// 1. 判斷
while (number != 2) {
//隻要 number 不為 2 時,說明是其它線程對應的 number 值,是以讓該線程進行等待
condition2.await();
}
// 2. 操作
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3. 通知
number = 3;
condition3.signal();
} finally {
lock.unlock();
}
}
public void print15() throws InterruptedException {
lock.lock();
try {
// 1. 判斷
while (number != 3) {
//隻要 number 不為 3 時,說明是其它線程對應的 number 值,是以讓該線程進行等待
condition3.await();
}
// 2. 操作
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3. 通知
number = 1;
condition1.signal();
} finally {
lock.unlock();
}
}
}
public class ConditionTest {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print5();
} catch (InterruptedException e) {
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print10();
} catch (InterruptedException e) {
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print15();
} catch (InterruptedException e) {
}
}
}, "C").start();
}
}
“8 鎖的現象”
示例:
class Phone{
public synchronized void sendEmail() throws Exception{
System.out.println("-----sendEmail");
}
public synchronized void sendSMS() throws Exception{
System.out.println("-----sendSMS");
}
}
public class Lock8{
public static void main(String[] args) throws Exception{
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
},"A").start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(()->{
phone.sendSMS();
},"B").start();
}
}
問:
-
正常通路,先列印郵件還是短信?
郵件—>短信
-
郵件方法睡眠 3 秒,先列印郵件還是短信?(下面的問題都會在加了睡眠時間的基礎上進行)
郵件—>短信
上述 1,2 問題:
一個對象裡面如果存在多個 synchronized 方法,某一時刻内,隻要有一個線程去調用其中的一個 synchronized 方法,其它的線程都隻能等待,換句話說,某一時刻内,隻能有唯一一個線程去通路這些 synchronized 方法,并且鎖的是目前對象 this ,被鎖之後,其它的線程都不能進入到目前對象的其它的 synchronized 方法。
-
新增一個普通方法 hello() ,先列印郵件還是 hello ?
hello()—>郵件
class Phone{ public synchronized void sendEmail(){ try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) {} System.out.println("-----sendEmail"); } public synchronized void sendSMS(){ System.out.println("-----sendSMS"); } public void hello(){ System.out.println("-----hello"); } } public class Lock8{ public static void main(String[] args) throws Exception{ Phone phone = new Phone(); new Thread(()->{ phone.sendEmail(); },"A").start(); TimeUnit.MILLISECONDS.sleep(200); new Thread(()->{ phone.hello(); },"B").start(); } }
-
“兩部手機”,先列印郵件還是短信?
短信—>郵件
上述 3,4 問題:
普通方法和加同步鎖的方法無關;換成兩個對象後,它們鎖的不是同一個對象,即不是同一把鎖,是以情況就會不同;
public static void main(String[] args) throws Exception{ Phone phone = new Phone(); Phone phone2 = new Phone(); new Thread(()->{ phone.sendEmail(); },"A").start(); TimeUnit.MILLISECONDS.sleep(200); new Thread(()->{ phone2.sendSMS(); },"B").start(); }
-
發送郵件和短信的方法修改為靜态同步方法,“同一部手機”,先列印郵件還是短信?
郵件—>短信
-
發送郵件和短信的方法修改為靜态同步方法,“兩部手機”,先列印郵件還是短信?
郵件—>短信
-
一個普通同步方法發送短信 sendSMS() ,一個靜态同步方法 sendEmail() ,“一部手機”,先列印郵件還是短信?
短信—>郵件
-
一個普通同步方法發送短信 sendSMS() ,一個靜态同步方法 sendEmail() ,“兩部手機”,先列印郵件還是短信?
短信—>郵件
通過上述的問題,就可以很清晰地看出來 synchronized 實作同步的基礎:
Java 中的每一個對象都可以作為鎖;有 3 種表現形式:
- 對于普通同步方法,鎖的是目前執行個體對象;
- 對于靜态同步方法,鎖的是目前類的 class 對象;
對于同步代碼塊,鎖的是 synchronized 括号裡配置的對象。
是以這兩把鎖(this/class)是兩個不同的對象,是以靜态同步方法和普通同步方法之間是不會有競争條件的。
多線程下的集合
List
示例:
public class ListTest{
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
上述代碼運作結果如下,而且每次運作的結果“類型”可能都不同;
[null, 697f0ec8, b502b230]
[null, 697f0ec8, b502b230]
[null, 697f0ec8, b502b230]
如果再将線程數量調大,會發現有如下結果:
[null, c3d3c6ae, 34f4af7e, 9aa99807, 04816aec]
...
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)[null, c3d3c6ae, 34f4af7e, 9aa99807, 04816aec]
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
at java.lang.String.valueOf(String.java:2994)
at java.io.PrintStream.println(PrintStream.java:821)
at ListTest.lambda$0(ListTest.java:11)
at java.lang.Thread.run(Thread.java:745)
java.util.ConcurrentModificationException
-
出現異常
java.util.ConcurrentModificationException ,并發修改異常
-
導緻原因
并發情況下多線程對集合進行增,删,改等操作,進而使給集合出現修改錯誤;
- 解決方案
- 使用 Vector 代替;
- 使用 Collections.synchronizedList() ;
- 使用 JUC 的 CopyOnWriteArrayList ;
- 優化建議
CopyOnWriteArrayList 的 add 方法:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWrite 容器,即寫時複制;往一個容器添加元素的時候,不直接往目前容器 Object[] 添加,而是先将目前的容器 Object[] 進行 copy 複制,複制出一個新的容器 Object[] newElements ,然後在新的容器 Object[] newElements 中添加元素,添加完畢後再将原容器的引用指向新的容器,也就是 setArray(newElements) ;這樣的好處就是可以對 CopyOnWrite 容器進行并發的讀取而不需要加鎖,因為目前容器不會添加任何元素。是以, CopyOnWrite 容器也是一種讀寫分離的思想。
Set
HashSet 也不是線程安全的,如果在并發情況下也會報出 java.util.ConcurrentModificationException 異常,使用 Collections 和 JUC 中的 CopyOnWriteArraySet 也解決該問題;
HashSet 底層是由 HashMap 實作的,而該 Map 的 value 值是一個 Object 的常量值 PRESENT ;
HashMap
Collections 和 JUC 的 ConcurrentHashMap 解決并發問題;
ConcurrentHashMap 單獨講解;
Callable
自從Java 1.5開始,就提供了接口 Callable 和 接口 Future ,通過它們可以在任務執行完畢之後得到任務執行結果;
Callable 接口中有未實作方法 V call() throws Exception ;
Callable 和 Runnable 接口的差別:
- Callable 接口可以在任務結束的時候提供一個傳回值,Runnable 無法提供這個功能;
- Callable 的 call 方法分可以抛出異常,而 Runnable 的 run 方法不能抛出異常;
示例
class MyThread implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("call 方法......");
return 666;
}
}
class CallableTest{
public static void main(String[] args) throws Exception{
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask,"A").start();
System.out.println(futureTask.get());
}
}
将實作了 Callable 接口的類中的 call 方法運作起來,就會使用到 FutureTask 類,而該類實作了 Runnable 接口和 Future 接口,是以在啟動該類的 call 方法時,要将該類作為參數傳入 Thread 的構造器中(Thread 構造器參數中可以傳入 Runnable 接口的實作類);
Thread 的構造方法有如下
public Thread(Runnable target, String name) {
init(null, target, name, 0);
}
傳入的是 Runnable ,而 FutureTask 其實也是實作了 Runnable 接口,這就是為什麼建立 FutureTask 對象了;
如果 new 多個 Thread 後接着啟動 call 方法,那麼最後隻能執行一個 call 方法;
JUC 輔助類
CountDownLatch
- CountDownLatch 這個類使一個線程等待其他線程各自執行完畢後再執行;
- 該類是通過一個計數器來實作的,計數器的初始值是線程的數量。每當一個線程執行完畢後,計數器的值就 -1 ,當計數器的值為 0 時,表示所有線程都執行完畢,因 await 方法阻塞的線程會被喚醒,然後繼續執行;
示例
public static void main(String[] args) {
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println("Test......");
}).start();
}
System.out.println(Thread.currentThread().getName() + " 結束");
}
上述的結果如下:
Test......
Test......
Test......
main 結束
Test......
Test......
Test......
會發現 main 線程在所有的列印未完成的情況下已經結束了,如果想讓 main 線程最後一個完成,可使用 CountDownLatch
public static void main(String[] args) throws InterruptedException {
//構造器傳入的值為線程的數量
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
//每次執行完成後就減一,直到為 0 就喚醒被阻塞的 main 線程
countDownLatch.countDown();
System.out.println("Test......");
}).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " 結束");
}
CyclicBarrier
示例
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("七顆龍珠已集齊,召喚神龍");
});
for (int i = 1; i <= 7; i++) {
final int t = i;
new Thread(()->{
System.out.println("收集到第"+ t +"顆龍珠");
try {
//線程等待
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},i+"").start();
}
}
CountDownLatch 和 CyclicBarrier 差別:
- CountDownLatch 是一個計數器,線程完成一個記錄一個,計數器遞減,隻能隻用一次;
- CyclicBarrier 的計數器更像一個閥門,需要所有線程都到達,然後繼續執行,計數器遞增,提供 reset 功能,可以多次使用;
Semaphore
在信号量上,有定義兩種操作
- acquire(擷取)目前一個線程調用 acquire 操作時,它要麼通過成功擷取信号量(信号量減 1 ),要麼一直等待下去,直到有線程釋放信号量或逾時;
- release(釋放)實際上會将信号量的值加 1 ,然後喚醒等待的線程;
信号量主要用于兩個目的:
- 多個共享資源的互斥使用;
- 并發線程數量的控制;
示例
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t搶到車位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t離開車位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
semaphore.release();
}
},"" + i).start();
}
}
讀寫鎖
多個線程讀取一個共享資源無需加鎖無任何問題,但其中一個線程對共享資源進行寫操作時,就不會讓其它線程對該資源進行讀或寫了;隻要存在一個寫操作時,就會被加鎖;
示例:模拟緩存讀寫
class TestCache {
private volatile Map<String,Object> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key,Object val){
Lock lock = this.lock.writeLock();
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t開始寫入資料");
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
map.put(key, val);
System.out.println(Thread.currentThread().getName() + "\t寫入資料結束");
}
public void get(String key){
Lock lock = this.lock.readLock();
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t開始讀取資料");
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t讀取資料結束,結果是:" + o);
}
}
public class ReadWriteLockTest {
public static void main(String[] args) {
TestCache testCache = new TestCache();
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(()->{
testCache.put(finalI+"", finalI);
},"" + i).start();
}
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(()->{
testCache.get(finalI+"");
},"" + i).start();
}
}
}
阻塞隊列
阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列;這兩個附加的操作支援阻塞的插入和移除方法:
- 支援阻塞的插入方法:當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿;
- 支援阻塞的移除方法:在隊列為空時,擷取元素的線程會等待隊列變為非空;
阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者是從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素、消費者用來擷取元素的容器;
種類分析
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列;
LinkedBlockingQueue :一個由連結清單結構組成的無界(其實有界,是整數最大值)阻塞隊列;
PriorityBlockingQueue :一個支援優先級排序的無界阻塞隊列;
DelayQueue:一個使用優先級隊列實作的無界阻塞隊列;
SynchronousQueue:一個不存儲元素的阻塞隊列;
LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列;
LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列;
在阻塞隊列不可用時,這兩個附加操作提供了4種處理方式:
抛出異常:是指當阻塞隊列滿時候,再往隊列裡插入元素,會抛出 **IllegalStateException Queue full ** 異常;當隊列為空時,從隊列裡擷取元素時會抛出 NoSuchElementException 異常 ;
示例(添加)
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("d"));
}
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at 多線程.BlockingQueueTest.main(BlockingQueueTest.java:17)
傳回特殊值:插入方法會傳回是否成功,成功則傳回 true;移除方法,則是從隊列裡拿出一個元素,如果沒有則傳回 null ;
一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡 put 元素,隊列會一直阻塞生産者線程,直到拿到資料,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裡 take 元素,隊列也會阻塞消費者線程,直到隊列可用;
逾時退出:當阻塞隊列滿時,隊列會阻塞生産者線程一段時間,如果超過一定的時間,生産者線程就會退出;
ArrayBlockingQueue
ArrayBlockingQueue 是一個用數組實作的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序;
預設情況下不保證通路者公平的通路隊列;
所謂公平通路隊列是指阻塞的所有生産者線程或消費者線程,當隊列可用時,可以按照阻塞的先後順序通路隊列,即先阻塞的生産者線程,可以先往隊列裡插入元素,先阻塞的消費者線程,可以先從隊列裡擷取元素。通常情況下為了保證公平性會降低吞吐量。
可以使用以下代碼建立一個公平的阻塞隊列:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3,true);
//有參構造器構造器
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//使用 ReentrantLock 建立一個公平鎖
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//ReentrantLock 的構造器(建立一個公平鎖)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
LinkedBlockingQueue
LinkedBlockingQueue 是一個用連結清單實作的有界阻塞隊列。隊列的預設和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序;
PriorityBlockingQueue
PriorityBlockingQueue 是一個支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。繼承 Comparable 類實作compareTo() 方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。需要注意的是不能保證同優先級元素的順序。
DelayQueue
DelayQueue 是一個支援延時擷取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實作。隊列中的元素必須實作 Delayed 接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素
DelayQueue 非常有用,可以将 DelayQueue 運用在以下應用場景。
- 緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了;
- 定時任務排程:使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行,比如TimerQueue就是使用DelayQueue實作的;
實作DelayQueue的三個步驟
第一步:繼承Delayed接口;
第二步:實作getDelay(TimeUnit unit),該方法傳回目前元素還需要延時多長時間,機關是納秒;
第三步:實作compareTo方法來指定元素的順序;
SynchronousQueue
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。它支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。使用以下構造方法可以建立公平性通路的 SynchronousQueue,如果設定為true,則等待的線程會采用先進先出的順序通路隊列;
SynchronousQueue的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue ;
LinkedTransferQueue
LinkedTransferQueue是一個由連結清單結構組成的無界阻塞 TransferQueue 隊列。相對于其他阻塞隊列,LinkedTransferQueue 多了tryTransfer 和 transfer 方法;
LinkedBlockingDeque
LinkedBlockingDeque 是一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素;
種類分析引用簡書 高默思
阻塞的實作
ReentranLock + Condition 實作隊列的阻塞,ReentranLock 是鎖,Condition是條件狀态,通過等待/通知機制,來實作線程之間的通信;
ReentranLock + Condition的等待/通知機制和Object的wait()與notify()是類似的,通過synchronized,在鎖中使用wait()與notify()達到線程之間通信,在ReentranLock 的lock()和unlock()之間通過類似的await()和signal()達到線程之間的通信;
線程池
線程池主要是控制運作的線程數量,處理過程中将任務放入隊列,然後線上程建立後啟動這些任務,如果線程數量超過了最大線程數量(maxPoolSize),超出數量的線程就會排隊等待,等待其它線程執行完畢,再從隊列中取出任務來執行;
主要特點是:線程複用,控制量大的并發數,管理線程(降低系統資源消耗,提高系統響應速度);
使用 Executors 線程池的工具類可以提供工廠方法用來建立不同類型的線程池;
- newFixedThreadPool:建立固定線程數的線程池;
- newSingleThreadExecutor:建立一個隻有一個線程的線程池;
- newCachedThreadPool:可以根據需要建立新的線程,但如果已有線程是空閑的會重用已有線程(一池 N 個工作線程,類似擴容);
上述三個建立的線程池,底層都是由 ThreadPoolExecutor 來進行建立的;關閉線程池調用 shutdown 方法;
并且上述三個建立線程池的方法在實際中都不可采取(阿裡巴巴開發手冊),可能會導緻 OOM;一般我們自定義線程池;
public class ExecutorDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
for (int i = 1; i <= 10; i++) {
int num = i;
threadPool.execute(()->{
System.out.println(num);
});
}
} finally {
threadPool.shutdown();
}
}
}
ThreadPoolExecutor
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){...}
- corePoolSize:線程池中保持的線程數量(常駐核心線程數),包括空閑線程在内,也就是線程池釋放的最小線程數量界限;
- maximumPoolSize:線程池中能夠容納最大線程數量(必須大于等于 1);
- keepAliveTime:目前線程數量大于核心線程數量,并且空閑線程保持線上程池中的時間大于存活時間 keepAliveTime 時,就釋放空閑的線程(maximumPoolSize - corePoolSize);
- TimeUnit(枚舉類) unit:是 keepAliveTime 參數時間的機關,可以是分鐘,秒,毫秒等等;
- BlockingQueue< Runnable > workQueue:任務隊列,當線程任務送出到線程池以後,首先放入隊列中,然後線程池按照該任務隊列依次執行相應的任務;可以使用的 workQueue 有很多,比如:LinkedBlockingQueue 等等;
- ThreadFactory threadFactory:表示生成線程池中工作線程的線程工廠,用于建立線程,一般使用預設即可;
- RejectedExecutionHandler handler:如果隊列已滿,并且工作線程大于了最大線程數(maximumPoolSize),就會按照指定的拒絕政策拒絕執行任務;(注意都是靜态内部類)
- ThreadPoolExecutor.AbortPolicy: 丢棄任務并抛出 RejectedExecutionException 異常(預設的);
- ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常;
- ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列中等待最久的任務,然後把目前任務加入到隊列中嘗試再次送出目前任務;
- ThreadPoolExecutor.CallerRunsPolicy:不會抛棄任務,也不抛出異常,而是将某些任務
,進而降低新任務的流量;回退到調用者
工作原理
- 在建立了線程池後,開始等待請求;
- 當調用 execute() 方法添加一個請求任務時,線程池會做出如下判斷:
- 如果正在運作的線程數量小于 corePoolSize ,那麼就會立刻建立線程運作這個任務;
- 如果正在運作的線程數量大于或等于 corePoolSize ,那麼将這個任務放入隊列;
- 如果這個時候隊列滿了且正在運作的線程數量還小于 maximumPoolSize ,那麼還是要建立非核心線程立刻運作這個任務;
- 如果隊列滿了且正在運作的線程數量大于或等于 maximumPoolSize ,那麼線程池會啟動飽和拒絕政策來執行;
- 當一個線程完成任務時,它會從隊列中取出一個任務來執行;
- 當一個線程無事可做超過一定的時間(keepAliveTime)時,線程會判斷:
- 如果目前運作的線程數大于 corePoolSize ,那麼這個線程就被停掉;是以線程池的所有任務完成後,它最終會收縮到corePoolSize的大小;
一般,編寫線程池時,如果該項目或該服務,方法使用到的 CPU 比較多,那麼最大線程數最好是 CPU 核數 + 1(CPU 密集型);而如果是 IO 密集型的,最大線程數一般為 CPU 核數 / 阻塞系數;