天天看點

java中的線程池,這一篇就夠了

java中的線程池,這一篇就夠了

java高并發系列第18篇文章。

本文主要内容

什麼是線程池

線程池實作原理

線程池中常見的各種隊列

自定義線程建立的工廠

常見的飽和政策

自定義飽和政策

線程池中兩種關閉方法有何不同

擴充線程池

合理地配置線程池

線程池中線程數量的配置

大家用jdbc操作過資料庫應該知道,操作資料庫需要和資料庫建立連接配接,拿到連接配接之後才能操作資料庫,用完之後銷毀。資料庫連接配接的建立和銷毀其實是比較耗時的,真正和業務相關的操作耗時是比較短的。每個資料庫操作之前都需要建立連接配接,為了提升系統性能,後來出現了資料庫連接配接池,系統啟動的時候,先建立很多連接配接放在池子裡面,使用的時候,直接從連接配接池中擷取一個,使用完畢之後傳回到池子裡面,繼續給其他需要者使用,這其中就省去建立連接配接的時間,進而提升了系統整體的性能。

線程池和資料庫連接配接池的原理也差不多,建立線程去處理業務,可能建立線程的時間比處理業務的時間還長一些,如果系統能夠提前為我們建立好線程,我們需要的時候直接拿來使用,用完之後不是直接将其關閉,而是将其傳回到線程中中,給其他需要這使用,這樣直接節省了建立和銷毀的時間,提升了系統的性能。

簡單的說,在使用了線程池之後,建立線程變成了從線程池中擷取一個空閑的線程,然後使用,關閉線程變成了将線程歸還到線程池。

當向線程池送出一個任務之後,線程池的處理流程如下:

判斷是否達到核心線程數,若未達到,則直接建立新的線程處理目前傳入的任務,否則進入下個流程

線程池中的工作隊列是否已滿,若未滿,則将任務丢入工作隊列中先存着等待處理,否則進入下個流程

是否達到最大線程數,若未達到,則建立新的線程處理目前傳入的任務,否則交給線程池中的飽和政策進行處理。

流程如下圖:

舉個例子,加深了解:

咱們作為開發者,上面都有開發主管,主管下面帶領幾個小弟幹活,CTO給主管授權說,你可以招聘5個小弟幹活,新來任務,如果小弟還不到吳哥,立即去招聘一個來幹這個新來的任務,當5個小弟都招來了,再來任務之後,将任務記錄到一個表格中,表格中最多記錄100個,小弟們會主動去表格中擷取任務執行,如果5個小弟都在幹活,并且表格中也記錄滿了,那你可以将小弟擴充到20個,如果20個小弟都在幹活,并且存放任務的表也滿了,産品經理再來任務後,是直接拒絕,還是讓産品自己幹,這個由你自己決定,小弟們都盡心盡力在幹活,任務都被處理完了,突然公司業績下滑,幾個員工沒事幹,打醬油,為了節約成本,CTO主管把小弟控制到5人,其他15個人直接被幹掉了。是以作為小弟們,别讓自己閑着,多幹活。

原理:先找幾個人幹活,大家都忙于幹活,任務太多可以排期,排期的任務太多了,再招一些人來幹活,最後幹活的和排期都達到上層上司要求的上限了,那需要采取一些其他政策進行處理了。對于長時間不幹活的人,考慮将其開掉,節約資源和成本。

java中的線程池

jdk中提供了線程池的具體實作,實作類是:java.util.concurrent.ThreadPoolExecutor,主要構造方法:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)           

corePoolSize:核心線程大小,當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即使有其他空閑線程可以處理任務也會創新線程,等到工作的線程數大于核心線程數時就不會在建立了。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前把核心線程都創造好,并啟動

maximumPoolSize:線程池允許建立的最大線程數。如果隊列滿了,并且以建立的線程數小于最大線程數,則線程池會再建立新的線程執行任務。如果我們使用了無界隊列,那麼所有的任務會加入隊列,這個參數就沒有什麼效果了

