天天看點

JVM 并發性: Java 和 Scala 并發性基礎(1)

轉載于:http://www.itxuexiwang.com/a/javajishu/jvm_jdk_yuanmafenxi/2016/0129/50.html?1454076380

處理器速度數十年來一直持續快速發展,并在世紀交替之際走到了終點。從那時起,處理器制造商更多地是通過增加核心來提高晶片性能,而不再通過增加時鐘速率來提高晶片性能。多核系統現在成為了從手機到企業伺服器等所有裝置的标準,而這種趨勢可能繼續并有所加速。開發人員越來越需要在他們的應用程式代碼中支援多個核心,這樣才能滿足性能需求。

在本系列文章中,您将了解一些針對 Java 和 Scala 語言的并發程式設計的新方法,包括 Java 如何将 Scala 和其他基于 JVM 的語言中已經探索出來的理念結合在一起。第一期文章将介紹一些背景,通過介紹 Java 7 和 Scala 的一些最新技術,幫助了解 JVM 上的并發程式設計的全景。您将了解如何使用 Java 

ExecutorService

 和 

ForkJoinPool

 類來簡化并發程式設計。還将了解一些将并發程式設計選項擴充到純 Java 中的已有功能之外的基本 Scala 特性。在此過程中,您會看到不同的方法對并發程式設計性能有何影響。後續幾期文章将會介紹 Java 8 中的并發性改進和一些擴充,包括用于執行可擴充的 Java 和 Scala 程式設計的 Akka 工具包。

Java 并發性支援

在 Java 平台誕生之初,并發性支援就是它的一個特性,線程和同步的實作為它提供了超越其他競争語言的優勢。Scala 基于 Java 并在 JVM 上運作,能夠直接通路所有 Java 運作時(包括所有并發性支援)。是以在分析 Scala 特性之前,我首先會快速回顧一下 Java 語言已經提供的功能。

Java 線程基礎

在 Java 程式設計過程中建立和使用線程非常容易。它們由 

java.lang.Thread

 類表示,線程要執行的代碼為 

java.lang.Runnable

 執行個體的形式。如果需要的話,可以在應用程式中建立大量線程,您甚至可以建立數千個線程。在有多個核心時,JVM 使用它們來并發執行多個線程;超出核心數量的線程會共享這些核心。

Java 5:并發性的轉折點

Java 從一開始就包含對線程和同步的支援。但線上程間共享資料的最初規範不夠完善,這帶來了 Java 5 的 Java 語言更新中的重大變化 (JSR-133)。Java Language Specification for Java 5 更正并規範化了 

synchronized#p#分頁标題#e#

 和 

volatile

 操作。該規範還規定不變的對象如何使用多線程。(基本上講,隻要在執行構造函數時不允許引用 “轉義”,不變的對象始終是線程安全的。)以前,線程間的互動通常需要使用阻塞的 

synchronized

 操作。這些更改支援使用 

volatile

 線上程間執行非阻塞協調。是以,在 Java 5 中添加了新的并發集合類來支援非阻塞操作 — 這與早期僅支援阻塞的線程安全方法相比是一項重大改進。

線程操作的協調難以讓人了解。隻要從程式的角度讓所有内容保持一緻,Java 編譯器和 JVM 就不會對您代碼中的操作重新排序,這使得問題變得更加複雜。例如:如果兩個相加操作使用了不同的變量,編譯器或 JVM 可以安裝與指定的順序相反的順序執行這些操作,隻要程式不在兩個操作都完成之前使用兩個變量的總數。這種重新排序操作的靈活性有助于提高 Java 性能,但一緻性隻被允許應用在單個線程中。硬體也有可能帶來線程問題。現代系統使用了多種緩存記憶體級别,一般來講,不是系統中的所有核心都能同樣看到這些緩存。當某個核心修改記憶體中的一個值時,其他核心可能不會立即看到此更改。

由于這些問題,在一個線程使用另一個線程修改的資料時,您必須顯式地控制線程互動方式。Java 使用了特殊的操作來提供這種控制,在不同線程看到的資料視圖中建立順序。基本操作是,線程使用 

synchronized

 關鍵字來通路一個對象。當某個線程在一個對象上保持同步時,該線程将會獲得此對象所獨有的一個鎖的獨占通路。如果另一個線程已持有該鎖,等待擷取該鎖的線程必須等待,或者被阻塞,直到該鎖被釋放。當該線程在一個 

synchronized

 代碼塊内恢複執行時,Java 會保證該線程可以 “看到了” 以前持有同一個鎖的其他線程寫入的所有資料,但隻是這些線程通過離開自己的 

synchronized

 鎖來釋放該鎖之前寫入的資料。這種保證既适用于編譯器或 JVM 所執行的操作的重新排序,也适用于硬體記憶體緩存。一個 

synchronized

 塊的内部是您代碼中的一個穩定性孤島,其中的線程可依次安全地執行、互動和共享資訊。

