天天看點

業務代碼最常用也最容易犯錯的元件

作者:一個即将退役的碼農

在程式中,我們會用各種池化技術來緩存建立昂貴的對象,比如線程池、連接配接池、記憶體池。一般是預先建立一些對象放入池中,使用的時候直接取出使用,用完歸還以便複用,還會通過一定的政策調整池中緩存對象的數量,實作池的動态伸縮。

由于線程的建立比較昂貴,随意、沒有控制地建立大量線程會造成性能問題,是以短平快的任務一般考慮使用線程池來處理,而不是直接建立線程。

今天,我們就針對線程池這個話題展開讨論,通過三個生産事故,來看看使用線程池應該注意些什麼。

線程池的聲明需要手動進行

Java 中的 Executors 類定義了一些快捷的工具方法,來幫助我們快速建立線程池。《阿裡巴巴 Java 開發手冊》中提到,禁止使用這些方法來建立線程池,而應該手動 new ThreadPoolExecutor 來建立線程池。這一條規則的背後,是大量血淋淋的生産事故,最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因為資源耗盡導緻 OOM 問題。

首先,我們來看一下 newFixedThreadPool 為什麼可能會出現 OOM 的問題。

我們寫一段測試代碼,來初始化一個單線程的 FixedThreadPool,循環 1 億次向線程池送出任務,每個任務都會建立一個比較大的字元串然後休眠一小時:

@GetMapping("oom1")
public void oom1() throws InterruptedException {

    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);

    //列印線程池的資訊,稍後我會解釋這段代碼
    printStats(threadPool); 

    for (int i = 0; i < 100000000; i++) {

        threadPool.execute(() -> {
            String payload = IntStream.rangeClosed(1, 1000000)
                    .mapToObj(__ -> "a")
                    .collect(Collectors.joining("")) + UUID.randomUUID().toString();
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
            }

            log.info(payload);

        });
    }

    threadPool.shutdown();
    threadPool.awaitTermination(1, TimeUnit.HOURS);
}
           

執行程式後不久,日志中就出現了如下 OOM:

Exception in thread "http-nio-45678-ClientPoller" java.lang.OutOfMemoryError: GC overhead limit exceeded
           

翻看 newFixedThreadPool 方法的源碼不難發現,線程池的工作隊列直接 new 了一個 LinkedBlockingQueue,而預設構造方法的 LinkedBlockingQueue 是一個 Integer.MAX_VALUE 長度的隊列,可以認為是無界的:

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    ...



    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
...
}
           

雖然使用 newFixedThreadPool 可以把工作線程控制在固定的數量上,但任務隊列是無界的。如果任務較多并且執行較慢的話,隊列可能會快速積壓,撐爆記憶體導緻 OOM。

我們再把剛才的例子稍微改一下,改為使用 newCachedThreadPool 方法來獲得線程池。程式運作不久後,同樣看到了如下 OOM 異常:

