天天看點

Java開發實踐:合理使用線程池及線程變量

作者:阿裡技術

作者:劉永幸

本文将從線程池和線程變量的原理和使用出發,結合執行個體給出最佳使用實踐,幫助各開發人員建構出穩定、高效的java應用服務。

一、背景

随着計算技術的不斷發展,3納米制程晶片已進入試産階段,摩爾定律在現有工藝下逐漸面臨巨大的實體瓶頸,通過多核處理器技術來提升伺服器的性能成為提升算力的主要方向。

在伺服器領域,基于java建構的後端伺服器占據着領先地位,是以,掌握java并發程式設計技術,充分利用CPU的并發處理能力是一個開發人員必修的基本功,本文結合線程池源碼和實踐,簡要介紹了線程池和線程變量的使用。

二、線程池概述

2.1 什麼是線程池

線程池是一種“池化”的線程使用模式,通過建立一定數量的線程,讓這些線程處于就緒狀态來提高系統響應速度,線上程使用完成後歸還到線程池來達到重複利用的目标,進而降低系統資源的消耗。

2.2 為什麼要使用線程池

總體來說,線程池有如下的優勢:

1)降低資源消耗:通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。

2)提高響應速度:當任務到達時,任務可以不需要等到線程建立就能立即執行。

3)提高線程的可管理性:線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。

三、線程池的使用

3.1 線程池建立&核心參數設定

在java中,線程池的實作類是ThreadPoolExecutor,構造函數如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit timeUnit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)           

可以通過new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)來建立一個線程池。

  • corePoolSize參數

在構造函數中,corePoolSize為線程池核心線程數。預設情況下,核心線程會一直存活,但是當将allowCoreThreadTimeout設定為true時,核心線程逾時也會回收。

  • maximumPoolSize參數

在構造函數中,maximumPoolSize為線程池所能容納的最大線程數。

  • keepAliveTime參數

在構造函數中,keepAliveTime表示線程閑置逾時時長。如果線程閑置時間超過該時長,非核心線程就會被回收。如果将allowCoreThreadTimeout設定為true時,核心線程也會逾時回收。

  • timeUnit參數

在構造函數中,timeUnit表示線程閑置逾時時長的時間機關。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

  • blockingQueue參數

在構造函數中,blockingQueue表示任務隊列,線程池任務隊列的常用實作類有:

ArrayBlockingQueue :一個數組實作的有界阻塞隊列,此隊列按照FIFO的原則對元素進行排序,支援公平通路隊列。

LinkedBlockingQueue :一個由連結清單結構組成的可選有界阻塞隊列,如果不指定大小,則使用Integer.MAX_VALUE作為隊列大小,按照FIFO的原則對元素進行排序。

PriorityBlockingQueue :一個支援優先級排序的無界阻塞隊列,預設情況下采用自然順序排列,也可以指定Comparator。

DelayQueue:一個支援延時擷取元素的無界阻塞隊列,建立元素時可以指定多久以後才能從隊列中擷取目前元素,常用于緩存系統設計與定時任務排程等。

SynchronousQueue:一個不存儲元素的阻塞隊列。存入操作必須等待擷取操作,反之亦然。

LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列,與LinkedBlockingQueue相比多了transfer和tryTranfer方法,該方法在有消費者等待接收元素時會立即将元素傳遞給消費者。

LinkedBlockingDeque:一個由連結清單結構組成的雙端阻塞隊列,可以從隊列的兩端插入和删除元素。

  • threadFactory參數

在構造函數中,threadFactory表示線程工廠。用于指定為線程池建立新線程的方式,threadFactory可以設定線程名稱、線程組、優先級等參數。如通過Google工具包可以設定線程池裡的線程名:

new ThreadFactoryBuilder().setNameFormat("general-detail-batch-%d").build()           
  • RejectedExecutionHandler參數

