天天看點

Java8總結之并發增強

java8中對并發進行了一些增強優化。簡單總結一下

原子值

從 Java5 開始,java.util.concurrent.atonic包提供了用于支援無鎖可變變量的類。
/**
 * 測試java8的并發增強
 * @author lianghaining
 *
 */
public class TestThread {

	public static AtomicLong num = new AtomicLong();
	
	@Test
	public void testCAS() {
		
		Long id = num.incrementAndGet();
		
	}
	
}
           

incrementAndGet 方法會自動将AtomicLong 的值加 1,并傳回增加後的值,并保證該操作不能被打斷。同時在多線程同時并發通路同一個執行個體,也能夠計算并傳回正确的值。

在 java5 中提供很多設定、增加、減少值的原子操作。但如果你想要進行更複雜的更新操作,就必須使用compareAndSet方法。看下面的代碼

public static AtomicLong num = new AtomicLong();
	@Test
	public void testGetMaxNum() throws InterruptedException {
        Thread t1 = new Thread(new LoopVolatile());
        t1.start();
          
        Thread t2 = new Thread(new LoopVolatile2());
        t2.start();
          
        while (t1.isAlive() || t2.isAlive()) {
        }
  
        System.out.println("final val is: " + num.get());
	}
	
	 private static class LoopVolatile implements Runnable {
	        public void run() {
	            long val = 0;
	            while (val < 10000000L) {
	            	num.set(num.get()+1L);
	                val++;
	            }
	        }
	    }
	      
	    private static class LoopVolatile2 implements Runnable {
	        public void run() {
	            long val = 0;
	            while (val < 10000000L) {
	            	num.set(num.get()+1L);
	                val++;
	            }
	        }
	    }
	    傳回結果:
	    final val is: 8596441
	    final val is: 8379896
           

看傳回結果可知如果是線程安全的應為:20000000,但不是,而且每一次運作都不一樣。這是因為

num.set(num.get()+1L)

并不是原子性的。可以看AtomicLong 中set方法。

private volatile long value; 
    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(long newValue) {
        value = newValue;
    }
           

可以看出set隻保證了可見性,并不能保證原子性。有興趣可以看另一個人的部落格為什麼volatile不能保證原子性而Atomic可以你應該在一個循環裡使用comparenAndSet來計算新值

如下:

private static class LoopVolatile implements Runnable {
	        public void run() {
	            long val = 0;
	            while (val < 10000000L) {
	            	//num.set(num.get()+1L);
	            	long update ;
	            	long oldValue ;
	            	do{
	            		oldValue = num.get();
	            		update = oldValue + 1;
	            	}while(!num.compareAndSet(oldValue, update));
	            	//num.updateAndGet(x->(x+1));
	                val++;
	            }
	        }
	    }
	      
	    private static class LoopVolatile2 implements Runnable {
	        public void run() {
	            long val = 0;
	            while (val < 10000000L) {
	            	//num.set(num.get()+1L);
	            	long update ;
	            	long oldValue ;
	            	do{
	            		oldValue = num.get();
	            		update = oldValue + 1;
	            	}while(!num.compareAndSet(oldValue, update));
	            	//num.updateAndGet(x->(x+1));
	                val++;
	            }
	        }
	    }
	    傳回結果:final val is: 20000000
           

如果另一個線程也在更新

num

,很可能它已經先更新成功了。那麼随後compareAndSet會傳回false,并不會設定新值。此時程式再次嘗試循環,讀取更新後的值并試圖改變它。最終,它成功地将已有值替換為新值。這遠比有鎖要快得多。

Now。在Java8中,你不必再編寫循環了,隻需要提供一個用來更新值的lambda表達式,更新操作自動完成。如下

private static class LoopVolatile implements Runnable {
	        public void run() {
	            long val = 0;
	            while (val < 10000000L) {
	            	num.updateAndGet(x->(x+1));
	            	//或num.accumulateAndGet(1,(x,y)-> x+y); 
	                val++;
	            }
	        }
	    }
	      
	    private static class LoopVolatile2 implements Runnable {
	        public void run() {
	            long val = 0;
	            while (val < 10000000L) {
	            	num.updateAndGet(x->(x+1));
	            	//或num.accumulateAndGet(1,(x,y)-> x+y); 
	                val++;
	            }
	        }
	    }
           

除了它之外,Java 8還提供了傳回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。

當你有大量線程通路同一個原子值時,由于AtomicXXX的樂觀鎖更新需要太多次重試,是以會導緻性能嚴重下降。為此,Java8 提供了LongAdder 和 LongAccumulator 來解決該問題。這兩個原碼我以後單獨說一下,如果感興趣可以看一下

從LONGADDER看更高效的無鎖實作 皓哥的部落格有介紹過。

這裡寫說結論:LongAdder 由多個變量組成,這些變量累加的值即為目前值。多個線程可以更新不同的被加數,當線程數量增加時,會自動增加新的被加數。由于通常情況下都是直到所有工作完成後才需要總和值,是以這種方法效率很高。

如果你的業務存在高度競争,那麼應該選擇 LongAdder 來代替AtomicLong.