[11:30:30.487] [http-nio-45678-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause

java.lang.OutOfMemoryError: unable to create new native thread 
           

從日志中可以看到,這次 OOM 的原因是無法建立線程,翻看 newCachedThreadPool 的源碼可以看到,這種線程池的最大線程數是 Integer.MAX_VALUE,可以認為是沒有上限的,而其工作隊列 SynchronousQueue 是一個沒有存儲空間的阻塞隊列。這意味着,隻要有請求到來,就必須找到一條工作線程來處理,如果目前沒有空閑的線程就再建立一條新的。

由于我們的任務需要 1 小時才能執行完成,大量的任務進來後會建立大量的線程。我們知道線程是需要配置設定一定的記憶體空間作為線程棧的,比如 1MB,是以無限制建立線程必然會導緻 OOM:

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
           

其實,大部分 Java 開發同學知道這兩種線程池的特性,隻是抱有僥幸心理,覺得隻是使用線程池做一些輕量級的任務,不可能造成隊列積壓或開啟大量線程。

但,現實往往是殘酷的。我之前就遇到過這麼一個事故:使用者注冊後,我們調用一個外部服務去發送短信,發送短信接口正常時可以在 100 毫秒内響應,TPS 100 的注冊量,CachedThreadPool 能穩定在占用 10 個左右線程的情況下滿足需求。在某個時間點,外部短信服務不可用了,我們調用這個服務的逾時又特别長,比如 1 分鐘,1 分鐘可能就進來了 6000 使用者,産生 6000 個發送短信的任務,需要 6000 個線程,沒多久就因為無法建立線程導緻了 OOM,整個應用程式崩潰。

是以,我同樣不建議使用 Executors 提供的兩種快捷的線程池,原因如下:

我們需要根據自己的場景、并發情況來評估線程池的幾個核心參數,包括核心線程數、最大線程數、線程回收政策、工作隊列的類型,以及拒絕政策,確定線程池的工作行為符合需求,一般都需要設定有界的工作隊列和可控的線程數。

任何時候,都應該為自定義線程池指定有意義的名稱,以友善排查問題。當出現線程數量暴增、線程死鎖、線程占用大量 CPU、線程執行出現異常等問題時,我們往往會抓取線程棧。此時,有意義的線程名稱,就可以友善我們定位問題。

除了建議手動聲明線程池以外,我還建議用一些監控手段來觀察線程池的狀态。線程池這個元件往往會表現得任勞任怨、默默無聞,除非是出現了拒絕政策,否則壓力再大都不會抛出一個異常。如果我們能提前觀察到線程池隊列的積壓,或者線程數量的快速膨脹,往往可以提早發現并解決問題。

線程池線程管理政策詳解

在之前的 Demo 中,我們用一個 printStats 方法實作了最簡陋的監控,每秒輸出一次線程池的基本内部資訊,包括線程數、活躍線程數、完成了多少任務,以及隊列中還有多少積壓任務等資訊:

private void printStats(ThreadPoolExecutor threadPool) {

   Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {

        log.info("=========================");
        log.info("Pool Size: {}", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
        log.info("=========================");

    }, 0, 1, TimeUnit.SECONDS);

}
           

接下來,我們就利用這個方法來觀察一下線程池的基本特性吧。

首先,自定義一個線程池。這個線程池具有 2 個核心線程、5 個最大線程、使用容量為 10 的 ArrayBlockingQueue 阻塞隊列作為工作隊列,使用預設的 AbortPolicy 拒絕政策,也就是任務添加到線程池失敗會抛出 RejectedExecutionException。此外,我們借助了 Jodd 類庫的 ThreadFactoryBuilder 方法來構造一個線程工廠,實作線程池線程的自定義命名。

然後,我們寫一段測試代碼來觀察線程池管理線程的政策。測試代碼的邏輯為,每次間隔 1 秒向線程池送出任務,循環 20 次,每個任務需要 10 秒才能執行完成,代碼如下:

@GetMapping("right")
public int right() throws InterruptedException {

    //使用一個計數器跟蹤完成的任務數
    AtomicInteger atomicInteger = new AtomicInteger();

    //建立一個具有2個核心線程、5個最大線程,使用容量為10的ArrayBlockingQueue阻塞隊列作為工作隊列的線程池,使用預設的AbortPolicy拒絕政策
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2, 5,
            5, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get(),
            new ThreadPoolExecutor.AbortPolicy());

    printStats(threadPool);

    //每隔1秒送出一次,一共送出20次任務
    IntStream.rangeClosed(1, 20).forEach(i -> {

        try {
            TimeUnit.SECONDS.sleep(1);

        } catch (InterruptedException e) {
            e.printStackTrace();

        }

        int id = atomicInteger.incrementAndGet();

        try {
            threadPool.submit(() -> {
                log.info("{} started", id);

                //每個任務耗時10秒
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {

                }
                log.info("{} finished", id);
            });

        } catch (Exception ex) {

            //送出出現異常的話,列印出錯資訊并為計數器減一
            log.error("error submitting task {}", id, ex);
            atomicInteger.decrementAndGet();

        }

    });

    TimeUnit.SECONDS.sleep(60);

    return atomicInteger.intValue();

}
           

60 秒後頁面輸出了 17,有 3 次送出失敗了:

業務代碼最常用也最容易犯錯的元件

并且日志中也出現了 3 次類似的錯誤資訊:

[14:24:52.879] [http-nio-45678-exec-1] [ERROR] [.t.c.t.demo1.ThreadPoolOOMController:103 ] - error submitting task 18

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@163a2dec rejected from java.util.concurrent.ThreadPoolExecutor@18061ad2[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 2]
           

我們把 printStats 方法列印出的日志繪制成圖表,得出如下曲線:

