天天看點

Active Object 并發模式在 Java 中的應用

簡介:  Active Object 是并發程式設計實踐中典型的設計模式,Active Object 模式的核心是通過解耦合方法的調用與執行來提高程式的并發度。本文将從典型 Active Object 設計模式入手,從一個新的視角來探讨 Active Object 并發模式在 Java 中的應用。

本文主要從以下兩個方面進行闡述:

  • 使用 C++ 語言,來描述 Active Object 設計模式。

    Java 類庫對于這樣一個典型的模式做了很好的類庫層面的封裝,是以對于 Java 的開發者來說,很多關于該設計模式本身的東西被屏蔽掉了。本文試圖使用 Native C++ 語言,幫助讀者從本質上對 Active Object 設計模式有一個更全面的認識。

  • 結合 C++ 版本的 Active Object 設計模式,引領讀者對于 Active Object 設計模式在 Java 類庫中的支援,有一個更深刻的認識,幫助讀者正确并且有效地使用這些類庫。

預備知識

并發對象 (Concurrent Object)

在這裡,我們先定義一下,什麼是并發對象。不同于一般的對象,并發對象指的是該對象方法的調用與方法的執行不在同一個線程內,也即:該對象方 法被異步執行。這其實是在多線程程式設計環境下典型的計算特征,線程引入了異步。從另一個角度來看,并發對象其實是面向對象的概念應用于多線程計算環境下的産 物。

Active Object 設計模式 C++ 描述

我們将從以下幾個方面來讨論 Active Object 模式。

問題描述

我們都知道,基于多線程機制的并發程式設計模式可以極大提高應用的 QoS(Quality of Service)。典型的例子如,我們在開發伺服器端的應用時,通行的做法就是通過多線程機制并發地服務用戶端送出上來的請求,以期提高伺服器對用戶端的 反應度 (Responsiveness)。同時,相比于單線程的應用,并發的多線程應用相對要複雜得多。在多線程的計算環境裡,并發對象被所有的調用者線程所共 享。一般來說,并發對象的設計實作需要考慮下面的幾個重要因素:

  • 并發對象的任何一次的方法執行,不允許無限地或者長時間阻止其它方法的調用執行,進而影響應用的 QoS。
  • 由于并發對象被調用者線程所共享,其内部狀态必須保證是線程安全的,必須受限于某些線程同步限制,并且這些限制對調用者來說是透明的,不可見的。從調用者的角度來看,并發對象與普通對象沒有多大差別。
  • 并發對象的設計實作,應該能夠透明地利用底層平台所提供的并發機制。這樣做的好處是,當并發對象運作在諸如多核處理器這類底層硬體平台上時,我們的應用能夠充分挖掘底層平台帶來的并發優勢,以獲得足夠好的應用性能。

我們使用 Active Object 設計模式來解決這些問題。

Active Object 設計模式的本質是解耦合方法的調用 (Method invocation) 與方法的執行 (Method execution),方法調用發生在調用者線程上下文中,而方法的執行發生在獨立于調用者線程的 Active Object 線程上下文中。并且重要的一點是,該方法與其它普通的對象成員方法對于調用者來說,沒有什麼特别的不同。從運作時的角度來看,這裡涉及到兩類線程,一個是 調用者線程,另外一個是 Active Object 線程,我們會在下面更詳細地談到。

結構

在 Active Object 模式中,主要有以下幾種類型的參與者:

  • 代理 (Proxy) :代理是 Active Object 所定義的對于調用者的公共接口。運作時,代理運作在調用者線程的上下文中,負責把調用者的方法調用轉換成相應的方法請求 (Method Request),并将其插入相應的 Activation List,最後傳回給調用者 Future 對象。
  • 方法請求:方法請求定義了方法執行所需要的上下文資訊,諸如調用參數等。
  • Activation List:負責存儲所有由代理建立的,等待執行的方法請求。從運作時來看,Activation List 會被包括調用者線程及其 Active Object 線程并發存取通路,是以,Activation List 實作應當是線程安全的。
  • 排程者 (Scheduler):排程者運作在 Active Object 線程中,排程者來決定下一個執行的方法請求,而排程政策可以基于很多種标準,比如根據方法請求被插入的順序 FIFO 或者 LIFO,比如根據方法請求的優先級等等。
  • Servant: Servant 定義了 Active Object 的行為和狀态,它是 Proxy 所定義的接口的事實實作。
  • Future: 調用者調用 Proxy 所定義的方法,獲得 Future 對象。調用者可以從該 Future 對象獲得方法執行的最終結果。在真實的實作裡,Future 對象會保留一個私有的空間,用來存放 Servant 方法執行的結果。

