一.completionservice接口提供了可以操作異步任務的功能,其唯一實作的api為executorcompletionservice。此api隻是可以擷取異步任務執行的結果,它不是executorservice。
其有5個核心方法:
future<v> poll():同步操作,擷取并移除第一已經完成的任務,否則傳回null。
future<v> poll(timeout):同步操作,擷取并移除第一個已經完成的任務,阻塞時間為timeout,否則傳回null;支援interruptedexception。
future<v> submit(callable<v> task):送出任務,并擷取任務執行結果的句柄。
future<v> submit(runnable,v result):送出任務,并擷取任務執行結果的句柄。
future<v> take():擷取并移除第一個執行完成的任務,阻塞,直到有任務傳回。支援interruptedexception。
executorcompletionservice之說以能夠提供此功能,原因就是其内部持有一個bockingqueue(此queue可以通過構造器傳入指定)。
同時這還要借助future/futuretask的功能。
public executorcompletionservice(executor executor,blockingqueue<future<v>> completionqueue):需要指定一個現有的executor和用于存儲future的隊列,此後通過submit送出的任務都将有executor來執行,并将"future句柄"添加到隊列中;這個api很像一個"修飾者".
二.future:提供了可以檢視異步執行的結果。此接口提供了多個友善的方法,以便檢測和控制任務的操作。
boolean cancel(boolean interruptifrunning):試圖取消任務的執行,如果任務已經完成或者取消,此操作将無效。如果任務尚未啟動(start),那麼任務将不會被執行,如果任務正在執行,則interruptifrunning參數決定是否中斷任務線程(線程需要相應“中斷”)。此方法傳回後,isdone将傳回true;如果方法取消成功,則iscancelled()則傳回true。
v get():等待并擷取執行結果。此方法會阻塞,知道結果傳回。此方法會線上程中斷時抛出interruptexception,如果任務被取消,将;抛出異常。
v get(timeout):阻塞指定的時間。如果時間逾時,仍未能執行完成,則抛出timeoutexception。
runnablefuture接口擴充了future接口和runnable,隻提供(覆寫)run()方法,其作用非常簡單,就是标示其子類具有可執行run方法,且擷取future結果。
三.futuretask就是runnablefuture的子類,具有future接口的可取消任務的能力,以及擷取異步計算結果的能力。futuretask可以認為是一個runnable和callable任務的橋梁類,其構造函數可以接受這兩種任務。
futuretask(callable<v> callable)
futuretask(runnable runnable, v result):當運作結束後,将傳回指定的result。
此外,還有幾個特殊的方法:
protected void done():可重寫的方法,當任務執行結束後,将會調用此方法執行額外的操作。
protected void set(v v):會被run方法内部調用,用來設定執行結果,此結果可以通過get擷取。
runnable類型的任務,會在futuretask中轉化成callable(參見executors.callable(runnable,result),原理很簡單,建立一個callable執行個體,即在調用call時間接的調用run(),
并在執行結束後,傳回指定的result)。
四.executorcompletionservice:送出給executorcompletionservice的任務,會被封裝成一個queueingfuture(一個futuretask子類),此類的唯一作用就是在done()方法中,增加了将執行的futuretask加入了内部隊列,此時外部調用者,就可以take到相應的執行結束的任務。(take就是從blockingqueue中依次擷取)
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
public class excutorcomplementservicetest {
/**
* @param args
*/
public static void main(string[] args) throws exception{
executor executor = executors.newfixedthreadpool(3);
completionservice<integer> cs = new executorcompletionservice<integer>(executor);
//list<future<integer>> result = new arraylist<future<integer>>(10);
for(int i=0; i< 10; i++){
cs.submit(new callable<integer>() {
@override
public integer call() throws exception {
random r = new random();
int init = 0;
for(int i = 0; i<100; i++){
init += r.nextint();
thread.sleep(100);
}
return integer.valueof(init);
}
});
}
for(int i=0; i<10; i++){
future<integer> future = cs.take();
if(future != null){
system.out.println(future.get());
}
}
}
}
在前面的文章中我們講述了建立線程的2種方式,一種是直接繼承thread,另外一種就是實作runnable接口。
這2種方式都有一個缺陷就是:在執行完任務之後無法擷取執行結果。
如果需要擷取執行結果,就必須通過共享變量或者使用線程通信的方式來達到效果,這樣使用起來就比較麻煩。
而自從java 1.5開始,就提供了callable和future,通過它們可以在任務執行完畢之後得到任務執行結果。
今天我們就來讨論一下callable、future和futuretask三個類的使用方法。以下是本文的目錄大綱:
一.callable與runnable
二.future
三.futuretask
四.使用示例
若有不正之處請多多諒解,并歡迎批評指正。
請尊重作者勞動成果,轉載請标明原文連結:
http://www.cnblogs.com/dolphin0520/p/3949310.html
先說一下java.lang.runnable吧,它是一個接口,在它裡面隻聲明了一個run()方法:
1
2
3
<code>public</code> <code>interface</code> <code>runnable {</code>
<code> </code><code>public</code> <code>abstract</code> <code>void</code> <code>run();</code>
<code>}</code>
由于run()方法傳回值為void類型,是以在執行完任務之後無法傳回任何結果。
callable位于java.util.concurrent包下,它也是一個接口,在它裡面也隻聲明了一個方法,隻不過這個方法叫做call():
4
5
6
7
8
9
<code>public</code> <code>interface</code> <code>callable<v> {</code>
<code> </code><code>/**</code>
<code> </code><code>* computes a result, or throws an exception if unable to do so.</code>
<code> </code><code>*</code>
<code> </code><code>* @return computed result</code>
<code> </code><code>* @throws exception if unable to compute a result</code>
<code> </code><code>*/</code>
<code> </code><code>v call() </code><code>throws</code> <code>exception;</code>
可以看到,這是一個泛型接口,call()函數傳回的類型就是傳遞進來的v類型。
那麼怎麼使用callable呢?一般情況下是配合executorservice來使用的,在executorservice接口中聲明了若幹個submit方法的重載版本:
<code><t> future<t> submit(callable<t> task);</code>
<code><t> future<t> submit(runnable task, t result);</code>
<code>future<?> submit(runnable task);</code>
第一個submit方法裡面的參數類型就是callable。
暫時隻需要知道callable一般是和executorservice配合來使用的,具體的使用方法講在後面講述。
一般情況下我們使用第一個submit方法和第三個submit方法,第二個submit方法很少使用。
future就是對于具體的runnable或者callable任務的執行結果進行取消、查詢是否完成、擷取結果。必要時可以通過get方法擷取執行結果,該方法會阻塞直到任務傳回結果。
future類位于java.util.concurrent包下,它是一個接口:
<code>public</code> <code>interface</code> <code>future<v> {</code>
<code> </code><code>boolean</code> <code>cancel(</code><code>boolean</code> <code>mayinterruptifrunning);</code>
<code> </code><code>boolean</code> <code>iscancelled();</code>
<code> </code><code>boolean</code> <code>isdone();</code>
<code> </code><code>v get() </code><code>throws</code> <code>interruptedexception, executionexception;</code>
<code> </code><code>v get(</code><code>long</code> <code>timeout, timeunit unit)</code>
<code> </code><code>throws</code> <code>interruptedexception, executionexception, timeoutexception;</code>
在future接口中聲明了5個方法,下面依次解釋每個方法的作用:
cancel方法用來取消任務,如果取消任務成功則傳回true,如果取消任務失敗則傳回false。參數mayinterruptifrunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設定true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayinterruptifrunning為true還是false,此方法肯定傳回false,即如果取消已經完成的任務會傳回false;如果任務正在執行,若mayinterruptifrunning設定為true,則傳回true,若mayinterruptifrunning設定為false,則傳回false;如果任務還沒有執行,則無論mayinterruptifrunning為true還是false,肯定傳回true。
iscancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則傳回 true。
isdone方法表示任務是否已經完成,若任務完成,則傳回true;
get()方法用來擷取執行結果,這個方法會産生阻塞,會一直等到任務執行完畢才傳回;
get(long timeout, timeunit unit)用來擷取執行結果,如果在指定時間内,還沒擷取到結果,就直接傳回null。
也就是說future提供了三種功能:
1)判斷任務是否完成;
2)能夠中斷任務;
3)能夠擷取任務執行結果。
因為future隻是一個接口,是以是無法直接用來建立對象使用的,是以就有了下面的futuretask。
我們先來看一下futuretask的實作:
<code>public</code> <code>class</code> <code>futuretask<v> </code><code>implements</code> <code>runnablefuture<v></code>
futuretask類實作了runnablefuture接口,我們看一下runnablefuture接口的實作:
<code>public</code> <code>interface</code> <code>runnablefuture<v> </code><code>extends</code> <code>runnable, future<v> {</code>
<code> </code><code>void</code> <code>run();</code>
可以看出runnablefuture繼承了runnable接口和future接口,而futuretask實作了runnablefuture接口。是以它既可以作為runnable被線程執行,又可以作為future得到callable的傳回值。
futuretask提供了2個構造器:
<code>public</code> <code>futuretask(callable<v> callable) {</code>
<code>public</code> <code>futuretask(runnable runnable, v result) {</code>
事實上,futuretask是future接口的一個唯一實作類。
1.使用callable+future擷取執行結果
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<code>public</code> <code>class</code> <code>test {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
<code> </code><code>executorservice executor = executors.newcachedthreadpool();</code>
<code> </code><code>task task = </code><code>new</code> <code>task();</code>
<code> </code><code>future<integer> result = executor.submit(task);</code>
<code> </code><code>executor.shutdown();</code>
<code> </code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>thread.sleep(</code><code>1000</code><code>);</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e1) {</code>
<code> </code><code>e1.printstacktrace();</code>
<code> </code><code>}</code>
<code> </code><code>system.out.println(</code><code>"主線程在執行任務"</code><code>);</code>
<code> </code><code>system.out.println(</code><code>"task運作結果"</code><code>+result.get());</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>e.printstacktrace();</code>
<code> </code><code>} </code><code>catch</code> <code>(executionexception e) {</code>
<code> </code><code>system.out.println(</code><code>"所有任務執行完畢"</code><code>);</code>
<code> </code><code>}</code>
<code>class</code> <code>task </code><code>implements</code> <code>callable<integer>{</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>integer call() </code><code>throws</code> <code>exception {</code>
<code> </code><code>system.out.println(</code><code>"子線程在進行計算"</code><code>);</code>
<code> </code><code>thread.sleep(</code><code>3000</code><code>);</code>
<code> </code><code>int</code> <code>sum = </code><code>0</code><code>;</code>
<code> </code><code>for</code><code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i<</code><code>100</code><code>;i++)</code>
<code> </code><code>sum += i;</code>
<code> </code><code>return</code> <code>sum;</code>
執行結果:
view code
2.使用callable+futuretask擷取執行結果
38
39
40
41
42
43
44
45
<code> </code><code>//第一種方式</code>
<code> </code><code>futuretask<integer> futuretask = </code><code>new</code> <code>futuretask<integer>(task);</code>
<code> </code><code>executor.submit(futuretask);</code>
<code> </code><code>//第二種方式,注意這種方式和第一種方式效果是類似的,隻不過一個使用的是executorservice,一個使用的是thread</code>
<code> </code><code>/*task task = new task();</code>
<code> </code><code>futuretask<integer> futuretask = new futuretask<integer>(task);</code>
<code> </code><code>thread thread = new thread(futuretask);</code>
<code> </code><code>thread.start();*/</code>
<code> </code><code>system.out.println(</code><code>"task運作結果"</code><code>+futuretask.get());</code>
如果為了可取消性而使用 future 但又不提供可用的結果,則可以聲明 future<?> 形式類型、并傳回 null 作為底層任務的結果。
原文連結:[http://wely.iteye.com/blog/2228719]