在變量上對 

volatile

 關鍵字的使用,為線程間的安全互動提供了一種稍微較弱的形式。

synchronized

 關鍵字可確定在您擷取該鎖時可以看到其他線程的存儲,而且在您之後,擷取該鎖的其他線程也會看到您的存儲。

volatile

 關鍵字将這一保證分解為兩個不同的部分。如果一個線程向

volatile

 變量寫入資料,那麼首先将會擦除它在這之前寫入的資料。如果某個線程讀取該變量,那麼該線程不僅會看到寫入該變量的值,還會看到寫入的線程所寫入的其他所有值。是以讀取一個 #p#分頁标題#e#

volatile

 變量會提供與輸入 一個 

synchronized

 塊相同的記憶體保證,而且寫入一個

volatile

 變量會提供與離開 一個 

synchronized

 塊相同的記憶體保證。但二者之間有很大的差别:

volatile

 變量的讀取或寫入絕不會受阻塞。

抽象 Java 并發性

同步很有用,而且許多多線程應用程式都是在 Java 中僅使用基本的 

synchronized

 塊開發出來的。但協調線程可能很麻煩,尤其是在處理許多線程和許多塊的時候。確定線程僅在安全的方式下互動并 避免潛在的死鎖(兩個或更多線程等待對方釋放鎖之後才能繼續執行),這很困難。支援并發性而不直接處理線程和鎖的抽象,這為開發人員提供了處理常見用例的更好方法。

java.util.concurrent

 分層結構包含一些集合變形,它們支援并發通路、針對原子操作的包裝器類,以及同步原語。這些類中的許多都是為支援非阻塞通路而設計的,這避免了死鎖的問題,而且實作了更高效的線程。這些類使得定義和控制線程之間的互動變得更容易,但他們仍然面臨着基本線程模型的一些複雜性。

java.util.concurrent

 包中的一對抽象,支援采用一種更加分離的方法來處理并發性:

Future<T>

 接口、

Executor

 和

ExecutorService

 接口。這些相關的接口進而成為了對 Java 并發性支援的許多 Scala 和 Akka 擴充的基礎,是以更詳細地了解這些接口和它們的實作是值得的。

Future<T>

 是一個 

T

 類型的值的持有者,但奇怪的是該值一般在建立 

Future

 之後才能使用。正确執行一個同步操作後,才會獲得該值。收到

Future

 的線程可調用方法來:

  • 檢視該值是否可用
  • 等待該值變為可用#p#分頁标題#e#
  • 在該值可用時擷取它
  • 如果不再需要該值,則取消該操作

Future

 的具體實作結構支援處理異步操作的不同方式。

Executor

 是一種圍繞某個執行任務的東西的抽象。這個 “東西” 最終将是一個線程,但該接口隐藏了該線程處理執行的細節。

Executor

 本身的适用性有限,

ExecutorService

 子接口提供了管理終止的擴充方法,并為任務的結果生成了 

Future

Executor

 的所有标準實作還會實作

ExecutorService

,是以實際上,您可以忽略根接口。

線程是相對重量級的資源,而且與配置設定并丢棄它們相比,重用它們更有意義。

ExecutorService

 簡化了線程間的工作共享,還支援自動重用線程,實作了更輕松的程式設計和更高的性能。

ExecutorService

 的 

ThreadPoolExecutor

 實作管理着一個執行任務的線程池。

應用 Java 并發性

并發性的實際應用常常涉及到需要與您的主要處理邏輯獨立的外部互動的任務(與使用者、存儲或其他系統的互動)。這類應用很難濃縮為一個簡單的示例,是以在示範并發性的時候,人們通常會使用簡單的計算密集型任務,比如數學計算或排序。我将使用一個類似的示例。

任務是找到離一個未知的輸入最近的已知單詞,其中的最近 是按照Levenshtein 距離 來定義的:将輸入轉換為已知的單詞所需的最少的字元增加、删除或更改次數。我使用的代碼基于 Wikipedia 上的 Levenshtein 距離 文章中的一個示例,該示例計算了每個已知單詞的 Levenshtein 距離,并傳回最佳比對值(或者如果多個已知的單詞擁有相同的距離,那麼傳回結果是不确定的)。

清單 1 給出了計算 Levenshtein 距離的 Java 代碼。該計算生成一個矩陣,将行和列與兩個對比的文本的大小進行比對,在每個次元上加 1。為了提高效率,此實作使用了一對大小與目标文本相同的數組來表示矩陣的連續行,将這些數組包裝在每個循環中,因為我隻需要上一行的值就可以計算下一行。#p#分頁标題#e#