keepAliveTime:線程池的工作線程空閑後,保持存活的時間。如果沒有任務處理了,有些線程會空閑,空閑的時間超過了這個值,會被回收掉。如果任務很多,并且每個任務的執行時間比較短,避免線程重複建立和回收,可以調大這個時間,提高線程的使用率

unit:keepAliveTIme的時間機關,可以選擇的機關有天、小時、分鐘、毫秒、微妙、千分之一毫秒和納秒。類型是一個枚舉java.util.concurrent.TimeUnit,這個枚舉也經常使用,有興趣的可以看一下其源碼

workQueue:工作隊列,用于緩存待處理任務的阻塞隊列,常見的有4種,本文後面有介紹

threadFactory:線程池中建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字

handler:飽和政策,當線程池無法處理新來的任務了,那麼需要提供一種政策處理送出的新任務,預設有4種政策,文章後面會提到

調用線程池的execute方法處理任務,執行execute方法的過程:

判斷線程池中運作的線程數是否小于corepoolsize,是:則建立新的線程來處理任務,否:執行下一步

試圖将任務添加到workQueue指定的隊列中,如果無法添加到隊列,進入下一步

判斷線程池中運作的線程數是否小于maximumPoolSize,是:則新增線程處理目前傳入的任務,否:将任務傳遞給handler對象rejectedExecution方法處理

線程池的使用步驟:

調用構造方法建立線程池

調用線程池的方法處理任務

關閉線程池

線程池使用的簡單示例

上一個簡單的示例,如下:

package com.itsoku.chat16;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

  • 跟着阿裡p7學并發,微信公衆号:javacode2018

    */

public class Demo1 {

static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
        5,
        10,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(10),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        int j = i;
        String taskName = "任務" + j;
        executor.execute(() -> {
            //模拟任務内部處理耗時
            try {
                TimeUnit.SECONDS.sleep(j);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + taskName + "處理完畢");
        });
    }
    //關閉線程池
    executor.shutdown();
}           

}

輸出:

pool-1-thread-1任務0處理完畢

pool-1-thread-2任務1處理完畢

pool-1-thread-3任務2處理完畢

pool-1-thread-1任務3處理完畢

pool-1-thread-2任務4處理完畢

pool-1-thread-3任務5處理完畢

pool-1-thread-1任務6處理完畢

pool-1-thread-2任務7處理完畢

pool-1-thread-3任務8處理完畢

pool-1-thread-1任務9處理完畢

線程池中常見5種工作隊列

任務太多的時候,工作隊列用于暫時緩存待處理的任務,jdk中常見的5種阻塞隊列:

ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按照先進先出原則對元素進行排序

LinkedBlockingQueue:是一個基于連結清單結構的阻塞隊列,此隊列按照先進先出排序元素,吞吐量通常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool使用了這個隊列。

SynchronousQueue :一個不存儲元素的阻塞隊列,每個插入操作必須等到另外一個線程調用移除操作,否則插入操作一直處理阻塞狀态,吞吐量通常要高于LinkedBlockingQueue,靜态工廠方法Executors.newCachedThreadPool使用這個隊列

PriorityBlockingQueue:優先級隊列,進入隊列的元素按照優先級會進行排序

前2種隊列相關示例就不說了,主要說一下後面2種隊列的使用示例。

SynchronousQueue隊列的線程池

import java.util.concurrent.*;

public class Demo2 {

public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    for (int i = 0; i < 50; i++) {
        int j = i;
        String taskName = "任務" + j;
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "處理" + taskName);
            //模拟任務内部處理耗時
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executor.shutdown();
}           

pool-1-thread-1處理任務0

pool-1-thread-2處理任務1

pool-1-thread-3處理任務2

pool-1-thread-6處理任務5

pool-1-thread-7處理任務6

pool-1-thread-4處理任務3

pool-1-thread-5處理任務4

pool-1-thread-8處理任務7

pool-1-thread-9處理任務8

