關于線程的基本操作。
前言
高并發開發的基礎便是看開發者如何很好地去掌控線程,線程安全,資料安全以及一次請求處理的效率優化,本篇文章為如何去簡單地操作線程,同時引出下一篇關于線程池的文章的講解。
在下面的示例程式中,我會用到許多的工具類方法來避免代碼的備援以及耦合度過高。工具類方法過多便以伸縮代碼塊的形式呈現:
工具類定義
public class ThreadUtil {
static public class CustomThreadFactory implements ThreadFactory {
private AtomicInteger threadNo = new AtomicInteger(1);
private String threadName;
private String tmp;
public CustomThreadFactory(String name){
this.tmp = name;
}
@Override
public Thread newThread(Runnable r) {
this.threadName = this.tmp+threadNo.get();
threadNo.incrementAndGet();
Print.tco("建立了一個線程,名稱為: "+this.threadName);
Thread thread = new Thread(r,threadName);
return thread;
}
}
public static void shutdownThreadPoolGracefully(ExecutorService threadPool){
//若已經關閉則傳回
if (!(threadPool instanceof ExecutorService) || threadPool.isTerminated()){
return;
}
try {
threadPool.shutdown();
}catch (SecurityException e){
return;
}catch (NullPointerException e){
return;
}
try {
//等待60秒,等待線程池中的任務完成執行
if (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
threadPool.shutdownNow();
if (threadPool.awaitTermination(60,TimeUnit.SECONDS)){
System.err.println("線程池任務未正常執行結束");
}
}
} catch (InterruptedException e) {
threadPool.shutdownNow();
}
try {
if (!threadPool.isTerminated()) {
for (int i = 0; i < 100; i++) {
if (threadPool.awaitTermination(10,TimeUnit.MILLISECONDS)){
break;
}
threadPool.shutdownNow();
}
}
}catch (InterruptedException e){
System.err.println(e.getMessage());
}catch (Throwable e){
System.err.println(e.getMessage());
}
}
//懶漢式單例建立線程池:用于執行定時,順序執行
static class SeqOrScheduledTargetThreadPoolLazyHolder{
static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1,
new CustomThreadFactory("seq"));
static {
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("定時和順序任務線程池",()->{
shutdownThreadPoolGracefully(EXECUTOR);
return null;
})
);
}
}
//CPU核數
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//IO處理線程數
private static final int IO_MAX = Math.max(2,CPU_COUNT*2);
/**
* 空閑保活時限,機關秒
*/
private static final int KEEP_ALIVE_SECONDS = 30;
/**
* 有界隊列size
*/
private static final int QUEUE_SIZE = 128;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT;
//懶漢式單例建立線程池:用于IO密集型任務
private static class IoIntenseTargetThreadPoolLazyHolder{
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
IO_MAX,
IO_MAX,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
new CustomThreadFactory("io")
);
static {
EXECUTOR.allowCoreThreadTimeOut(true);
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("IO密集型任務線程池", new Callable() {
@Override
public Object call() throws Exception {
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}
}));
}
}
//懶漢式單例建立線程池:用于CPU密集型任務
private static class CpuIntenseTargetThreadPoolLazyHolder{
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
MAXIMUM_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
new CustomThreadFactory("cpu")
);
static {
EXECUTOR.allowCoreThreadTimeOut(true);
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread("CPU密集型任務線程池", new Callable() {
@Override
public Object call() throws Exception {
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}
}));
}
}
private static final int MIXED_MAX = 128;//最大線程
private static final String MIXED_THREAD_AMOUNT = "thread.amount";
private static class MixedTargetThreadPoolLazyHolder{
//首先從環境變量 thread.amount中擷取預配置的線程數
//如果沒有對thread.amount進行配置,就使用常量MIXED_MAX作為線程數
private static final int max = (null != System.getProperty(MIXED_THREAD_AMOUNT) ?
Integer.parseInt(System.getProperty(MIXED_THREAD_AMOUNT)) : MIXED_MAX);
public static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
max,
max,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
new CustomThreadFactory("mixed")
);
public MixedTargetThreadPoolLazyHolder(){
System.out.println("Mix類建立啦");
}
static {
EXECUTOR.allowCoreThreadTimeOut(true);
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("混合型任務線程池", new Callable() {
@Override
public Object call() throws Exception {
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}
})
);
}
}
public static ThreadPoolExecutor getMixedTargetThreadPool(){
return ThreadUtil.MixedTargetThreadPoolLazyHolder.EXECUTOR;
}
public static String getCurThreadName(){
return Thread.currentThread().getName();
}
public static void sleepMilliSeconds(int millisecond){
LockSupport.parkNanos(millisecond*1000L*1000L);
}
public static void execute(String cft){
synchronized (System.out){
System.out.println(cft);
}
}
public static Thread getCurThread(){
return Thread.currentThread();
}
public static Boolean getCyrThreadDaemon(){
return getCurThread().isDaemon();
}
public static void sleepSeconds(int second){
LockSupport.parkNanos(second * 1000L * 1000L * 1000L);
}
}
如果讀者要以筆者的代碼來示範一下各個功能的話,建議導包形式為:
import static *.ThreadUtil.*;
工具類中的ShutdownHookThread類:
package com.qzlnode.util;
import java.util.concurrent.Callable;
public class ShutdownHookThread extends Thread{
private final Callable callable;
private volatile boolean target = false;
public ShutdownHookThread(String name, Callable callable) {
super(name);
this.callable = callable;
}
@Override
public void run() {
synchronized (this){
try {
if (!target){
target = true;
callable.call();
}
}catch (Exception e){
System.err.println(e.getMessage());
}
}
}
}
為線程設定名稱
Thread類為設定線程的名稱提供了三種方式:
- 在建立線程時,可以通過Thread類構造方法去初始化建立線程的名稱。
- 可以在建立線程後,在啟動線程之前使用Thread類提供的setName方法為線程設定名稱。
- 在前面兩種方法都不用的情況下,系統會自動為線程設定名稱。預設是Thread-i。i為你順序建立的線程的序号
線上程執行使用者代碼邏輯塊的時候,代碼邏輯塊可以有setName()方法,允許線上程運作的過程中,動态地為目前運作的線程設定名稱。
不建議為線程取相同的名稱,前面的文章提到線程的名稱其實是給使用者看的,起相同的名稱容易在使用者使用jstack工具檢視線程時迷糊。
簡單線程名稱操作的例子
public class ThreadNameDemo {
private static int SLEEP_GAP = 500;
static class RunTarget implements Runnable{
@Override
public void run() {
Print.tco(getCurThreadName()+" doing~~~");
sleepMilliSeconds(SLEEP_GAP);
Print.tco(getCurThreadName()+" 執行完成.");
}
}
public static void main(String[] args) {
RunTarget target = new RunTarget();
new Thread(target).start();
new Thread(target).start();
new Thread(target, "線程-A").start();
new Thread(target,"線程-B").start();
sleepMilliSeconds(Integer.MAX_VALUE);
}
}
我們可以來看一下結果:

當我們以構造方法的方式為建立線程設定名稱時,我們在運作線程擷取目前線程名稱,就會看到我們自己設定的名稱,但當我們并沒有為建立線程做出設定名稱的動作時,擷取到的就是系統自動給我們賦的名稱:Thread-i。
讓線程休息一會
如果你有接觸過線程池的話,就知道每一個使用者代碼邏輯塊就是一個工作,我很喜歡把線程看做是流水線員工,它不知疲憊地處理着手中的工作,隻有主管(排程程式)不給它繼續工作下去(沒有獲得到CPU時間片)了,它才停下來但内心還是期待着下一次的配置設定來處理手上的工作。線程那麼地累,不如我們人為地讓它休息一會。
sleep的作用是讓目前正在執行的線程休眠,就線程狀态來說就是從執行狀态變為限時等待狀态。我們從Thread來檢視sleep究竟有幾種用法。
-
public static native void sleep(long millis) throws InterruptedException;
-
public static void sleep(long millis,int nanos) throws InterruptedException;
從Thread類中我們可以看到有兩個sleep重載方法,一個是讓線程休息使用者自定的毫秒時間,另一個則是讓線程休息使用者自定的毫秒加納秒時間。同時我們可以看到每一個方法都抛出了InterruptedException異常,這也要求我們要對此異常進行捕獲并處理。
線程在sleep的時候,線程狀态處于限時等待狀态,在等待狀态中,作業系統并不會為線程配置設定CPU時間片,而時間結束,中途沒有如interrupt喚醒操作的話,線程狀态會自動變為就緒狀态,等待作業系統為其配置設定CPU時間片。
接下來我們來寫個例子:
public class Sleep {
static class SleepThread extends Thread{
static int threadNo = 1;
public SleepThread() {
super("thread-"+threadNo);
threadNo++;
}
@Override
public void run() {
Print.tco("這個線程要開始睡覺咯~~~"+getState());
sleepSeconds(10);//每個線程休息10秒
Print.tco("這個線程睡起來了~~~"+getState());
}
}
public static void main(String[] args) {
Thread thread = null;
for (int i = 0; i < 5; i++) {
thread = new SleepThread();
thread.start();
}
Print.tco(getCurThreadName()+" 執行結束.");
}
}
在程式執行時,我們使用Jstack工具檢視JVM中的線程狀态,從下面的第一張提我們可以很明顯地看到線程處于TIME_WAITING狀态,随着程式運作結束,我們發現輸出在控制台中的結果顯示每一個線程都是Runnable狀态。這是正常的,隻有線程擷取到CPU時間片的時候才會運作
Print.tco()
語句,此時的線程肯定是運作狀态。
但需要注意的是:當線程休息時間到了後,線程的狀态應該是就緒狀态,等到作業系統給了線程CPU時間片才會去輸出這個線程睡起來了這個語句,此時線程的狀态肯定也是運作狀态(Runnable)。
叫醒線程的好夢
忽視線程池技術,一個線程什麼時候執行完成後退出呢?這個問題隻有線程自己能知道,當我們不小心未給線程設定睡醒的時間,線程什麼時候才能退出WAITING狀态呢?現在我們來介紹一下interrupt方法,interrupt與stop方法不同在于:
- stop方法就像你寫文章的時候把你電腦電源拔了,它會導緻資料不安全,資料的不一緻。
- interrupt方法并不是直接中斷線程而是将線程設定為中斷标志位,使用者可以循環檢查這個狀态,然後做出相應的處理。
現在我們來說明一下兩種場景:
(1) 當線程處于BLOACKED狀态時,使用interrupt方法,會立即退出阻塞,并抛出InterruptedException異常,此時我們可以捕獲該異常然後做出一些處理,然後讓線程退出。當然事先線程得準備好處理InterruptedException異常的準備。一般來說當我們調用無限時的sleep、wait、join操作時,線程會進入到阻塞狀态。
為什麼使用interrupt方法隻是設定中斷标志位卻能夠使阻塞退出,并且抛出InterruptException異常,我們從JDK源碼解答:
public void interrupt() {
if (this != Thread.currentThread()) {
checkAccess();
// thread may be blocked in an I/O operation
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupted = true;
interrupt0(); // inform VM of interrupt
b.interrupt(this);
return;
}
}
}
interrupted = true;
// inform VM of interrupt
interrupt0();
}
- 首先進入interrupt方法,會判斷你要喚醒的線程對象是不是等于目前獲得到CPU時間片正在運作的線程對象,如果不是則先擷取安全管理檔案檢查你的權限。
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
- 後獲得鎖 (我們可以看到blockerLock是被final修飾的,也就是說後面線程要通路blockerLock變量的話,都會進入阻塞狀态),執行同步代碼塊,會給b指派一個Interruptible類型的bloack。( 我們也可以看到block是由volatile修飾的,它是輕量級鎖,它保證了共享變量在多線程的可視性。)
什麼是Interruptible類型,從JDK的注釋中我們知道它是可以中斷有着可中斷I/O操作的線程的對象。sleep、wait、join方法就是可以中斷的I/O操作,是以這個b并不是空的。
- 由于運用無限時的sleep、wait、join等可中斷I/O操作方法,是以b不是空,進入if代碼塊,首先将boolean類型的interrupted設定為ture,友善後面清除中斷标志位。後調用interrupt0方法設定中斷标志位,最後調用Interruptible接口的interrupt方法去真正中斷線程,并接收一個InterruptedException異常,也就是說線程會抛出一個InterruptedException異常。
(2) 從上面的源碼讀下去,我們得知當線程正常運作時,使用interrupt方法,僅僅會設定中斷标志位,不會進入到裡面的if代碼塊。是以使用者程式可以在适當位置通過調用isInterrupted()方法來檢視自己是否被中斷,并退出線程。
這裡我們可以一個例子來示範:
@Test
@DisplayName("使用者程式自動檢測Interrupt屬性來判斷是否退出線程")
public void testInterrupted2() throws InterruptedException {
Thread thread = new Thread(){//匿名類
@Override
public void run() {
Print.tco("我起了");
//一直循環
while (true){
Print.tco(isInterrupted());
sleepMilliSeconds(5000);
if (isInterrupted()){
Print.tco("我被秒了");
return;
}
}
}
};
thread.start();
sleepSeconds(2);//等待2秒
thread.interrupt();//中斷線程
sleepSeconds(2);//等待2秒
thread.interrupt();
}
這裡@Test使用是Junit5版本,這個版本功能更豐富,也相容其他檢測引擎。
一起工作的線程倆
有時候一個線程要依賴于其他的線程來完成自己的工作,具體的依賴為:一個線程需要将另一個線程的執行流程合并到自己的執行流程中,這就涉及到了線程的合并。在Thread類中提供了join方法來讓線程之間進行合并,我們進入Thread類來看一下其方法的定義:
-
public final synchronized void join(final long millis) throw InterruptedException;
-
public final synchronized void join(long millis, int nanos) throw InterruptedException;
-
public final void join() throws InterruptedException;
我們可以看到有三種重載的join()方法。
- 調用第一種方法,調用線程會進入TIME_WAITING狀态,直到合并的線程完成了自己執行的任務或者等待合并的線程執行了mills(毫秒)的時間,調用線程才會回到就緒狀态,直到被配置設定到CPU時間片,才會執行自己後面的使用者代碼邏輯塊。
- 調用第二種方法,調用線程會進入TIME_WAITING狀态,直到合并的線程完成了自己執行的任務或者等待合并的線程執行了mills(毫秒)+nanos(納秒)的時間,調用線程才會回到就緒狀态,直到被配置設定到CPU時間片,才會執行自己後面的使用者代碼邏輯塊。
- 調用第三種方法,調用線程會進入WAITING狀态,直到合并的線程完成了自己執行的任務,調用線程才會回到就緒狀态,直到被配置設定到CPU時間片,才會執行自己後面的使用者代碼邏輯塊。
join()方法是執行個體方法,需要使用被合并的線程的句柄(引用對象)去調用。
我們寫一個簡單的代碼去示範其功能。
public class Join {
public static final int SLEEP_GAP = 5000;//睡眠時長
public static final int MAX_TURN = 50;//睡眠次數
static class SleepThread extends Thread {
static int threadNo = 0;
public SleepThread() {
super("sleepThread-" + threadNo++);
}
@Override
public void run() {
Print.tco(getName() + " 進行睡眠.");
sleepMilliSeconds(SLEEP_GAP);
Print.tco(getName() + " 執行結束.")
}
public static void main(String[] args) {
Thread thread1 = new SleepThread();
Print.tco("啟動tread1");
thread1.start();
try {
thread1.join();//合并線程1,不限時
} catch (InterruptedException e) {
e.printStackTrace();
}
Print.tco("啟動thread2");
Thread thread2 = new SleepThread();
thread2.start();
try {
thread2.join(1000);//限時合并,限時1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
Print.tco("線程運作結束.");
}
}
讓出機會的線程