為什麼要使用Future
線程擷取到運作結果有幾種方式
java複制代碼public class Sum {
private Sum(){}
public static int sum(int n){
int sum = 0;
for (int i = 0; i < n; i++) {
sum += n;
}
return sum;
}
}
Thread.sleep()
java複制代碼private static int sum_sleep = 0;
Thread thread = new Thread(() -> sum_sleep = Sum.sum(100));
thread.start();
TimeUnit.SECONDS.sleep(1);
System.out.printf("get result by thread.sleep: %d\n", sum_sleep);
使用sleep()方法擷取,這種方法,有不可控性,也許sleep1秒鐘,但是線程還沒有執行完成,可能會導緻擷取到的結果不準确。
Thread.join()
java複制代碼private static int sum_join = 0;
Thread thread = new Thread(() -> sum_join = Sum.sum(100));
thread.start();
thread.join();
System.out.printf("get result by thread.join: %d\n", sum_join);
循環
java複制代碼private static int sum_loop = 0;
private static volatile boolean flag;
Thread thread = new Thread(() -> {
sum_loop = Sum.sum(100);
flag = true;
});
thread.start();
int i = 0;
while (!flag) {
i++;
}
System.out.printf("get result by loopLock: %d\n", sum_loop);
notifyAll() / wait()
java複制代碼private static class NotifyAndWaitTest {
private Integer sum = null;
private synchronized void sum_wait_notify() {
sum = Sum.sum(100);
notifyAll();
}
private synchronized Integer getSum() {
while (sum == null) {
try {
wait();
} catch (Exception e) {
e.printStackTrace();
}
}
return sum;
}
}
private static void getResultByNotifyAndWait() throws Exception {
NotifyAndWaitTest test = new NotifyAndWaitTest();
new Thread(test::sum_wait_notify).start();
System.out.printf("get result by NotifyAndWait: %d\n", test.getSum());
}
Lock & Condition
java複制代碼private static class LockAndConditionTest {
private Integer sum = null;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void sum() {
try {
lock.lock();
sum = Sum.sum(100);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public Integer getSum() {
try {
lock.lock();
while (Objects.isNull(sum)) {
try {
condition.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return sum;
}
}
private static void getResultByLockAndCondition() throws Exception {
LockAndConditionTest test = new LockAndConditionTest();
new Thread(test::sum).start();
System.out.printf("get result by lock and condition: %d\n", test.getSum());
}
BlockingQueue
java複制代碼BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
new Thread(() -> queue.offer(Sum.sum(100))).start();
System.out.printf("get result by blocking queue: %d\n", queue.take());
CountDownLatch
java複制代碼private static int sum_countDownLatch = 0;
private static void getResultByCountDownLatch() {
CountDownLatch latch = new CountDownLatch(1);
new Thread(
() -> {
sum_countDownLatch = Sum.sum(100);
latch.countDown();
})
.start();
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.printf("get result by countDownLatch: %d\n", sum_countDownLatch);
}
CyclicBarrier
java複制代碼private static int sum_cyclicBarrier = 0;
private static void getResultByCycleBarrier() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(
() -> {
sum_cyclicBarrier = Sum.sum(100);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
})
.start();
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.printf("get result by cyclicBarrier: %d\n", sum_cyclicBarrier);
}
Semaphore
java複制代碼private static int sum_semaphore = 0;
private static void getResultBySemaphore() {
Semaphore semaphore = new Semaphore(0);
new Thread(
() -> {
sum_semaphore = Sum.sum(100);
semaphore.release();
})
.start();
try {
semaphore.acquire();
System.out.printf("get result by semaphore: %d\n", sum_semaphore);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面提到的擷取線程執行結果的方法,暫時基于之前學到的内容,我隻能想到這些。這些實作方式也不是很優雅,不是最佳實踐。
線程池,利用ThreadPoolExecutor的execute(Runnable command)方法,利用這個方法雖說可以送出任務,但是卻沒有辦法擷取任務執行結果。
那麼我們如果需要擷取任務的執行結果并且優雅的實作,可以通過Future接口和Callable接口配合實作, 本文将會通過具體的例子講解如何使用Future。
Future最主要的作用是,比如當做比較耗時運算的時候,如果我們一直在原地等待方法傳回,顯然是不明智的,整體程式的運作效率會大大降低。我們可以把運算的過程放到子線程去執行,再通過Future去控制子線程執行的計算過程,最後擷取到計算結果。這樣一來就可以把整個程式的運作效率提高,是一種異步的思想。
如何使用Future
要想使用Future首先得先了解一下Callable。Callable 接口相比于 Runnable 的一大優勢是可以有傳回結果,那這個傳回結果怎麼擷取呢?就可以用 Future 類的 get 方法來擷取 。是以,Future 相當于一個存儲器,它存儲了 Callable 的call方法的任務結果。
一般情況下,Future,Callable,ExecutorService是一起使用的,ExecutorService裡相關的代碼如下:
java複制代碼// 送出 Runnable 任務
// 由于Runnable接口的run方法沒有傳回值,是以,Future僅僅是用來斷言任務已經結束,有點類似join();
Future<?> submit(Runnable task);
// 送出 Callable 任務
// Callable裡的call方法是有傳回值的,是以這個方法傳回的Future對象可以通過調用其get()方法來擷取任務的執
//行結果。
<T> Future<T> submit(Callable<T> task);
// 送出 Runnable 任務及結果引用
// Future的傳回值就是傳給submit()方法的參數result。
<T> Future<T> submit(Runnable task, T result);
具體使用方法如下:
java複制代碼ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> Sum.sum(100));
System.out.printf("get result by Callable + Future: %d\n", future.get());
executor.shutdown();
Future實作原理
Future基本概述
Future接口5個方法:
java複制代碼// 取消任務
boolean cancel(boolean mayInterruptIfRunning);
// 判斷任務是否已取消
boolean isCancelled();
// 判斷任務是否已結束
boolean isDone();
// 獲得任務執行結果 阻塞,被調用時,如果任務還沒有執行完,那麼調用get()方法的線程會阻塞。直到任務執行完
// 才會被喚醒
get();
// 獲得任務執行結果,支援逾時
get(long timeout, TimeUnit unit);
- cancel(boolean mayInterruptIfRunning):
- 用來取消異步任務的執行。
- 如果異步任務已經完成或者已經被取消,或者由于某些原因不能取消,則會傳回false。
- 如果任務還沒有被執行,則會傳回true并且異步任務不會被執行。
- 如果任務已經開始執行了但是還沒有執行完成,若mayInterruptIfRunning為true,則會立即中斷執行任務的線程并傳回true,若mayInterruptIfRunning為false,則會傳回true且不會中斷任務執行線程。
- isCanceled():
- 判斷任務是否被取消。
- 如果任務在結束(正常執行結束或者執行異常結束)前被取消則傳回true,否則傳回false。
- isDone():
- ·判斷任務是否已經完成,如果完成則傳回true,否則傳回false。
- 任務執行過程中發生異常、任務被取消也屬于任務已完成,也會傳回true。
- get():
- 擷取任務執行結果,如果任務還沒完成則會阻塞等待直到任務執行完成。
- 如果任務被取消則會抛出CancellationException異常。
- 如果任務執行過程發生異常則會抛出ExecutionException異常。
- 如果阻塞等待過程中被中斷則會抛出InterruptedException異常。
- get(long timeout,Timeunit unit):
- 帶逾時時間的get()版本,上面講述的get()方法,同樣适用這裡。
- 如果阻塞等待過程中逾時則會抛出TimeoutException異常。
使用IDEA,檢視Future的實作類其實有很多,比如FutureTask,ForkJoinTask,CompletableFuture等,其餘基本是繼承了ForkJoinTask實作的内部類。
本篇文章主要講解FutureTask的實作原理
FutureTask基本概述
FutureTask 為 Future 提供了基礎實作,如擷取任務執行結果(get)和取消任務(cancel)等。如果任務尚未完成,擷取任務執行結果時将會阻塞。一旦執行結束,任務就不能被重新開機或取消(除非使用runAndReset執行計算)。FutureTask 常用來封裝 Callable 和 Runnable,也可以作為一個任務送出到線程池中執行。除了作為一個獨立的類之外,此類也提供了一些功能性函數供我們建立自定義 task 類使用。FutureTask 的線程安全由CAS來保證。
java複制代碼// 建立 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 建立線程池
ExecutorService es = Executors.newCachedThreadPool();
// 送出 FutureTask
es.submit(futureTask);
// 擷取計算結果
Integer result = futureTask.get();
java複制代碼// 建立 FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 建立并啟動線程
Thread T1 = new Thread(futureTask);
T1.start();
// 擷取計算結果
Integer result = futureTask.get();
FutureTask可以很容易擷取子線程的執行結果。
FutureTask實作原理
構造函數
java複制代碼public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask提供了兩個構造器
- Callable接口有傳回,将callable指派給this.callable。
- Runnable接口無傳回,如果想要擷取到執行結果,需要傳V result給FutureTask,FutureTask将Runnable和result封裝成Callable,再将callable指派給this.callable。
- 狀态初始化狀态為NEW
FutureTask内置狀态有:
java複制代碼private volatile int state; // 可見性
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
- NEW 初始狀态
- COMPLETING 任務已經執行完(正常或者異常),準備指派結果,但是這個狀态會時間會比較短,屬于中間狀态。
- NORMAL 任務已經正常執行完,并已将任務傳回值指派到結果
- EXCEPTIONAL 任務執行失敗,并将異常指派到結果
- CANCELLED 取消
- INTERRUPTING 準備嘗試中斷執行任務的線程
- INTERRUPTED 對執行任務的線程進行中斷(未必中斷到)
狀态轉換:
run()執行流程
java複制代碼public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set()
java複制代碼protected void set(V v) {
// state變量,通過CAS操作,将NEW->COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 将結果指派給outcome屬性
outcome = v;
// state狀态直接指派為NORMAL,不需要CAS
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
setException()
java複制代碼protected void setException(Throwable t) {
// state變量,通過CAS操作,将NEW->COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 将異常指派給outcome屬性
outcome = t;
// state狀态直接指派為EXCEPTIONAL,不需要CAS
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
finishCompletion()
set()和setException()兩個方法最後都調用了finishCompletion()方法,完成一些善後工作,具體流程如下:
java複制代碼private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 移除等待線程
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 自旋周遊等待線程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 喚醒等待線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 任務完成後調用函數,自定義擴充
done();
callable = null; // to reduce footprint
}
handlePossibleCancellationInterrupt()
java複制代碼private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
// 在中斷者中斷線程之前可能會延遲,是以我們隻需要讓出CPU時間片自旋等待
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
get()執行流程
java複制代碼public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// awaitDone用于等待任務完成,或任務因為中斷或逾時而終止。傳回任務的完成狀态。
s = awaitDone(false, 0L);
return report(s);
}
具體流程:
awaitDone()
java複制代碼private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
// 擷取到目前狀态
int s = state;
// 如果目前狀态不為NEW或者COMPLETING
if (s > COMPLETING) {
if (q != null)
q.thread = null;
// 直接傳回state
return s;
}
// COMPLETING是一個很短暫的狀态,調用Thread.yield期望讓出時間片,之後重試循環。
else if (s == COMPLETING)
Thread.yield();
// 如果阻塞線程被中斷則将目前線程從阻塞隊列中移除
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 新進來的線程添加等待節點
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
/*
* 這是Treiber Stack算法入棧的邏輯。
* Treiber Stack是一個基于CAS的無鎖并發棧實作,
* 更多可以參考https://en.wikipedia.org/wiki/Treiber_Stack
*/
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
// 逾時,移除棧中節點。
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
// 未超市并且狀态為NEW,阻塞目前線程
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
removeWaiter()
java複制代碼private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 如果目前節點仍有效,則置pred為目前節點,繼續周遊。
if (q.thread != null)
pred = q;
/*
* 目前節點已無效且有前驅,則将前驅的後繼置為目前節點的後繼實作删除節點。
* 如果前驅節點已無效,則重新周遊waiters棧。
*/
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
/*
* 目前節點已無效,且目前節點沒有前驅,則将棧頂置為目前節點的後繼。
* 失敗的話重新周遊waiters棧。
*/
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}
report()
java複制代碼private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
cancel()執行流程
java複制代碼public boolean cancel(boolean mayInterruptIfRunning) {
// 狀态機不是NEW 或CAS更新狀态 流轉到INTERRUPTING或者CANCELLED失敗,不允許cancel
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果要求中斷執行中的任務,則直接中斷任務執行線程,并更新狀态機為最終狀态INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
經典案例
引用極客時間-java并發程式設計課程的案例燒水泡茶:
并發程式設計可以總結為三個核心問題:分工,同步和互斥。編寫并發程式,首先要做分工。
- T1負責洗水壺,燒開水,泡茶這三道工序
- T2負責洗茶壺,洗茶杯,拿茶葉三道工序。
- T1在執行泡茶這道工序需要等到T2完成拿茶葉的工作。(join,countDownLatch,阻塞隊列都可以完成)
java複制代碼// 建立任務 T2 的 FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 建立任務 T1 的 FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 線程 T1 執行任務 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 線程 T2 執行任務 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待線程 T1 執行結果
System.out.println(ft1.get());
// T1Task 需要執行的任務:
// 洗水壺、燒開水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1 任務需要 T2 任務的 FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1: 洗水壺...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1: 燒開水...");
TimeUnit.SECONDS.sleep(15);
// 擷取 T2 線程的茶葉
String tf = ft2.get();
System.out.println("T1: 拿到茶葉:"+tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
}
}
// T2Task 需要執行的任務:
// 洗茶壺、洗茶杯、拿茶葉
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2: 洗茶壺...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2: 洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2: 拿茶葉...");
TimeUnit.SECONDS.sleep(1);
return " 龍井 ";
}
}
// 一次執行結果:
//T1: 洗水壺...
//T2: 洗茶壺...
//T1: 燒開水...
//T2: 洗茶杯...
//T2: 拿茶葉...
//T1: 拿到茶葉: 龍井
//T1: 泡茶...
//上茶: 龍井
作者:Shawn_Shawn
連結:https://juejin.cn/post/7257517788193472549