大家好,這篇文章我們來聊下動态線程池開源項目(DynamicTp)的通知告警子產品。目前項目提供以下通知告警功能,每一個通知項都可以獨立配置是否開啟、告警門檻值、告警間隔時間、平台等,具體代碼請看core子產品notify包。
大家好,這篇文章我們來聊下動态線程池開源項目(DynamicTp)的通知告警子產品。目前項目提供以下通知告警功能,每一個通知項都可以獨立配置是否開啟、告警門檻值、告警間隔時間、平台等,具體代碼請看core子產品notify包。
1.核心參數變更通知
2.線程池活躍度告警
3.隊列容量告警
4.拒絕政策告警
5.任務執行逾時告警
6.任務排隊逾時告警
DynamicTp項目位址
目前700star,感謝你的star,歡迎pr,業務之餘一起給開源貢獻一份力量
gitee位址:https://gitee.com/yanhom/dynamic-tp
github位址:https://github.com/lyh200/dynamic-tp
系列文章
美團動态線程池實踐思路,開源了
動态線程池架構(DynamicTp),監控及源碼解析篇
動态線程池(DynamicTp),動态調整Tomcat、Jetty、Undertow線程池參數篇
線程池解讀
上篇文章裡大概講到了JUC線程池的執行流程,我們這裡再仔細回顧下,上圖是JUC下線程池ThreadPoolExecutor類的繼承體系。
頂級接口Executor提供了一種方式,解耦任務的送出和執行,隻定義了一個execute(Runnable command)方法用來送出任務,至于具體任務怎麼執行則交給他的實作者去自定義實作。
ExecutorService接口繼承Executor,且擴充了生命周期管理的方法、傳回Futrue的方法、批量送出任務的方法
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService抽象類繼承ExecutorService接口,對ExecutorService相關方法提供了預設實作,用RunnableFuture的實作類FutureTask包裝Runnable任務,交給execute()方法執行,然後可以從該FutureTask阻塞擷取執行結果,并且對批量任務的送出做了編排
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ThreadPoolExecutor繼承AbstractExecutorService,采用池化思想管理一定數量的線程來排程執行送出的任務,且定義了一套線程池的生命周期狀态,用一個ctl變量來同時儲存目前池狀态(高3位)和目前池線程數(低29位)。看過源碼的小夥伴會發現,ThreadPoolExecutor類裡的方法大量有同時需要擷取或更新池狀态和池目前線程數的場景,放一個原子變量裡,可以很好的保證資料的一緻性以及代碼的簡潔性。
// 用此變量儲存目前池狀态(高3位)和目前線程數(低29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 可以接受新任務送出,也會處理任務隊列中的任務
// 結果:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新任務送出,但會處理任務隊列中的任務
// 結果:000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新任務,不執行隊列中的任務,且會中斷正在執行的任務
// 結果:001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 任務隊列為空,workerCount = 0,線程池的狀态在轉換為TIDYING狀态時,會執行鈎子方法terminated()
// 結果:010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 調用terminated()鈎子方法後進入TERMINATED狀态
// 結果:010 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 低29位變為0,得到了線程池的狀态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 高3位變為為0,得到了線程池中的線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
核心入口execute()方法執行邏輯如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以總結出如下主要執行流程,當然看上述代碼會有一些異常分支判斷,可以自己順理加到下述執行主流程裡
1.判斷線程池的狀态,如果不是RUNNING狀态,直接執行拒絕政策
2.如果目前線程數 < 核心線程池,則建立一個線程來處理送出的任務
3.如果目前線程數 > 核心線程數且任務隊列沒滿,則将任務放入任務隊列等待執行
4.如果 核心線程池 < 目前線程池數 < 最大線程數,且任務隊列已滿,則建立新的線程執行送出的任務
5.如果目前線程數 > 最大線程數,且隊列已滿,則拒絕該任務
addWorker()方法邏輯
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 擷取目前池狀态
int rs = runStateOf(c);
// 1.判斷如果線程池狀态 > SHUTDOWN,直接傳回false,否則2
// 2.如果線程池狀态 = SHUTDOWN,并且firstTask不為null則直接傳回false,因為SHUTDOWN狀态的線程池不能在接受新任務,否則3
// 3.如果線程池狀态 = SHUTDOWN,并且firstTask == null,此時如果任務隊列為空,則直接傳回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 1.如果目前線程池線程數大于等于CAPACITY(理論上的最大值5億),則傳回fasle
// 2.如果建立核心線程情況下目前池線程數 >= corePoolSize,則傳回false
// 3.如果建立非核心線程情況下目前池線程數 >= maximumPoolSize,則傳回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas 增加目前池線程數量,成功則退出循環
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// cas 增加目前池線程數量失敗(多線程并發),則重新擷取ctl,計算出目前線程池狀态,如果不等于上述計算的狀态rs,則說明線程池狀态發生了改變,需要跳到外層循環重新進行狀态判斷,否則執行内部循環
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 至此說明線程池狀态校驗通過,且增加池線程數量成功,則建立一個Worker線程來執行任務
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 通路worker set時需要擷取mainLock全局鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 1.目前池狀态 < SHUTDOWN,也就是RUNNING狀态,如果已經started,抛出異常
// 2.目前池狀态 = SHUTDOWN,且firstTask == null,需要處理任務隊列中的任務,如果已經started,抛出異常
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 剛建立線程添加到workers集合中
workers.add(w);
int s = workers.size();
// 判斷更新曆史最大線程數量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動建立線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 啟動失敗,workerCount--,workers裡移除該worker
addWorkerFailed(w);
}
return workerStarted;
}
線程池中的線程并不是直接用的Thread類,而是定義了一個内部工作線程Worker類,實作了AQS以及Runnable接口,然後持有一個Thread類的引用及一個firstTask(建立後第一個要執行的任務),每個Worker線程啟動後會執行run()方法,該方法會調用執行外層runWorker(Worker w)方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 1.如果task不為空,則作為該線程的第一個任務直接執行
// 2.如果task為空,則通過getTask()方法從任務隊列中擷取任務執行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 線程池狀态 >= STOP,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 實際執行任務前調用的鈎子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 實際執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任務執行後調用的鈎子方法
afterExecute(task, thrown);
}
} finally {
// 任務置為null,重新擷取新任務,完成數++
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 無任務可執行,執行worker銷毀邏輯
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法邏輯
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 以下兩種情況遞減工作線程數量
// 1. rs >= STOP
// 2. rs == SHUTDOWN && workQueue.isEmpty()
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 允許核心線程逾時 或者 目前線程數 > 核心線程數,有可能發生逾時關閉
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc什麼情況 > maximumPoolSize,調用setMaximumPoolSize()方法将maximumPoolSize調小了,會發生這種情況,此時需要關閉多餘線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 阻塞隊列擷取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 發生中斷,進行重試
timedOut = false;
}
}
}
以上内容比較詳細的介紹了ThreadPoolExecutor的繼承體系,以及相關的核心源碼,基于此,現在我們來看DynamicTp提供的告警通知能力。
核心參數變更通知
對應配置中心的監聽端監聽到配置變更後,封裝到DtpProperties中然後交由DtpRegistry類中的refresh()方法去做配置更新,同時通知時會高亮顯示有變更的字段
線程池活躍度告警
活躍度 = activeCount / maximumPoolSize
服務啟動後會開啟一個定時監控任務,每隔一定時間(可配置)去計算線程池的活躍度,達到配置的threshold門檻值後會觸發一次告警,告警間隔内多次觸發不會發送告警通知
隊列容量告警
容量使用率 = queueSize / queueCapacity
服務啟動後會開啟一個定時監控任務,每隔一定時間去計算任務隊列的使用率,達到配置的threshold門檻值後會觸發一次告警,告警間隔内多次觸發不會發送告警通知
拒絕政策告警
/**
* Do sth before reject.
* @param executor ThreadPoolExecutor instance
*/
default void beforeReject(ThreadPoolExecutor executor) {
if (executor instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) executor;
dtpExecutor.incRejectCount(1);
Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
}
}
線程池線程數達到配置的最大線程數,且任務隊列已滿,再送出任務會觸發拒絕政策。DtpExecutor線程池用到的RejectedExecutionHandler是經過動态代理包裝過的,在執行具體的拒絕政策之前會執行RejectedAware類beforeReject()方法,此方法會去做拒絕數量累加(總數值累加、周期值累加)。且判斷如果周期累計值達到配置的門檻值,則會觸發一次告警通知(同時重置周期累加值為0及上次告警時間為目前時間),告警間隔内多次觸發不會發送告警通知
任務隊列逾時告警
重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了執行逾時或排隊逾時值,則會用DtpRunnable包裝任務,同時記錄任務的送出時間submitTime,beforeExecute根據目前時間和submitTime的內插補點就可以計算到該任務在隊列中的等待時間,然後判斷如果內插補點大于配置的queueTimeout則累加排隊逾時任務數量(總數值累加、周期值累加)。且判斷如果周期累計值達到配置的門檻值,則會觸發一次告警通知(同時重置周期累加值為0及上次告警時間為目前時間),告警間隔内多次觸發不會發送告警通知
@Override
public void execute(Runnable command) {
if (CollUtil.isNotEmpty(taskWrappers)) {
for (TaskWrapper t : taskWrappers) {
command = t.wrap(command);
}
}
if (runTimeout > 0 || queueTimeout > 0) {
command = new DtpRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (!(r instanceof DtpRunnable)) {
super.beforeExecute(t, r);
return;
}
DtpRunnable runnable = (DtpRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {
runnable.setStartTime(currTime);
}
if (queueTimeout > 0) {
long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {
queueTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
}
}
super.beforeExecute(t, r);
}
任務執行逾時告警
重寫ThreadPoolExecutor的afterExecute()方法,根據目前時間和beforeExecute()中設定的startTime的內插補點即可算出任務的實際執行時間,然後判斷如果內插補點大于配置的runTimeout則累加排隊逾時任務數量(總數值累加、周期值累加)。且判斷如果周期累計值達到配置的門檻值,則會觸發一次告警通知(同時重置周期累加值為0及上次告警時間為目前時間),告警間隔内多次觸發不會發送告警通知
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (runTimeout > 0) {
DtpRunnable runnable = (DtpRunnable) r;
long runTime = System.currentTimeMillis() - runnable.getStartTime();
if (runTime > runTimeout) {
runTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
}
}
super.afterExecute(r, t);
}
告警通知相關配置項
如果想使用通知告警功能,配置檔案必須要配置platforms字段,且可以配置多個平台,如釘釘、企微等;notifyItems配置具體告警項,包括門檻值、平台、告警間隔等。
spring:
dynamic:
tp:
# 省略其他項
platforms: # 通知平台
- platform: wechat
urlKey: 38a98-0c5c3b649c
receivers: test
- platform: ding
urlKey: f80db3e801d593604f4a08dcd6a
secret: SECb5444a6f375d5b9d21
receivers: 17811511815
executors: # 動态線程池配置,都有預設值,采用預設值的可以不配置該項,減少配置量
- threadPoolName: dtpExecutor1
executorType: common # 線程池類型common、eager:适用于io密集型
corePoolSize: 2
maximumPoolSize: 4
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任務隊列,檢視源碼QueueTypeEnum枚舉類
rejectedHandlerType: CallerRunsPolicy # 拒絕政策,檢視RejectedTypeEnum枚舉類
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: dtp1 # 線程名字首
waitForTasksToCompleteOnShutdown: false # 參考spring線程池設計
awaitTerminationSeconds: 5 # 機關(s)
preStartAllCoreThreads: false # 是否預熱核心線程,預設false
runTimeout: 200 # 任務執行逾時門檻值,目前隻做告警用,機關(ms)
queueTimeout: 100 # 任務在隊列等待逾時門檻值,目前隻做告警用,機關(ms)
taskWrapperNames: ["ttl"] # 任務包裝器名稱,內建TaskWrapper接口
notifyItems: # 報警項,不配置自動會按預設值配置(變更通知、容量報警、活性報警、拒絕報警、任務逾時報警)
- type: capacity # 報警項類型,檢視源碼 NotifyTypeEnum枚舉類
threshold: 80 # 報警門檻值
platforms: [ding,wechat] # 可選配置,不配置預設拿上層platforms配置的是以平台
interval: 120 # 報警間隔(機關:s)
- type: change
- type: liveness
threshold: 80
interval: 120
- type: reject
threshold: 1
interval: 160
- type: run_timeout
threshold: 1
interval: 120
- type: queue_timeout
threshold: 1
interval: 140
總結
本文開頭介紹了線程池ThreadPoolExecutor的繼承體系,核心流程的源碼解讀。然後介紹了DynamicTp提供的以上6種告警通知能力,希望通過監控+告警可以讓我們及時感覺到我們業務線程池的執行負載情況,第一時間做出調整,防止事故的發生。
聯系我
對項目有什麼想法或者建議,可以加我微信交流,或者建立issues,一起完善項目
公衆号:CodeFox
微信:yanhom1314