LongAccumulator 将這個思想帶到了任意的累加操作中。如下

LongAccumulator adder = new LongAccumulator(Long::sum, 0);
與
LongAdder是一樣的
           

ConcurrentHashMap改進

在多線程中ConcurrentHashMap是常用的一個類型,在1.8相對于1.7有很大的不同。無論是哈希沖突下大數量查詢時雙向連結清單的效率低下,還是在多線程下,分段鎖實作的複雜。1.8中都做了不同的實作。

1. 1.7實作

資料結構

jdk1.7中采用Segment + HashEntry的方式進行實作,結構如下:

Java8總結之并發增強

ConcurrentHashMap初始化時,計算出Segment數組的大小ssize和每個Segment中HashEntry數組的大小cap,并初始化Segment數組的第一個元素;其中ssize大小為2的幂次方,預設為16,cap大小也是2的幂次方,最小值為2,最終結果根據根據初始化容量initialCapacity進行計算,計算過程如下:

if (c * ssize < initialCapacity)
    ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
    cap <<= 1;
           

其中Segment在實作上繼承了ReentrantLock,這樣就自帶了鎖的功能。

put實作

當執行put方法插入資料時,根據key的hash值,在Segment數組中找到相應的位置,如果相應位置的Segment還未初始化,則通過CAS進行指派,接着執行Segment對象的put方法通過加鎖機制插入資料,實作如下:

場景:線程A和線程B同時執行相同Segment對象的put方法

1、線程A執行tryLock()方法成功擷取鎖,則把HashEntry對象插入到相應的位置;

2、線程B擷取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重複執行tryLock()方法嘗試擷取鎖,在多處理器環境下,重複次數為64,單處理器重複次數為1,當執行tryLock()方法的次數超過上限時,則執行lock()方法挂起線程B;

3、當線程A執行完插入操作時,會通過unlock()方法釋放鎖,接着喚醒線程B繼續執行;

size實作

因為ConcurrentHashMap是可以并發插入資料的,是以在準确計算元素時存在一定的難度,一般的思路是統計每個Segment對象中的元素個數,然後進行累加,但是這種方式計算出來的結果并不一樣的準确的,因為在計算後面幾個Segment的元素個數時,已經計算過的Segment同時可能有資料的插入或則删除,在1.7的實作中,采用了如下方式:

try {
    for (;;) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0)
                    overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}
           

先采用不加鎖的方式,連續計算元素的個數,最多計算3次:

1、如果前後兩次計算結果相同,則說明計算出來的元素個數是準确的;

2、如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數;

2.1.8實作

資料結構

1.8中放棄了Segment臃腫的設計,取而代之的是采用Node + CAS + Synchronized來保證并發安全進行實作,結構如下:

Java8總結之并發增強

隻有在執行第一次put方法時才會調用initTable()初始化Node數組,實作如下:

/**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
           

put實作

當執行put方法插入資料時,根據key的hash值,在Node數組中找到相應的位置,實作如下:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
           

1、如果相應位置的Node還未初始化,則通過CAS插入相應的資料;

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}
           

2、如果相應位置的Node不為空,且目前該節點不處于移動狀态,則對該節點加synchronized鎖,如果該節點的hash不小于0,則周遊連結清單更新節點或插入新節點;

if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
        K ek;
        if (e.hash == hash &&
            ((ek = e.key) == key ||
             (ek != null && key.equals(ek)))) {
            oldVal = e.val;
            if (!onlyIfAbsent)
                e.val = value;
            break;
        }
        Node<K,V> pred = e;
        if ((e = e.next) == null) {
            pred.next = new Node<K,V>(hash, key, value, null);
            break;
        }
    }
}
           

3、如果該節點是TreeBin類型的節點,說明是紅黑樹結構,則通過putTreeVal方法往紅黑樹中插入節點;

else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
        oldVal = p.val;
        if (!onlyIfAbsent)
            p.val = value;
    }
}
           

4、如果binCount不為0,說明put操作對資料産生了影響,如果目前連結清單的個數達到8個,則通過treeifyBin方法轉化為紅黑樹,如果oldVal不為空,說明是一次更新操作,沒有對元素個數産生影響,則直接傳回舊值;

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}
           

5、如果插入的是一個新節點,則執行addCount()方法嘗試更新元素個數baseCount;

size實作

1.8中使用一個volatile類型的變量baseCount記錄元素的個數,當插入新資料或則删除資料時,會通過addCount()方法更新baseCount,實作如下:

if ((as = counterCells) != null ||
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}
           

1、初始化時counterCells為空,在并發量很高時,如果存在兩個線程同時執行CAS修改baseCount值,則失敗的線程會繼續執行方法體中的邏輯,使用CounterCell記錄元素個數的變化;

2、如果CounterCell數組counterCells為空,調用fullAddCount()方法進行初始化,并插入對應的記錄數,通過CAS設定cellsBusy字段,隻有設定成功的線程才能初始化CounterCell數組,實作如下:

else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}
           

3、如果通過CAS設定cellsBusy字段失敗的話,則繼續嘗試通過CAS修改baseCount字段,如果修改baseCount字段成功的話,就退出循環,否則繼續循環插入CounterCell對象;

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    break;
           

是以在1.8中的size實作比1.7簡單多,因為元素個數儲存baseCount中,部分元素的變化個數儲存在CounterCell數組中,實作如下:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
 
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
           

通過累加baseCount和CounterCell數組中的數量,即可得到元素的總個數;

通過測試1.8的性能在get和size時有明顯的性能優勢,put的性能與1.7相當

CompletableFuture

這裡引入簡書的一個文章。常用的已經說了CompletableFuture詳解

CompletableFuture類實作了CompletionStage和Future接口。Future是Java 5添加的類,用來描述一個異步計算的結果,但是擷取一個結果時方法較少,要麼通過輪詢isDone,确認完成後,調用get()擷取值,要麼調用get()設定一個逾時時間。但是這個get()方法會阻塞住調用線程,這種阻塞的方式顯然和我們的異步程式設計的初衷相違背。

為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴充功能形成了CompletableFuture。

CompletionStage是一個接口,從命名上看得知是一個完成的階段,它裡面的方法也标明是在某個運作階段得到了結果之後要做的事情。

  1. 進行變換
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
           

首先說明一下已Async結尾的方法都是可以異步執行的,如果指定了線程池,會在指定的線程池中執行,如果沒有指定,預設會在ForkJoinPool.commonPool()中執行,下文中将會有好多類似的,都不詳細解釋了。關鍵的入參隻有一個Function,它是函數式接口,是以使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,傳回值是經過轉化後結果。

例如:

@Test
 public void thenApply() {
   String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
   System.out.println(result);
   }
           

結果為:

hello world
           
  1. 進行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
           

thenAccept是針對結果進行消耗,因為他的入參是Consumer,有入參無傳回值。

例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}
           

結果為:

hello world
           
  1. 對上一步的計算結果不關心,執行下一個操作。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
           

thenRun它的入參是一個Runnable的執行個體,表示當得到上一步的結果時的操作。

例如:

@Test
    public void thenRun(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(() -> System.out.println("hello world"));
        while (true){}
    }
           

結果為:

hello world 
           
  1. 結合兩個CompletionStage的結果,進行轉化後傳回
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
           

它需要原來的處理傳回值,并且other代表的CompletionStage也要傳回值之後,利用這兩個傳回值,進行轉換後傳回指定類型的值。

例如:

@Test
    public void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }
           

結果為:

hello world
           
  1. 結合兩個CompletionStage的結果,進行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
           

它需要原來的處理傳回值,并且other代表的CompletionStage也要傳回值之後,利用這兩個傳回值,進行消耗。

例如:

@Test
    public void thenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> System.out.println(s1 + " " + s2));
        while (true){}
    }
           

結果為:

hello world
           
  1. 在兩個CompletionStage都運作完執行。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
           

不關心這兩個CompletionStage的結果,隻關心這兩個CompletionStage執行完畢,之後在進行操作(Runnable)。

例如:

@Test
    public void runAfterBoth(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true){}
    }
           

結果為

hello world
           
  1. 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
           

我們現實開發場景中,總會碰到有兩種管道完成同一個事情,是以就可以調用這個方法,找一個最快的結果進行處理。

例如:

@Test
    public void applyToEither() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), s -> s).join();
        System.out.println(result);
    }
           

結果為:

hello world
           
  1. 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
           

例如:

@Test
    public void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }
           

結果為:

hello world
           
  1. 兩個CompletionStage,任何一個完成了都會執行下一步的操作(Runnable)。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
           

例如:

@Test
    public void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }
           

結果為:

hello world
           
  1. 當運作時出現了異常,可以通過exceptionally進行補償。

例如:

@Test
    public void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }
           

結果為:

java.lang.RuntimeException: 測試一下異常情況
hello world
           
  1. 當運作完成時,對結果的記錄。這裡的完成時有兩種情況,一種是正常執行,傳回值。另外一種是遇到異常抛出造成程式的中斷。這裡為什麼要說成記錄,因為這幾個方法都會傳回CompletableFuture,當Action執行完畢後它的結果傳回原始的CompletableFuture的計算結果或者傳回異常。是以不會對結果産生任何的作用。
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
           

例如:

@Test
    public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }
           

結果為:

null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world
           

這裡也可以看出,如果使用了exceptionally,就會對最終的結果産生影響,它沒有口子傳回如果沒有異常時的正确的值,這也就引出下面我們要介紹的handle。

  1. 運作完成時,對結果的處理。這裡的完成時有兩種情況,一種是正常執行,傳回值。另外一種是遇到異常抛出造成程式的中斷。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
           

例如:

出現異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現異常
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }
           

結果為:

hello world
           

未出現異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }
           

結果為:

s1
           

上面就是CompletionStage接口中方法的使用執行個體,CompletableFuture同樣也同樣實作了Future,是以也同樣可以使用get進行阻塞擷取值,總的來說,CompletableFuture使用起來還是比較爽的,看起來也比較優雅一點。