java8中對并發進行了一些增強優化。簡單總結一下
原子值
從 Java5 開始,java.util.concurrent.atonic包提供了用于支援無鎖可變變量的類。
/**
* 測試java8的并發增強
* @author lianghaining
*
*/
public class TestThread {
public static AtomicLong num = new AtomicLong();
@Test
public void testCAS() {
Long id = num.incrementAndGet();
}
}
incrementAndGet 方法會自動将AtomicLong 的值加 1,并傳回增加後的值,并保證該操作不能被打斷。同時在多線程同時并發通路同一個執行個體,也能夠計算并傳回正确的值。
在 java5 中提供很多設定、增加、減少值的原子操作。但如果你想要進行更複雜的更新操作,就必須使用compareAndSet方法。看下面的代碼
public static AtomicLong num = new AtomicLong();
@Test
public void testGetMaxNum() throws InterruptedException {
Thread t1 = new Thread(new LoopVolatile());
t1.start();
Thread t2 = new Thread(new LoopVolatile2());
t2.start();
while (t1.isAlive() || t2.isAlive()) {
}
System.out.println("final val is: " + num.get());
}
private static class LoopVolatile implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.set(num.get()+1L);
val++;
}
}
}
private static class LoopVolatile2 implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.set(num.get()+1L);
val++;
}
}
}
傳回結果:
final val is: 8596441
final val is: 8379896
看傳回結果可知如果是線程安全的應為:20000000,但不是,而且每一次運作都不一樣。這是因為
num.set(num.get()+1L)
并不是原子性的。可以看AtomicLong 中set方法。
private volatile long value;
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(long newValue) {
value = newValue;
}
可以看出set隻保證了可見性,并不能保證原子性。有興趣可以看另一個人的部落格為什麼volatile不能保證原子性而Atomic可以你應該在一個循環裡使用comparenAndSet來計算新值
如下:
private static class LoopVolatile implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
//num.set(num.get()+1L);
long update ;
long oldValue ;
do{
oldValue = num.get();
update = oldValue + 1;
}while(!num.compareAndSet(oldValue, update));
//num.updateAndGet(x->(x+1));
val++;
}
}
}
private static class LoopVolatile2 implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
//num.set(num.get()+1L);
long update ;
long oldValue ;
do{
oldValue = num.get();
update = oldValue + 1;
}while(!num.compareAndSet(oldValue, update));
//num.updateAndGet(x->(x+1));
val++;
}
}
}
傳回結果:final val is: 20000000
如果另一個線程也在更新
num
,很可能它已經先更新成功了。那麼随後compareAndSet會傳回false,并不會設定新值。此時程式再次嘗試循環,讀取更新後的值并試圖改變它。最終,它成功地将已有值替換為新值。這遠比有鎖要快得多。
Now。在Java8中,你不必再編寫循環了,隻需要提供一個用來更新值的lambda表達式,更新操作自動完成。如下
private static class LoopVolatile implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.updateAndGet(x->(x+1));
//或num.accumulateAndGet(1,(x,y)-> x+y);
val++;
}
}
}
private static class LoopVolatile2 implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.updateAndGet(x->(x+1));
//或num.accumulateAndGet(1,(x,y)-> x+y);
val++;
}
}
}
除了它之外,Java 8還提供了傳回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。
當你有大量線程通路同一個原子值時,由于AtomicXXX的樂觀鎖更新需要太多次重試,是以會導緻性能嚴重下降。為此,Java8 提供了LongAdder 和 LongAccumulator 來解決該問題。這兩個原碼我以後單獨說一下,如果感興趣可以看一下
從LONGADDER看更高效的無鎖實作 皓哥的部落格有介紹過。
這裡寫說結論:LongAdder 由多個變量組成,這些變量累加的值即為目前值。多個線程可以更新不同的被加數,當線程數量增加時,會自動增加新的被加數。由于通常情況下都是直到所有工作完成後才需要總和值,是以這種方法效率很高。
如果你的業務存在高度競争,那麼應該選擇 LongAdder 來代替AtomicLong.
LongAccumulator 将這個思想帶到了任意的累加操作中。如下
LongAccumulator adder = new LongAccumulator(Long::sum, 0);
與
LongAdder是一樣的
ConcurrentHashMap改進
在多線程中ConcurrentHashMap是常用的一個類型,在1.8相對于1.7有很大的不同。無論是哈希沖突下大數量查詢時雙向連結清單的效率低下,還是在多線程下,分段鎖實作的複雜。1.8中都做了不同的實作。
1. 1.7實作
資料結構
jdk1.7中采用Segment + HashEntry的方式進行實作,結構如下:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9UkaNpXU61UeFpmYvhnMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzUTO0MDNzYTM4ITMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
ConcurrentHashMap初始化時,計算出Segment數組的大小ssize和每個Segment中HashEntry數組的大小cap,并初始化Segment數組的第一個元素;其中ssize大小為2的幂次方,預設為16,cap大小也是2的幂次方,最小值為2,最終結果根據根據初始化容量initialCapacity進行計算,計算過程如下:
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
其中Segment在實作上繼承了ReentrantLock,這樣就自帶了鎖的功能。
put實作
當執行put方法插入資料時,根據key的hash值,在Segment數組中找到相應的位置,如果相應位置的Segment還未初始化,則通過CAS進行指派,接着執行Segment對象的put方法通過加鎖機制插入資料,實作如下:
場景:線程A和線程B同時執行相同Segment對象的put方法
1、線程A執行tryLock()方法成功擷取鎖,則把HashEntry對象插入到相應的位置;
2、線程B擷取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重複執行tryLock()方法嘗試擷取鎖,在多處理器環境下,重複次數為64,單處理器重複次數為1,當執行tryLock()方法的次數超過上限時,則執行lock()方法挂起線程B;
3、當線程A執行完插入操作時,會通過unlock()方法釋放鎖,接着喚醒線程B繼續執行;
size實作
因為ConcurrentHashMap是可以并發插入資料的,是以在準确計算元素時存在一定的難度,一般的思路是統計每個Segment對象中的元素個數,然後進行累加,但是這種方式計算出來的結果并不一樣的準确的,因為在計算後面幾個Segment的元素個數時,已經計算過的Segment同時可能有資料的插入或則删除,在1.7的實作中,采用了如下方式:
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
先采用不加鎖的方式,連續計算元素的個數,最多計算3次:
1、如果前後兩次計算結果相同,則說明計算出來的元素個數是準确的;
2、如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數;
2.1.8實作
資料結構
1.8中放棄了Segment臃腫的設計,取而代之的是采用Node + CAS + Synchronized來保證并發安全進行實作,結構如下:
隻有在執行第一次put方法時才會調用initTable()初始化Node數組,實作如下:
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put實作
當執行put方法插入資料時,根據key的hash值,在Node數組中找到相應的位置,實作如下:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
1、如果相應位置的Node還未初始化,則通過CAS插入相應的資料;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
2、如果相應位置的Node不為空,且目前該節點不處于移動狀态,則對該節點加synchronized鎖,如果該節點的hash不小于0,則周遊連結清單更新節點或插入新節點;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
3、如果該節點是TreeBin類型的節點,說明是紅黑樹結構,則通過putTreeVal方法往紅黑樹中插入節點;
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
4、如果binCount不為0,說明put操作對資料産生了影響,如果目前連結清單的個數達到8個,則通過treeifyBin方法轉化為紅黑樹,如果oldVal不為空,說明是一次更新操作,沒有對元素個數産生影響,則直接傳回舊值;
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
5、如果插入的是一個新節點,則執行addCount()方法嘗試更新元素個數baseCount;
size實作
1.8中使用一個volatile類型的變量baseCount記錄元素的個數,當插入新資料或則删除資料時,會通過addCount()方法更新baseCount,實作如下:
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
1、初始化時counterCells為空,在并發量很高時,如果存在兩個線程同時執行CAS修改baseCount值,則失敗的線程會繼續執行方法體中的邏輯,使用CounterCell記錄元素個數的變化;
2、如果CounterCell數組counterCells為空,調用fullAddCount()方法進行初始化,并插入對應的記錄數,通過CAS設定cellsBusy字段,隻有設定成功的線程才能初始化CounterCell數組,實作如下:
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
3、如果通過CAS設定cellsBusy字段失敗的話,則繼續嘗試通過CAS修改baseCount字段,如果修改baseCount字段成功的話,就退出循環,否則繼續循環插入CounterCell對象;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
是以在1.8中的size實作比1.7簡單多,因為元素個數儲存baseCount中,部分元素的變化個數儲存在CounterCell數組中,實作如下:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
通過累加baseCount和CounterCell數組中的數量,即可得到元素的總個數;
通過測試1.8的性能在get和size時有明顯的性能優勢,put的性能與1.7相當
CompletableFuture
這裡引入簡書的一個文章。常用的已經說了CompletableFuture詳解
CompletableFuture類實作了CompletionStage和Future接口。Future是Java 5添加的類,用來描述一個異步計算的結果,但是擷取一個結果時方法較少,要麼通過輪詢isDone,确認完成後,調用get()擷取值,要麼調用get()設定一個逾時時間。但是這個get()方法會阻塞住調用線程,這種阻塞的方式顯然和我們的異步程式設計的初衷相違背。
為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴充功能形成了CompletableFuture。
CompletionStage是一個接口,從命名上看得知是一個完成的階段,它裡面的方法也标明是在某個運作階段得到了結果之後要做的事情。
- 進行變換
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
首先說明一下已Async結尾的方法都是可以異步執行的,如果指定了線程池,會在指定的線程池中執行,如果沒有指定,預設會在ForkJoinPool.commonPool()中執行,下文中将會有好多類似的,都不詳細解釋了。關鍵的入參隻有一個Function,它是函數式接口,是以使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,傳回值是經過轉化後結果。
例如:
@Test
public void thenApply() {
String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
System.out.println(result);
}
結果為:
hello world
- 進行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
thenAccept是針對結果進行消耗,因為他的入參是Consumer,有入參無傳回值。
例如:
@Test
public void thenAccept(){
CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}
結果為:
hello world
- 對上一步的計算結果不關心,執行下一個操作。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenRun它的入參是一個Runnable的執行個體,表示當得到上一步的結果時的操作。
例如:
@Test
public void thenRun(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenRun(() -> System.out.println("hello world"));
while (true){}
}
結果為:
hello world
- 結合兩個CompletionStage的結果,進行轉化後傳回
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
它需要原來的處理傳回值,并且other代表的CompletionStage也要傳回值之後,利用這兩個傳回值,進行轉換後傳回指定類型的值。
例如:
@Test
public void thenCombine() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> s1 + " " + s2).join();
System.out.println(result);
}
結果為:
hello world
- 結合兩個CompletionStage的結果,進行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
它需要原來的處理傳回值,并且other代表的CompletionStage也要傳回值之後,利用這兩個傳回值,進行消耗。
例如:
@Test
public void thenAcceptBoth() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> System.out.println(s1 + " " + s2));
while (true){}
}
結果為:
hello world
- 在兩個CompletionStage都運作完執行。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
不關心這兩個CompletionStage的結果,隻關心這兩個CompletionStage執行完畢,之後在進行操作(Runnable)。
例如:
@Test
public void runAfterBoth(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
while (true){}
}
結果為
hello world
- 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
我們現實開發場景中,總會碰到有兩種管道完成同一個事情,是以就可以調用這個方法,找一個最快的結果進行處理。
例如:
@Test
public void applyToEither() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), s -> s).join();
System.out.println(result);
}
結果為:
hello world
- 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
例如:
@Test
public void acceptEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).acceptEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), System.out::println);
while (true){}
}
結果為:
hello world
- 兩個CompletionStage,任何一個完成了都會執行下一步的操作(Runnable)。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
例如:
@Test
public void runAfterEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
while (true) {
}
}
結果為:
hello world
- 當運作時出現了異常,可以通過exceptionally進行補償。
例如:
@Test
public void exceptionally() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
結果為:
java.lang.RuntimeException: 測試一下異常情況
hello world
- 當運作完成時,對結果的記錄。這裡的完成時有兩種情況,一種是正常執行,傳回值。另外一種是遇到異常抛出造成程式的中斷。這裡為什麼要說成記錄,因為這幾個方法都會傳回CompletableFuture,當Action執行完畢後它的結果傳回原始的CompletableFuture的計算結果或者傳回異常。是以不會對結果産生任何的作用。
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
例如:
@Test
public void whenComplete() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println(s);
System.out.println(t.getMessage());
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
結果為:
null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world
這裡也可以看出,如果使用了exceptionally,就會對最終的結果産生影響,它沒有口子傳回如果沒有異常時的正确的值,這也就引出下面我們要介紹的handle。
- 運作完成時,對結果的處理。這裡的完成時有兩種情況,一種是正常執行,傳回值。另外一種是遇到異常抛出造成程式的中斷。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
例如:
出現異常時
@Test
public void handle() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//出現異常
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
結果為:
hello world
未出現異常時
@Test
public void handle() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
結果為:
s1
上面就是CompletionStage接口中方法的使用執行個體,CompletableFuture同樣也同樣實作了Future,是以也同樣可以使用get進行阻塞擷取值,總的來說,CompletableFuture使用起來還是比較爽的,看起來也比較優雅一點。