天天看點

從線程池拒絕政策中我們可以學到什麼?一、背景二、可以學到什麼?三、總結

一、背景

很多人都知道或者用過線程池,線程池構造方法的參數中有一個參數為拒絕政策。

那麼,通過拒絕政策我們可以學到哪些思想?

下面簡單講講自己的了解。

二、可以學到什麼?

2.1 為什麼提供多種拒絕政策?

不知道你有沒有思考過,為什麼會有那麼多程式設計語言?為什麼會有那麼多算法?為什麼會有那麼多設計模式?為什麼會有那麼多存儲方式?為什麼會有那麼多線程池拒絕政策?

如果你稍微思考,就會發現不同的程式設計語言、算法、設計模式、存儲方式、線程池拒絕政策等适用不同的場景。

從線程池拒絕政策中我們可以學到什麼?一、背景二、可以學到什麼?三、總結

(圖檔來源:

美團技術

啟發:沒有最好的選擇,隻有最适合的選擇

我們要做的是在某個特定場景下選擇最适合的技術,而不是最新、最熱的技術。

2.2 如果我來設計拒絕政策,我能想到幾個?

如果workerCount >= maximumPoolSize,并且線程池内的阻塞隊列已滿, 則根據拒絕政策來處理該任務。

如果由你來設計線程池經典的處理政策你會提供怎樣的政策?

如果我們從來沒學過這一塊,可能就想不出這麼多經典的處理政策。

JDK 源碼提供的拒絕政策應該是經過深思熟慮,能夠覆寫到常見業務場景。

啟發:當我們面臨大資料量處理時,也可以參考這些政策根據其适用的場景去靈活處理。

2.3 為什麼預設政策是 AbortPolicy?

不知道你是否思考過,為什麼預設的政策是 AbortPolicy?

如果預設是 DiscardPolicy 或者 DiscardOldestPolicy,真正觸發拒絕政策時,使用者不容易感覺。

使用者對源碼了解不深入很容易出現不符合預期的運作效果。

顯然超出“承載能力” 丢棄任務并抛出異常是一個更适合的選擇。不提供服務,沒啥好說的,已經超過能力了,沒法提供服務。抛出異常為了讓上遊感覺到并做出相應處理。

另外我們可以看下 Redis 緩存淘汰政策:

1.noeviction(預設政策):對于寫請求不再提供服務,直接傳回錯誤(DEL請求和部分特殊請求除外)

2.allkeys-lru:從所有key中使用LRU算法進行淘汰

3.volatile-lru:從設定了過期時間的key中使用LRU算法進行淘汰

4.allkeys-random:從所有key中随機淘汰資料

5.volatile-random:從設定了過期時間的key中随機淘汰

6.volatile-ttl:在設定了過期時間的key中,淘汰過期時間剩餘最短的

我們會發現兩者也有“驚人”的相似性,都是不提供服務,傳回錯誤。

啟發1:當我們遇到超過承載量的場景時,需要提供不同的選擇,可以借鑒上述思想,并且預設需要考慮報錯,保護自身的同僚,讓上遊感覺到。

啟發2:可以通過對比經典技術對相似問題的處理來得到比較靠譜的方案 or 思考方向。

2.4 如果提供的政策無法滿足需求怎麼辦?

作為一個嚴謹的設計者,就應該預料到自己提供的政策不可能涵蓋所有場景。

是以需要支援使用者自定義才行。

比如 Dubbo 中提供了

AbortPolicyWithReport

拒絕政策,在觸發拒絕政策時可以報告一些資訊。

public class AbortPolicyWithReport extends AbortPolicy {
    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    private final String threadName;
    private final URL url;
    private static volatile long lastPrintTime = 0L;
    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", this.threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), this.url.getProtocol(), this.url.getIp(), this.url.getPort());
        logger.warn(msg);
        this.dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
        long now = System.currentTimeMillis();
        if (now - lastPrintTime >= 600000L) {
            if (guard.tryAcquire()) {
                Executors.newSingleThreadExecutor().execute(new Runnable() {
                    public void run() {
                        String dumpPath = AbortPolicyWithReport.this.url.getParameter("dump.directory", System.getProperty("user.home"));
                        String OS = System.getProperty("os.name").toLowerCase();
                        SimpleDateFormat sdf;
                        if (OS.contains("win")) {
                            sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
                        } else {
                            sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
                        }

                        String dateStr = sdf.format(new Date());
                        FileOutputStream jstackStream = null;

                        try {
                            jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log." + dateStr));
                            JVMUtil.jstack(jstackStream);
                        } catch (Throwable var15) {
                            AbortPolicyWithReport.logger.error("dump jstack error", var15);
                        } finally {
                            AbortPolicyWithReport.guard.release();
                            if (jstackStream != null) {
                                try {
                                    jstackStream.flush();
                                    jstackStream.close();
                                } catch (IOException var14) {
                                }
                            }

                        }

                        AbortPolicyWithReport.lastPrintTime = System.currentTimeMillis();
                    }
                });
            }
        }
    }
}
           

PS: 這也是我們學習和了解 volatile、Semaphore 的一個機會。

再比如 Netty 中就提供了通過建立線程執行的政策:

NewThreadRunsPolicy

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }           

這些開源項目的拒絕政策都為我們開了不少腦洞。

我們也可以根據具體業務,扔到消息隊列裡再消費等方式處理。

啟發1:面向未來程式設計。

當我們設計一些通用工具時,也要留一些拓展性給别人。

啟發1: 帶着問題學技術或者把技術放在特定場景裡學習,更有興趣,也更容易掌握。

三、總結

本文簡單談下自己從線程池拒絕政策中學到的一點知識,希望能夠對大家有啟發。

希望大家在讀源碼時能多一些思考,多思考為什麼,而不是記憶結論。

多問幾個問題,如:

  • 這是為了解決什麼問題?
  • 作者為什麼這麼設計? 這麼設計的優點是什麼?
  • 這麼設計的核心原理是什麼(作者是怎麼設計的) ?
  • 如果是我會怎麼設計?
  • 别人是如何設計的?
  • 我能從中學到什麼,得到什麼對未來技術成長有幫助的啟發?

如果你覺得本文對你有幫助,歡迎點贊、收藏加關注三連!!

推薦:如果想深入了解如何更高效地閱讀源碼,可以看我的 GitChat 文章:

《你真的知道該怎麼讀源碼嗎?》
從線程池拒絕政策中我們可以學到什麼?一、背景二、可以學到什麼?三、總結