在構造函數中,rejectedExecutionHandler表示拒絕政策。當達到最大線程數且隊列任務已滿時需要執行的拒絕政策,常見的拒絕政策如下:

ThreadPoolExecutor.AbortPolicy:預設政策,當任務隊列滿時抛出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:丢棄掉不能執行的新任務,不抛任何異常。

ThreadPoolExecutor.CallerRunsPolicy:當任務隊列滿時使用調用者的線程直接執行該任務。

ThreadPoolExecutor.DiscardOldestPolicy:當任務隊列滿時丢棄阻塞隊列頭部的任務(即最老的任務),然後添加目前任務。

3.2 線程池狀态轉移圖

ThreadPoolExecutor線程池有如下幾種狀态:

  • RUNNING:運作狀态,接受新任務,持續處理任務隊列裡的任務;
  • SHUTDOWN:不再接受新任務,但要處理任務隊列裡的任務;
  • STOP:不再接受新任務,不再處理任務隊列裡的任務,中斷正在進行中的任務;
  • TIDYING:表示線程池正在停止運作,中止所有任務,銷毀所有工作線程,當線程池執行terminated()方法時進入TIDYING狀态;
  • TERMINATED:表示線程池已停止運作,所有工作線程已被銷毀,所有任務已被清空或執行完畢,terminated()方法執行完成。
Java開發實踐:合理使用線程池及線程變量

3.3 線程池任務排程機制

Java開發實踐:合理使用線程池及線程變量

線程池送出一個任務時任務排程的主要步驟如下:

1)當線程池裡存活的核心線程數小于corePoolSize核心線程數參數的值時,線程池會建立一個核心線程去處理送出的任務;

2)如果線程池核心線程數已滿,即線程數已經等于corePoolSize,新送出的任務會被嘗試放進任務隊列workQueue中等待執行;

3)當線程池裡面存活的線程數已經等于corePoolSize了,且任務隊列workQueue已滿,再判斷目前線程數是否已達到maximumPoolSize,即最大線程數是否已滿,如果沒到達,建立一個非核心線程執行送出的任務;

4)如果目前的線程數已達到了maximumPoolSize,還有新的任務送出過來時,執行拒絕政策進行處理。

核心代碼如下:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }           

3.4 Tomcat線程池分析

Tomcat請求處理過程

Java開發實踐:合理使用線程池及線程變量

Tomcat的整體架構包含連接配接器和容器兩大部分,其中連接配接器負責與外部通信,容器負責内部邏輯處理。在連接配接器中:

1)使用ProtocolHandler接口來封裝I/O模型和應用層協定的差異,其中I/O模型可以選擇非阻塞I/O、異步I/O或APR,應用層協定可以選擇HTTP、HTTPS或AJP。ProtocolHandler将I/O模型和應用層協定進行組合,讓EndPoint隻負責位元組流的收發,Processor負責将位元組流解析為Tomcat Request/Response對象,實作功能子產品的高内聚和低耦合,ProtocolHandler接口繼承關系如下圖示。

2)通過擴充卡Adapter将Tomcat Request對象轉換為标準的ServletRequest對象。

Java開發實踐:合理使用線程池及線程變量

Tomcat為了實作請求的快速響應,使用線程池來提高請求的處理能力。下面我們以HTTP非阻塞I/O為例對Tomcat線程池進行簡要的分析。

Tomcat線程池建立

Java開發實踐:合理使用線程池及線程變量

at中,通過AbstractEndpoint類提供底層的網絡I/O的處理,若使用者沒有配置自定義公共線程池,則AbstractEndpoint通過createExecutor方法來建立Tomcat預設線程池。

核心部分代碼如下:

public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }           

其中,TaskQueue、ThreadPoolExecutor分别為Tomcat自定義任務隊列、線程池實作。

Tomcat自定義ThreadPoolExecutor

Tomcat自定義線程池繼承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成員變量來更高效地統計已經送出但尚未完成的任務數量(submittedCount),包括已經在隊列中的任務和已經交給工作線程但還未開始執行的任務。

