天天看點

Java核心技術-并發程式設計-線程與線程池

在現在網際網路項目中,經常面對的是高并發的場景,而針對面對高并發的系統架構主要在緩存、分布式資料庫、消息隊列、分布式部署等進行分享。

如何提高單應用的響應能力就跟線程有莫大的關系了。本文将通過以下内容進行分享:

  • 任務
  • 線程
  • 線程狀态
  • 線程屬性
  • 線程池
  • 傳回結果任務

Part-1:任務

//定義任務:實作Runnable接口
public class LiftOff implements Runnable {
    protected int countDown = 10;
    private static int taskCount = 0;
    private final int id = taskCount++;
    public LiftOff() {    }
    public LiftOff(int countDown) {this.countDown = countDown;    }
    public String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff!") + ")";
    }
    @Override
    public void run() {
        while (countDown-- > 0) {
            System.out.print(status());
            Thread.yield();
        }
    }
}
           

Part-2:線程調用

Thread thread=new Thread(new LiftOff());
thread.start();

//線程初始化方法
public Thread(
    ThreadGroup group,  //線程分組,預設是空
    Runnable target,    //線程執行任務
    String name,        //線程名稱
    long stackSize      //線程棧大小,預設是0表示忽略
) 

           

Part-3:線程狀态

建立、可運作、被阻塞、等待、計時等待、被終止

Java核心技術-并發程式設計-線程與線程池

Part-4:線程屬性

  • 優先級
//線程優先級:最低1,最高10,預設為5
t.setPriority(int newPriority)
           
  • 守護線程
t.setDaemon(true)
           

PS:當程式僅剩下守護線程的時候,虛拟機就會退出

Part-5:線程池

建構一個線程有一定代價,因為涉及與作業系統的互動。如果程式中建立大量生命周期很短的線程,應該使用線程池(thread pool)。

  • 線程池建立基礎類
//Java線程池建構底層方法:
public ThreadPoolExecutor(
    int corePoolSize,                   //線程池工作線程池大小
    int maximumPoolSize,                //線程池最大工作線程數
    long keepAliveTime,                 //非核心線程空閑等待時長
    TimeUnit unit,                      //非核心線程空閑等待時間機關
    BlockingQueue<Runnable> workQueue,  //線程池任務等待隊列
    ThreadFactory threadFactory,        //線程建構工廠,預設DefaultThreadFactory
    RejectedExecutionHandler handler    //
)
           
  • 預設線程池建立方法
//java.util.concurrent.Executors 神明系統不同線程池
//固定線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, 
        nThreads,
        0L, 
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}

//單線程執行個體線程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(
            1, 
            1,
            0L, 
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()));
}

//緩存線程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
            0, 
            Integer.MAX_VALUE,
            60L, 
            TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

//計劃線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//單個核心計劃線程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
           
  • 線程池參數比較
    Java核心技術-并發程式設計-線程與線程池
  • 拒絕政策
//執行指定任務
public static class CallerRunsPolicy implements RejectedExecutionHandler {
//抛出異常:RejectedExecutionException
public static class AbortPolicy implements RejectedExecutionHandler {
//新任務直接丢棄
public static class DiscardPolicy implements RejectedExecutionHandler {
//将等待隊列頭部節點丢棄,将新任務放入隊尾
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
           

Part-6:帶傳回類型任務

  • 實作Callable接口
import java.util.Random;
import java.util.concurrent.Callable;

public class RandomDataCallable implements Callable<Integer> {
    private final Random random = new Random();
    private final Integer randomLimit = 10000;

    @Override
    public Integer call() throws Exception {
        //模拟異步任務
        int randomResult = 0;
        do {
            Thread.currentThread().join(300);
            randomResult = random.nextInt(randomLimit);
        } while (randomResult % 21 != 0);
        return randomResult;
    }
}
           
  • 生成Future對象
List<Future<Integer>> futureResults = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    futureResults.add(executorService.submit(randomDataCallable));
}
           
  • 擷取結果
int countTimes = 0;
while (futureResults.stream().filter(Future::isDone).count() != futureResults.size()) {
    Thread.currentThread().join(100);
    countTimes++;
    System.out.print("等待異步任務結果" + countTimes);
}

futureResults.forEach(futureResult->{
    try {
        System.out.println(futureResult.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
});
           

參考資料

  • Java核心技術(卷1)基礎知識
  • Java程式設計思想