pool-1-thread-10處理任務9

pool-1-thread-11處理任務10

pool-1-thread-12處理任務11

pool-1-thread-13處理任務12

pool-1-thread-14處理任務13

pool-1-thread-15處理任務14

pool-1-thread-16處理任務15

pool-1-thread-17處理任務16

pool-1-thread-18處理任務17

pool-1-thread-19處理任務18

pool-1-thread-20處理任務19

pool-1-thread-21處理任務20

pool-1-thread-25處理任務24

pool-1-thread-24處理任務23

pool-1-thread-23處理任務22

pool-1-thread-22處理任務21

pool-1-thread-26處理任務25

pool-1-thread-27處理任務26

pool-1-thread-28處理任務27

pool-1-thread-30處理任務29

pool-1-thread-29處理任務28

pool-1-thread-31處理任務30

pool-1-thread-32處理任務31

pool-1-thread-33處理任務32

pool-1-thread-38處理任務37

pool-1-thread-35處理任務34

pool-1-thread-36處理任務35

pool-1-thread-41處理任務40

pool-1-thread-34處理任務33

pool-1-thread-39處理任務38

pool-1-thread-40處理任務39

pool-1-thread-37處理任務36

pool-1-thread-42處理任務41

pool-1-thread-43處理任務42

pool-1-thread-45處理任務44

pool-1-thread-46處理任務45

pool-1-thread-44處理任務43

pool-1-thread-47處理任務46

pool-1-thread-50處理任務49

pool-1-thread-48處理任務47

pool-1-thread-49處理任務48

代碼中使用Executors.newCachedThreadPool()建立線程池,看一下的源碼:

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}           

從輸出中可以看出,系統建立了50個線程處理任務,代碼中使用了SynchronousQueue同步隊列,這種隊列比較特殊,放入元素必須要有另外一個線程去擷取這個元素,否則放入元素會失敗或者一直阻塞在那裡直到有線程取走,示例中任務處理休眠了指定的時間,導緻已建立的工作線程都忙于處理任務,是以新來任務之後,将任務丢入同步隊列會失敗,丢入隊列失敗之後,會嘗試建立線程處理任務。使用上面的方式建立線程池需要注意,如果需要處理的任務比較耗時,會導緻新來的任務都會建立新的線程進行處理,可能會導緻建立非常多的線程,最終耗盡系統資源,觸發OOM。

PriorityBlockingQueue優先級隊列的線程池

public class Demo3 {

static class Task implements Runnable, Comparable<Task> {

    private int i;
    private String name;

    public Task(int i, String name) {
        this.i = i;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "處理" + this.name);
    }

    @Override
    public int compareTo(Task o) {
        return Integer.compare(o.i, this.i);
    }
}

public static void main(String[] args) {
    ExecutorService executor = new ThreadPoolExecutor(1, 1,
            60L, TimeUnit.SECONDS,
            new PriorityBlockingQueue());
    for (int i = 0; i < 10; i++) {
        String taskName = "任務" + i;
        executor.execute(new Task(i, taskName));
    }
    for (int i = 100; i >= 90; i--) {
        String taskName = "任務" + i;
        executor.execute(new Task(i, taskName));
    }
    executor.shutdown();
}           

pool-1-thread-1處理任務100

pool-1-thread-1處理任務99

pool-1-thread-1處理任務98

pool-1-thread-1處理任務97

pool-1-thread-1處理任務96

pool-1-thread-1處理任務95

pool-1-thread-1處理任務94

pool-1-thread-1處理任務93

pool-1-thread-1處理任務92

pool-1-thread-1處理任務91

pool-1-thread-1處理任務90

pool-1-thread-1處理任務9

pool-1-thread-1處理任務8

pool-1-thread-1處理任務7

pool-1-thread-1處理任務6

pool-1-thread-1處理任務5

pool-1-thread-1處理任務4

pool-1-thread-1處理任務3

pool-1-thread-1處理任務2

pool-1-thread-1處理任務1