/**
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
 * If a RejectedExecutionHandler is not specified a default one will be configured
 * and that one will always throw a RejectedExecutionException
 *
 */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

    /**
     * The number of tasks submitted but not yet finished. This includes tasks
     * in the queue and tasks that have been handed to a worker thread but the
     * latter did not start executing the task yet.
     * This number is always greater or equal to {@link #getActiveCount()}.
     */
    // 新增的submittedCount成員變量,用于統計已送出但還未完成的任務數
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    // 構造函數
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        // 預啟動所有核心線程
        prestartAllCoreThreads();
    }
​
}           

Tomcat在自定義線程池ThreadPoolExecutor中重寫了execute()方法,并實作對送出執行的任務進行submittedCount加一。Tomcat在自定義ThreadPoolExecutor中,當線程池抛出RejectedExecutionException異常後,會調用force()方法再次向TaskQueue中進行添加任務的嘗試。如果添加失敗,則submittedCount減一後,再抛出RejectedExecutionException。

@Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }
​
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }
​
        }
    }           

Tomcat自定義任務隊列

在Tomcat中重新定義了一個阻塞隊列TaskQueue,它繼承于LinkedBlockingQueue。在Tomcat中,核心線程數預設值為10,最大線程數預設為200,為了避免線程到達核心線程數後後續任務放入隊列等待,Tomcat通過自定義任務隊列TaskQueue重寫offer方法實作了核心線程池數達到配置數後線程的建立。

具體地,從線程池任務排程機制實作可知,當offer方法傳回false時,線程池将嘗試建立新新線程,進而實作任務的快速響應。TaskQueue核心實作代碼如下:

/**
 * As task queue specifically designed to run with a thread pool executor. The
 * task queue is optimised to properly utilize threads within a thread pool
 * executor. If you use a normal queue, the executor will spawn threads when
 * there are idle threads and you wont be able to force items onto the queue
 * itself.
 */
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
​
    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }
​
    @Override
    public boolean offer(Runnable o) {
        // 1. parent為線程池,Tomcat中為自定義線程池執行個體
      //we can't do any checks
        if (parent==null) return super.offer(o);
        // 2. 當線程數達到最大線程數時,新送出任務入隊
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        // 3. 當送出的任務數小于線程池中已有的線程數時,即有空閑線程,任務入隊即可
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        // 4. 【關鍵點】如果目前線程數量未達到最大線程數,直接傳回false,讓線程池建立新線程
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        // 5. 最後的兜底,放入隊列
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }   
}           

Tomcat自定義任務線程

Tomcat中通過自定義任務線程TaskThread實作對每個線程建立時間的記錄;使用靜态内部類WrappingRunnable對Runnable進行包裝,用于對StopPooledThreadException異常類型的處理。

/**
 * A Thread implementation that records the time at which it was created.
 *
 */
public class TaskThread extends Thread {
​
    private final long creationTime;
​
    public TaskThread(ThreadGroup group, Runnable target, String name) {
        super(group, new WrappingRunnable(target), name);
        this.creationTime = System.currentTimeMillis();
    }
​
​
    /**
     * Wraps a {@link Runnable} to swallow any {@link StopPooledThreadException}
     * instead of letting it go and potentially trigger a break in a debugger.
     */
    private static class WrappingRunnable implements Runnable {
        private Runnable wrappedRunnable;
        WrappingRunnable(Runnable wrappedRunnable) {
            this.wrappedRunnable = wrappedRunnable;
        }
        @Override
        public void run() {
            try {
                wrappedRunnable.run();
            } catch(StopPooledThreadException exc) {
                //expected : we just swallow the exception to avoid disturbing
                //debuggers like eclipse's
                log.debug("Thread exiting on purpose", exc);
            }
        }
​
    }
​
}           

3.5 思考&小結