執行序列圖

在 Active Object 設計模式中,在參與者之間将發生如下的協作過程:

  1. 方法請求的構造與排程。調用者調用 Proxy 的方法 method(),Proxy 建立相應的方法請求,把它傳給排程者 (Scheduler),排程者負責把該方法請求放入 Activation List 中。如果 method() 需要傳回執行結果,Proxy 傳回一個 Future 對象給調用者(圖 1 中步驟 1 到步驟 6)。
  2. 方法請求執行。排程者負責從 Activation List 隊列裡按照預先定義的規則拿出下一個可執行的方法請求,并把該請求綁定到相應 Servant 所定義的方法(圖 1 中步驟 7 到步驟 11)。
  3. 完成階段。儲存任何 Servant 方法執行的結果到 Future 對象中去(圖 1 中步驟 12)。重複第二步,排程者繼續輪詢 Activation List 隊列,看是否有下一個可執行的方法請求。

圖 1. Active Object Sequence Diagram.

Active Object 并發模式在 Java 中的應用

從圖 1 我們可以看到,步驟 1 到步驟 6 運作在調用者線程中,而步驟 7 到步驟 12 運作在 Active Object 的線程中。

實作

在本節中,我們給出 Active Object 的 C++ 示例實作。

調用者調用 Proxy 的 get() 方法,從 Active Object 獲得 Message。我們可以假定,在真實的應用中, get() 方法的實作受制于某些慢速的 IO 操作,比如需要通過 TCP Socket 從遠端的機器獲得 Message, 然後傳回給調用者。是以我們使用 Active Object 來實作該應用,通過線程的并發達到提高應用的 QoS。

  1. 實作 Servant,如清單 1 所示:

    清單 1. MQ_Servant

    class MQ_Servant {
    public:
        // Constructor and destructor.
        MQ_Servant (size_t mq_size);
        virtual ~MQ_Servant ();
    
        // Message queue implementation operations.
        void put (const Message &msg);
        Message get ();
    
         // Predicates.
        bool empty () const;
        bool full () const;
    private:
        // Internal queue representation, e.g., a circular
        // array or a linked list, that does not use any
        // internal synchronization mechanism.
    };
          
    MQ_Servant 是真正的服務提供者,實作了 Proxy 中定義的方法。put() 和 get() 方法用來操作底層的隊列。另外,Servant 的實作是純粹的應用邏輯實作,或者稱為商業邏輯實作,沒有混合任何的線程同步機制 , 這有利于我們進行應用邏輯的重用,而不需要考慮不同的線程同步機制。
  2. 實作 Proxy,如清單 2 所示:

    清單 2. MQ_Proxy

    class MQ_Proxy {
    public:
        // Bound the message queue size.
        enum { MQ_MAX_SIZE = 100 };
        MQ_Proxy (size_t size = MQ_MAX_SIZE)
        :scheduler_ (size), 
        servant_ (size) { 
        }
    
        // Schedule <put> to execute on the active object.
        void put (const Message &msg) {
            Method_Request *mr = new Put(servant_,msg);
            scheduler_.insert (mr);
        }
    
        // Return a <Message_Future> as the "future" result of
        // an asynchronous <get> method on the active object.
        Message_Future get () {
            Message_Future result;
            Method_Request *mr = new Get (&servant_,result);
            scheduler_.insert (mr);
            return result;
        }
    
        // empty() and full() predicate implementations ...
    private:
        // The servant that implements the active object
        // methods and a scheduler for the message queue.
        MQ_Servant servant_;
        MQ_Scheduler scheduler_;
    };
          
    同一個程序中的多個調用者線程可以共享同一個 Proxy。
  3. 實作 Method Request,如清單 3 所示:

    清單 3. Method_Request

    class Method_Request {
    public:
        // Evaluate the synchronization constraint.
        virtual bool can_run () const = 0
        // Execute the method. 
        virtual void call () = 0;
    };
    // Inherites from Method_Request
    class Get : public Method_Request {
    public:
        Get (MQ_Servant *rep, const Message_Future &f)
        :servant_ (rep), 
        result_ (f) 
        { 
        }
        virtual bool can_run () const {
            // Synchronization constraint: cannot call a 
            // <get> method until queue is not empty.
            return !servant_->empty ();
        }
    
        virtual void call () {
            // Bind dequeued message to the future result.
            result_ = servant_->get ();
        }
    private:
        MQ_Servant *servant_;
        Message_Future result_;
    };
          
  4. 實作 Activation List,如清單 4 所示:

    清單 4. Activation_List

    class Activation_List {
    public:
        // Block for an "infinite" amount of time waiting
        // for <insert> and <remove> methods to complete.
        enum { INFINITE = -1 };
    
        // Define a "trait".
        typedef Activation_List_Iterator iterator;
    
        Activation_List ();
    
        // Insert <method_request> into the list, waiting up
        // to <timeout> amount of time for space to become
        // available in the queue. Throws the <System_Ex>
        // exception if <timeout> expires.
        void insert (Method_Request *method_request,Time_Value *timeout = 0);
    
        // Remove <method_request> from the list, waiting up
        // to <timeout> amount of time for a <method_request>
        // to be inserted into the list. Throws the
        // <System_Ex> exception if <timeout> expires.
        void remove (Method_Request *&method_request, Time_Value *timeout = 0);
    
    private:
        // Synchronization mechanisms, e.g., condition
        // variables and mutexes, and the queue implementation, 
             // e.g., an array or a linked list, go here.
    };
          
    Activation List 的實際上就是一個線程同步機制保護下的 Method Request 隊列,對該隊列的所有操作 (insert/remove) 都應該是線程安全的。從本質上講,Activation List 所基于的就是典型的生産者 / 消費者并發程式設計模型,調用者線程作為生産者把 Method Request 放入該隊列,Active Object 線程作為消費者從該隊列拿出 Method Request, 并執行。
  5. 實作 Scheduler,如清單 5 所示:

    清單 5. MQ_Scheduler

    class MQ_Scheduler {
    public:
        // Initialize the <Activation_List> and make <MQ_Scheduler>
        // run in its own thread of control.
             // we call this thread as Active Object thread.
        MQ_Scheduler ()
        : act_list_() {
            // Spawn separate thread to dispatch method requests.
            // The following call is leveraging the parallelism available on native OS
            // transparently
            Thread_Manager::instance ()->spawn (&svc_run,this);
        }
        // ... Other constructors/destructors, etc.
    
        // Put <Method_Request> into <Activation_List>. This
        // method runs in the thread of its client,i.e.
        // in the proxy's thread.
        void insert (Method_Request *mr) {
            act_list_.insert (mr);
        }
    
        // Dispatch the method requests on their servant
        // in its scheduler's thread of control.
        virtual void dispatch () {
            // Iterate continuously in a separate thread(Active Object thread).
            for (;;) {
                Activation_List::iterator request;
                // The iterator's <begin> method blocks
                // when the <Activation_List> is empty.
                for(request = act_list_.begin (); request != act_list_.end ();++request){
                    // Select a method request whose
                    // guard evaluates to true.
                    if ((*request).can_run ()) {
                        // Take <request> off the list.
                        act_list_.remove (*request);
                        (*request).call () ;
                        delete *request;
                    }
    
                    // Other scheduling activities can go here,
                    // e.g., to handle when no <Method_Request>s
                    // in the <Activation_List> have <can_run>
                    // methods that evaluate to true.
    
                }
    
            }
        }
    
    private:
        // List of pending Method_Requests.
        Activation_List act_list_;
    
        // Entry point into the new thread.
        static void *svc_run (void *arg) {
            MQ_Scheduler *this_obj =    static_cast<MQ_Scheduler *> (args);
            this_obj->dispatch ();
        }
    };
          
  6. 實作 Future,如清單 6 所示:

    清單 6. Message_Future

    class Message_Future {
    public:
        // Initializes <Message_Future> to
        // point to <message> immediately.
        Message_Future (const Message &message);
    
        //Other implementatio……
    
        // Block upto <timeout> time waiting to obtain result
        // of an asynchronous method invocation. Throws
        // <System_Ex> exception if <timeout> expires.
        Message result (Time_Value *timeout = 0) const;
    private:
        //members definition here……
    };
          
    事實上,對于調用者來說,可以通過以下的方式從 Future 對象獲得真實的執行結果 Message:
    • 同步等待。調用者調用 Future 對象的 result() 方法同步等待,直到後端的 Servant 相應方法執行結束,并把結果存儲到了 Future 對象中來,result 傳回,調用者獲得 Message。
    • 同步逾時等待。調用者調用 Future 對象的 result(timeout) 方法。如果過了 timeout 時間之後,後端的 Servant 相應方法執行仍未結束,則調用失敗,否則,調用者線程被喚醒,result 方法傳回,調用者獲得 Message。
    • 異步查詢。調用者可以通過調用 Future 對象定義的查詢方法 ( 清單 6 沒有提供相應的定義 ),檢視真實的結果是否準備好了,如果準備好了,調用 result 方法,直接獲得 Message。

    清單 7 是使用該 Active Object 的示例。

    清單 7. Active Object 使用

    MQ_Proxy message_queue;
    
    //Optioin 1. Obtain future and block thread until message arrives.
    Message_Future future = message_queue.get();
    Message msg = future.result();
    //Handle received message here
    handle(msg);
    
    //2. Obtain a future (does not block the client).
    Message_Future future = message_queue.get ();
    
    //The current thread is not blocked, do something else here...
    //Evaluate future and block if result is not available.
    Message msg = future.result ();
    //Handle received message here
    handle(msg);
          
    從清單 7 可以看到,MQ_Proxy 對于調用者而言,和一個普通的 C++ 定義的對象并沒有差別,并發的實作細節已經被隐藏。