業務代碼最常用也最容易犯錯的元件

至此,我們可以總結出線程池預設的工作行為:

不會初始化 corePoolSize 個線程,有任務來了才建立工作線程;

當核心線程滿了之後不會立即擴容線程池,而是把任務堆積到工作隊列中;

當工作隊列滿了後擴容線程池,一直到線程個數達到 maximumPoolSize 為止;

如果隊列已滿且達到了最大線程後還有任務進來,按照拒絕政策處理;

當線程數大于核心線程數時,線程等待 keepAliveTime 後還是沒有任務需要處理的話,收縮線程到核心線程數。

了解這個政策,有助于我們根據實際的容量規劃需求,為線程池設定合适的初始化參數。當然,我們也可以通過一些手段來改變這些預設工作行為,比如:

聲明線程池後立即調用 prestartAllCoreThreads 方法,來啟動所有核心線程;

傳入 true 給 allowCoreThreadTimeOut 方法,來讓線程池在空閑的時候同樣回收核心線程。

不知道你有沒有想過:Java 線程池是先用工作隊列來存放來不及處理的任務,滿了之後再擴容線程池。當我們的工作隊列設定得很大時,最大線程數這個參數顯得沒有意義,因為隊列很難滿,或者到滿的時候再去擴容線程池已經于事無補了。

那麼,我們有沒有辦法讓線程池更激進一點,優先開啟更多的線程,而把隊列當成一個後備方案呢?比如我們這個例子,任務執行得很慢,需要 10 秒,如果線程池可以優先擴容到 5 個最大線程,那麼這些任務最終都可以完成,而不會因為線程池擴容過晚導緻慢任務來不及處理。

限于篇幅,這裡我隻給你一個大緻思路:

由于線程池在工作隊列滿了無法入隊的情況下會擴容線程池,那麼我們是否可以重寫隊列的 offer 方法,造成這個隊列已滿的假象呢?

由于我們 Hack 了隊列,在達到了最大線程後勢必會觸發拒絕政策,那麼能否實作一個自定義的拒絕政策處理程式,這個時候再把任務真正插入隊列呢?

接下來,就請你動手試試看如何實作這樣一個“彈性”線程池吧。Tomcat 線程池也實作了類似的效果,可供你借鑒。

務必确認清楚線程池本身是不是複用的

不久之前我遇到了這樣一個事故:某項目生産環境時不時有報警提示線程數過多,超過 2000 個,收到報警後檢視監控發現,瞬時線程數比較多但過一會兒又會降下來,線程數抖動很厲害,而應用的通路量變化不大。

為了定位問題,我們線上程數比較高的時候進行線程棧抓取,抓取後發現記憶體中有 1000 多個自定義線程池。一般而言,線程池肯定是複用的,有 5 個以内的線程池都可以認為正常,而 1000 多個線程池肯定不正常。

在項目代碼裡,我們沒有搜到聲明線程池的地方,搜尋 execute 關鍵字後定位到,原來是業務代碼調用了一個類庫來獲得線程池,類似如下的業務代碼:調用 ThreadPoolHelper 的 getThreadPool 方法來獲得線程池,然後送出數個任務到線程池處理,看不出什麼異常。

@GetMapping("wrong")
public String wrong() throws InterruptedException {

    ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();

    IntStream.rangeClosed(1, 10).forEach(i -> {
        threadPool.execute(() -> {
            ...
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {

            }
        });
    });

    return "OK";
}
           

但是,來到 ThreadPoolHelper 的實作讓人大跌眼鏡,getThreadPool 方法居然是每次都使用 Executors.newCachedThreadPool 來建立一個線程池。

class ThreadPoolHelper {

    public static ThreadPoolExecutor getThreadPool() {

        //線程池沒有複用
        return (ThreadPoolExecutor) Executors.newCachedThreadPool();

    }

}
           

我們想到 newCachedThreadPool 會在需要時建立必要多的線程,業務代碼的一次業務操作會向線程池送出多個慢任務,這樣執行一次業務操作就會開啟多個線程。如果業務操作并發量較大的話,的确有可能一下子開啟幾千個線程。

那,為什麼我們能在監控中看到線程數量會下降,而不會撐爆記憶體呢?

回到 newCachedThreadPool 的定義就會發現,它的核心線程數是 0,而 keepAliveTime 是 60 秒,也就是在 60 秒之後所有的線程都是可以回收的。好吧,就因為這個特性,我們的業務程式死得沒太難看。