1)Tomcat為什麼要自定義線程池和任務隊列實作?

JUC原生線程池在送出任務時,當工作線程數達到核心線程數後,繼續送出任務會嘗試将任務放入阻塞隊列中,隻有目前運作線程數未達到最大設定值且在任務隊列任務滿後,才會繼續建立新的工作線程來處理任務,是以JUC原生線程池無法滿足Tomcat快速響應的訴求。

2)Tomcat為什麼使用無界隊列?

Tomcat在EndPoint中通過acceptCount和maxConnections兩個參數來避免過多請求積壓。其中maxConnections為Tomcat在任意時刻接收和處理的最大連接配接數,當Tomcat接收的連接配接數達到maxConnections時,Acceptor不會讀取accept隊列中的連接配接;這時accept隊列中的線程會一直阻塞着,直到Tomcat接收的連接配接數小于maxConnections(maxConnections預設為10000,如果設定為-1,則連接配接數不受限制)。acceptCount為accept隊列的長度,當accept隊列中連接配接的個數達到acceptCount時,即隊列滿,此時進來的請求一律被拒絕,預設值是100(基于Tomcat 8.5.43版本)。是以,通過acceptCount和maxConnections兩個參數作用後,Tomcat預設的無界任務隊列通常不會造成OOM。

/**
 * Allows the server developer to specify the acceptCount (backlog) that
 * should be used for server sockets. By default, this value
 * is 100.
 */
private int acceptCount = 100;
​
private int maxConnections = 10000;           

3.6 最佳實踐

避免用Executors 的建立線程池

Java開發實踐:合理使用線程池及線程變量

Executors常用方法有以下幾個:

1)newCachedThreadPool():建立一個可緩存的線程池,調用 execute 将重用以前構造的線程(如果線程可用)。如果沒有可用的線程,則建立一個新線程并添加到線程池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。CachedThreadPool适用于并發執行大量短期耗時短的任務,或者負載較輕的伺服器;

2)newFiexedThreadPool(int nThreads):建立固定數目線程的線程池,線程數小于nThreads時,送出新的任務會建立新的線程,當線程數等于nThreads時,送出新的任務後任務會被加入到阻塞隊列,正在執行的線程執行完畢後從隊列中取任務執行,FiexedThreadPool适用于負載略重但任務不是特别多的場景,為了合理利用資源,需要限制線程數量;

3)newSingleThreadExecutor() 建立一個單線程化的 Executor,SingleThreadExecutor适用于串行執行任務的場景,每個任務按順序執行,不需要并發執行;

4)newScheduledThreadPool(int corePoolSize) 建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代 Timer 類。ScheduledThreadPool中,傳回了一個ScheduledThreadPoolExecutor執行個體,而ScheduledThreadPoolExecutor實際上繼承了ThreadPoolExecutor。從代碼中可以看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小為傳入的corePoolSize,maximumPoolSize大小為Integer.MAX_VALUE,逾時時間為0,workQueue為DelayedWorkQueue。實際上ScheduledThreadPool是一個排程池,其實作了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三個方法,可以實作延遲執行、周期執行等操作;

5)newSingleThreadScheduledExecutor() 建立一個corePoolSize為1的ScheduledThreadPoolExecutor;

6)newWorkStealingPool(int parallelism)傳回一個ForkJoinPool執行個體,ForkJoinPool 主要用于實作“分而治之”的算法,适合于計算密集型的任務。

Executors類看起來功能比較強大、用起來還比較友善,但存在如下弊端:

1)FiexedThreadPool和SingleThreadPool任務隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,進而導緻OOM;

2)CachedThreadPool和ScheduledThreadPool允許建立的線程數量為Integer.MAX_VALUE,可能會建立大量的線程,進而導緻OOM;

使用線程時,可以直接調用 ThreadPoolExecutor 的構造函數來建立線程池,并根據業務實際場景來設定corePoolSize、blockingQueue、RejectedExecuteHandler等參數。

