天天看點

Clojure的并發(四)Agent深入分析和Actor

<a href="http://www.blogjava.net/killme2008/archive/2010/07/archive/2010/07/14/326027.html">clojure 的并發(一) ref和stm</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/07/archive/2010/07/17/326362.html">clojure 的并發(二)write skew分析</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/07/archive/2010/07/17/326389.html">clojure 的并發(三)atom、緩存和性能</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/07/archive/2010/07/19/326540.html">clojure 的并發(四)agent深入分析和actor</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/07/archive/2010/07/23/326976.html">clojure 的并發(五)binding和let</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/07/30/327606.html">clojure的并發(六)agent可以改進的地方</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/04/327985.html">clojure的并發(七)pmap、pvalues和pcalls</a>

<a href="http://www.blogjava.net/killme2008/archive/2010/08/08/328230.html">clojure的并發(八)future、promise和線程</a>

四、 agent和actor

   除了用于協調同步的ref,獨立同步的ref,還有一類非常常見的需求:你可能希望狀态的更新是異步,你通常不關心更新的結果,這時候你可以考慮下使用agent。

1、建立agent:

user=&gt; (def counter (agent 0))

#'user/counter

user=&gt; counter

#&lt;agent@9444d1: 0&gt;

通過agent函數你就可以建立一個agent,指向一個不可變的初始狀态。

2、取agent的值,這跟ref和atom沒啥兩樣,都是通過deref或者@宏:

user=&gt; @counter

user=&gt; (deref counter)

3、更新agent,通過send或者send-off函數給agent發送任務去更新agent:

user=&gt; (send counter inc)

  send傳回agent對象,内部的值仍然是0,而非inc遞增之後的1,這是因為send是異步發送,更新是在另一個線程執行,兩個線程(repl主線程和更新任務的線程)的執行順序沒有同步,顯示什麼取決于兩者誰更快。更新肯定是發生了,檢視counter的值:

1

   果然更新到了1了。send的方法簽名:

(send a f &amp; args)

   其中f是更新的函數,它的定義如下:

(f state-of-agent &amp; args)

   也就是它會在第一個參數接收目前agent的狀态,而args是send附帶的參數。

   還有個方法,send-off,它的作用于send類似:

user=&gt; (send-off counter inc)

#&lt;agent@9444d1: 1&gt;

2

   send和send-off的差別在于,send是将任務交給一個固定大小的線程池執行

final public static executorservice pooledexecutor =

        executors.newfixedthreadpool(2 + runtime.getruntime().availableprocessors());

   預設線程池大小是cpu核數加上2。是以send執行的任務最好不要有阻塞的操作。而send-off則使用沒有大小限制(取決于記憶體)的線程池:

final public static executorservice soloexecutor = executors.newcachedthreadpool();

   是以,send-off比較适合任務有阻塞的操作,如io讀寫之類。請注意,所有的agent是共用這些線程池,這從這些線程池的定義看出來,都是靜态變量。

4、異步轉同步,剛才提到send和send-off都是異步将任務送出給線程池去處理,如果你希望同步等待結果傳回,那麼可以使用await函數:

 (do (send counter inc) (await counter) (println @counter))

send一個任務之後,調用await等待agent所有派發的更新任務結束,然後列印agent的值。await是阻塞目前線程,直到至今為止所有任務派發執行完畢才傳回。await沒有逾時,會一直等待直到條件滿足,await-for則可以接受等待的逾時時間,如果超過指定時間沒有傳回,則傳回nil,否則傳回結果。

 (do (send counter inc) (await-for 100 counter) (println @counter))

await-for接受的機關是毫秒。

5、錯誤處理

   agent也可以跟ref和atom一樣設定validator,用于限制驗證。由于agent的更新是異步的,你不知道更新的時候agent是否發生異常,隻有等到你去取值或者更新的時候才能發現:

user=&gt; (def counter (agent 0 :validator number?))

user=&gt; (send counter (fn[_] "foo"))

#&lt;clojure.lang.agent@4de8ce62: 0&gt;

   強制要求counter的值是數值類型,第二個表達式我們給counter發送了一個更新任務,想将狀态更新為字元串"foo",由于是異步更新,傳回的結果可能沒有顯示異常,當你取值的時候,問題出現了:

java.lang.exception: agent has errors (no_source_file:0)

  告訴你agent處于不正常的狀态,如果你想擷取詳細資訊,可以通過agent-errors函數:

user=&gt; (.printstacktrace (agent-errors counter))

java.lang.illegalargumentexception: no matching field found: printstacktrace for class clojure.lang.persistentlist (no_source_file:0)

   你可以恢複agent到前一個正常的狀态,通過clear-agent-errors函數:

user=&gt; (clear-agent-errors counter)

nil

6、加入事務