清單 1. Java 中的 Levenshtein 距離計算
  1. private int editDistance(String word, int[] v0, int[] v1) {  
  2. #p#分頁标題#e#   
  3.     // initialize v0 (prior row of distances) as edit distance for empty 'word'  
  4.     for (int i = 0; i < v0.length; i++) {  
  5.         v0[i] = i;  
  6.     }  
  7.    #p#分頁标題#e#
  8.     // calculate updated v0 (current row distances) from the previous row v0  
  9.     for (int i = 0; i < word.length(); i++) {  
  10.         // first element of v1 = delete (i+1) chars from target to match empty 'word'  
  11. #p#分頁标題#e#        v1[0] = i + 1;  
  12.         // use formula to fill in the rest of the row  
  13.         for (int j = 0; j < targetText.length(); j++) {  
  14.             int#p#分頁标題#e# cost = (word.charAt(i) == targetText.charAt(j)) ? 0 : 1;  
  15.             v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost);  
  16.         }  
  17.         // swap v1 (current row) and v0 (previous row) for next iteration #p#分頁标題#e# 
  18.         int[] hold = v0;  
  19.         v0 = v1;  
  20.         v1 = hold;  
  21.     }  
  22.     // return final value representing best edit distance  #p#分頁标題#e#
  23.     return v0[targetText.length()];  

如果有大量已知詞彙要與未知的輸入進行比較,而且您在一個多核系統上運作,那麼您可以使用并發性來加速處理:将已知單詞的集合分解為多個塊,将每個塊作為一個獨立任務來處理。通過更改每個塊中的單詞數量,您可以輕松地更改任務分解的粒度,進而了解它們對總體性能的影響。清單 2 給出了分塊計算的 Java 代碼,摘自 示例代碼 中的 

ThreadPoolDistance

 類。清單 2 使用一個标準的 

ExecutorService

,将線程數量設定為可用的處理器數量。

清單 2. 在 Java 中通過多個線程來執行分塊的距離計算
  1. private final ExecutorService threadPool;  #p#分頁标題#e#
  2. private final String[] knownWords;  
  3. private final int blockSize;  
  4. public ThreadPoolDistance(String[] words, int block) {  
  5. #p#分頁标題#e#    threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());  
  6.     knownWords = words;  
  7.     blockSize = block;  
  8. }  
  9. public DistancePair bestMatch(String target) {  
  10. #p#分頁标題#e#    // build a list of tasks for matching to ranges of known words  
  11.     List<DistanceTask> tasks = new ArrayList<DistanceTask>();  
  12.     int size = 0;  
  13.     for (int base = 0#p#分頁标題#e#; base < knownWords.length; base += size) {  
  14.         size = Math.min(blockSize, knownWords.length - base);  
  15.         tasks.add(new DistanceTask(target, base, size));  
  16.     }  
  17.     DistancePair best;  
  18.     try {  
  19. #p#分頁标題#e#   
  20.         // pass the list of tasks to the executor, getting back list of futures  
  21.         List<Future<DistancePair>> results = threadPool.invokeAll(tasks);  
  22.         // find the best result, waiting for each future to complete  
  23.         best = DistancePair.WORST_CASE;  
  24. #p#分頁标題#e#        for (Future<DistancePair> future: results) {  
  25.             DistancePair result = future.get();  
  26.             best = DistancePair.best(best, result);  
  27.         }  
  28.     } catch (InterruptedException e) {  
  29. #p#分頁标題#e#        throw new RuntimeException(e);  
  30.     } catch (ExecutionException e) {  
  31.         throw new RuntimeException(e);  
  32.     }  
  33.     return best;  
  34. #p#分頁标題#e#}  
  35. public class DistanceTask implements Callable<DistancePair>  
  36. #p#分頁标題#e#{  
  37.     private final String targetText;  
  38.     private final int startOffset;  
  39.     private final int compareCount;  
  40. #p#分頁标題#e#   
  41.     public DistanceTask(String target, int offset, int count) {  
  42.         targetText = target;  
  43.         startOffset = offset;  
  44.         compareCount = count;  
  45.     }  
  46. #p#分頁标題#e#   
  47.     private int editDistance(String word, int[] v0, int[] v1) {  
  48.         ...  
  49.     }  
  50.     @Override 
  51.     public DistancePair call() throws Exception {  
  52. #p#分頁标題#e#        // directly compare distances for comparison words in range  
  53.         int[] v0 = new int[targetText.length() + 1];  
  54.         int[] v1 = new int[targetText.length() + 1];  
  55. #p#分頁标題#e#        int bestIndex = -1;  
  56.         int bestDistance = Integer.MAX_VALUE;  
  57.         boolean single = false;  
  58.         for (int i = 0; i < compareCount; i++) {  #p#分頁标題#e#
  59.             int distance = editDistance(knownWords[i + startOffset], v0, v1);  
  60.             if (bestDistance > distance) {  
  61.                 bestDistance = distance;  
  62.                 bestIndex = i + startOffset;  
  63.                 single = true;  
  64. #p#分頁标題#e#            } else if (bestDistance == distance) {  
  65.                 single = false;  
  66.             }  
  67.         }  
  68.         return single ? new DistancePair(bestDistance, knownWords[bestIndex]) :  
  69. #p#分頁标題#e#                new DistancePair(bestDistance);  
  70.     }  

