天天看點

Java高效并發(十)----ForkJoin架構分而治之政策ForkJoinPool線程池 ForkJoinTask任務

分而治之政策

當我們要處理很大的資料,一個重要的思想就是把問題劃分成若幹個小問題處理,然後把小問題的結果進行整合,得到最終的結果。在JDK中有一個ForkJoin線程池,使用fork/join方法處理資料。Fork/Join 模式有自己的适用範圍。如果一個應用能被分解成多個子任務,并且組合多個子任務的結果就能夠獲得最終的答案,那麼這個應用就适合用 Fork/Join 模式來解決

ForkJoinPool線程池

java.util.concurrent.ForkJoinPool 繼承了 AbstractExecutorService類,這個線程池提供了下面幾種執行任務的方法,有些有傳回值有些沒有傳回值。有些要求任務是Runnable對象,有些要求任務是Callable對象

Java高效并發(十)----ForkJoin架構分而治之政策ForkJoinPool線程池 ForkJoinTask任務

 它的無參數構造方法,傳回跟系統CPU數量一樣的線程

public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
           

 其中一個重要的方法是,送出ForkJoinTask對象的任務,傳回ForkJoinTask對象的結果 

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }
           

 ForkJoinTask任務

 ForkJoinTask任務就是支援fork()分解和join()等待的任務。

ForkJoinTask是一個J.U.C下的一個抽象類

public abstract class ForkJoinTask<V> implements Future<V>, Serializable

這個類中有兩個方法

fork()方法,當ForkJoinTask任務執行fork()方法,會把ForkJoinTask任務送出給ForkJoinPool

join方法,ForkJoinTask任務執行join()方法,會把結果傳回。

Java高效并發(十)----ForkJoin架構分而治之政策ForkJoinPool線程池 ForkJoinTask任務

它有兩個實作類

①  public abstract class RecursiveTask<V> extends ForkJoinTask<V>  帶有傳回結果的任務

②  public abstract class RecursiveAction extends ForkJoinTask<Void>  沒有傳回結果的任務

 通常我們并不直接繼承 ForkJoinTask,它包含了太多的抽象方法。針對特定的問題,我們可以選擇 ForkJoinTask 的不同子類來完成任務。RecursiveAction 是 ForkJoinTask 的一個子類,它代表了一類最簡單的 ForkJoinTask:不需要傳回值,當子任務都執行完畢之後,不需要進行中間結果的組合。如果我們從 RecursiveAction 開始繼承,那麼我們隻需要重載 

protected void compute()

 方法

RecursiveTask 是 ForkJoinTask 的一個子類,它是需要傳回值的任務,當子任務都執行完畢之後,進行中間結果的組合。如果我們從RecursiveTask 開始繼承,那麼我們隻需要重載 

protected void compute()

 方法,它可使用泛型指定一個傳回值的類型

 下面這個例子解決這樣一個問題就用到帶有傳回結果的任務。

如何充分利用多核CPU,計算很大List中所有整數的和?

那一個大的任務分成幾個小任務來執行。而且在ForkJoinPool線程池中,每一個線程對應一個任務隊列,如果線程A已經完成了自己的任務,發現B線程的任務隊列還有很多,它會去B的任務隊列從後面拿任務來執行,如果任務時ForkJoinTask類型的,就支援join(),一起傳回。

package xidian.lili.reentrantlock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long>{
	public int start;
	public int end;
	public final int THROLDSOLD=1000;
	public CountTask(int start, int end) {
		this.start = start;
		this.end = end;
	}
	protected Long compute() {
		Long sum=0L;
		if((end-start)<THROLDSOLD){
			for(int i=start;i<=end;i++){
				sum+=i;
			}
		}else{
			int step=(start+end)/100;
			ArrayList<CountTask> subtasks=new ArrayList();
			int pos=start;
			for(int i=0;i<100;i++){
				int lastone=pos+step;
				if(lastone>end)lastone=end;
				CountTask subtask=new CountTask(pos,lastone);
				subtasks.add(subtask);
				pos=lastone+1;
				subtask.fork();				
			}
			for(CountTask subtask:subtasks){
				sum+=subtask.join();
			}
		}
		return sum;
	}	
	public static void main(String[] args) {
		CountTask task=new CountTask(0,10000);
		ForkJoinPool pool=new ForkJoinPool();
		ForkJoinTask<Long> result=pool.submit(task);
		try {
			long res=result.get();
			System.out.println(res);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			
			e.printStackTrace();
		}

	}

}
           

 問題

使用Fork/Join模式,如果任務的層次劃分的很深,一直得不到傳回值,一可能就是任務太多,系統内的線程數多,導緻系統性能下降。二可能調用層次太深導緻棧溢出

當我們上述任務的子任務劃分更小,也就是調用層次更深,出現了OOM異常

Java高效并發(十)----ForkJoin架構分而治之政策ForkJoinPool線程池 ForkJoinTask任務

繼續閱讀