天天看點

CompletionService、Future

一.completionservice接口提供了可以操作異步任務的功能,其唯一實作的api為executorcompletionservice。此api隻是可以擷取異步任務執行的結果,它不是executorservice。

其有5個核心方法:

future<v> poll():同步操作,擷取并移除第一已經完成的任務,否則傳回null。

future<v> poll(timeout):同步操作,擷取并移除第一個已經完成的任務,阻塞時間為timeout,否則傳回null;支援interruptedexception。

future<v> submit(callable<v> task):送出任務,并擷取任務執行結果的句柄。

future<v> submit(runnable,v result):送出任務,并擷取任務執行結果的句柄。

future<v> take():擷取并移除第一個執行完成的任務,阻塞,直到有任務傳回。支援interruptedexception。

 executorcompletionservice之說以能夠提供此功能,原因就是其内部持有一個bockingqueue(此queue可以通過構造器傳入指定)。

同時這還要借助future/futuretask的功能。

public executorcompletionservice(executor executor,blockingqueue<future<v>> completionqueue):需要指定一個現有的executor和用于存儲future的隊列,此後通過submit送出的任務都将有executor來執行,并将"future句柄"添加到隊列中;這個api很像一個"修飾者".

二.future:提供了可以檢視異步執行的結果。此接口提供了多個友善的方法,以便檢測和控制任務的操作。

boolean cancel(boolean interruptifrunning):試圖取消任務的執行,如果任務已經完成或者取消,此操作将無效。如果任務尚未啟動(start),那麼任務将不會被執行,如果任務正在執行,則interruptifrunning參數決定是否中斷任務線程(線程需要相應“中斷”)。此方法傳回後,isdone将傳回true;如果方法取消成功,則iscancelled()則傳回true。

v get():等待并擷取執行結果。此方法會阻塞,知道結果傳回。此方法會線上程中斷時抛出interruptexception,如果任務被取消,将;抛出異常。

v get(timeout):阻塞指定的時間。如果時間逾時,仍未能執行完成,則抛出timeoutexception。

runnablefuture接口擴充了future接口和runnable,隻提供(覆寫)run()方法,其作用非常簡單,就是标示其子類具有可執行run方法,且擷取future結果。

三.futuretask就是runnablefuture的子類,具有future接口的可取消任務的能力,以及擷取異步計算結果的能力。futuretask可以認為是一個runnable和callable任務的橋梁類,其構造函數可以接受這兩種任務。

futuretask(callable<v> callable) 

futuretask(runnable runnable, v result):當運作結束後,将傳回指定的result。

 此外,還有幾個特殊的方法:

protected void done():可重寫的方法,當任務執行結束後,将會調用此方法執行額外的操作。

protected void set(v v):會被run方法内部調用,用來設定執行結果,此結果可以通過get擷取。

runnable類型的任務,會在futuretask中轉化成callable(參見executors.callable(runnable,result),原理很簡單,建立一個callable執行個體,即在調用call時間接的調用run(),

并在執行結束後,傳回指定的result)。

四.executorcompletionservice:送出給executorcompletionservice的任務,會被封裝成一個queueingfuture(一個futuretask子類),此類的唯一作用就是在done()方法中,增加了将執行的futuretask加入了内部隊列,此時外部調用者,就可以take到相應的執行結束的任務。(take就是從blockingqueue中依次擷取)

CompletionService、Future

public class excutorcomplementservicetest {  

    /** 

    * @param args 

    */  

    public static void main(string[] args) throws exception{  

        executor executor = executors.newfixedthreadpool(3);  

        completionservice<integer> cs = new executorcompletionservice<integer>(executor);  

        //list<future<integer>> result = new arraylist<future<integer>>(10);  

        for(int i=0; i< 10; i++){  

            cs.submit(new callable<integer>() {          

                @override  

                public integer call() throws exception {  

                    random r = new random();  

                    int init = 0;  

                    for(int i = 0; i<100; i++){  

                        init += r.nextint();  

                        thread.sleep(100);  

                    }  

                    return integer.valueof(init);  

                }  

            });  

        }  

        for(int i=0; i<10; i++){  

            future<integer> future = cs.take();  

            if(future != null){  

                system.out.println(future.get());  

            }  

        }   

    }  

}  