Java 對 Active Object 支援

Java JDK 1.3 引入了 java.util.Timer 和 java.util.TimerTask,提供了對 timer-based 并發任務支援,Timer 和 TimerTask 可以看作是 Active Object 設計模式在 Java 中的實作。不過,在這裡我們不打算過多讨論 Timer 及其 TimerTask。由于 Timer 和 TimerTask 的缺陷性,例如 Timer 使用單線程執行 TimerTask 導緻的 Task 排程時間的不精确性等問題。從 Java1.5 開始,Java 建議使用 ScheduledThreadPoolExecutor 作為 Timer 的替代。

在這裡,我們讨論一下自 Java1.5 引入的 Executor Framework。Java1.5 的 Executor Framework 可以看作是 Active Object 設計模式在 Java 中的展現。不過 Java 的 Executor Framework 極大地簡化了我們前面所讨論的 Active Object 所定義的模式。

Java 的 Executor Framework 是一套靈活強大的異步任務執行架構,它提供了标準的方式解耦合任務的送出與任務的執行。Java Executor 架構中的任務指的是實作了 Runnable 或者 Callable 接口的對象。Executor 的示例用法如清單 8 所示:

清單 8. Java Executor 示例代碼

public class TaskExecutionTcpServer {
    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
                public void handleRequest(Socket connection) {
                    // Handle the incoming socket connection from                         
                    //individual client.
                }
            };
            exec.execute(task);
        }
    }
}
      

