天天看點

Java SE 7新特性

java SE 7對juc包進行了更新,增加了新的輕量級任務執行架構fork/join和多階段thread同步工具。

lightweight task execute framework- ---fork/join

    該架構的目的是為了更好的利用底層平台上的多核cpu和多處理器來進行并行處理,解決問題時通過使用分治(divide and conquer)算法或map/reduce算法來進行。該名稱來源于使用時的兩個基本的操作fork,join,可以類比于map/reduce中的map和reduce操作。fork操作是把一個大的問題劃分成若幹個較小的問題,這個劃分過程一般是遞歸進行的,直到得到可以直接進行計算的粒度合适的子問題。在劃分時需要恰當選取子問題的大小。太大的子問題不利于通過并行方式來提高性能,而太小子問題則會帶來較大的額外開銷。每個子問題在計算完成之後,可以得到關于整個問題的部分解。join操作就是把這些部分解或結果收集并組織起來,得到最終完整的結果。調用join也可能是遞歸進行的,與fork操作相對應。

    相對于一般threadpool的實作,fork/join優勢展現在對其中包含的任務的處理方式上。一般threadpool中如果一個thread正在執行的任務由于某些原因無法繼續運作,則該thread牌等待狀态。而在fork/join架構實作中,如果某個子問題由于等待另外一個子問題的完成而無法運作,則處理該子問題的thread會主動尋找其他尚未運作的子問題來執行,這種方式減少了thread等待時間,提高了性能。為了高效運作,每個fork/join子問題中應該避免使用synchronized關鍵詞或其他方式來進行同步,也不應使用阻塞式io操作或過多的通路共享變量。在理想情況下,每個子問題實作中都應該隻進行cpu去處,而且隻使用每個問題的内部對象,唯一的同步應該隻發生在子問題和建立它的父問題之間。

    一個fork/join架構執行的任務由ForkJoinTask類表示。它實作了Future接口,可以照future接口的方式來使用。中間有兩個最重要的方法form和join,fork方法以異步方式啟動任務的執行,而join則等待任務完成并傳回執行的結果。建立自己的任務時,最好不要直接繼承自ForkJoinTask,而是繼承自它的子類RecursiveTask或RecursiveAction類。兩者差別是前者表示的任務可以傳回結果,後者則不可以。

    任務的執行由ForkJoinPool類的對象來完成。它實作了ExecutorService接口,還可以使用一般的Callable/Runnable接口來表示任務。forkjoinpool中執行的任務可以兩類,一是通過execute,invoke,submit方法直接送出的任務,另外一類是在執行過程中産生的子任務,并通過fork方法來運作。一般做法是表示整個問題的forkjointask類的對象用第一類形式送出,而在執行過程中産生的子任務并不需要進行處理,forkjoinpool類對象會負責子任務的執行。

    示例使用fork/join查找數組中最大值。每個查找任務都在一定數組序号區間範圍内進行。如果目前區間範圍較大,則将其劃分成兩個子區間,對每個子區間建立子任務來進行查找,遞歸整個過程。直到範圍足夠小,再在這個區間中進行順序比較來查找最大值。通過fork啟動子任務,通過join方法來擷取子任務的執行結果。子任務執行完成後,再把得到的部分結果合并到最終結果中,這就是一典型的使用分治算法實作的方式。

    public class MaxVAlue{
        private static final int RANGE_LENGTH=2000;
        private final ForkJoinPool forkJoinPool = new ForkJoinPool();
        private static class MaxValueTask extends RecursiveTask<Long>{
            private final long[] array;
            private final int start;
            private final int end;
            MaxValueTask(long[] array,int start,int end){
                this.array = array;
                this.start = start;
                this.end = end;
            }
            protected Long compute(){
                long max = Long.MIN_VALUE;
                if(end - start <= RANG_LENGTH){
                    for(int i = start;i < end;i++){
                        if(array[i]>max){
                            max = array[i];
                        }
                    }
                }else{
                    int mid = (start+end)/2;
                    MaxValueTask lowTask = new MaxValueTask(array,start,mid);
                    MaxValueTask highTask = new MaxValueTask(array,mid,end);
                    lowTask.fork();
                    highTask.fork();
                    max= Math.max(max,lowTask.join());
                    max = Math.max(max,highTask.join());
                    
                }
                return max;
            }
        }
        //
        public void calculate(long[] array){
            MaxValueTask task = new MaxValueTask(array,0,array.length);
            Long result = forkJoinPool.invoke(task);
            System.out.println(result);
        }
    }                

    以上代碼實作方式的效率要低于直接對整個數組進行順序比較的方式。隻用來做一個示範。因為多thread所帶來的額外開銷過大。實際應用場景中,在一個目錄中包含所有文本檔案中搜尋某個關鍵詞時,可以為每個檔案建立一個子任務。這種實作方式性能要優于單thread的查找方法。如果相關功能可以用遞歸和分治算法來解決就适合使用fork/join架構。

多階段thread同步工具

    Phaser類是新增一個實用同步工具。所提供的功能和靈活性比之前介紹的倒數閘門和循環屏障要強很多。在fork/join架構中子任務間進行同步時,要優先使用Phaser類對象。它的特點是把多thread協作執行的任務切分為多個階段phase,每個階段上都可以有任意個參與者參與。thread可以随時注冊并參與到某個階段的執行中來。當一階段中所有thread都成功完成後,phaser類對象會自動進入下一階段。如果循環下去直到phaser類對象中不再包含任何參與者,此phaser對象運作自動結束。該對象被建立出來後,其初始階段編号為0,在構造方法中可以指定初始的參與者的個數也可以在建立後使用register方法或bulkregister方法動态添加一個或多個參與者。當某參與者完成其任務後調用arriave方法來進行聲明。有的參與者完成一次執行後不再繼續參與,可以調用arriveandderegister方法在聲明完成後取消自己的注冊。如果還需要等待其他參與者完成,可以調用arriveandawaitadvance方法,此時thread會阻塞,直到phaser類對象成功進入下個階段。

版權聲明:本文為CSDN部落客「weixin_33912453」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_33912453/article/details/92469655