要修複這個 Bug 也很簡單,使用一個靜态字段來存放線程池的引用,傳回線程池的代碼直接傳回這個靜态字段即可。這裡一定要記得我們的最佳實踐,手動建立線程池。修複後的 ThreadPoolHelper 類如下:

class ThreadPoolHelper {

  private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    10, 50,
    2, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());

  public static ThreadPoolExecutor getRightThreadPool() {

    return threadPoolExecutor;
  }

}
           

需要仔細斟酌線程池的混用政策

線程池的意義在于複用,那這是不是意味着程式應該始終使用一個線程池呢?

當然不是。通過第一小節的學習我們知道,要根據任務的“輕重緩急”來指定線程池的核心參數,包括線程數、回收政策和任務隊列:

對于執行比較慢、數量不大的 IO 任務,或許要考慮更多的線程數,而不需要太大的隊列。

而對于吞吐量較大的計算型任務,線程數量不宜過多,可以是 CPU 核數或核數 *2(理由是,線程一定排程到某個 CPU 進行執行,如果任務本身是 CPU 綁定的任務,那麼過多的線程隻會增加線程切換的開銷,并不能提升吞吐量),但可能需要較長的隊列來做緩沖。

之前我也遇到過這麼一個問題,業務代碼使用了線程池異步處理一些記憶體中的資料,但通過監控發現處理得非常慢,整個處理過程都是記憶體中的計算不涉及 IO 操作,也需要數秒的處理時間,應用程式 CPU 占用也不是特别高,有點不可思議。

經排查發現,業務代碼使用的線程池,還被一個背景的檔案批處理任務用到了。

或許是夠用就好的原則,這個線程池隻有 2 個核心線程,最大線程也是 2,使用了容量為 100 的 ArrayBlockingQueue 作為工作隊列,使用了 CallerRunsPolicy 拒絕政策:

private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        2, 2,
        1, TimeUnit.HOURS,
        new ArrayBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("batchfileprocess-threadpool-%d").get(),
        new ThreadPoolExecutor.CallerRunsPolicy());
           

這裡,我們模拟一下檔案批處理的代碼,在程式啟動後通過一個線程開啟死循環邏輯,不斷向線程池送出任務,任務的邏輯是向一個檔案中寫入大量的資料:

@PostConstruct
public void init() {

    printStats(threadPool);
    new Thread(() -> {

        //模拟需要寫入的大量資料
        String payload = IntStream.rangeClosed(1, 1_000_000)
                .mapToObj(__ -> "a")
                .collect(Collectors.joining(""));

        while (true) {
            threadPool.execute(() -> {
                try {

                    //每次都是建立并寫入相同的資料到相同的檔案
                    Files.write(Paths.get("demo.txt"), Collections.singletonList(LocalTime.now().toString() + ":" + payload), UTF_8, CREATE, TRUNCATE_EXISTING);

                } catch (IOException e) {
                    e.printStackTrace();

                }
                log.info("batch file processing done");
            });
        }
    }).start();

}
           

可以想象到,這個線程池中的 2 個線程任務是相當重的。通過 printStats 方法列印出的日志,我們觀察下線程池的負擔:

業務代碼最常用也最容易犯錯的元件

可以看到,線程池的 2 個線程始終處于活躍狀态,隊列也基本處于打滿狀态。因為開啟了 CallerRunsPolicy 拒絕處理政策,是以當線程滿載隊列也滿的情況下,任務會在送出任務的線程,或者說調用 execute 方法的線程執行,也就是說不能認為送出到線程池的任務就一定是異步處理的。如果使用了 CallerRunsPolicy 政策,那麼有可能異步任務變為同步執行。從日志的第四行也可以看到這點。這也是這個拒絕政策比較特别的原因。

不知道寫代碼的同學為什麼設定這個政策,或許是測試時發現線程池因為任務處理不過來出現了異常,而又不希望線程池丢棄任務,是以最終選擇了這樣的拒絕政策。不管怎樣,這些日志足以說明線程池是飽和狀态。

可以想象到,業務代碼複用這樣的線程池來做記憶體計算,命運一定是悲慘的。我們寫一段代碼測試下,向線程池送出一個簡單的任務,這個任務隻是休眠 10 毫秒沒有其他邏輯:

