天天看點

Clojure的并發(八)future、promise和線程

<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/14/326027.html">clojure 的并發(一) ref和stm</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/17/326362.html">clojure 的并發(二)write skew分析</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/17/326389.html">clojure 的并發(三)atom、緩存和性能</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/19/326540.html">clojure 的并發(四)agent深入分析和actor</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/23/326976.html">clojure 的并發(五)binding和let</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/07/30/327606.html">clojure的并發(六)agent可以改進的地方</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/04/327985.html">clojure的并發(七)pmap、pvalues和pcalls</a>

八、future、promise和線程

1、clojure中使用future是啟動一個線程,并執行一系列的表達式,當執行完成的時候,線程會被回收:

user=&gt; (def myfuture (future (+ 1 2)))

#'user/myfuture

user=&gt; @myfuture

3

future接受一個或者多個表達式,并将這些表達式交給一個線程去處理,上面的(+ 1 2)是在另一個線程計算的,傳回的future對象可以通過deref或者@宏來阻塞擷取計算的結果。

future函數傳回的結果可以認為是一個類似java.util.concurrent.future的對象,是以可以取消:

user=&gt; (future-cancelled? myfuture)

false

user=&gt; (future-cancel myfuture)

也可以通過謂詞future?來判斷一個變量是否是future對象:

user=&gt; (future? myfuture)

true

2、future的實作,future其實是一個宏,它内部是調用future-call函數來執行的:

(defmacro future

  [&amp; body] `(future-call (fn [] ~@body)))

可以看到,是将body包裝成一個匿名函數交給future-call執行,future-call接受一個callable對象:

(defn future-call 

  [^callable f]

  (let [fut (.submit clojure.lang.agent/soloexecutor f)]

    (reify 

     clojure.lang.ideref 

      (deref [_] (.get fut))

     java.util.concurrent.future

      (get [_] (.get fut))

      (get [_ timeout unit] (.get fut timeout unit))

      (iscancelled [_] (.iscancelled fut))

      (isdone [_] (.isdone fut))

      (cancel [_ interrupt?] (.cancel fut interrupt?)))))i

将傳入的callable對象f送出給agent的soloexecuture

final public static executorservice soloexecutor = executors.newcachedthreadpool();

執行,傳回的future對象賦予fut,接下來是利用clojure 1.2引入的reify定義了一個匿名的資料類型,它有兩種protocol:clojure.lang.ideref和java.utill.concurrent.future。其中ideref定義了deref方法,而future則簡單地将一些方法委托給fut對象。protocol你可以了解成java中的接口,這裡就是類似多态調用的作用。

這裡有個地方值的學習的是,clojure定義了一個future宏,而不是直接讓使用者使用future-call,這符合使用宏的規則:避免匿名函數。因為如果讓使用者使用future-call,使用者需要将表達式包裝成匿名對象傳入,而提供一個宏就友善許多。

3、啟動線程的其他方法,在clojure中完全可以采用java的方式去啟動一個線程:

user=&gt; (.start (thread. #(println "hello")))

nil

hello

4、promise用于線程之間的協調通信,當一個promise的值還沒有設定的時候,你調用deref或者@想去解引用的時候将被阻塞:

user=&gt; (def mypromise (promise))

#'user/mypromise

user=&gt; @mypromise

在repl執行上述代碼将導緻repl被挂起,這是因為mypromise還沒有值,你直接調用了@mypromise去解引用導緻主線程阻塞。

如果在調用@宏之前先給promise設定一個值的話就不會阻塞:

user=&gt; (deliver mypromise 5)

#&lt;afn$ideref$db53459f@c0f1ec: 5&gt;

user=&gt; @mypromise               

5

通過調用deliver函數給mypromise傳遞了一個值,這使得後續的@mypromise直接傳回傳遞的值5。顯然promise可以用于不同線程之間的通信和協調。

5、promise的實作:promise的實作非常簡單,是基于countdownlatch做的實作,内部除了關聯一個countdownlatch還關聯一個atom用于存儲值:

(defn promise

  []

  (let [d (java.util.concurrent.countdownlatch. 1)

        v (atom nil)]

     clojure.lang.ideref

      (deref [_] (.await d) @v)

     clojure.lang.ifn

      (invoke [this x]

        (locking d

          (if (pos? (.getcount d))

            (do (reset! v x)

                (.countdown d)

                this)

            (throw (illegalstateexception. "multiple deliver calls to a promise"))))))))

d是一個countdownlatch,v是一個atom,一開始值是nil。傳回的promise對象也是通過reify定義的匿名資料類型,他也是有兩個protocol,一個是用于deref的ideref,簡單地調用d.await()阻塞等待;另一個是匿名函數,接受兩個參數,第一個是promise對象自身,第二個參數是傳入的值x,當d的count還大于0的請看下,設定v的值為x,否則抛出異常的多次deliver了。檢視下deliver函數,其實就是調用promise對象的匿名函數protocol:

(defn deliver

  {:added "1.1"}

  [promise val] (promise val))

文章轉自莊周夢蝶  ,原文釋出時間 2010-08-08