在前面的文章中我們講述了建立線程的2種方式,一種是直接繼承thread,另外一種就是實作runnable接口。

  這2種方式都有一個缺陷就是:在執行完任務之後無法擷取執行結果。

  如果需要擷取執行結果,就必須通過共享變量或者使用線程通信的方式來達到效果,這樣使用起來就比較麻煩。

  而自從java 1.5開始,就提供了callable和future,通過它們可以在任務執行完畢之後得到任務執行結果。

  今天我們就來讨論一下callable、future和futuretask三個類的使用方法。以下是本文的目錄大綱:

  一.callable與runnable

  二.future

  三.futuretask

  四.使用示例

  若有不正之處請多多諒解,并歡迎批評指正。

  請尊重作者勞動成果,轉載請标明原文連結:

  http://www.cnblogs.com/dolphin0520/p/3949310.html

  

  先說一下java.lang.runnable吧,它是一個接口,在它裡面隻聲明了一個run()方法:

1

2

3

<code>public</code> <code>interface</code> <code>runnable {</code>

<code>    </code><code>public</code> <code>abstract</code> <code>void</code> <code>run();</code>

<code>}</code>

   由于run()方法傳回值為void類型,是以在執行完任務之後無法傳回任何結果。

  callable位于java.util.concurrent包下,它也是一個接口,在它裡面也隻聲明了一個方法,隻不過這個方法叫做call():

4

5

6

7

8

9

<code>public</code> <code>interface</code> <code>callable&lt;v&gt; {</code>

<code>    </code><code>/**</code>

<code>     </code><code>* computes a result, or throws an exception if unable to do so.</code>

<code>     </code><code>*</code>

<code>     </code><code>* @return computed result</code>

<code>     </code><code>* @throws exception if unable to compute a result</code>

<code>     </code><code>*/</code>

<code>    </code><code>v call() </code><code>throws</code> <code>exception;</code>

   可以看到,這是一個泛型接口,call()函數傳回的類型就是傳遞進來的v類型。

  那麼怎麼使用callable呢?一般情況下是配合executorservice來使用的,在executorservice接口中聲明了若幹個submit方法的重載版本:

<code>&lt;t&gt; future&lt;t&gt; submit(callable&lt;t&gt; task);</code>

<code>&lt;t&gt; future&lt;t&gt; submit(runnable task, t result);</code>

<code>future&lt;?&gt; submit(runnable task);</code>

  第一個submit方法裡面的參數類型就是callable。

  暫時隻需要知道callable一般是和executorservice配合來使用的,具體的使用方法講在後面講述。

  一般情況下我們使用第一個submit方法和第三個submit方法,第二個submit方法很少使用。

  future就是對于具體的runnable或者callable任務的執行結果進行取消、查詢是否完成、擷取結果。必要時可以通過get方法擷取執行結果,該方法會阻塞直到任務傳回結果。

  future類位于java.util.concurrent包下,它是一個接口:

<code>public</code> <code>interface</code> <code>future&lt;v&gt; {</code>

<code>    </code><code>boolean</code> <code>cancel(</code><code>boolean</code> <code>mayinterruptifrunning);</code>

<code>    </code><code>boolean</code> <code>iscancelled();</code>

<code>    </code><code>boolean</code> <code>isdone();</code>

<code>    </code><code>v get() </code><code>throws</code> <code>interruptedexception, executionexception;</code>

<code>    </code><code>v get(</code><code>long</code> <code>timeout, timeunit unit)</code>

<code>        </code><code>throws</code> <code>interruptedexception, executionexception, timeoutexception;</code>

   在future接口中聲明了5個方法,下面依次解釋每個方法的作用:

cancel方法用來取消任務,如果取消任務成功則傳回true,如果取消任務失敗則傳回false。參數mayinterruptifrunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設定true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayinterruptifrunning為true還是false,此方法肯定傳回false,即如果取消已經完成的任務會傳回false;如果任務正在執行,若mayinterruptifrunning設定為true,則傳回true,若mayinterruptifrunning設定為false,則傳回false;如果任務還沒有執行,則無論mayinterruptifrunning為true還是false,肯定傳回true。

iscancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則傳回 true。

isdone方法表示任務是否已經完成,若任務完成,則傳回true;

get()方法用來擷取執行結果,這個方法會産生阻塞,會一直等到任務執行完畢才傳回;

get(long timeout, timeunit unit)用來擷取執行結果,如果在指定時間内,還沒擷取到結果,就直接傳回null。

  也就是說future提供了三種功能:

  1)判斷任務是否完成;

  2)能夠中斷任務;

  3)能夠擷取任務執行結果。

  因為future隻是一個接口,是以是無法直接用來建立對象使用的,是以就有了下面的futuretask。

  我們先來看一下futuretask的實作:

<code>public</code> <code>class</code> <code>futuretask&lt;v&gt; </code><code>implements</code> <code>runnablefuture&lt;v&gt;</code>

   futuretask類實作了runnablefuture接口,我們看一下runnablefuture接口的實作:

<code>public</code> <code>interface</code> <code>runnablefuture&lt;v&gt; </code><code>extends</code> <code>runnable, future&lt;v&gt; {</code>

<code>    </code><code>void</code> <code>run();</code>

   可以看出runnablefuture繼承了runnable接口和future接口,而futuretask實作了runnablefuture接口。是以它既可以作為runnable被線程執行,又可以作為future得到callable的傳回值。

  futuretask提供了2個構造器:

<code>public</code> <code>futuretask(callable&lt;v&gt; callable) {</code>

<code>public</code> <code>futuretask(runnable runnable, v result) {</code>

  事實上,futuretask是future接口的一個唯一實作類。

  1.使用callable+future擷取執行結果

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

<code>public</code> <code>class</code> <code>test {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>executorservice executor = executors.newcachedthreadpool();</code>

<code>        </code><code>task task = </code><code>new</code> <code>task();</code>

<code>        </code><code>future&lt;integer&gt; result = executor.submit(task);</code>

<code>        </code><code>executor.shutdown();</code>

<code>        </code> 

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>thread.sleep(</code><code>1000</code><code>);</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e1) {</code>

<code>            </code><code>e1.printstacktrace();</code>

<code>        </code><code>}</code>

<code>        </code><code>system.out.println(</code><code>"主線程在執行任務"</code><code>);</code>

<code>            </code><code>system.out.println(</code><code>"task運作結果"</code><code>+result.get());</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>e.printstacktrace();</code>

<code>        </code><code>} </code><code>catch</code> <code>(executionexception e) {</code>

<code>        </code><code>system.out.println(</code><code>"所有任務執行完畢"</code><code>);</code>

<code>    </code><code>}</code>

<code>class</code> <code>task </code><code>implements</code> <code>callable&lt;integer&gt;{</code>

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>integer call() </code><code>throws</code> <code>exception {</code>

<code>        </code><code>system.out.println(</code><code>"子線程在進行計算"</code><code>);</code>

<code>        </code><code>thread.sleep(</code><code>3000</code><code>);</code>

<code>        </code><code>int</code> <code>sum = </code><code>0</code><code>;</code>

<code>        </code><code>for</code><code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i&lt;</code><code>100</code><code>;i++)</code>

<code>            </code><code>sum += i;</code>

<code>        </code><code>return</code> <code>sum;</code>

   執行結果:

CompletionService、Future

 view code

  2.使用callable+futuretask擷取執行結果

38

39

40

41

42

43

44

45

<code>        </code><code>//第一種方式</code>

<code>        </code><code>futuretask&lt;integer&gt; futuretask = </code><code>new</code> <code>futuretask&lt;integer&gt;(task);</code>

<code>        </code><code>executor.submit(futuretask);</code>

<code>        </code><code>//第二種方式,注意這種方式和第一種方式效果是類似的,隻不過一個使用的是executorservice,一個使用的是thread</code>

<code>        </code><code>/*task task = new task();</code>

<code>        </code><code>futuretask&lt;integer&gt; futuretask = new futuretask&lt;integer&gt;(task);</code>

<code>        </code><code>thread thread = new thread(futuretask);</code>

<code>        </code><code>thread.start();*/</code>

<code>            </code><code>system.out.println(</code><code>"task運作結果"</code><code>+futuretask.get());</code>

   如果為了可取消性而使用 future 但又不提供可用的結果,則可以聲明 future&lt;?&gt; 形式類型、并傳回 null 作為底層任務的結果。

原文連結:[http://wely.iteye.com/blog/2228719]

繼續閱讀