避免使用局部線程池

使用局部線程池時,若任務執行完後沒有執行shutdown()方法或有其他不當引用,極易造成系統資源耗盡。

合理設定線程池參數

在工程實踐中,通常使用下述公式來計算核心線程數:

nThreads=(w+c)/c*n*u=(w/c+1)*n*u

其中,w為等待時間,c為計算時間,n為CPU核心數(通常可通過 Runtime.getRuntime().availableProcessors()方法擷取),u為CPU目标使用率(取值區間為[0, 1]);在最大化CPU使用率的情況下,當處理的任務為計算密集型任務時,即等待時間w為0,此時核心線程數等于CPU核心數。

上述計算公式是理想情況下的建議核心線程數,而不同系統/應用在運作不同的任務時可能會有一定的差異,是以最佳線程數參數還需要根據任務的實際運作情況和壓測表現進行微調。

增加異常處理

為了更好地發現、分析和解決問題,建議在使用多線程時增加對異常的處理,異常處理通常有下述方案:

  • 在任務代碼處增加try...catch異常處理;
  • 如果使用的Future方式,則可通過Future對象的get方法接收抛出的異常;
  • 為工作線程設定setUncaughtExceptionHandler,在uncaughtException方法中處理異常。

優雅關閉線程池

public void destroy() {
        try {
            poolExecutor.shutdown();
            if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT, TimeUnit.SECONDS)) {
                poolExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // 如果目前線程被中斷,重新取消所有任務
            pool.shutdownNow();
            // 保持中斷狀态
            Thread.currentThread().interrupt();
        }
    }           

為了實作優雅停機的目标,我們應當先調用shutdown方法,調用這個方法也就意味着,這個線程池不會再接收任何新的任務,但是已經送出的任務還會繼續執行。之後我們還應當調用awaitTermination方法,這個方法可以設定線程池在關閉之前的最大逾時時間,如果在逾時時間結束之前線程池能夠正常關閉則會傳回true,否則,逾時會傳回false。通常我們需要根據業務場景預估一個合理的逾時時間,然後調用該方法。

如果awaitTermination方法傳回false,但又希望盡可能線上程池關閉之後再做其他資源回收工作,可以考慮再調用一下shutdownNow方法,此時隊列中所有尚未被處理的任務都會被丢棄,同時會設定線程池中每個線程的中斷标志位。shutdownNow并不保證一定可以讓正在運作的線程停止工作,除非送出給線程的任務能夠正确響應中斷。

鷹眼上下文參數傳遞

/**
* 在主線程中,開啟鷹眼異步模式,并将ctx傳遞給多線程任務
**/
// 防止鷹眼鍊路丢失,需要傳遞
RpcContext_inner ctx = EagleEye.getRpcContext();
// 開啟異步模式
ctx.setAsyncMode(true);
​
​
/**
* 線上程池任務線程中,設定鷹眼rpc環境
**/
private void runTask() {
    try {
        EagleEye.setRpcContext(ctx);
        // do something...
​
    } catch (Exception e) {
        log.error("requestError, params: {}", this.params, e);
    } finally {
        // 判斷目前任務是否是主線程在運作,當Rejected政策為CallerRunsPolicy的時候,核對目前線程
        if (mainThread != Thread.currentThread()) {
            EagleEye.clearRpcContext();
        }
    }
​
}           

四、ThreadLocal線程變量概述

4.1 什麼是ThreadLocal

ThreadLocal類提供了線程本地變量(thread-local variables),這些變量不同于普通的變量,通路線程本地變量的每個線程(通過其get或set方法)都有其自己的獨立初始化的變量副本,是以ThreadLocal沒有多線程競争的問題,不需要單獨進行加鎖。

4.2 ThreadLocal使用場景

  • 每個線程都需要有屬于自己的執行個體資料(線程隔離);
  • 架構跨層資料的傳遞;
  • 需要參數全局傳遞的複雜調用鍊路的場景;
  • 資料庫連接配接的管理,在AOP的各種嵌套調用中保證事務的一緻性。