清單 2 中的 

bestMatch()

 方法構造一個 

DistanceTask

 距離清單,然後将該清單傳遞給 

ExecutorService

。這種對 

ExecutorService

 的調用形式将會接受一個 

Collection<? extends Callable<T>>

 類型的參數,該參數表示要執行的任務。該調用傳回一個 

Future<T>

 清單,用它來表示執行的結果。

ExecutorService

 使用在每個任務上調用 

call()

 方法所傳回的值,異步填寫這些結果。在本例中,

T

 類型為

DistancePair

— 一個表示距離和比對的單詞的簡單的值對象,或者在沒有找到惟一比對值時近表示距離。

bestMatch()

 方法中執行的原始線程依次等待每個 

Future

 完成,累積最佳的結果并在完成時傳回它。通過多個線程來處理 

DistanceTask

 的執行,原始線程隻需等待一小部分結果。剩餘結果可與原始線程等待的結果并發地完成。

并發性性能

要充分利用系統上可用的處理器數量,必須為 

ExecutorService

 配置至少與處理器一樣多的線程。您還必須将至少與處理器一樣多的任務傳遞給#p#分頁标題#e#

ExecutorService

 來執行。實際上,您或許希望擁有比處理器多得多的任務,以實作最佳的性能。這樣,處理器就會繁忙地處理一個接一個的任務,近在最後才空閑下來。但是因為涉及到開銷(在建立任務和 future 的過程中,在任務之間切換線程的過程中,以及最終傳回任務的結果時),您必須保持任務足夠大,以便開銷是按比例減小的。

圖 1 展示了我在使用 Oracle 的 Java 7 for 64-bit Linux® 的四核 AMD 系統上運作測試代碼時測量的不同任務數量的性能。每個輸入單詞依次與 12,564 個已知單詞相比較,每個任務在一定範圍的已知單詞中找到最佳的比對值。全部 933 個拼寫錯誤的輸入單詞會重複運作,每輪運作之間會暫停片刻供 JVM 處理,該圖中使用了 10 輪運作後的最佳時間。從圖 1 中可以看出,每秒的輸入單詞性能在合理的塊大小範圍内(基本來講,從 256 到大于 1,024)看起來是合理的,隻有在任務變得非常小或非常大時,性能才會極速下降。對于塊大小 16,384,最後的值近建立了一個任務,是以顯示了單線程性能。

圖 1. 

ThreadPoolDistance

 性能
JVM 并發性: Java 和 Scala 并發性基礎(1)

Fork-Join

Java 7 引入了 

ExecutorService

 的另一種實作:

ForkJoinPool

 類。

ForkJoinPool

 是為高效處理可反複分解為子任務的任務而設計的,它使用 

RecursiveAction

 類(在任務未生成結果時)或 

RecursiveTask<T>

 類(在任務具有一個 

T

 類型的結果時)來處理任務。

RecursiveTask<T>

 提供了一種合并子任務結果的便捷方式,如清單 3 所示。

清單 3. 

RecursiveTask<DistancePair>

 示例
  1. #p#分頁标題#e#private ForkJoinPool threadPool = new ForkJoinPool();  
  2. private final String[] knownWords;  
  3. private final int blockSize;  
  4. #p#分頁标題#e#   
  5. public ForkJoinDistance(String[] words, int block) {  
  6.     knownWords = words;  
  7.     blockSize = block;  
  8. }  
  9. public DistancePair bestMatch(String target) {  #p#分頁标題#e#
  10.     return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords));  
  11. }  
  12. public class DistanceTask extends RecursiveTask<DistancePair>  
  13. {  
  14.     private final String compareText;  
  15.     private final#p#分頁标題#e# int startOffset;  
  16.     private final int compareCount;  
  17.     private final String[] matchWords;  
  18.     public#p#分頁标題#e# DistanceTask(String from, int offset, int count, String[] words) {  
  19.         compareText = from;  
  20.         startOffset = offset;  
  21.         compareCount = count;  
  22.         matchWords = words;  
  23.     }  
  24. #p#分頁标題#e#   
  25.     private int editDistance(int index, int[] v0, int[] v1) {  
  26.         ...  
  27.     }  
  28. #p#分頁标題#e#     
  29.     @Override 
  30.     protected DistancePair compute() {  
  31.         if#p#分頁标題#e# (compareCount > blockSize) {  
  32.             // split range in half and find best result from bests in each half of range  
  33.             int half = compareCount / 2;  
  34.             DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords);  
  35. #p#分頁标題#e#            t1.fork();  
  36.             DistanceTask t2 = new DistanceTask(compareText, startOffset + half,  
  37.                 compareCount - half, matchWords);  
  38.             DistancePair p2 = t2.compute();  
  39.             return DistancePair.best(p2, t1.join());  
  40.         }  
  41. #p#分頁标題#e#   
  42.         // directly compare distances for comparison words in range  
  43.         int[] v0 = new int[compareText.length() + 1];  
  44.         int[] v1 = new int[compareText.length() + 1#p#分頁标題#e#];  
  45.         int bestIndex = -1;  
  46.         int bestDistance = Integer.MAX_VALUE;  
  47.         boolean single = false;  
  48.         for (int#p#分頁标題#e# i = 0; i < compareCount; i++) {  
  49.             int distance = editDistance(i + startOffset, v0, v1);  
  50.             if (bestDistance > distance) {  
  51.                 bestDistance = distance;  
  52.                 bestIndex = i + startOffset;  
  53.                 single = #p#分頁标題#e#true;  
  54.             } else if (bestDistance == distance) {  
  55.                 single = false;  
  56.             }  
  57.         }  
  58.         return#p#分頁标題#e# single ? new DistancePair(bestDistance, knownWords[bestIndex]) :  
  59.             new DistancePair(bestDistance);  
  60.     }  

