天天看點

個人學習筆記:java常用線程池及相關的接口

1、Runnable接口與Callable接口

(1)Runnable接口

//Runnable接口的唯一方法,無傳回值,也不會抛出異常
void run()
           

(2)Callable接口

//callable的唯一方法
//傳回計算結果,或者如果無法執行則抛出異常。
V	call() throws Exception
           
2、Future架構

(1)Future接口主要方法:

//嘗試取消執行此任務。如果任務已完成,已取消或由于某些其他原因無法取消,則此嘗試将失敗。
boolean cancel(boolean mayInterruptIfRunning)
//如果此任務在正常完成之前被取消則傳回true。
boolean isCancelled()
//如果此任務完成,則傳回true。
//完成可能是由于正常終止,例外或取消,在所有這些情況下,此方法将傳回true。
boolean isDone()
//需要等待計算完成,然後擷取其結果。
V get()throws InterruptedException,ExecutionException
V get(long timeout,TimeUnit unit)throws InterruptedException,ExecutionException,TimeoutException
           

(2)RunnableFuture接口,是Runnable接口Future接口的子接口。除此之外它本身隻有一個run方法。

(3)FutureTask類,實作了RunnableFuture接口。

構造函數:

//建立一個FutureTask在運作時執行給定的Callable。
FutureTask(Callable<V> callable)
//在運作時建立一個FutureTask執行給定的Runnable,get并在成功完成時傳回給定的結果result。
FutureTask(Runnable runnable, V result)
           

并可通過get方法擷取計算結果。

3、Executor接口:

這個接口隻有一個方法,在将來的某個時間執行給定的指令。根據Executor實作的判斷,該指令可以在新線程,池化線程或調用線程中執行。

4、ExecutorService接口(Executor接口的子接口)

主要方法submit。

//送出值傳回任務以執行并傳回表示任務的挂起結果的Future。
//Future的get方法将在成功完成後傳回任務的結果。
<T> Future<T> submit(Callable<T> task)
//送出Runnable任務以執行并傳回表示該任務的Future。
//Future的get方法将在成功完成後傳回給定的結果。
//task要送出的任務,result傳回的結果
<T> Future<T> submit(Runnable task,T result)
//送出Runnable任務以執行并傳回表示該任務的Future。
//Future的get方法将null在成功完成後傳回
Future<?> submit(Runnable task)
           
5、Executors類

這個提供了幾個非常有用的方法來建立線程池。

(1)newCachedThreadPool()方法。(線程數量可變)

建立一個根據需要建立新線程的線程池,但在它們可用時将重用以前構造的線程。

這些池通常會提高執行許多短期異步任務的程式的性能。 調用execute将重用以前構造的線程(如果可用)。如果沒有可用的現有線程,則将建立一個新線程并将其添加到池中。未使用60秒的線程将終止并從緩存中删除。是以,長時間閑置的池不會消耗任何資源。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

(2)newFixedThreadPool()方法

建立一個大小固定線程池,該線程池重用在共享的無界隊列中運作的固定數量的線程。池中的線程将一直存在,直到它顯式出現shutdown。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           

(3)newScheduledThreadPool()方法

建立一個線程池,可以排程指令在給定的延遲後運作,或者定期執行。

//corePoolSize  池中保留的線程數,即使它們處于空閑狀态
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
           

(4)newSingleThreadScheduledExecutor()方法。

建立一個單線程執行程式,保證任務按順序執行,并且不會有多個任務處于活動狀态在任何給定的時間。

(5)newWorkStealingPool()方法

建立一個工作竊取線程池 ,産生的線程是精靈線程(守護線程、背景線程)

//Creates a work-stealing thread pool using all available processors as 
//its target parallelism level.
public static ExecutorService newWorkStealingPool()
           
6、ForkJoinPool類

可以将将一個任務拆分成多個“小任務”并行計算,再把多個“小任務”的結果合并成總的計算結果,與MapReduce計算的過程有相似之處。任務應該是RecursiveAction類或者RecursiveTask的子類。

使用示例:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinPool {

    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
    }


    static class AddTask extends RecursiveTask<Long> {

        private static final long serialVersionUID = 1L;
        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            }

            int middle = start + (end-start)/2;

            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();

            return subTask1.join() + subTask2.join();
        }

    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        forkJoinPool.execute(task);
        long result = task.join();
        System.out.println(result);
    }
}
           
7、ThreadPoolExecutor類

可以看到Executors中建立newCachedThreadPool和newFixedThreadPool都是調用這個類的構造方法。而newScheduledThreadPool是先調用ScheduledThreadPoolExecutor類的一個構造方法,然後在ScheduledThreadPoolExecutor的構造方法中調用其父類ThreadPoolExecutor類的一個構造方法來建立的。也就是說,newCachedThreadPool、newFixedThreadPool和ScheduledThreadPoolExecutor這三個建立線程池的方法最終都是通過調用ThreadPoolExecutor這個類的構造方法來建立的。