五、ThreadLocal的原理與實踐

對于ThreadLocal而言,常用的方法有get/set/initialValue三個方法。

衆所周知,在java中SimpleDateFormat有線程安全問題,為了安全地使用SimpleDateFormat,除了1)建立SimpleDateFormat局部變量;和2)加同步鎖 兩種方案外,我們還可以使用3)ThreadLocal的方案。

/**
* 使用 ThreadLocal 定義一個全局的 SimpleDateFormat
*/
private static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = new
ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
// 用法
String dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());           

5.1 ThreadLocal原理

Thread内部維護了一個ThreadLocal.ThreadLocalMap執行個體(threadLocals),ThreadLocal的操作都是圍繞着threadLocals來操作的。

threadLocal.get()方法

/**
     * Returns the value in the current thread's copy of this
     * thread-local variable.  If the variable has no value for the
     * current thread, it is first initialized to the value returned
     * by an invocation of the {@link #initialValue} method.
     *
     * @return the current thread's value of this thread-local
     */
public T get() {
    // 1. 擷取目前線程
    Thread t = Thread.currentThread();
    // 2. 擷取目前線程内部的ThreadLocalMap變量t.threadLocals;
    ThreadLocalMap map = getMap(t);
    // 3. 判斷map是否為null
    if (map != null) {
        // 4. 使用目前threadLocal變量擷取entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 5. 判斷entry是否為null
        if (e != null) {
            // 6.傳回Entry.value
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 7. 如果map/entry為null設定初始值
    return setInitialValue();
}
​
 /**
     * Variant of set() to establish initialValue. Used instead
     * of set() in case user has overridden the set() method.
     *
     * @return the initial value
     */
private T setInitialValue() {
    // 1. 初始化value,如果重寫就用重寫後的value,預設null
    T value = initialValue();
    // 2. 擷取目前線程
    Thread t = Thread.currentThread();
    // 3. 擷取目前線程内部的ThreadLocalMap變量
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 4. 不為null就set, key: threadLocal, value: value
        map.set(this, value);
    else
        // 5. map若為null則建立ThreadLocalMap對象
        createMap(t, value);
    return value;
}
​
/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
​
/**
 * Construct a new map initially containing (firstKey, firstValue).
 * ThreadLocalMaps are constructed lazily, so we only create
 * one when we have at least one entry to put in it.
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 1. 初始化entry數組,size: 16
    table = new Entry[INITIAL_CAPACITY];
    // 2. 計算value的index
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    // 3. 在對應index位置指派
    table[i] = new Entry(firstKey, firstValue);
    // 4. entry size
    size = 1;
    // 5. 設定threshold: threshold = len * 2 / 3;
    setThreshold(INITIAL_CAPACITY);
}
/**
 * Set the resize threshold to maintain at worst a 2/3 load factor.
 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}           

threadLocal.set()方法

/**
     * Sets the current thread's copy of this thread-local variable
     * to the specified value.  Most subclasses will have no need to
     * override this method, relying solely on the {@link #initialValue}
     * method to set the values of thread-locals.
     *
     * @param value the value to be stored in the current thread's copy of
     *        this thread-local.
     */
    public void set(T value) {
        // 1. 擷取目前線程
        Thread t = Thread.currentThread();
        // 2. 擷取目前線程内部的ThreadLocalMap變量
        ThreadLocalMap map = getMap(t);
        if (map != null)
            // 3. 設定value
            map.set(this, value);
        else
            // 4. 若map為null則建立ThreadLocalMap
            createMap(t, value);
    }           

ThreadLocalMap

從JDK源碼可見,ThreadLocalMap中的Entry是弱引用類型的,這就意味着如果這個ThreadLocal隻被這個Entry引用,而沒有被其他對象強引用時,就會在下一次GC的時候回收掉。

static class ThreadLocalMap {
​
        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;
​
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
    
    // ...
}           

5.2 ThreadLocal示例

鷹眼鍊路ThreadLocal的使用

EagleEye(鷹眼)作為全鍊路監控系統在集團内部被廣泛使用,traceId、rpcId、壓測标等資訊存儲在EagleEye的ThreadLocal變量中,并在HSF/Dubbo服務調用間進行傳遞。EagleEye通過Filter将資料初始化到ThreadLocal中,部分相關代碼如下:

EagleEyeHttpRequest eagleEyeHttpRequest = this.convertHttpRequest(httpRequest);
// 1. 初始化,将traceId、rpcId等資料存儲到鷹眼的ThreadLocal變量中
EagleEyeRequestTracer.startTrace(eagleEyeHttpRequest, false);
​
try {
    chain.doFilter(httpRequest, httpResponse);
} finally {
    // 2. 清理ThreadLocal變量值
    EagleEyeRequestTracer.endTrace(this.convertHttpResponse(httpResponse));
}           

在EagleEyeFilter中,通過EagleEyeRequestTracer.startTrace方法進行初始化,在前置入參轉換後,通過startTrace重載方法将鷹眼上下文參數存入ThreadLocal中,相關代碼如下:

Java開發實踐:合理使用線程池及線程變量
Java開發實踐:合理使用線程池及線程變量

EagleEyeFilter在finally代碼塊中,通過EagleEyeRequestTracer.endTrace方法結束調用鍊,通過clear方法将ThreadLocal中的資料進行清理,相關代碼實作如下:

Java開發實踐:合理使用線程池及線程變量

Bad case:XX項目權益領取失敗問題

在某權益領取原有鍊路中,通過app打開一級頁面後才能發起權益領取請求,請求經過淘系無線網關(Mtop)後到達服務端,服務端通過mtop sdk擷取目前會話資訊。

在XX項目中,對權益領取鍊路進行了更新改造,在一級頁面請求時,通過服務端同時發起權益領取請求。具體地,服務端在處理一級頁面請求時,同時通過調用hsf/dubbo接口來進行權益領取,是以在發起rpc調用時需要攜帶使用者目前會話資訊,在服務提供端将會話資訊進行提取并注入到mtop上下文,進而才能通過mtop sdk擷取到會話id等資訊。某開發同學在實作時,因ThreadLocal使用不當造成下述問題:

  • 問題1:因ThreadLocal初始化時機不當,造成擷取不到會話資訊,進而導緻權益領取失敗。
  • 問題2:請求完成時,因未清理ThreadLocal中的變量值,導緻髒資料。

問題1:權益領取失敗分析

在權益領取服務中,該應用建構了一套高效和線程安全的依賴注入架構,基于該架構的業務邏輯子產品通常抽象為xxxModule形式,Module間為網狀依賴關系,架構會按依賴關系自動調用init方法(其中,被依賴的module 的init方法先執行)。

在應用中,權益領取接口的主入口為CommonXXApplyModule類,CommonXXApplyModule依賴XXSessionModule。當請求來臨時,會按依賴關系依次調用init方法,是以XXSessionModule的init方法會優先執行;而開發同學在CommonXXApplyModule類中的init方法中通過調用recoverMtopContext()方法來期望恢複mtop上下文,因recoverMtopContext()方法的調用時機過晚,進而導緻XXSessionModule子產品擷取不到正确的會話id等資訊而導緻權益領取失敗。

Java開發實踐:合理使用線程池及線程變量

問題2:髒資料分析

權益領取服務在處理請求時,若目前線程曾經處理過權益領取請求,因ThreadLocal變量值未被清理,此時XXSessionModule通過mtop SDK擷取會話資訊時得到的是前一次請求的會話資訊,進而造成髒資料。

解決方案

在依賴注入架構入口處AbstractGate#visit(或在XXSessionModule中)通過recoverMtopContext方法注入mtop上下文資訊,并在入口方法的finally代碼塊清理目前請求的threadlocal變量值。

5.3 思考&小結

1)ThreadLocalMap中的Entry為什麼要設計為弱引用類型?

若使用強引用類型,則threadlocal的引用鍊為:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal對象)和value;在這種場景下,隻要這個線程還在運作(如線程池場景),若不調用remove方法,則該對象及關聯的所有強引用對象都不會被垃圾回收器回收。