在示例 8 中,我們建立了一個基于線程池的 Java Executor, 每當新的 TCP 連接配接進來的時候,我們就配置設定一個獨立的實作了 Runnable 任務來處理該連接配接,所有這些任務運作在我們建立的有 100 個線程的線程池上。

我們可以從 Active Object 設計模式的角度來審視一下 Java Executor 架構。Java Executor 架構以任務 (Task) 為中心,簡化了 Active Object 中的角色分工。可以看到,實作 Runnable 或者 Callable 接口的 Java Executor 任務整合了 Method Request 和 Servant 的角色 , 通過實作 run() 或者 call() 方法實作應用邏輯。Java Executor 架構并沒有顯式地定義 Proxy 接口,而是直接調用 Executor 送出任務,這裡的 Executor 相當于 Active Object 中排程者角色。從調用者的角度來看,這看起來并不像是在調用一個普通對象方法,而是向 Executor 送出了一個任務。是以,在這個層面上說,并發的底層細節已經暴露給了調用者。對于 Java 的開發者來說,如果你不擔心這樣的底層并發細節直接暴露給調用者,或者說你的應用并不需要像對待普通對象一樣對待并發對象,Java 的 Executor 架構是一個很好的選擇。相反,如果你希望隐藏這樣的并發細節,希望像操縱普通對象一樣操縱并發對象,那你就需要如本文上節所描述的那樣,遵循 Active Object 設計原則 , 清晰地定義各個角色,實作自己的 Active Object 模式。