輸出中,除了第一個任務,其他任務按照優先級高低按順序處理。原因在于:建立線程池的時候使用了優先級隊列,進入隊列中的任務會進行排序,任務的先後順序由Task中的i變量決定。向PriorityBlockingQueue加入元素的時候,内部會調用代碼中Task的compareTo方法決定元素的先後順序。

自定義建立線程的工廠

給線程池中線程起一個有意義的名字,在系統出現問題的時候,通過線程堆棧資訊可以更容易發現系統中問題所在。自定義建立工廠需要實作java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法,參數為傳入的任務,需要傳回一個工作線程。

示例代碼:

import java.util.concurrent.atomic.AtomicInteger;

public class Demo4 {

static AtomicInteger threadNum = new AtomicInteger(1);

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(10), r -> {
        Thread thread = new Thread(r);
        thread.setName("自定義線程-" + threadNum.getAndIncrement());
        return thread;
    });
    for (int i = 0; i < 5; i++) {
        String taskName = "任務-" + i;
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "處理" + taskName);
        });
    }
    executor.shutdown();
}           

自定義線程-1處理任務-0

自定義線程-3處理任務-2

自定義線程-2處理任務-1

自定義線程-4處理任務-3

自定義線程-5處理任務-4

代碼中在任務中輸出了目前線程的名稱,可以看到是我們自定義的名稱。

通過jstack檢視線程的堆棧資訊,也可以看到我們自定義的名稱,我們可以将代碼中executor.shutdown();先給注釋掉讓程式先不退出,然後通過jstack檢視,如下:

4種常見飽和政策

當線程池中隊列已滿,并且線程池已達到最大線程數,線程池會将任務傳遞給飽和政策進行處理。這些政策都實作了RejectedExecutionHandler接口。接口中有個方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor)

參數說明:

r:需要執行的任務

executor:目前線程池對象

JDK中提供了4種常見的飽和政策:

AbortPolicy:直接抛出異常

CallerRunsPolicy:在目前調用者的線程中運作任務,即随丢來的任務,由他自己去處理

DiscardOldestPolicy:丢棄隊列中最老的一個任務,即丢棄隊列頭部的一個任務,然後執行目前傳入的任務

DiscardPolicy:不處理,直接丢棄掉,方法内部為空

需要實作RejectedExecutionHandler接口。任務無法處理的時候,我們想記錄一下日志,我們需要自定義一個飽和政策,示例代碼:

public class Demo5 {

static class Task implements Runnable {
    String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "處理" + this.name);
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "Task{" +
                "name='" + name + '\'' +
                '}';
    }
}

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
            1,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(1),
            Executors.defaultThreadFactory(),
            (r, executors) -> {
                //自定義飽和政策
                //記錄一下無法處理的任務
                System.out.println("無法處理的任務:" + r.toString());
            });
    for (int i = 0; i < 5; i++) {
        executor.execute(new Task("任務-" + i));
    }
    executor.shutdown();
}           

無法處理的任務:Task{name='任務-2'}

無法處理的任務:Task{name='任務-3'}

pool-1-thread-1處理任務-0

無法處理的任務:Task{name='任務-4'}

pool-1-thread-1處理任務-1

輸出結果中可以看到有3個任務進入了飽和政策中,記錄了任務的日志,對于無法處理多任務,我們最好能夠記錄一下,讓開發人員能夠知道。任務進入了飽和政策,說明線程池的配置可能不是太合理,或者機器的性能有限,需要做一些優化調整。

線程池中的2個關閉方法

線程池提供了2個關閉方法:shutdown和shutdownNow,當調用者兩個方法之後,線程池會周遊内部的工作線程,然後調用每個工作線程的interrrupt方法給線程發送中斷信号,内部如果無法響應中斷信号的可能永遠無法終止,是以如果内部有無線循環的,最好在循環内部檢測一下線程的中斷信号,合理的退出。調用者兩個方法中任意一個,線程池的isShutdown方法就會傳回true,當所有的任務線程都關閉之後,才表示線程池關閉成功,這時調用isTerminaed方法會傳回true。

