前言
掌握線程池是後端程式員的基本要求,相信大家求職面試過程中,幾乎都會被問到有關于線程池的問題。我在網上搜集了幾道經典的線程池面試題,并以此為切入點,談談我對線程池的了解。如果有哪裡了解不正确,非常希望大家指出,接下來大家一起分析學習吧。
公衆号:Java爛豬皮
經典面試題
- 面試問題1:Java的線程池說一下,各個參數的作用,如何進行的?
- 面試問題2:按線程池内部機制,當送出新任務時,有哪些異常要考慮。
- 面試問題3:線程池都有哪幾種工作隊列?
- 面試問題4:使用無界隊列的線程池會導緻記憶體飙升嗎?
- 面試問題5:說說幾種常見的線程池及使用場景?
線程池概念
線程池: 簡單了解,它就是一個管理線程的池子。
- 它幫我們管理線程,避免增加建立線程和銷毀線程的資源損耗。因為線程其實也是一個對象,建立一個對象,需要經過類加載過程,銷毀一個對象,需要走GC垃圾回收流程,都是需要資源開銷的。
- 提高響應速度。 如果任務到達了,相對于從線程池拿線程,重新去建立一條線程執行,速度肯定慢很多。
- 重複利用。 線程用完,再放回池子,可以達到重複利用的效果,節省資源。
線程池的建立
線程池可以通過ThreadPoolExecutor來建立,我們來看一下它的構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
幾個核心參數的作用:
- corePoolSize: 線程池核心線程數最大值
- maximumPoolSize: 線程池最大線程數大小
- keepAliveTime: 線程池中非核心線程空閑的存活時間大小
- unit: 線程空閑存活時間機關
- workQueue: 存放任務的阻塞隊列
- threadFactory: 用于設定建立線程的工廠,可以給建立的線程設定有意義的名字,可友善排查問題。
- handler: 線城池的飽和政策事件,主要有四種類型。
任務執行
線程池執行流程,即對應execute()方法:
- 送出一個任務,線程池裡存活的核心線程數小于線程數corePoolSize時,線程池會建立一個核心線程去處理送出的任務。
- 如果線程池核心線程數已滿,即線程數已經等于corePoolSize,一個新送出的任務,會被放進任務隊列workQueue排隊等待執行。
- 當線程池裡面存活的線程數已經等于corePoolSize了,并且任務隊列workQueue也滿,判斷線程數是否達到maximumPoolSize,即最大線程數是否已滿,如果沒到達,建立一個非核心線程執行送出的任務。
- 如果目前的線程數達到了maximumPoolSize,還有新的任務過來的話,直接采用拒絕政策處理。
四種拒絕政策
- AbortPolicy(抛出一個異常,預設的)
- DiscardPolicy(直接丢棄任務)
- DiscardOldestPolicy(丢棄隊列裡最老的任務,将目前這個任務繼續送出給線程池)
- CallerRunsPolicy(交給線程池調用所在的線程進行處理)
為了形象描述線程池執行,我打個比喻:
- 核心線程比作公司正式員工
- 非核心線程比作外包員工
- 阻塞隊列比作需求池
- 送出任務比作提需求
- 當産品提個需求,正式員工(核心線程)先接需求(執行任務)
- 如果正式員工都有需求在做,即核心線程數已滿),産品就把需求先放需求池(阻塞隊列)。
- 如果需求池(阻塞隊列)也滿了,但是這時候産品繼續提需求,怎麼辦呢?那就請外包(非核心線程)來做。
- 如果所有員工(最大線程數也滿了)都有需求在做了,那就執行拒絕政策。
- 如果外包員工把需求做完了,它經過一段(keepAliveTime)空閑時間,就離開公司了。
好的,到這裡。面試問題1->Java的線程池說一下,各個參數的作用,如何進行的? 是否已經迎刃而解啦, 我覺得這個問題,回答:線程池構造函數的corePoolSize,maximumPoolSize等參數,并且能描述清楚線程池的執行流程 就差不多啦。
線程池異常處理
在使用線程池處理任務的時候,任務代碼可能抛出RuntimeException,抛出異常後,線程池可能捕獲它,也可能建立一個新的線程來代替異常的線程,我們可能無法感覺任務出現了異常,是以我們需要考慮線程池異常情況。
當送出新任務時,異常如何處理?
我們先來看一段代碼:
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
threadPool.submit(() -> {
System.out.println("current thread name" + Thread.currentThread().getName());
Object object = null;
System.out.print("result## "+object.toString());
});
}
顯然,這段代碼會有異常,我們再來看看執行結果
雖然沒有結果輸出,但是沒有抛出異常,是以我們無法感覺任務出現了異常,是以需要添加try/catch。 如下圖:
OK,線程的異常處理,我們可以直接try...catch捕獲。
線程池exec.submit(runnable)的執行流程
通過debug上面有異常的submit方法(建議大家也去debug看一下,圖上的每個方法内部是我打斷點的地方),處理有異常submit方法的主要執行流程圖:
//構造feature對象
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//線程池執行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
//捕獲異常
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
通過以上分析,submit執行的任務,可以通過Future對象的get方法接收抛出的異常,再進行處理。 我們再通過一個demo,看一下Future對象的get方法處理異常的姿勢,如下圖:
其他兩種處理線程池異常方案
除了以上1.在任務代碼try/catch捕獲異常,2.通過Future對象的get方法接收抛出的異常,再處理兩種方案外,還有以上兩種方案:
3.為工作者線程設定UncaughtExceptionHandler,在uncaughtException方法中處理異常
我們直接看這樣實作的正确姿勢:
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(
(t1, e) -> {
System.out.println(t1.getName() + "線程抛出的異常"+e);
});
return t;
});
threadPool.execute(()->{
Object object = null;
System.out.print("result## " + object.toString());
});
運作結果:
4.重寫ThreadPoolExecutor的afterExecute方法,處理傳遞的異常引用
這是jdk文檔的一個demo:
class ExtendedExecutor extends ThreadPoolExecutor {
// 這可是jdk文檔裡面給的例子。。
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}}
是以,被問到線程池異常處理,如何回答?
線程池的工作隊列
線程池都有哪幾種工作隊列?
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue
ArrayBlockingQueue(有界隊列)是一個用數組實作的有界阻塞隊列,按FIFO排序量。
LinkedBlockingQueue
LinkedBlockingQueue(可設定容量隊列)基于連結清單結構的阻塞隊列,按FIFO排序任務,容量可以選擇進行設定,不設定的話,将是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool線程池使用了這個隊列
DelayQueue
DelayQueue(延遲隊列)是一個任務定時周期的延遲執行的隊列。根據指定的執行時間從小到大排序,否則根據插入到隊列的先後排序。newScheduledThreadPool線程池使用了這個隊列。
PriorityBlockingQueue
PriorityBlockingQueue(優先級隊列)是具有優先級的無界阻塞隊列;
SynchronousQueue
SynchronousQueue(同步隊列)一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool線程池使用了這個隊列。
針對面試題:線程池都有哪幾種工作隊列? 我覺得,回答以上幾種ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue等,說出它們的特點,并結合使用到對應隊列的常用線程池(如newFixedThreadPool線程池使用LinkedBlockingQueue),進行展開闡述, 就可以啦。
幾種常用的線程池
- newFixedThreadPool (固定數目線程的線程池)
- newCachedThreadPool(可緩存線程的線程池)
- newSingleThreadExecutor(單線程的線程池)
- newScheduledThreadPool(定時及周期執行的線程池)
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
線程池特點:
- 核心線程數和最大線程數大小一樣
- 沒有所謂的非空閑時間,即keepAliveTime為0
- 阻塞隊列為無界隊列LinkedBlockingQueue
工作機制:
- 送出任務
- 如果線程數少于核心線程,建立核心線程執行任務
- 如果線程數等于核心線程,把任務添加到LinkedBlockingQueue阻塞隊列
- 如果線程執行完任務,去阻塞隊列取任務,繼續執行。
執行個體代碼
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
IDE指定JVM參數:-Xmx8m -Xms8m :
run以上代碼,會抛出OOM:
是以,面試題:使用無界隊列的線程池會導緻記憶體飙升嗎?
答案 :會的,newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果線程擷取一個任務後,任務的執行時間比較長(比如,上面demo設定了10秒),會導緻隊列的任務越積越多,導緻機器記憶體使用不停飙升, 最終導緻OOM。
使用場景
FixedThreadPool 适用于處理CPU密集型的任務,確定CPU在長期被工作線程使用的情況下,盡可能的少的配置設定線程,即适用執行長期的任務。
newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
線程池特點:
- 核心線程數為0
- 最大線程數為Integer.MAX_VALUE
- 阻塞隊列是SynchronousQueue
- 非核心線程空閑存活時間為60秒
當送出任務的速度大于處理任務的速度時,每次送出一個任務,就必然會建立一個線程。極端情況下會建立過多的線程,耗盡 CPU 和記憶體資源。由于空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。
工作機制
- 送出任務
- 因為沒有核心線程,是以任務直接加到SynchronousQueue隊列。
- 判斷是否有空閑線程,如果有,就去取出任務執行。
- 如果沒有空閑線程,就建立一個線程執行。
- 執行完任務的線程,還可以存活60秒,如果在這期間,接到任務,可以繼續活下去;否則,被銷毀。
執行個體代碼
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在執行");
});
}
運作結果:
使用場景
用于并發執行大量短期的小任務。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
線程池特點
- 核心線程數為1
- 最大線程數也為1
- 阻塞隊列是LinkedBlockingQueue
- keepAliveTime為0
工作機制
- 送出任務
- 線程池是否有一條線程在,如果沒有,建立線程執行任務
- 如果有,講任務加到阻塞隊列
- 目前的唯一線程,從隊列取任務,執行完一個,再繼續取,一個人(一條線程)夜以繼日地幹活。
執行個體代碼
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在執行");
});
}
運作結果:
使用場景
适用于串行執行任務的場景,一個任務一個任務地執行。
newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
線程池特點
- 最大線程數為Integer.MAX_VALUE
- 阻塞隊列是DelayedWorkQueue
- keepAliveTime為0
- scheduleAtFixedRate() :按某種速率周期執行
- scheduleWithFixedDelay():在某個延遲後執行
工作機制
- 添加一個任務
- 線程池中的線程從 DelayQueue 中取任務
- 線程從 DelayQueue 中擷取 time 大于等于目前時間的task
- 執行完後修改這個 task 的 time 為下次被執行的時間
- 這個 task 放回DelayQueue隊列中
執行個體代碼
/**
建立一個給定初始延遲的間隔性的任務,之後的下次執行時間是上一次任務從執行到結束所需要的時間+* 給定的間隔時間
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在執行");
}, 1, 3, TimeUnit.SECONDS);
運作結果:
/**
建立一個給定初始延遲的間隔性的任務,之後的每次任務執行時間為 初始延遲 + N * delay(間隔)
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在執行");
}, 1, 3, TimeUnit.SECONDS);;
使用場景
周期性執行任務的場景,需要限制線程數量的場景
回到面試題:說說幾種常見的線程池及使用場景?
回答這四種經典線程池 :newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool,分線程池特點,工作機制,使用場景分開描述,再分析可能存在的問題,比如newFixedThreadPool記憶體飙升問題 即可
線程池狀态
線程池有這幾個狀态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
//線程池狀态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
線程池各個狀态切換圖:
RUNNING
- 該狀态的線程池會接收新任務,并處理阻塞隊列中的任務;
- 調用線程池的shutdown()方法,可以切換到SHUTDOWN狀态;
- 調用線程池的shutdownNow()方法,可以切換到STOP狀态;
SHUTDOWN
- 該狀态的線程池不會接收新任務,但會處理阻塞隊列中的任務;
- 隊列為空,并且線程池中執行的任務也為空,進入TIDYING狀态;
STOP
- 該狀态的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運作的任務;
- 線程池中執行的任務為空,進入TIDYING狀态;
TIDYING
- 該狀态表明所有的任務已經運作終止,記錄的任務數量為0。
- terminated()執行完畢,進入TERMINATED狀态
TERMINATED
- 該狀态表示線程池徹底終止
為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。
大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!
歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。
每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!
作者:撿田螺的小男孩
連結:https://juejin.cn/post/6844903889678893063