圖 2 顯示了清單 3 中的 

ForkJoin

 代碼與 清單 2 中的 

ThreadPool

 代碼的性能對比。

ForkJoin

 代碼在所有塊大小中穩定得多,僅在您隻有單個塊(意味着執行是單線程的)時性能會顯著下降。标準的 

ThreadPool

 代碼僅在塊大小為 256 和 1,024 時會表現出更好的性能。

圖 2. 

ThreadPoolDistance

 與 

ForkJoinDistance

 的性能對比
JVM 并發性: Java 和 Scala 并發性基礎(1)

這些結果表明,如果可調節應用程式中的任務大小來實作最佳的性能,那麼使用标準 #p#分頁标題#e#

ThreadPool

 比 

ForkJoin

 更好。但請注意,

ThreadPool

的 “最佳性能點” 取決于具體任務、可用處理器數量以及您系統的其他因素。一般而言,

ForkJoin

 以最小的調優需求帶來了優秀的性能,是以最好盡可能地使用它。

Scala 并發性基礎

Scala 通過許多方式擴充了 Java 程式設計語言和運作時,其中包括添加更多、更輕松的處理并發性的方式。對于初學者而言,

Future<T>

 的 Scala 版本比 Java 版本靈活得多。您可以直接從代碼塊中建立 future,可向 future 附加回調來處理這些 future 的完成。清單 4 顯示了 Scala future 的一些使用示例。該代碼首先定義了 

futureInt()

 方法,以便按需提供 

Future<Int>

,然後通過三種不同的方式來使用 future。

清單 4. Scala 