agent跟atom不一樣,agent可以加入事務,在事務裡調用send發送一個任務,當事務成功的時候該任務将隻會被發送一次,最多最少都一次。利用這個特性,我們可以實作在事務操作的時候寫檔案,達到acid中的d——持久性的目的:

(def backup-agent (agent "output/messages-backup.clj" ))

(def messages (ref []))

(use '[clojure.contrib.duck-streams :only (spit)])

(defn add-message-with-backup [msg]

       (dosync

           (let [snapshot (commute messages conj msg)]

                (send-off backup-agent (fn [filename]

                                        (spit filename snapshot)

                                        filename))

           snapshot)))

定義了一個backup-agent用于儲存消息,add-message-with-backup函數首先将狀态儲存到messages,這是個普通的ref,然後調用send-off給backup-agent一個任務:

 (fn [filename]

          (spit filename snapshot)

         filename)

這個任務是一個匿名函數,它利用spit打開檔案,寫入目前的快照,并且關閉檔案,檔案名來自backup-agent的狀态值。注意到,我們是用send-off,send-off利用cache線程池,哪怕阻塞也沒關系。

利用事務加上一個backup-agent可以實作類似資料庫的acid,但是還是不同的,主要差別在于backup-agent的更新是異步,并不保證一定寫入檔案,是以持久性也沒辦法得到保證。

7、關閉線程池:

前面提到agent的更新都是交給線程池去處理,在系統關閉的時候你需要關閉這兩個線程吃,通過shutdown-agents方法,你再添加任務将被拒絕:

user=&gt; (shutdown-agents)

java.util.concurrent.rejectedexecutionexception (no_source_file:0)

user=&gt; (send counter inc)    

哪怕我重新建立了counter,送出任務仍然被拒絕,進一步證明這些線程池是全局共享的。

8、原理淺析

前文其實已經将agent的實作原理大體都說了,agent本身隻是個普通的java對象,它的内部維持一個狀态和一個隊列:

    volatile object state;

    atomicreference&lt;ipersistentstack&gt; q = new atomicreference(persistentqueue.empty);

任務送出的時候,是封裝成action對象,添加到此隊列

    public object dispatch(ifn fn, iseq args, boolean solo) {

        if (errors != null) {

            throw new runtimeexception("agent has errors", (exception) rt.first(errors));

        }

        //封裝成action對象

        action action = new action(this, fn, args, solo);

        dispatchaction(action);

        return this;

    }

    static void dispatchaction(action action) {

        lockingtransaction trans = lockingtransaction.getrunning();

        // 有事務,加入事務

        if (trans != null)

            trans.enqueue(action);

        else if (nested.get() != null) {

            nested.set(nested.get().cons(action));

        else {

            // 入隊

            action.agent.enqueue(action);

send和send-off都是調用agent的dispatch方法,隻是兩者的參數不一樣,dispatch的第二個參數 solo決定了是使用哪個線程池處理action:

(defn send

  [#^clojure.lang.agent a f &amp; args]

    (. a (dispatch f args false)))

(defn send-off

    (. a (dispatch f args true)))

send-off将solo設定為true,當為true的時候使用cache線程池:

   final public static executorservice soloexecutor = executors.newcachedthreadpool();

    final static threadlocal&lt;ipersistentvector&gt; nested = new threadlocal&lt;ipersistentvector&gt;();

        void execute() {

            if (solo)

                soloexecutor.execute(this);

            else

                pooledexecutor.execute(this);

執行的時候調用更新函數并設定新的狀态:

try {

                    object oldval = action.agent.state;

                    object newval = action.fn.applyto(rt.cons(action.agent.state, action.args));

                    action.agent.setstate(newval);

                    action.agent.notifywatches(oldval, newval);

                }

                catch (throwable e) {

                    // todo report/callback

                    action.agent.errors = rt.cons(e, action.agent.errors);

                    haderror = true;

9、跟actor的比較:

agent跟actor有一個顯著的不同,agent的action來自于别人發送的任務附帶的更新函數,而actor的action則是自身邏輯的一部分。是以,如果想用agent實作actor模型還是相當困難的,下面是我的一個嘗試:

(ns actor)

(defn receive [&amp; args]

   (apply hash-map args))

(defn self [] *agent*)

(defn spawn [recv-map]

    (agent recv-map))

(defn ! [actor msg]

    (send actor #(apply (get %1 %2)  (vector %2)) msg))

;;啟動一個actor

(def actor (spawn 

             (receive :hello #(println "receive "%))))

;;發送消息 hello

(! actor :hello)

   利用spawn啟動一個actor,其實本質上是一個agent,而發送通過感歎号!,給agent發送一個更新任務,它從recv-map中查找消息對應的處理函數并将消息作為參數來執行。難點在于消息比對,比對這種簡單類型的消息沒有問題,但是如果比對用到變量,暫時沒有想到好的思路實作,例如實作兩個actor的ping/pong。

文章轉自莊周夢蝶  ,原文釋出時間2010-07-19