1. 程序與線程
通常,一個任務就是一個程序(Process),比如打開一個浏覽器就是啟動一個浏覽器程序,打開一個Word就啟動了一個Word程序。大多時候一個程序需要同時幹很多件事情,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序内部,要同時幹多件事,就需要同時運作多個“子任務”,我們把程序内的這些“子任務”稱為線程(Thread)。即一個程式至少有一個程序,一個程序至少有一個線程。
我們大學學作業系統的時候,都知道程序是資源配置設定的基本機關,線程是執行和排程的基本機關,線程本身不擁有資源,資源來自于它的程序。也就是說,程序在執行過程中有自己獨立的記憶體空間,與其他程序互相隔離,是以程序間的通信需要另辟蹊徑,比如常見的有管道通信、消息隊列、信号量以及套接字等方法,同時,一個程序中有三大塊——程序控制塊(PCB)、資料段、代碼段,這會導緻程序間的會産生很大的開銷。而線程與線程之間因為共享程序申請的記憶體區域,它們之間可以互相通信,因為線程的粒度小,使得線程的切換速度比程序快很多,可以極大地提高程式的運作效率,如下,是線程和程序的關系(圖檔摘自知乎)。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISM9AnYldnJwAzN9c3PnBnauQ0MlQ0MlcnW3BXbMFTQ61keJRlT5VkaOhHM510MBpnT3NGRORTRU1UeBRVT1UERNlHMp1kMNpnTwEEVNZXQU1UNFRUT5hTaNJTT65EMBRVT2NmMiNnSywEd5ITW110MaZHetlVdO1GT0UERNl3YXJGc5kHT20ESjBjUIF2Lc12bj5SYphXa5VWen5WY35iclN3Ztl2Lc9CX6MHc0RHaiojIsJye.jpg)
線程分為兩種:使用者線程和守護線程。守護 線程,是指在程式運作的時候在背景提供一種通用服務的線程,比如垃圾回收線程就是一個很稱職的守護者,并且這種線程并不屬于程式中不可或缺的部分。守護線程是用來服務使用者線程的,一旦使用者線程全部運作結束,程式會終止,守護線程也會随之退出。
在進入多線程之前,可以先看看線程的幾種狀态:
- 1. 建立(NEW):新建立了一個線程對象。
- 2. 就緒(RUNNABLE或READY) :線程正在參與競争CPU的使用權。
- 3. 運作(RUNNING):線程取到了CPU的使用權,正在執行。
- 4. 阻塞(BLOCKED):阻塞狀态是線程因為某種原因放棄CPU使用權,暫時停止運作。直到滿足條件(比如逾時等待、喚醒)時,該線程重新回到就緒态,參與競争CPU使用權。
- 5. 等待(WAITING):線程無限等待某個對象的鎖,或等待另一個線程結束的狀态。
- 6. 計時等待(TIME_WAITING):線程在某一段時間内等待某個對象的“鎖”,或者主動休眠,亦或者等待一個線程結束,除非被中斷,時間一到,馬上回到就緒狀态,被中斷的方法則抛出異常。
- 7. 終止(Terminated):即線程終止(線程的的代碼被執行完畢)和執行過程出現異常或者被外界強制中斷。
狀态的轉換的具體轉換如下圖所示:
2. Thread類和Runnable接口
通過繼承Thread類,實run方法即可實作一個線程類,常用的API如下:
方法描述 | |
---|---|
start() | 從建立狀态轉化為就緒狀态,開始參與CPU使用權的競争。 |
run() | 直接調用該 Runnable 對象的 run 方法時直接取得CPU的使用權 |
interrupt() | 中斷線程。在程式代碼中搭配while (!Thread.interrupted()){..}使用。 |
isDaemon() | 判斷目前線程是否是守護線程。 |
setDaemon(boolean true) | 将目前線程設定為守護線程,必須在調用start()之後才有效。 |
setPriority(int priority) | 更改線程的優先級。 |
interrupt() | 中斷線程。 |
isAlive() | 測試線程是否處于活動狀态。 |
join(long millisec) | 等待該線程終止的時間最長為 millis 毫秒。 |
Thread.yield() | 暫停目前正在執行的線程對象(讓出目前線程的CPU,轉為就緒狀态),并執行其他線程。 |
Thread.currentThread() | 傳回對目前正在執行的線程對象的引用。 |
Thread.sleep(long millisec) | 在指定的毫秒數内讓目前正在執行的線程休眠(暫停執行),此操作受到系統計時器和排程程式精度和準确性的影響。 |
由于java中的類是單繼承的,而接口可以多繼承。一個類實作多個接口的情況,因為接口隻有抽象方法,具體方法隻能由實作接口的類實作,在調用的時候始終隻會調用實作類的方法(不存在歧義),是以在開發中通常使用Runnable。
public class Thread1 implements Runnable {
@Override
public void run() {
System.out.println("iii");
}
public static void main(String[] args) {
Thread1 rt = new Thread1();
Thread t = new Thread(rt);
t.start();
}
}
這裡補充一下線程中斷interrupt()函數,這個函數并不會中斷某個線程,而是向該線程發送一個信号量,如果要使某個線程中斷,則應該加上isInterrupt()函數去判斷,然後再去做中斷處理。如下代碼:
public class Thread3 implements Runnable{
@Override
public void run() {
while(true){
if(Thread.currentThread().isInterrupted()){
System.out.println("Something interrupted me.");
break;
}
else{
System.out.println("Thread is Going...");
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread3 thread3 = new Thread3();
Thread t = new Thread(thread3);
t.start();
Thread.sleep(3000);
t.interrupt();
}
}
3. 線程池
當線程的在某一時刻大量的建立與銷毀會消耗很多資源,我們可以提前建立好一些線程,将他們集中管理起來,形成一個線程池,需要使用的時候直接拿過來用,使用完後,放回線程池。
Executor架構
在 java.util.cocurrent 包下,通過該架構來控制線程的啟動、執行和關閉,可以簡化并發程式設計的操作,由Executors類的五個靜态工廠方法建立,其常用方法如下。
3.1 線程池的建立
- newFixedThreadPool:建立固定大小的線程池。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那麼線程池會補充一個新線程。
public class Executors { /* 函數功能:建立一個固定長度的的線程池,用于儲存任務的阻塞隊列為無限制長度的LinkedBlockingQueue。 線程池中的線程将會一直存在除非線程池shutdown,即線程池中的線程沒有受到存活時間的限制。 */ public static ExecutorService newFixedThreadPool(int nThreads) { /* 參數一 核心線程數大小(最小線程數),當線程數 < 參數一 ,會建立線程執行 runnable * 參數二 最大線程數, 當線程數 >= 參數二,會把runnable放入workQueue(參數5)中 * 參數三 保持存活時間,空閑線程能保持的最大時間。 * 參數四 時間機關 * 參數五 儲存任務的阻塞隊列 *public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueu */ return new ThreadPoolExecutor(nThreads1, nThreads2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //... } ExecutorService es = Executors.newFixedThreadPool(20); //如果線程池中線程數過大或過小,都會影響性能
- newCachedThreadPool:建立一個可緩存空閑線程60秒的線程池。如果線程池的大小超過了處理任務所需要的線程,那麼就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。
public class Executors { public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //... } ExecutorService es = Executors.newCachedThreadPool(); //缺點是在通路量突然很大的時候,會建立大量線程
- 建立一個單線程的線程池。這個線程池隻有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那麼會有一個新的線程來替代它。
ExecutorService es = Executors.newSingleThreadExecutor(); //等同于 ExecutorService es = Executors.newFixedThreadPool(1);
- newScheduledThreadPool:建立一個大小無限的線程池。此線程池支援定時以及周期性執行任務的需求。
public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(1); ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); } }, 1000/*第一個周期開始的時間*/, 2000/*每個周期間隔的時間*/, TimeUnit.MILLISECONDS); }
- newSingleThreadScheduledExecutor:建立一個單線程的線程池。此線程池支援定時以及周期性執行任務的需求。
3.2 線程池中線程的使用
通過Executors類去獲得的線程池都實作了ExecutorService這個接口。可以調用execute()或者submit()方法把相應的任務送出到線程池中去。
1. execute(Runnable): 這個方法接收一個Runnable執行個體,并且異步的執行。
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
// Future future =
es.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("run the thread.");
}
});
System.out.println("over");
}
/* output:
* over
* run the thread.
*/
2. submit(Runnable):
submit(Runnable)
和
execute(Runnable)
差別是前者可以傳回一個Future對象,通過傳回的Future對象,我們可以檢查送出的任務是否執行完畢。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Future future = es.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("run the thread.");
}
});
future.get(); //future.get()方法會産生阻塞,直到上面的線程完成,即等待一秒鐘
System.out.println("over");
}
/* output:
* run the thread.
* over
*/
3. submit(Callable):
submit(Callable)
和
submit(Runnable)
類似,也會傳回一個Future對象,但是參數Callable類中的call方法可以傳回一個值,而Runable不行。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Future future = es.submit(new Callable() {
@Override
public Object call() throws Exception {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "run the thread.";
}
});
String rs = (String)future.get(); //future.get()方法會産生阻塞
System.out.println("over and " + rs);
}
/* output:
* over and run the thread.
*/
4. invokeAny(Collection<? extends Callable<T>> tasks>): 方法輸入接受一個Callable集合類型的參數,啟動多個線程互相獨立的去執行對應線程的任務,一旦有一個線程執行完畢,則傳回,同時其他線程終止。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(3);
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return " first task";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return " second task";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return " third task";
}
});
String rs = es.invokeAny(callables);
System.out.println(rs);
}
/* output
* second task
*/
5. invokeAll(Collection<? extends Callable<T>> tasks>): 該方法則會并行的執行Callable集合類型的所有方法。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(3);
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return " first task";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return " second task";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return " third task";
}
});
List<Future<String>> list= es.invokeAll(callables);
for (Future<String> future:list) {
String s = future.get();
System.out.println(s);
}
}
6. shotdown(): 調用該方法後,關閉線程池,已送出的方法會繼續執行,執行結束後,線程池全部關閉,該方法是一個異步方法,一旦調用,立即傳回。
7.shotdownNow(): 調用該方法後,關閉線程池,已送出的方法也會被取消,線程池立即全部關閉,該方法是一個異步方法,一旦調用,立即傳回。
8. awaitTermination(timeout,unit): 調用該方法阻塞目前線程,使得線程池中的線程執行完畢,最長等待時間為timeout,此方法需要在調用shotdown/shotdownNow後才有效。
public class ThreadSafe implements Runnable {
private static int count = 0;
@Override
public void run() {
for (int i = 0; i < 10; i++) {
count++;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
es.execute(new ThreadSafe());
}
es.shutdown(); //不允許添加線程,異步關閉連接配接池
es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接配接池的線程任務完成
System.out.println(count);
}
}
/* output
* 200
*/
參考文獻
- 龐永華. Java多線程與Socket:實戰微服務架構[M].電子工業出版社.2019.3
- Executors類中建立線程池的幾種方法的分析