總而言之,Java Executor 架構簡化了 Active Object 所定義的模式,模糊了 Active Object 中角色的分工,其基于生産者 / 消費者模式,生産者和消費者基于任務互相協作。

總結

最後,我們讨論一下 Active Object 設計模式的優缺點。

Active Object 給我們的應用帶來的好處:

  • 極大提高了應用的并發性以及簡化了線程同步帶來的複雜性。并發性的提高得益于調用者線程與 Active Object 線程的并發執行。簡化的線程同步複雜性主要表現在所有線程同步細節封裝在排程者内 ( 也就是 Java 的 Executor 對象 ),Active Object 調用者并不需要關心。
  • 在 Active Object 中,方法的執行順序可以不同于方法的調用順序。用 Java 的話說,也就是任務執行的順序可以不同于任務送出的順序。在一定情況下,這可以幫助優化我們應用的性能,提高應用的 QoS 及其 Responsiveness。在 Java Executor 架構下,你可以根據目前的計算資源,确定優化的執行政策 (Execution Policy),該執行政策的内容包括:任務将配置設定在多少線程上執行,以什麼順序執行,多少任務可以同時執行等等。

當然,Active Object 也有缺點:

  • 額外的性能開銷。這涉及到從調用者線程到 Active Object 線程的上下文切換,線程同步,額外的記憶體拷貝等。
  • 難于調試。Active Object 引入了方法的異步執行,從調試者的角度看,調試這樣的方法調用不像普通方法那樣直截了當,并且這其中涉及到了線程的排程,同步等。

參考資料

學習

  • “JDK 5.0 中的并發 ”(developerWorks,2004 年 12 月):本教程将介紹 JDK 5.0 提供的用于并發的新實用程式類,并通過與現有并發原語(synchronized、wait() 和 notify())相比較,說明這些類如何提高了可伸縮性。
  • “Java 多線程與并發程式設計專題 ” (developerWorks,2008 年 6 月):Java 平台提供了一套廣泛而功能強大的 API、工具和技術。其中,内建支援線程是它的一個強大的功能。這一功能為使用 Java 程式設計語言的程式員提供了并發程式設計這一誘人但同時也非常具有挑戰性的選擇。本專題彙集了與 Java 多線程與并發程式設計相關的文章和教程,幫助讀者了解 Java 并發程式設計的模式及其利弊,向讀者展示了如何更精确地使用 Java 平台的線程模型。
  • “Java 設計模式與模組化專題 ”(developerWorks,2008 年 1 月):本專題為 Java 軟體工程師們提供了面向 Java 的設計模式和模組化方面相關的文章和教程。幫助讀者了解、學習作為專業軟體工程師必需掌握的設計模式與模組化技術。
  • 技術書店 :浏覽關于這些和其他技術主題的圖書。
  • developerWorks Java 技術專區 :數百篇關于 Java 程式設計各個方面的文章。

原文:http://www.ibm.com/developerworks/cn/java/j-lo-activeobject/index.html?ca=drs-