<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/14/326027.html">clojure 的并發(一) ref和stm</a>
<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/17/326362.html">clojure 的并發(二)write skew分析</a>
<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/17/326389.html">clojure 的并發(三)atom、緩存和性能</a>
<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/19/326540.html">clojure 的并發(四)agent深入分析和actor</a>
<a href="http://www.blogjava.net/killme2008/archive/2010/08/archive/2010/07/23/326976.html">clojure 的并發(五)binding和let</a>
<a href="http://www.blogjava.net/killme2008/archive/2010/07/30/327606.html">clojure的并發(六)agent可以改進的地方</a>
<a href="http://www.blogjava.net/killme2008/archive/2010/08/04/327985.html">clojure的并發(七)pmap、pvalues和pcalls</a>
八、future、promise和線程
1、clojure中使用future是啟動一個線程,并執行一系列的表達式,當執行完成的時候,線程會被回收:
user=> (def myfuture (future (+ 1 2)))
#'user/myfuture
user=> @myfuture
3
future接受一個或者多個表達式,并将這些表達式交給一個線程去處理,上面的(+ 1 2)是在另一個線程計算的,傳回的future對象可以通過deref或者@宏來阻塞擷取計算的結果。
future函數傳回的結果可以認為是一個類似java.util.concurrent.future的對象,是以可以取消:
user=> (future-cancelled? myfuture)
false
user=> (future-cancel myfuture)
也可以通過謂詞future?來判斷一個變量是否是future對象:
user=> (future? myfuture)
true
2、future的實作,future其實是一個宏,它内部是調用future-call函數來執行的:
(defmacro future
[& body] `(future-call (fn [] ~@body)))
可以看到,是将body包裝成一個匿名函數交給future-call執行,future-call接受一個callable對象:
(defn future-call
[^callable f]
(let [fut (.submit clojure.lang.agent/soloexecutor f)]
(reify
clojure.lang.ideref
(deref [_] (.get fut))
java.util.concurrent.future
(get [_] (.get fut))
(get [_ timeout unit] (.get fut timeout unit))
(iscancelled [_] (.iscancelled fut))
(isdone [_] (.isdone fut))
(cancel [_ interrupt?] (.cancel fut interrupt?)))))i
将傳入的callable對象f送出給agent的soloexecuture
final public static executorservice soloexecutor = executors.newcachedthreadpool();
執行,傳回的future對象賦予fut,接下來是利用clojure 1.2引入的reify定義了一個匿名的資料類型,它有兩種protocol:clojure.lang.ideref和java.utill.concurrent.future。其中ideref定義了deref方法,而future則簡單地将一些方法委托給fut對象。protocol你可以了解成java中的接口,這裡就是類似多态調用的作用。
這裡有個地方值的學習的是,clojure定義了一個future宏,而不是直接讓使用者使用future-call,這符合使用宏的規則:避免匿名函數。因為如果讓使用者使用future-call,使用者需要将表達式包裝成匿名對象傳入,而提供一個宏就友善許多。
3、啟動線程的其他方法,在clojure中完全可以采用java的方式去啟動一個線程:
user=> (.start (thread. #(println "hello")))
nil
hello
4、promise用于線程之間的協調通信,當一個promise的值還沒有設定的時候,你調用deref或者@想去解引用的時候将被阻塞:
user=> (def mypromise (promise))
#'user/mypromise
user=> @mypromise
在repl執行上述代碼将導緻repl被挂起,這是因為mypromise還沒有值,你直接調用了@mypromise去解引用導緻主線程阻塞。
如果在調用@宏之前先給promise設定一個值的話就不會阻塞:
user=> (deliver mypromise 5)
#<afn$ideref$db53459f@c0f1ec: 5>
user=> @mypromise
5
通過調用deliver函數給mypromise傳遞了一個值,這使得後續的@mypromise直接傳回傳遞的值5。顯然promise可以用于不同線程之間的通信和協調。
5、promise的實作:promise的實作非常簡單,是基于countdownlatch做的實作,内部除了關聯一個countdownlatch還關聯一個atom用于存儲值:
(defn promise
[]
(let [d (java.util.concurrent.countdownlatch. 1)
v (atom nil)]
clojure.lang.ideref
(deref [_] (.await d) @v)
clojure.lang.ifn
(invoke [this x]
(locking d
(if (pos? (.getcount d))
(do (reset! v x)
(.countdown d)
this)
(throw (illegalstateexception. "multiple deliver calls to a promise"))))))))
d是一個countdownlatch,v是一個atom,一開始值是nil。傳回的promise對象也是通過reify定義的匿名資料類型,他也是有兩個protocol,一個是用于deref的ideref,簡單地調用d.await()阻塞等待;另一個是匿名函數,接受兩個參數,第一個是promise對象自身,第二個參數是傳入的值x,當d的count還大于0的請看下,設定v的值為x,否則抛出異常的多次deliver了。檢視下deliver函數,其實就是調用promise對象的匿名函數protocol:
(defn deliver
{:added "1.1"}
[promise val] (promise val))
文章轉自莊周夢蝶 ,原文釋出時間 2010-08-08