天天看點

java ForkJoin架構使用執行個體

package com.test.thread.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
 * Created by jl on 2018/8/31 0031
 * fork join架構采用分治法将任務分解進而提高整體任務執行效率
 * forkJoinTask.fork()方法用于将任務繼續拆分或執行,如果是拆分,目前線程就充當了監工無法被配置設定到任務
 * forkJoinTask.invokeAll(task1,task2,...)方法是針對fork方法的優化,被invoke的n個任務中會将第一個任務留給目前線程
 * 去執行,這樣遞歸循環下去進而保證所有的線程都充當勞工角色(有些線程既是勞工也是監工,沒有隻是監工的線程)
 * forkJoinTask.join()方法用于傳回任務執行的結果
 */
public class Demo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName());

        ForkJoinPool forkJoinPool = new ForkJoinPool(5);
        CountTask task = new CountTask(1, 6);

        Future<Integer> result = forkJoinPool.submit(task);
        System.out.println(result.get());
    }
}
           
package com.test.thread.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * Created by jl on 2018/8/31 0031
 * 用于計算連續自然數相加的任務
 */
public class CountTask extends RecursiveTask<Integer> {

    //門檻值,任務大小低于該值則結束拆分
    private static final int THRESHOLD = 2;

    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            System.out.println(Thread.currentThread().getName() + "(勞工)");
            // 模拟工作耗時
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //進行運算
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            System.out.println(Thread.currentThread().getName() + "(監工)");
            //如果大于門檻值,就進行任務拆分
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //執行子任務
//            leftTask.fork();
//            rightTask.fork();
            // invokeAll的N個任務中,其中N-1個任務會使用fork()交給其它線程執行,
            // 但是,它還會留一個任務自己執行,這樣,就充分利用了線程池,保證沒有空閑的線程。
            invokeAll(leftTask, rightTask);
            //等待子任務執行完成,得到執行結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任務結果
            sum = leftResult + rightResult;
        }
        return sum;
    }
}