private Callable<Integer> calcTask() {

    return () -> {
        TimeUnit.MILLISECONDS.sleep(10);
        return 1;

    };
}

@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException {

    return threadPool.submit(calcTask()).get();
}
           

我們使用 wrk 工具對這個接口進行一個簡單的壓測,可以看到 TPS 為 75,性能的确非常差。

業務代碼最常用也最容易犯錯的元件

細想一下,問題其實沒有這麼簡單。因為原來執行 IO 任務的線程池使用的是 CallerRunsPolicy 政策,是以直接使用這個線程池進行異步計算的話,當線程池飽和的時候,計算任務會在執行 Web 請求的 Tomcat 線程執行,這時就會進一步影響到其他同步處理的線程,甚至造成整個應用程式崩潰。

解決方案很簡單,使用獨立的線程池來做這樣的“計算任務”即可。計算任務打了雙引号,是因為我們的模拟代碼執行的是休眠操作,并不屬于 CPU 綁定的操作,更類似 IO 綁定的操作,如果線程池線程數設定太小會限制吞吐能力:

private static ThreadPoolExecutor asyncCalcThreadPool = new ThreadPoolExecutor(
  200, 200,
  1, TimeUnit.HOURS,
  new ArrayBlockingQueue<>(1000),
  new ThreadFactoryBuilder().setNameFormat("asynccalc-threadpool-%d").get());

@GetMapping("right")
public int right() throws ExecutionException, InterruptedException {
  return asyncCalcThreadPool.submit(calcTask()).get();

}
           

使用單獨的線程池改造代碼後再來測試一下性能,TPS 提高到了 1727:

業務代碼最常用也最容易犯錯的元件

可以看到,盲目複用線程池混用線程的問題在于,别人定義的線程池屬性不一定适合你的任務,而且混用會互相幹擾。這就好比,我們往往會用虛拟化技術來實作資源的隔離,而不是讓所有應用程式都直接使用實體機。

就線程池混用問題,我想再和你補充一個坑:Java 8 的 parallel stream 功能,可以讓我們很友善地并行處理集合中的元素,其背後是共享同一個 ForkJoinPool,預設并行度是 CPU 核數 -1。對于 CPU 綁定的任務來說,使用這樣的配置比較合适,但如果集合操作涉及同步 IO 操作的話(比如資料庫操作、外部服務調用等),建議自定義一個 ForkJoinPool(或普通線程池)。你可以參考第一講的相關 Demo。

重點回顧

線程池管理着線程,線程又屬于寶貴的資源,有許多應用程式的性能問題都來自線程池的配置和使用不當。在今天的學習中,我通過三個和線程池相關的生産事故,和你分享了使用線程池的幾個最佳實踐。

第一,Executors 類提供的一些快捷聲明線程池的方法雖然簡單,但隐藏了線程池的參數細節。是以,使用線程池時,我們一定要根據場景和需求配置合理的線程數、任務隊列、拒絕政策、線程回收政策,并對線程進行明确的命名友善排查問題。

第二,既然使用了線程池就需要確定線程池是在複用的,每次 new 一個線程池出來可能比不用線程池還糟糕。如果你沒有直接聲明線程池而是使用其他同學提供的類庫來獲得一個線程池,請務必檢視源碼,以确認線程池的執行個體化方式和配置是符合預期的。

第三,複用線程池不代表應用程式始終使用同一個線程池,我們應該根據任務的性質來選用不同的線程池。特别注意 IO 綁定的任務和 CPU 綁定的任務對于線程池屬性的偏好,如果希望減少任務間的互相幹擾,考慮按需使用隔離的線程池。

最後我想強調的是,線程池作為應用程式内部的核心元件往往缺乏監控(如果你使用類似 RabbitMQ 這樣的 MQ 中間件,運維同學一般會幫我們做好中間件監控),往往到程式崩潰後才發現線程池的問題,很被動。在設計篇中我們會重新談及這個問題及其解決方案。

思考與讨論

我們改進了 ThreadPoolHelper 使其能夠傳回複用的線程池。如果我們不小心每次都建立了這樣一個自定義的線程池(10 核心線程,50 最大線程,2 秒回收的),反複執行測試接口線程,最終可以被回收嗎?會出現 OOM 問題嗎?

你還遇到過線程池相關的其他坑嗎?

繼續閱讀