調用shutdown方法之後,線程池将不再接口新任務,内部會将所有已送出的任務處理完畢,處理完畢之後,工作線程自動退出。

而調用shutdownNow方法後,線程池會将還未處理的(在隊裡等待處理的任務)任務移除,将正在進行中的處理完畢之後,工作線程自動退出。

至于調用哪個方法來關閉線程,應該由送出到線程池的任務特性決定,多數情況下調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。

雖然jdk提供了ThreadPoolExecutor這個高性能線程池,但是如果我們自己想在這個線程池上面做一些擴充,比如,監控每個任務執行的開始時間,結束時間,或者一些其他自定義的功能,我們應該怎麼辦?

這個jdk已經幫我們想到了,ThreadPoolExecutor内部提供了幾個方法beforeExecute、afterExecute、terminated,可以由開發人員自己去這些方法。看一下線程池内部的源碼:

try {

beforeExecute(wt, task);//任務執行之前調用的方法
Throwable thrown = null;
try {
    task.run();
} catch (RuntimeException x) {
    thrown = x;
    throw x;
} catch (Error x) {
    thrown = x;
    throw x;
} catch (Throwable x) {
    thrown = x;
    throw new Error(x);
} finally {
    afterExecute(task, thrown);//任務執行完畢之後調用的方法
}           

} finally {

task = null;
w.completedTasks++;
w.unlock();           

beforeExecute:任務執行之前調用的方法,有2個參數,第1個參數是執行任務的線程,第2個參數是任務

protected void beforeExecute(Thread t, Runnable r) { }

afterExecute:任務執行完成之後調用的方法,2個參數,第1個參數表示任務,第2個參數表示任務執行時的異常資訊,如果無異常,第二個參數為null

protected void afterExecute(Runnable r, Throwable t) { }

terminated:線程池最終關閉之後調用的方法。所有的工作線程都退出了,最終線程池會退出,退出時調用該方法

public class Demo6 {

static class Task implements Runnable {
    String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "處理" + this.name);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "Task{" +
                "name='" + name + '\'' +
                '}';
    }
}

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(1),
            Executors.defaultThreadFactory(),
            (r, executors) -> {
                //自定義飽和政策
                //記錄一下無法處理的任務
                System.out.println("無法處理的任務:" + r.toString());
            }) {
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println(System.currentTimeMillis() + "," + t.getName() + ",開始執行任務:" + r.toString());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",任務:" + r.toString() + ",執行完畢!");
        }

        @Override
        protected void terminated() {
            System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",關閉線程池!");
        }
    };
    for (int i = 0; i < 10; i++) {
        executor.execute(new Task("任務-" + i));
    }
    TimeUnit.SECONDS.sleep(1);
    executor.shutdown();
}           

1564324574847,pool-1-thread-1,開始執行任務:Task{name='任務-0'}

1564324574850,pool-1-thread-3,開始執行任務:Task{name='任務-2'}

pool-1-thread-3處理任務-2

1564324574849,pool-1-thread-2,開始執行任務:Task{name='任務-1'}

pool-1-thread-2處理任務-1

1564324574848,pool-1-thread-5,開始執行任務:Task{name='任務-4'}

pool-1-thread-5處理任務-4

1564324574848,pool-1-thread-4,開始執行任務:Task{name='任務-3'}

pool-1-thread-4處理任務-3

1564324574850,pool-1-thread-7,開始執行任務:Task{name='任務-6'}

pool-1-thread-7處理任務-6

1564324574850,pool-1-thread-6,開始執行任務:Task{name='任務-5'}

1564324574851,pool-1-thread-8,開始執行任務:Task{name='任務-7'}

pool-1-thread-8處理任務-7

pool-1-thread-6處理任務-5

1564324574851,pool-1-thread-10,開始執行任務:Task{name='任務-9'}