2)使用static和不使用static修飾threadlocal變量有和差別?

若使用static關鍵字進行修飾,則一個線程僅對應一個線程變量;否則,threadlocal語義變為perThread-perInstance,容易引發記憶體洩漏,如下述示例:

public class ThreadLocalTest {
    public static class ThreadLocalDemo {
        private ThreadLocal<String> threadLocalHolder = new ThreadLocal();
​
        public void setValue(String value) {
            threadLocalHolder.set(value);
        }
​
        public String getValue() {
            return threadLocalHolder.get();
        }
    }
​
    public static void main(String[] args) {
        int count = 3;
        List<ThreadLocalDemo> list = new LinkedList<>();
        for (int i = 0; i < count; i++) {
            ThreadLocalDemo demo = new ThreadLocalDemo();
            demo.setValue("demo-" + i);
            list.add(demo);
        }
        System.out.println();
    }
}           

在上述main方法第22行debug,可見線程的threadLocals變量中有3個threadlocal執行個體。在工程實踐中,使用threadlocal時通常期望一個線程隻有一個threadlocal執行個體,是以,若不使用static修飾,期望的語義發生了變化,同時易引起記憶體洩漏。

Java開發實踐:合理使用線程池及線程變量

5.4 最佳實踐

ThreadLocal變量值初始化和清理建議成對出現