Future<T>

 示例代碼
  1. import ExecutionContext.Implicits.global  
  2. #p#分頁标題#e#val lastInteger = new AtomicInteger  
  3. def futureInt() = future {  
  4.   Thread sleep 2000 
  5.   lastInteger incrementAndGet  
  6. }  
  7. // use callbacks for completion of futures  #p#分頁标題#e#
  8. val a1 = futureInt  
  9. val a2 = futureInt  
  10. a1.onSuccess {  
  11.     case i1 => {  
  12.       a2.onSuccess {  
  13.         case i2 => println("Sum of values is "#p#分頁标題#e# + (i1 + i2))  
  14.       }  
  15.     }  
  16. }  
  17. Thread sleep 3000 
  18. // use for construct to extract values when futures complete  
  19. #p#分頁标題#e#val b1 = futureInt  
  20. val b2 = futureInt  
  21. for (i1 <- b1; i2 <- b2) yield println("Sum of values is " + (i1 + i2))  
  22. Thread sleep 3000 
  23. // wait directly for completion of futures  
  24. #p#分頁标題#e#val c1 = futureInt  
  25. val c2 = futureInt  
  26. println("Sum of values is " + (Await.result(c1, Duration.Inf) +  
  27.   Await.result(c2, Duration.Inf))) 

清單 4 中的第一個示例将回調閉包附加到一對 future 上,以便在兩個 future 都完成時,将兩個結果值的和列印到控制台上。回調是按照建立它們的順序直接嵌套在 future 上,但是,即使更改順序,它們也同樣有效。如果在您附加回調時 future 已完成,該回調仍會運作,但無法保證它會立即運作。原始執行線程會在 

Thread sleep 3000

 行上暫停,以便在進入下一個示例之前完成 future。

第二個示例示範了使用 Scala 

for

 comprehension 從 future 中異步提取值,然後直接在表達式中使用它們。

for

 comprehension 是一種 Scala 結構,可用于簡潔地表達複雜的操作組合(

map

filter

flatMap

 和 

foreach

)。它一般與各種形式的集合結合使用,但 Scala future 實作了相同的單值方法來通路集合值。是以可以使用 future 作為一種特殊的集合,一種包含最多一個值(可能甚至在未來某個時刻之前之後才包含該值)的集合。在這種情況下,

for

 語句要求擷取 future 的結果,并在表達式中使用這些結果值。在幕後,這種技術會生成與第一個示例完全相同的代碼,但以線性代碼的形式編寫它會得到更容易了解的更簡單的表達式。和第一個示例一樣,原始執行線程會暫停,以便在進入下一個示例之前完成 future。#p#分頁标題#e#

第三個示例使用阻塞等待來擷取 future 的結果。這與 Java future 的工作原理相同,但在 Scala 中,一個擷取最大等待時間參數的特殊

Await.result()

 方法調用會讓阻塞等待變得更為明顯。

清單 4 中的代碼沒有顯式地将 future 傳遞給 

ExecutorService

 或等效的對象,是以如果沒有使用過 Scala,那麼您可能想知道 future 内部的代碼是如何執行的。答案取決于 清單 4 中最上面一行:

import ExecutionContext.Implicits.global

。Scala API 常常為代碼塊中頻繁重用的參數使用 

implicit

 值。

future { }

 結構要求 

ExecutionContext

 以隐式參數的形式提供。這個 

ExecutionContext

 是 Java

ExecutorService

 的一個 Scala 包裝器,以相同方式用于使用一個或多個托管線程來執行任務。

除了 future 的這些基本操作之外,Scala 還提供了一種方式将任何集合轉換為使用并行程式設計的集合。将集合轉換為并行格式後,您在集合上執行的任何标準的 Scala 集合操作(比如 

map

filter

 或 

fold

)都會自動地盡可能并行完成。(本文稍後會在 清單 7 中提供一個相關示例,該示例使用 Scala 查找一個單詞的最佳比對值。)

錯誤處理

Java 和 Scala 中的 future 都必須解決錯誤處理的問題。在 Java 中,截至 Java 7,future 可抛出一個 

ExecutionException

 作為傳回結果的替代方案。應用程式可針對具體的失敗類型而定義自己的 

ExecutionException

 子類,或者可連鎖異常來傳遞詳細資訊,但這限制了靈活性。

Scala future 提供了更靈活的錯誤處理。您可以通過兩種方式完成 Scala future:成功時提供一個結果值(假設要求一個結果值),或者在失敗時提供一個關聯的 

Throwable

。您也可以采用多種方式處理 future 的完成。在 清單 4 中,

onSuccess

 方法用于附加回調來處理 future 的成功完成。您還可以使用 

onComplete

 來處理任何形式的完成(它将結果或 throwable 包裝在一個 #p#分頁标題#e#

Try

 中來适應兩種情況),或者使用 

onFailure

 來專門處理錯誤結果。Scala future 的這種靈活性擴充到了您可以使用 future 執行的所有操作,是以您可以将錯誤處理直接內建到代碼中。

這個 Scala 

Future<T>

 還有一個緊密相關的 

Promise<T>

 類。future 是一個結果的持有者,該結果在某個時刻可能可用(或不可用 — 無法内在地確定一個 future 将完成)。future 完成後,結果是固定的,不會發生改變。promise 是這個相同契約的另一端:結果的一個一次性、可配置設定的持有者,具有結果值或 throwable 的形式。可從 promise 擷取 future,在 promise 上設定了結果後,就可以在該 future 上設定此結果。

應用 Scala 并發性

現在您已熟悉一些基本的 Scala 并發性概念,是時候來了解一下解決 Levenshtein 距離問題的代碼了。清單 5 顯示了 Levenshtein 距離計算的一個比較符合語言習慣的 Scala 實作,該代碼基本上與 清單 1 中的 Java 代碼類似,但采用了函數風格。

清單 5. Scala 中的 Levenshtein 距離計算
  1. val limit = targetText.length  
  2. #p#分頁标題#e#def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {  
  3.   val length = word.length  
  4.   @tailrec 
  5.   def distanceByRow(rnum: Int, r0: Array[Int], r1: Array[Int]): Int = {  
  6.     if (rnum >= length) r0(limit)  #p#分頁标題#e#
  7.     else {  
  8.       // first element of r1 = delete (i+1) chars from target to match empty 'word'  
  9.       r1(0) = rnum + 1 
  10. #p#分頁标題#e#      // use formula to fill in the rest of the row  
  11.       for (j <- 0 until limit) {  
  12.         val cost = if (word(rnum) == targetText(j)) 0 else 1 
  13.         r1(j + 1) = min(r1(j) + 1#p#分頁标題#e#, r0(j + 1) + 1, r0(j) + cost);  
  14.       }  
  15.       // recurse with arrays swapped for next row  
  16.       distanceByRow(rnum + 1, r1, r0)  
  17.     }  
  18. #p#分頁标題#e#  }  
  19.   // initialize v0 (prior row of distances) as edit distance for empty 'word'  
  20.   for (i <- 0 to limit) v0(i) = i  
  21.   // recursively process rows matching characters in word being compared to find best  #p#分頁标題#e#
  22.   distanceByRow(0, v0, v1)  

清單 5 中的代碼對每個行值計算使用了尾部遞歸 

distanceByRow()

 方法。此方法首先檢查計算了多少行,如果該數字與檢查的單詞中的字元數比對,則傳回結果距離。否則會計算新的行值,然後遞歸地調用自身來計算下一行(将兩個行數組包裝在該程序中,以便正确地傳遞新的最新的行值)。Scala 将尾部遞歸方法轉換為與 Java 

while

 循環等效的代碼,是以保留了與 Java 代碼的相似性。

但是,此代碼與 Java 代碼之間有一個重大差別。清單 5 中的 

for

 comprehension 使用了閉包。閉包并不總是得到了目前 JVM 的高效處理(參閱Why is using for/foreach on a Range slow?,了解有關的詳細資訊),是以它們在該計算的最裡層循環上增加了大量開銷。如上所述,清單 5 中的代碼的運作速度沒有 Java 版本那麼快。清單 6 重寫了代碼,将 

for

 comprehension 替換為添加的尾部遞歸方法。這個版本要詳細得多,但執行效率與 Java 版本相當。

清單 6. 為提升性能而重新構造的計算代碼
  1. #p#分頁标題#e#val limit = targetText.length  
  2. def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {  
  3.   val length = word.length  
  4. #p#分頁标題#e#  @tailrec 
  5.   def distanceByRow(row: Int, r0: Array[Int], r1: Array[Int]): Int = {  
  6.     if (row >= length) r0(limit)  
  7.     else {  
  8.       // first element of v1 = delete (i+1) chars from target to match empty 'word'  #p#分頁标題#e#
  9.       r1(0) = row + 1 
  10.       // use formula recursively to fill in the rest of the row  
  11.       @tailrec 
  12.       def distanceByColumn(col: Int): Unit = {  
  13. #p#分頁标題#e#        if (col < limit) {  
  14.           val cost = if (word(row) == targetText(col)) 0 else 1 
  15.           r1(col + 1) = min(r1(col) + 1, r0(col + 1) + 1, r0(col) + cost)  
  16. #p#分頁标題#e#          distanceByColumn(col + 1)  
  17.         }  
  18.       }  
  19.       distanceByColumn(0)  
  20.       // recurse with arrays swapped for next row  
  21. #p#分頁标題#e#      distanceByRow(row + 1, r1, r0)  
  22.     }  
  23.   }  
  24.   // initialize v0 (prior row of distances) as edit distance for empty 'word'  
  25.   @tailrec 
  26. #p#分頁标題#e#  def initArray(index: Int): Unit = {  
  27.     if (index <= limit) {  
  28.       v0(index) = index  
  29.       initArray(index + 1)  
  30.     }  
  31.   }  
  32.   initArray(0#p#分頁标題#e#)  
  33.   // recursively process rows matching characters in word being compared to find best  
  34.   distanceByRow(0, v0, v1)  

清單 7 給出的 Scala 代碼執行了與 清單 2 中的 Java 代碼相同的阻塞的距離計算。

bestMatch()

 方法找到由 

Matcher

 類執行個體處理的特定單詞塊中與目标文本最比對的單詞,使用尾部遞歸 

best()

 方法來掃描單詞。

*Distance

 類建立多個 

Matcher

 執行個體,每個對應一個單詞塊,然後協調比對結果的執行群組合。

清單 7. Scala 中使用多個線程的一次阻塞距離計算
#p#分頁标題#e#      
  1. class Matcher(words: Array[String]) {  
  2.   def bestMatch(targetText: String) = {  
  3.     val limit = targetText.length  
  4.     val v0 = new#p#分頁标題#e# Array[Int](limit + 1)  
  5.     val v1 = new Array[Int](limit + 1)  
  6.     def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {  
  7.       ...  
  8.     }  
  9. #p#分頁标題#e#   
  10.     @tailrec 
  11.     def best(index: Int, bestDist: Int, bestMatch: Option[String]): DistancePair =  
  12.       if (index < words.length) {  
  13.         val newDist = editDistance(words(index), v0, v1)  #p#分頁标題#e#
  14.         val next = index + 1 
  15.         if (newDist < bestDist) best(next, newDist, Some(words(index)))  
  16.         else if (newDist == bestDist) best(next, bestDist, None)  
  17.         else best(next, bestDist, bestMatch)  
  18. #p#分頁标題#e#      } else DistancePair(bestDist, bestMatch)  
  19.     best(0, Int.MaxValue, None)  
  20.   }  
  21. }  
  22. class ParallelCollectionDistance(words: Array[String], size: Int) #p#分頁标題#e#extends TimingTestBase {  
  23.   val matchers = words.grouped(size).map(l => new Matcher(l)).toList  
  24.   def shutdown = {}  
  25.   def blockSize = size  
  26. #p#分頁标題#e#   
  27.   def bestMatch(target: String) = {  
  28.     matchers.par.map(m => m.bestMatch(target)).  
  29.       foldLeft(DistancePair.worstMatch)((a, m) => DistancePair.best(a, m))  
  30.   }  
  31. }  
  32. #p#分頁标題#e#   
  33. class DirectBlockingDistance(words: Array[String], size: Int) extends TimingTestBase {  
  34.   val matchers = words.grouped(size).map(l => new Matcher(l)).toList  
  35.   def shutdown = {}  
  36. #p#分頁标題#e#   
  37.   def blockSize = size  
  38.   def bestMatch(target: String) = {  
  39.     import ExecutionContext.Implicits.global  
  40.     val futures = matchers.map(m => future { m.bestMatch(target) })  #p#分頁标題#e#
  41.     futures.foldLeft(DistancePair.worstMatch)((a, v) =>  
  42.       DistancePair.best(a, Await.result(v, Duration.Inf)))  
  43.   }  

清單 7 中的兩個 

*Distance

 類顯示了協調 

Matcher

 結果的執行群組合的不同方式。

ParallelCollectionDistance

 使用前面提到的 Scala 的并行集合 feature 來隐藏并行計算的細節,隻需一個簡單的 

foldLeft

 就可以組合結果。

DirectBlockingDistance

 更加明确,它建立了一組 future,然後在該清單上為每個結果使用一個 

foldLeft

 和嵌套的阻塞等待。

性能再分析

清單 7 中的兩個 

*Distance

 實作都是處理 

Matcher

 結果的合理方法。(它們不僅合理,而且非常高效。示例代碼#p#分頁标題#e# 包含我在試驗中嘗試的其他兩種實作,但未包含在本文中。)在這種情況下,性能是一個主要問題,是以圖 3 顯示了這些實作相對于 Java 

ForkJoin

 代碼的性能。

圖 3. 

ForkJoinDistance

 與 Scala 替代方案的性能對比
JVM 并發性: Java 和 Scala 并發性基礎(1)

圖 3 顯示,Java 

ForkJoin

 代碼的性能比每種 Scala 實作都更好,但 

DirectBlockingDistance

 在 1,024 的塊大小下提供了更好的性能。兩種 Scala 實作在大部分塊大小下,都提供了比 清單 1 中的 

ThreadPool

 代碼更好的性能。

這些性能結果僅是示範結果,不具權威性。如果您在自己的系統上運作計時測試,可能會看到不同的性能,尤其在使用不同數量的核心的時候。如果希望為距離任務獲得最佳的性能,那麼可以實作一些優化:可以按照長度對已知單詞進行排序,首先與長度和輸入相同的單詞進行比較(因為編輯距離總是不低于與單詞長度之差)。或者我可以在距離計算超出之前的最佳值時,提前退出計算。但作為一個相對簡單的算法,此試驗公平地展示了兩種并發操作是如何提高性能的,以及不同的工作共享方法的影響。

在性能方面,清單 7 中的 Scale 控制代碼與 清單 2 和 清單 3 中的 Java 代碼的對比結果很有趣。Scala 代碼短得多,而且(假設您熟悉 Scala!)比 Java 代碼更清晰。Scala 和 Java 可很好的互相操作,您可以在本文的 完整示例代碼 中看到:Scala 代碼對 Scala 和 Java 代碼都運作了計時測試,Java 代碼進而直接處理 Scala 代碼的各部分。得益于這種輕松的互操作性,您可以将 Scala 引入現有的 Java 代碼庫中,無需進行通盤修改。最初使用 Scala 為 Java 代碼實作高水準控制常常很有用,這樣您就可以充分利用 Scala 強大的表達特性,同時沒有閉包或轉換的任何重大性能影響。

清單 7 中的 

ParallelCollectionDistance

 Scala 代碼的簡單性非常具有吸引力。使用此方法,您可以從代碼中完全抽象出并發性,進而編寫類似單線程應用程式的代碼,同時仍然獲得多個處理器的優勢。幸運的是,對于喜歡此方法的簡單性但又不願意或無法執行 Scala 開發的人而言,Java 8 帶來了一種執行直接的 Java 程式設計的類似特性。#p#分頁标題#e#

結束語

現在您已經了解了 Java 和 Scala 并發性操作的基礎知識,本系列下一篇文章将介紹 Java 8 如何改進對 Java 的并發性支援(以及從長遠來講,可能對 Scala 的并發性支援)。Java 8 的許多改動您看起來可能都很熟悉(Scala 并發性特性中使用的許多相同的概念都包含在 Java 8 中),是以您很快就能夠在普通的 Java 代碼中使用一些 Scala 技術。請閱讀下一期文章,了解應該如何做。