背景
我在的公司雖然是移動支付領域的公司。但是我做的業務類似于管理系統,是以一開始寫代碼的時候沒有考慮到資料的量的問題。開始有一個統計頁面,大概要統計的資料分為十多個次元,然後每個次元需要考慮十個左右的方面。也就是統計頁面輕輕地點選一個查詢按鈕,要進行100次左右的資料庫查詢。開始資料量小的時候,查詢還能夠使用,頁面不會逾時。到後面資料量越來越大,最大的一張表資料量已經超過1億。這時候悲催的事情發生了--- 頁面點選查詢直接未響應.....
方案考慮
其實當時的方案我想了兩種:
- 優化業務實作邏輯,其實在查詢的時候一個表的多個次元的查詢可以傳一個次元清單進去。查出來結果之後,背景在進行分組計算。
- 采用多線程,對每一個次元采用一個線程去執行。這樣其實每個線程的查詢次數在10次左右,總的時間差不多可以縮短10倍。
當然,當時我知道最好的方案是混合使用1和2。但當時1的實作存在以下問題,最終讓我決定選擇方案2:
- 因為每個次元涉及多張表, 不同的表歸屬于不同的子產品。如果某張表的查詢條件不支援次元清單,那麼需要提需求給對應子產品開發...... (何年何月能完)
- 方案1的改動涉及代碼變動較多,且不好封裝每個線程任務,寫出來的代碼邏輯有點繞,背景存在大量的重新分次元統計的代碼。簡而言之就是不優雅
實作考慮
既然最終標明方案2的話,那麼自然考慮到選擇線程池。那麼選啥線程池呢?Single?Schedule肯定不用想直接PASS。Cached?其實目前來說是可行的,因為目前線上的次元也就十多個。以Cached線程池的特性,隻要同時并發的線程數量不至于太大,也不至于給系統太大壓力導緻系統癱瘓。但是因為次元會随着業務的增長而越來越多,如果後續次元增加到20甚至30,那麼對系統的壓力就無法預估了。
思前想後,我最終決定選擇Fix線程池,将線程池固化大小為10個。但這時候我又想,其實統計頁面一天查詢的次數并不多。可能就每天早上點選查詢一次,後面可能就不再點查詢。那麼這時候又出現了兩種蛋疼的選擇:
- 直接在查詢的方法内部初始化線程池
- 在類的屬性中初始化線程池
第1種方案的話,每次查詢都要重新初始化線程池,造成很大的資源消耗。如果連續查詢多次,并不會後面比前面快,反而可能由于不停的線程池銷毀建立導緻越來越慢。最終我選擇了第2種方案,當然我并沒有選擇餓漢模式直接初始化,而是選擇了懶漢模式在方法中進行線程池初始化,且通過鎖保證隻初始化一次。
想法嘗試
到這裡你如果覺得我的定制初探就完了,那你就too young too naive。我不追求可行,隻追求完美~
這時候我就在想,其實根據使用者的操作習慣,統計頁面的查詢按鈕,要麼就隔着幾個小時不按,要麼就可能一時心血來潮,連續查詢幾天或者同一天分多個次元查詢多次。而大家都知道Fix線程池固化了線程池的大小,即使後面連續幾個小時沒有任務來,仍然會一直保持着初始大小的線程數。那麼能不能實作即能夠控制線程數量Fix,又可以在空閑的時候銷毀核心線程呢?答案當然是有的,關鍵點在于:ThreadPoolExecutor的allowCoreThreadTimeOut方法
/**
* Sets the policy governing whether core threads may time out and
* terminate if no tasks arrive within the keep-alive time, being
* replaced if needed when new tasks arrive. When false, core
* threads are never terminated due to lack of incoming
* tasks. When true, the same keep-alive policy applying to
* non-core threads applies also to core threads. To avoid
* continual thread replacement, the keep-alive time must be
* greater than zero when setting <tt>true</tt>. This method
* should in general be called before the pool is actively used.
* @param value <tt>true</tt> if should time out, else <tt>false</tt>
* @throws IllegalArgumentException if value is <tt>true</tt>
* and the current keep-alive time is not greater than zero.
*
* @since 1.6
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
allowCoreThreadTimeOut = value;
}
從源碼的注釋來看,該方法可以支援線程池的keep-alive time的設定同時對核心線程和非核心線程生效。具體為啥,後面我分析線程池源碼的時候會講到,現在我們隻需要看看用到該處的源碼(在ThreadPoolExecutor的getTask方法中):
/**
* Gets the next task for a worker thread to run. The general
* approach is similar to execute() in that worker threads trying
* to get a task to run do so on the basis of prevailing state
* accessed outside of locks. This may cause them to choose the
* "wrong" action, such as trying to exit because no tasks
* appear to be available, or entering a take when the pool is in
* the process of being shut down. These potential problems are
* countered by (1) rechecking pool state (in workerCanExit)
* before giving up, and (2) interrupting other workers upon
* shutdown, so they can recheck state. All other user-based state
* changes (to allowCoreThreadTimeOut etc) are OK even when
* performed asynchronously wrt getTask.
*
* @return the task
*/
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
關鍵在于
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
,該方法表示嘗試從等待隊列中擷取任務,如果超過keepAlive time,則直接傳回null。如果傳回null的話,work線程就會被終止。
好了這些都是後話,在我看了線程池的源碼之後才能夠清楚地知道為啥這個參數有這個作用。那麼在之前,我是怎麼測試驗證我的想法的呢?其實很簡單:
- 我先參照線程池的預設的
定義自己的線程工廠,目的是為了擷取線程工廠内的DefaultThreadFactory
屬性,因為ThreadGroup
類有一個ThreadGroup
方法,該方法可以擷取線程組内活躍的線程個數。activeCount
class MyThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
MyThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
// 我所增加的方法,為了擷取線程組
public ThreadGroup getThreadGroup() {
return this.group;
}
}
- 萬事俱備,隻欠東風了,我隻需要構造兩種不同的情況驗證我的猜想即可!
MyThreadFactory myThreadFactory = new MyThreadFactory();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), myThreadFactory);
// executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <= 20; i++) {
executor.submit(new MyRunnable());
}
System.out.println(myThreadFactory.getThreadGroup().activeCount()); // 6
Thread.sleep(20000);
System.out.println("After destroy, active thread count:" + myThreadFactory.getThreadGroup().activeCount()); // 6/1
executor.shutdown();
運作的結果:
- 如果不執行
兩個activeCount的結果都是6
executor.allowCoreThreadTimeOut(true);
- 如果執行
第一個activeCount的結果為6,第二個activeCount的結果為1
executor.allowCoreThreadTimeOut(true);
最終實作
好了,終于到最終定制實作了。我的代碼實作如下(類為Spring管理的類,最終線程池shutdown在PreDestroy的時候):
private volatile ThreadPoolExecutor searchExecutors;
private final Object lock = new Object();
/**
* 初始化線程池,開始不進行初始化,免得浪費系統資源
*/
private void initExecutor() {
if (searchExecutors != null) {
return;
}
synchronized (lock) {
if (searchExecutors == null) {
// 設定一個固定大小為10,核心線程如果超過10分鐘空閑也可銷毀的線程池
ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
tempExecutor.allowCoreThreadTimeOut(true);
this.searchExecutors = tempExecutor;
}
}
}
@PreDestroy
public void destroy() {
if (searchExecutors != null) {
searchExecutors.shutdown();
}
}
這裡再說兩點
- 這個初始化方法采用了double-check-lock的方式,來保證多線程并發擷取到的是同一個線程池執行個體
- 注意到在設定屬性searchExecutors之前借助了一個tempExecutor。這樣也是為了防止ThreadPoolExecutor對象已經被初始化,但是allowCoreThreadTimeOut還未被執行的問題。(對象過早逃逸導緻屬性與預期不符)。
總結
通過這次線程池定制初探,發現其實看起來再沒有技術含量的工作,如果細細想下去還是會有很多可以深入研究的東西。而做軟體其實也要像做藝術品一樣,多考慮不同的實作可能,盡量選擇最完美的解決方案。
黎明前最黑暗,成功前最絕望!