如果不執行清理操作,則可能會出現:

1)記憶體洩漏:由于ThreadLocalMap的中key是弱引用,而Value是強引用。這就導緻了一個問題,ThreadLocal在沒有外部對象強引用時,發生GC時弱引用Key會被回收,而Value不會回收,進而Entry裡面的元素出現<null,value>的情況。如果建立ThreadLocal的線程一直持續運作,那麼這個Entry對象中的value就有可能一直得不到回收,這樣可能會導緻記憶體洩露。

2)髒資料:由于線程複用,在使用者1請求時,可能儲存了業務資料在ThreadLocal中,若不清理,則使用者2的請求進來時,可能會讀到使用者1的資料。

建議使用try...finally 進行清理。

ThreadLocal變量建議使用static進行修飾

我們在使用ThreadLocal時,通常期望的語義是perThread,若不使用static進行修飾,則語義變為perThread-perInstance;線上程池場景下,若不用static進行修飾,建立的線程相關執行個體可能會達到 M * N個(其中M為線程數,N為對應類的執行個體數),易造成記憶體洩漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。

謹慎使用ThreadLocal.withInitial

在應用中,謹慎使用ThreadLocal.withInitial(Supplier<? extends S> supplier)這個工廠方法建立ThreadLocal對象,一旦不同線程的ThreadLocal使用了同一個Supplier對象,那麼隔離也就無從談起了,如:

// 反例,實際上使用了共享對象obj而并未隔離,
private static ThreadLocal<Obj> threadLocal = ThreadLocal.withIntitial(() -> obj);           

六、總結

在java工程實踐中,線程池和線程變量被廣泛使用,因線程池和線程變量的不當使用經常造成安全生産事故,是以,正确使用線程池和線程變量是每一位開發人員必須修煉的基本功。本文從線程池和線程變量的使用出發,簡要介紹了線程池和線程變量的原理和使用實踐,各開發人員可結合最佳實踐和實際應用場景,正确地使用線程和線程變量,建構出穩定、高效的java應用服務。

繼續閱讀