pool-1-thread-10處理任務-9

1564324574852,pool-1-thread-9,開始執行任務:Task{name='任務-8'}

pool-1-thread-9處理任務-8

1564324576851,pool-1-thread-2,任務:Task{name='任務-1'},執行完畢!

1564324576851,pool-1-thread-3,任務:Task{name='任務-2'},執行完畢!

1564324576852,pool-1-thread-1,任務:Task{name='任務-0'},執行完畢!

1564324576852,pool-1-thread-4,任務:Task{name='任務-3'},執行完畢!

1564324576852,pool-1-thread-8,任務:Task{name='任務-7'},執行完畢!

1564324576852,pool-1-thread-7,任務:Task{name='任務-6'},執行完畢!

1564324576852,pool-1-thread-5,任務:Task{name='任務-4'},執行完畢!

1564324576853,pool-1-thread-6,任務:Task{name='任務-5'},執行完畢!

1564324576853,pool-1-thread-10,任務:Task{name='任務-9'},執行完畢!

1564324576853,pool-1-thread-9,任務:Task{name='任務-8'},執行完畢!

1564324576853,pool-1-thread-9,關閉線程池!

從輸出結果中可以看到,每個需要執行的任務列印了3行日志,執行前由線程池的beforeExecute列印,執行時會調用任務的run方法,任務執行完畢之後,會調用線程池的afterExecute方法,從每個任務的首尾2條日志中可以看到每個任務耗時2秒左右。線程池最終關閉之後調用了terminated方法。

要想合理的配置線程池,需要先分析任務的特性,可以沖一下幾個角度分析:

任務的性質:CPU密集型任務、IO密集型任務和混合型任務

任務的優先級:高、中、低

任務的執行時間:長、中、短

任務的依賴性:是否依賴其他的系統資源,如資料庫連接配接。

性質不同任務可以用不同規模的線程池分開處理。CPU密集型任務應該盡可能小的線程,如配置cpu數量+1個線程的線程池。由于IO密集型任務并不是一直在執行任務,不能讓cpu閑着,則應配置盡可能多的線程,如:cup數量*2。混合型的任務,如果可以拆分,将其拆分成一個CPU密集型任務和一個IO密集型任務,隻要這2個任務執行的時間相差不是太大,那麼分解後執行的吞吐量将高于串行執行的吞吐量。可以通過Runtime.getRuntime().availableProcessors()方法擷取cpu數量。優先級不同任務可以對線程池采用優先級隊列來處理,讓優先級高的先執行。

使用隊列的時候建議使用有界隊列,有界隊列增加了系統的穩定性,如果采用無解隊列,任務太多的時候可能導緻系統OOM,直接讓系統當機。

線程池彙總線程大小對系統的性能有一定的影響,我們的目标是希望系統能夠發揮最好的性能,過多或者過小的線程數量無法有消息的使用機器的性能。咋Java Concurrency inPractice書中給出了估算線程池大小的公式:

Ncpu = CUP的數量

Ucpu = 目标CPU的使用率,0<=Ucpu<=1

W/C = 等待時間與計算時間的比例

為儲存處理器達到期望的使用率,最有的線程池的大小等于:

Nthreads = Ncpu × Ucpu × (1+W/C)

一些使用建議

在《阿裡巴巴java開發手冊》中指出了線程資源必須通過線程池提供,不允許在應用中自行顯示的建立線程,這樣一方面是線程的建立更加規範,可以合理控制開辟線程的數量;另一方面線程的細節管理交給線程池處理,優化了資源的開銷。而線程池不允許使用Executors去建立,而要通過ThreadPoolExecutor方式,這一方面是由于jdk中Executor架構雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等建立線程池的方法,但都有其局限性,不夠靈活;另外由于前面幾種方法内部也是通過ThreadPoolExecutor方式實作,使用ThreadPoolExecutor有助于大家明确線程池的運作規則,建立符合自己的業務場景需要的線程池,避免資源耗盡的風險。