天天看點

ACE的架構及其核心

ACE設計架構和基礎子產品的關聯

*一、案例描述

視訊電警開發,是基于ACE架構上的一次重複開發,本文檔拟對ACE架構做一個梳理,以期對他人進行基于ace的開發有所幫助。

*二、系統安裝

ACE的安裝是一件比較麻煩的事情,這裡簡單的記錄了我在VS2005下安裝ACE的過程,希望能給大家一個參考。

安裝環境:

l 作業系統:Windows XP 專業版

l 編譯環境:VS2005中文版

l ACE版本:ACE-5.5.1

安裝過程:

a) 下載下傳安裝包。Ace的安裝檔案可以在​​http://download.dre.vanderbilt.edu/​​中下載下傳到,由于我是在windows環境下安裝并且不需要TAO等其它庫,便下載下傳了ACE-5.5.1.zip。

b) 下載下傳完成後将其解壓。我的解壓路徑為D:\Develop\ACE_wrappers。

c) 設定環境變量

d) 在作業系統添加一個名為ACE_ROOT的使用者環境變量,值為剛才ace的解壓路徑D:\Develop\ACE_wrappers。

e) 添加使用者的Path環境變量,值為%ACE_ROOT%\lib,這樣才能保證系統能找到ace生成的動态連接配接庫。

f) 設定VS2005的C++開發項目資訊,依次打開菜單 工具-選項-項目和解決方案-VC++目錄 ,在右側目錄清單中選擇"包含目錄",添加$(ACE_ROOT),在右側目錄清單中選擇"庫檔案",添加$(ACE_ROOT)\lib。

g) 編譯ACE,在ACE_ROOT\ace目錄建立一個名為 config.h的檔案。編輯檔案并加入以下内容:

#define ACE_HAS_STANDARD_CPP_LIBRARY 1

#include "ace/config-win32.h"

其中第一行是因為我想用标準C++跨平台,第二行則是必須要的,表明目前是在win32的環境下進行ace的項目。

h) 進入ACE_ROOT\ace目錄中,能發現ACE現在已經帶VS2005的編譯項目了,直接打開ace_vc8.sln,直接生成ACE項目的Debug版和Release版,編譯過程還比較快,大概就幾分鐘的樣子。編譯連結完成後,在ACE_ROOT\lib中一共生成了四個檔案,分别是"ACE.dll","ACE.lib", "ACEd.dll","ACEd.lib",其中帶"d"表示的是Debug版本。

i) 檢驗 ACE

j) 打開VS2005,建立一個空項目,将ACE程式員手冊中的第一個程式拷入其中。

k) 配置屬性->連結器->正常->附加依賴項,添入ACEd.lib。

l) 編譯,如果不出意外的話就能看到你的ace版的" hello world"啦。

注意:

ACE項目的字元集設定是"未設定",而VS2005的c++項目預設字元集是"使用 Unicode 字 符集",如果用到了ACE連結庫時需要将字元集改為"未設定"(在"項目屬性->配置屬性->項目預設值->字元集"中配置),否則可能出現連結錯誤。

至此,ACE的安裝工作便算完成.下面是完成unicode編譯的ACE設定:

*三、ACE的使用及其核心子產品講解等

下面為本人在使用ACE中遇到的一些問題的彙總,隻介紹了大體的思路,具體的細節還需進佐證。

1. ACE配置子產品的使用

就一個正常項目而言,一個配置檔案是必不可少的,那就先從這裡入手了。linux/unix 程式可能經常用到指令行方式,不過我還是比較喜歡windows 的 ini 格式的,當然,有xml 的更好,不過 ACE 裡暫時沒有提供。配置檔案的使用很簡單,ACE 提供的類也很友好。代碼如下:

​​

ACE的架構及其核心

​​

2. ACE的互斥管理機制

2.1、ACE Lock類屬

鎖類屬包含的類包裝簡單的鎖定機制,比如互斥體、信号量、讀/寫互斥體和令牌等。這裡我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進行一些簡單的說明。

互斥體的使用

互斥體用于保護共享的易變代碼,也就是全局或靜态資料。這樣的資料必須通過互斥體進行保護,以防止它們在多個線程同時通路時損壞。在ACE中可以通過ACE_Thread_Mutex實作線程的通路互斥,下面的例子示範ACE_Thread_Mutex類的使用。

#include "ace/Thread.h"

#include "ace/Synch.h"

#include <iostream>

using namespace std;

ACE_Thread_Mutex mutex;

void* Thread1(void *arg)

{

    mutex.acquire();

    ACE_OS::sleep(3);

    cout<<endl<<"hello thread1"<<endl;

    mutex.release();

return NULL;

}

void* Thread2(void *arg)

{

    mutex.acquire();

    cout<<endl<<"hello thread2"<<endl;

    mutex.release();

return NULL;

}

int main(int argc, char *argv[])

{

    ACE_Thread::spawn((ACE_THR_FUNC)Thread1);

//Thread2 比Thread1晚建立1秒鐘,故後嘗試擷取互斥體

    ACE_OS::sleep(1);

    ACE_Thread::spawn((ACE_THR_FUNC)Thread2);

while(true)

        ACE_OS::sleep(10);    return 0;

}

ACE_Thread_Mutex主要有兩個方法:

acquire():用來擷取互斥體,如果無法擷取,将阻塞至擷取到為止。

release():用來釋放互斥體,進而使自己或者其它線程能夠擷取互斥體。

當線程要通路共享資源時,首先調用acquire()方法擷取互斥體,進而擷取對改互斥體所保護的共享資源的唯一通路權限,通路結束時調用釋放互斥體,使得其它線程能擷取共享資源的通路權限。

在此例中,本來Thread2的列印消息在Thread1之前,但由于Thread1先獲得互斥體,故Thread2隻有待Thread1結束後才能進入臨界區。讀者朋友們可以通過将ACE_Thread_Mutex替換為ACE_NULL_Mutex看一下不加鎖的執行結果。

ACE Lock類屬簡介,清單如下:

名字

描述

ACE_Mutex

封裝互斥機制(根據平台,可以是mutex_t、pthread_mutex_t等等)的包裝類,用于提供簡單而有效的機制來使對共享資源的通路序列化。它與二進制信号量(binary semaphore)的功能相類似。可被用于線程和程序間的互斥。

ACE_Thread_Mutex

可用于替換ACE_Mutex,專用于線程同步。

ACE_Process_Mutex

可用于替換ACE_Mutex,專用于程序同步。

ACE_NULL_Mutex

提供了ACE_Mutex接口的"無為"(do-nothing)實作,可在不需要同步時用作替換。

ACE_RW_Mutex

封裝讀者/作者鎖的包裝類。它們是分别為讀和寫進行擷取的鎖,在沒有作者在寫的時候,多個讀者可以同時進行讀取。

ACE_RW_Thread_Mutex

可用于替換ACE_RW_Mutex,專用于線程同步。

ACE_RW_Process_Mutex

可用于替換ACE_RW_Mutex,專用于程序同步。

ACE_Semaphore

這些類實作計數信号量,在有固定數量的線程可以同時通路一個資源時很有用。在OS不提供這種同步機制的情況下,可通過互斥體來進行模拟。

ACE_Thread_Semaphore

應被用于替換ACE_Semaphore,專用于線程同步。

ACE_Process_Semaphore

應被用于替換ACE_Semaphore,專用于程序同步。

ACE_Token

提供"遞歸互斥體"(recursive mutex),也就是,目前持有某令牌的線程可以多次重新擷取它,而不會阻塞。而且,當令牌被釋放時,它確定下一個正阻塞并等待此令牌的線程就是下一個被放行的線程。

ACE_Null_Token

令牌接口的"無為"(do-nothing)實作,在你知道不會出現多個線程時使用。

ACE_Lock

定義鎖定接口的接口類。一個純虛類,如果使用的話,必須承受虛函數調用開銷。

ACE_Lock_Adapter

基于模闆的擴充卡,允許将前面提到的任意一種鎖定機制适配到ACE_Lock接口。

可以簡單的分為以下幾類:

· 互斥鎖

互斥鎖(通常稱為"互斥體"或"二進制信号量")用于保護多線程控制并發通路的共享資源的完整性。互斥體通過定義臨界區來序列化多線程控制的執行,在臨界區中每一時刻隻有一個線程在執行它的代碼。互斥體簡單而高效(時間和空間)。

ACE線程庫提供了Mutex式的類(是一組互斥體對象,擁有類似的接口),他是一種簡單而高效的類型是"非遞歸"互斥體。非遞歸互斥體不允許目前擁有互斥體的線程在釋放它之前重新擷取它。否則,将會立即發生死鎖。遞歸互斥體在ACE Recursive_Thread_Mutex類中可移植地實作。

· 讀者/作者鎖

讀者/作者鎖與互斥體相類似。例如,擷取讀者/作者鎖的線程也必須釋放它。多個線程可同時擷取一個讀者/作者鎖用于讀,但隻有一個線程可以擷取該鎖用于寫。當互斥體保護的資源用于讀遠比用于寫要頻繁時,讀者/作者互斥體有助于改善并發的執行。

ACE線程庫提供了一個叫作RW_Mutex的類,在C++封裝類中可移植地實作了讀者/作者鎖的語義。讀者/作者鎖将優先選擇權給作者。因而,如果有多個讀者和一個作者在鎖上等待,作者将會首先擷取它。

計數信号量

在概念上,計數信号量是可以原子地增減的整數。如果線程試圖減少一個值為零的信号量的值,它就會阻塞,直到另一個線程增加該信号量的值。

計數信号量用于追蹤共享程式狀态的變化。它們記錄某種特定事件的發生。因為信号量維護狀态,它們允許線程根據該狀态來作決定,即使事件是發生在過去。

信号量比互斥體效率要低,但是,它們要更為通用,因為它們無需被最初擷取它們的同一線程擷取和釋放。這使得它們能夠用于異步的執行上下文中(比如信号處理器)。ACE線程庫提供一個叫作Semaphore的類來可移植地在C++包裝類中實作信号量語義。

2.2、ACE Guard類屬

與C一級的互斥體API相比較,Mutex包裝為同步多線程控制提供了一種優雅的接口。但是,Mutex潛在地容易出錯,因為程式員有可能忘記調用release方法(當然,C級的互斥體API更容易出錯)。這可能由于程式員的疏忽或是C++異常的發生而發生,然而,其導緻及其嚴重的後果--死鎖。

是以,為改善應用的健壯性,ACE同步機制有效地利用C++類構造器和析構器的語義來確定Mutex鎖被自動擷取和釋放。

ACE提供了一個稱為Guard、Write_Guard和Read_Guard的類族,確定在進入和退出C++代碼塊時分别自動擷取和釋放鎖。

Guard類是最基本的守衛機制,定義可以簡化如下(實際定義比這相對要複雜而完善一點):

template <class LOCK>

class Guard

{

public:

    Guard (LOCK &l): lock_ (&l){ lock_.acquire (); }

    ˜Guard (void) {    lock_.release (); }

private:

    LOCK lock_;

}

Guard類的對象定義一"塊"代碼,在其上鎖被自動擷取,并在退出塊時自動釋放,即使是程式抛異常也能保證自動解鎖。這種機制也能為Mutex、RW_Mutex和Semaphore同步封裝工作。

對于讀寫鎖,由于加鎖接口不一樣,ace也提供了相應的Read_Guard和Write_Guard類,Read_Guard和Write_Guard類有着與Guard類相同的接口。但是,它們的acquire方法分别對鎖進行讀和寫。

預設地, Guard類構造器将會阻塞程式,直到鎖被擷取。會有這樣的情況,程式必須使用非阻塞的acquire調用(例如,防止死鎖)。是以,可以傳給ACE Guard的構造器第二個參數(請參看原始代碼,而不是我這裡的簡化代碼),訓示它使用鎖的try_acquire方法,而不是acquire。随後調用者可以使用Guard的locked方法來原子地測試實際上鎖是否已被擷取。

用Guard重寫上一節的Thread1方法如下(注釋了的部分是原有代碼):

void* Thread1(void *arg)

{

    ACE_Guard<ACE_Thread_Mutex> guard(mutex);

    //mutex.acquire();

    ACE_OS::sleep(3);

    cout<<endl<<"hello thread1"<<endl;

    //mutex.release();

    return NULL;

}

相比較而言,使用Guard更加簡潔,并且會自動解鎖,免除了一部分後顧之憂。

注意:

Guard隻能幫你自動加解鎖,并不能解決死鎖問題,特别是對于那些非遞歸的互斥體來說使用Guard尤其要注意防止死鎖。

Guard是在Guard變量析構時解鎖,如果在同一函數中兩次對同一互斥體變量使用Guard要注意其對象生命周期,否則容易造成死鎖。

2.3、ACE Condition類屬

ACE Condition類屬(條件變量)提供風格與互斥體、讀者/作者鎖和計數信号量不同的鎖定機制。當持有鎖的線程在臨界區執行代碼時,這三種機制讓協作線程進行等待。相反,條件變量通常被一個線程用于使自己等待,直到一個涉及共享資料的條件表達式到達特定的狀态。當另外的協作線程訓示共享資料的狀态已發生變化,排程器就喚醒一個在該條件變量上挂起的線程。于是新喚醒的線程重新對它的條件表達式進行求值,如果共享資料已到達合适狀态,就恢複處理。

ACE線程庫提供一個叫作Condition的類來可移植地在C++包裝類中實作條件變量語義。定義方式如下:

ACE_Thread_Mutex mutex;

ACE_Condition<ACE_Thread_Mutex> cond(mutex);

該對象有兩個常用方法。

signal()//向使用該條件變量的其它線程發送滿足條件信号。

wait()//查詢是否滿足條件,如果滿足,則繼續往下執行;如果不滿足條件,主線程就等待在此條件變量上。條件變量随即自動釋放互斥體,并使主線程進入睡眠。

條件變量總是與互斥體一起使用。這是一種可如下描述的一般模式:

while( expression NOT TRUE ) wait on condition variable;

條件變量不是用于互斥,往往用于線程間的協作,下面例子示範了通過條件變量實作線程協作。

#include "ace/Thread.h"

#include "ace/Synch.h"

#include <iostream>

using namespace std;

ACE_Thread_Mutex mutex;

ACE_Condition<ACE_Thread_Mutex> cond(mutex);

void* worker(void *arg)

{

    ACE_OS::sleep(2);        //保證eater線程的cond.wait()在worker線程的cond.signal()先執行

    mutex.acquire();

    ACE_OS::sleep(1);

    cout<<endl<<"produce"<<endl;

    cond.signal();

    mutex.release();

return NULL;

}

void* eater(void *arg)

{

    mutex.acquire();

    cond.wait();

    cout<<endl<<"eat"<<endl;

    mutex.release();

return NULL;

}

int main(int argc, char *argv[])

{

    ACE_Thread::spawn((ACE_THR_FUNC)worker);

    ACE_OS::sleep(1);

    ACE_Thread::spawn((ACE_THR_FUNC)eater);

while(true)

        ACE_OS::sleep(10);

return 0;

}

這個例子中,首先建立了一個生産者線程worker和一個消費者線程eater,消費者線程執行比生産者快,兩個線程不加限制并發執行會導緻先消費,後生産的情況(隻是加互斥鎖也不能很好的解決,以為無法保證生産者一定先獲得互斥體)。是以這裡通過條件變量的通知方式保證線程的順序執行:

a) 消費者線程擷取互斥體,等待條件滿足(生産者生産了食品)。同時釋放互斥體,進入休眠狀态。

b) 生産者擷取互斥體(雖然是消費者先擷取的互斥體,但消費者調用的wait函數會釋放消費者的互斥體),生産商品後,通過條件變量發送信号(調用signal函數)通知消費者生産完成,結束生産過程,釋放互斥體。

c) 消費者收到信号後,重新擷取互斥體,完成消費過程。

使用條件變量的注意事項:

l 條件變量必須和互斥體一起使用,也就是說使用前必須加鎖(調用互斥體acquire函數),使用完後需釋放互斥體。

條件變量中的wait()和signal()成對使用的話,必須保證wait()函數在signal()之前執行,這樣才能保證wait()能收到條件滿足通知,不至于一直等待下去,形成死鎖(worker線程中的第一句話就是起的這個作用)。

3. ACE的線程管理機制

2.1、ACE Lock類屬

不同的作業系統下用c++進行過多線程程式設計的朋友對那些線程處理的API可能深有體會,這些API提供了相同或是相似的功能,但是它們的API的差别卻極為懸殊。

ACE_Thread提供了對不同OS的線程調用的簡單包裝,通過一個通用的接口進行處理線程建立、挂起、取消和删除等問題。

一. 線程入口函數

所有線程必須從一個指定的函數開始執行,該函數稱為線程函數,它必須具有下列原型:

void* worker(void *arg) {}

該函數輸入一個void *型的參數,可以在建立線程時傳入。

注意:

所有的線程啟動函數(方法)必須是靜态的或全局的(就如同直接使用OS線程API時所要求的一樣)。

二.線程基本操作

1.建立一個線程

一個程序的主線程是由作業系統自動生成,如果你要讓一個主線程建立額外的線程,可以通過ACE_Thread::spawn()實作,該函數一般的使用方式如下:

    ACE_thread_t threadId;

    ACE_hthread_t threadHandle;

    ACE_Thread::spawn(

        (ACE_THR_FUNC)worker,        //線程執行函數

        NULL,                        //執行函數參數

        THR_JOINABLE | THR_NEW_LWP,

        &threadId,

        &threadHandle

        );

為了簡化,也可以使用其預設參數使用ACE_Thread::spawn((ACE_THR_FUNC)worker) 來建立一個worker的線程。

另外,ACE還提供了ACE_Thread::spawn_n函數來建立多個線程。

2.終止線程

線上程函數體中ACE_Thread::exit()調用即可終止線程執行。

3.設定線程的相對優先級

當一個線程被首次建立時,它的優先級等同于它所屬程序的優先級。一個線程的優先級是相對于其所屬的程序的優先級而言的。可以通過調用ACE_Thread::setprio函數改變線程的相對優先級,該函數的調用方式如下:

ACE_Thread::setprio(threadHandle,ACE_DEFAULT_THREAD_PRIORITY)

4.挂起及恢複線程

挂起線程可以通過來實作,它能暫停一個線程的執行,其調用方式如下ACE_Thread::suspend(threadHandle) 。

相應的,可以通過ACE_Thread::resume(threadHandle) 恢複被挂起的線程的執行。

5.等待線程結束

在主函數中調用ACE_Thread::join(threadHandle)可阻塞主函數,直道線程結束才能繼續執行。

6.停止線程

在主函數中調用ACE_Thread::cancel (threadHandle)可停止線程的執行(在Unix底下可以,而在windows下好像不起作用,有待檢驗)。

三.程式示例

下面例子示範了如何用ace建立一個線程。

#include "ace/Thread.h"

#include "ace/Synch.h"

#include <iostream>

using namespace std;

void* worker(void *arg)

{

    for(int i=0;i<10;i++)

    {

        ACE_OS::sleep(1);

        cout<<endl<<"hello world"<<endl;

    }

    return NULL;

}

int main(int argc, char *argv[])

{

    ACE_thread_t threadId;

    ACE_hthread_t threadHandle;

    ACE_Thread::spawn(

        (ACE_THR_FUNC)worker,        //線程執行函數

        NULL,                        //執行函數參數

        THR_JOINABLE | THR_NEW_LWP,

        &threadId,

        &threadHandle

        );

    ACE_Thread::join(threadHandle);

    return 0;

}

在這個簡單的例子中,建立了1個工作者線程,執行程式中定義的worker()函數。然後阻塞主函數,待線程結束後退出程式。

4. ACE的網絡通訊機制

4.1、TCP通訊

傳輸控制協定TCP(Transmission Control Protocol):TCP提供可靠的、面向連接配接的運輸服務,用于高可靠性資料的傳輸。TCP協定的可靠性是指保證每個tcp封包能按照發送順序到達用戶端。

Tcp通信過程一般為如下步驟:

a) 伺服器綁定端口,等待用戶端連接配接。

b) 用戶端通過伺服器的ip和伺服器綁定的端口連接配接伺服器。

c) 伺服器和用戶端通過網絡建立一條資料通路,通過這條資料通路進行資料互動。

常用API:

1. ACE_INET_Addr類。

ACE"位址"類ACE_Addr的子類,表示TCP/IP和UDP/IP的位址。它通常包含機器的ip和端口資訊,通過它可以定位到所通信的程序。

定義方式:

ACE_INET_Addr addInfo(3000,"192.168.1.100");

常用方法:

l get_host_name    擷取主機名

l get_ip_address    擷取ip位址

l get_port_number    擷取端口号

2. ACE_SOCK_Acceptor類。

服務期端使用,用于綁定端口和被動地接受連接配接。

常用方法:

l open 綁定端口

l accept建立和客戶段的連接配接

3. ACE_SOCK_Connector類。

用戶端使用,用于主動的建立和伺服器的連接配接。

常用方法:

l connect()    建立和服務期的連接配接。

4. ACE_SOCK_Stream類。

用戶端和伺服器都使用,表示客戶段和伺服器之間的資料通路。

常用方法:

l send ()    發送資料

l recv ()    接收資料

l close()    關閉連接配接(實際上就是斷開了socket連接配接)。

代碼示例:

下面例子示範了如何如何用ACE建立TCP通信的Server端。

#include "ace/SOCK_Acceptor.h"

#include "ace/SOCK_Stream.h"

#include "ace/INET_Addr.h"

#include "ace/OS.h"

#include <string>

#include <iostream>

using namespace std;

int main(int argc, char *argv[])

{

    ACE_INET_Addr port_to_listen(3000);        //綁定的端口

    ACE_SOCK_Acceptor acceptor;

    if (acceptor.open (port_to_listen, 1) == -1)     //綁定端口

    {

        cout<<endl<<"bind port fail"<<endl;

        return -1;

    }

    while(true)

    {

        ACE_SOCK_Stream peer;        //和用戶端的資料通路

        ACE_Time_Value timeout (10, 0);

        if (acceptor.accept (peer) != -1)    //建立和用戶端的連接配接

        {

            cout<<endl<<endl<<"client connect. "<<endl;

            char buffer[1024];

            ssize_t bytes_received;

            ACE_INET_Addr raddr;

            peer.get_local_addr(raddr);

            cout<<endl<<"local port\t"<<raddr.get_host_name()<<"\t"<<raddr.get_port_number()<<endl;

while ((bytes_received =

                peer.recv (buffer, sizeof(buffer))) != -1)    //讀取用戶端發送的資料

            {

                peer.send(buffer, bytes_received);    //對用戶端發資料

            }

            peer.close ();

        }

    }

    return 0;

}

這個例子實作的功能很簡單,伺服器端綁定3000号端口,等待一個用戶端的連接配接,然後将從用戶端讀取的資料再次轉發給用戶端,也就是實作了一個EchoServer的功能。

相應的用戶端程式也比較簡單,代碼如下:

#include <ace/SOCK_Stream.h>

#include <ace/SOCK_Connector.h>

#include <ace/INET_Addr.h>

#include <ace/Time_Value.h>

#include <string>

#include <iostream>

using namespace std;

int main(int argc, char *argv[])

{

    ACE_INET_Addr addr(3000,"127.0.0.1");

    ACE_SOCK_Connector connector;   

    ACE_Time_Value timeout(5,0);

    ACE_SOCK_Stream peer;

    if(connector.connect(peer,addr,&timeout) != 0)

    {

        cout<<"connection failed !"<<endl;

        return 1;

    }

    cout<<"conneced !"<<endl;

    string s="hello world";

    peer.send(s.c_str(),s.length());    //發送資料

    cout<<endl<<"send:\t"<<s<<endl;

    ssize_t bc=0;            //接收的位元組數

    char buf[1024];

    bc=peer.recv(buf,1024,&timeout);    //接收資料

    if(bc>=0)

    {

        buf[bc]='\0';

        cout<<endl<<"rev:\t"<<buf<<endl;

    }

    peer.close();

    return 0;

}

下表給出了伺服器端和用戶端的傳輸過程的比較:

操作

用戶端

伺服器端

初始化

不需要

調用acceptor.open()綁定端口

建立連接配接

調用connector.connect()方法

調用acceptor.accept()方法

傳輸資料

發送:調用peer.recv()方法

接收:調用peer.send()方法

關閉連接配接

調用peer.close()方法

4.2、UDP服務。

在ace中,通過ACE_SOCK_Dgram類提供udp通信服務,ACE_SOCK_Dgram和ACE_SOCK_Stream的API非常類似,一樣提供了send,recv及close等常用操作,這裡就不再累述了。

udp通信時無需像tcp那樣建立連接配接和關閉連接配接,tcp程式設計時需要通過accept和connect來建立連接配接,而udp通信省略了這一步驟,相對來說程式設計更為簡單。

由于udp通信時無建立連接配接,伺服器端不能像Tcp通信那樣在建立連接配接的時候就獲得用戶端的位址資訊,故伺服器端不能主動對用戶端發送資訊(不知道用戶端的位址),隻有等到收到用戶端發送的udp資訊時才能确定用戶端的位址資訊,進而進行通信。

udp通信過程如下:

l 伺服器端綁定一固定udp端口,等待接收用戶端的通信。

l 用戶端通過伺服器的ip和位址資訊直接對伺服器端發送消息。

l 伺服器端收到用戶端發送的消息後擷取用戶端的ip和端口資訊,通過該位址資訊和用戶端通信。

下面代碼為EchoServer的udp版:

//server.cpp

#include <ace/SOCK_Dgram.h>

#include <ace/INET_Addr.h>

#include <ace/Time_Value.h>

#include <string>

#include <iostream>

using namespace std;

int main(int argc, char *argv[])

{

    ACE_INET_Addr port_to_listen(3000);    //綁定的端口

    ACE_SOCK_Dgram peer(port_to_listen);    //通信通道    char buf[100];

    while(true)

    {

        ACE_INET_Addr remoteAddr;    //所連接配接的遠端位址

        int bc = peer.recv(buf,100,remoteAddr);    //接收消息,擷取遠端位址資訊

        if( bc != -1)

        {

            string s(buf,bc);

            cout<<endl<<"rev:\t"<<s<<endl;

        }

        peer.send(buf,bc,remoteAddr);    //和遠端位址通信

    }    return 0;

}

相應的用戶端程式如下:

//client.cpp

#include <ace/SOCK_Dgram.h>

#include <ace/INET_Addr.h>

#include <ace/Time_Value.h>

#include <string>

#include <iostream>

using namespace std;

int main(int argc, char *argv[])

{

    ACE_INET_Addr remoteAddr(3000,"127.0.0.1");    //所連接配接的遠端位址

    ACE_INET_Addr localAddr;    //本地位址資訊

    ACE_SOCK_Dgram peer(localAddr);    //通信通道

    peer.send("hello",5,remoteAddr);    //發送消息

    char buf[100];

    int bc = peer.recv(buf,100,remoteAddr);    //接收消息

    if( bc != -1)

    {

        string s(buf,bc);

        cout<<endl<<"rev:\t"<<s<<endl;

    }

    return 0;

}

和tcp程式設計相比,udp無需通過acceptor,connector來建立連接配接,故代碼相對tcp程式設計來說要簡單許多。另外,由于udp是一種無連接配接的通信方式,ACE_SOCK_Dgram的執行個體對象中無法儲存遠端位址資訊(儲存了本地位址資訊),故通信的時候需要加上遠端位址資訊。

5. ACE的設計模式

5.1、主動對象模式

主動對象模式用于降低方法執行和方法調用之間的耦合。該模式描述了另外一種更為透明的任務間通信方法。

傳統上,所有的對象都是被動的代碼段,對象中的代碼是在對它發出方法調用的線程中執行的,當方法被調用時,調用線程将阻塞,直至調用結束。而主動對象卻不一樣。這些對象具有自己的指令執行線程,主動對象的方法将在自己的執行線程中執行,不會阻塞調用方法。

例如,設想對象"A"已在你的程式的main()函數中被執行個體化。當你的程式啟動時,OS建立一個線程,以從main()函數開始執行。如果你調用對象A的任何方法,該線程将"流過"那個方法,并執行其中的代碼。一旦執行完成,該線程傳回調用該方法的點并繼續它的執行。但是,如果"A"是主動對象,事情就不是這樣了。在這種情況下,主線程不會被主動對象借用。相反,當"A"的方法被調用時,方法的執行發生在主動對象持有的線程中。另一種思考方法:如果調用的是被動對象的方法(正常對象),調用會阻塞(同步的);而另一方面,如果調用的是主動對象的方法,調用不會阻塞(異步的)。

由于主動對象的方法調用不會阻塞,這樣就提高了系統響應速度,在網絡程式設計中是大有用武之地的。

在這裡我們将一個"Logger"(日志記錄器)對象對象為例來介紹如何将一個傳統對象改造為主動對象,進而提高系統響應速度。

Logger的功能是将一些系統事件的記錄在存儲器上以備查詢,由于Logger使用慢速的I/O系統來記錄發送給它的消息,是以對Logger的操作将會導緻系統長時間的等待。

其功能代碼簡化如下:

class Logger: public ACE_Task<ACE_MT_SYNCH>

{

public:

    void LogMsg(const string& msg)

    {

        cout<<endl<<msg<<endl;

        ACE_OS::sleep(2);

    }

};

為了實作記錄日志操作的主動執行,我們需要用指令模式将其封裝,進而使得記錄日志的方法能在合适的時間和地方主動執行,封裝方式如下:

class LogMsgCmd: public ACE_Method_Object

{

public:

    LogMsgCmd(Logger *plog,const string& msg)

    {

        this->log=plog;

        this->msg=msg;

    }

    int call()

    {

        this->log->LogMsg(msg);

        return 0;

    }

private:

    Logger *log;

    string msg;

};

class Logger: public ACE_Task<ACE_MT_SYNCH>

{

public:

    void LogMsg(const string& msg)

    {

        cout<<endl<<msg<<endl;

        ACE_OS::sleep(2);

    }

    LogMsgCmd *LogMsgActive(const string& msg)

    {

        new LogMsgCmd(this,msg);

    }

};

這裡對代碼功能做一下簡單的說明:

ACE_Method_Object是ACE提供的指令模式借口,指令接口調用函數為int call(),在這裡通過它可以把每個記錄檔的調用封裝為一個LogMsgCmd對象,這樣,當原來需要調用LogMsg的方法的地方隻要調用LogMsgActive即可生成一個LogMsgCmd對象,由于調用LogMsgActive方法,隻是對指令進行了封裝,并沒有進行日志操作,是以該方法會立即傳回。然後再新開一個線程,将LogMsgCmd對象作為參數傳入,在該線程中執行LogMsgCmd對象的call方法,進而實作無阻塞調用。

然而,每次對一個LogMsg調用都開啟一個新線程,無疑是對資源的一種浪費,實際上我們往往将生成的LogMsgCmd對象插入一個指令隊列中,隻新開一個指令執行線程依次執行指令隊列中的所有指令。并且,為了實作對象的封裝,指令隊列和指令執行線程往往也封裝到Logger對象中,代碼如下所示:

#include "ace/OS.h"

#include "ace/Task.h"

#include "ace/Method_Object.h"

#include "ace/Activation_Queue.h"

#include "ace/Auto_Ptr.h"

#include <string>

#include <iostream>

using namespace std;

class Logger: public ACE_Task<ACE_MT_SYNCH>

{

public:

    Logger()

    {

        this->activate();

    }

    int svc();

    void LogMsg(const string& msg);

    void LogMsgActive (const string& msg);

private:

    ACE_Activation_Queue cmdQueue;    //指令隊列

};

class LogMsgCmd: public ACE_Method_Object

{

public:

    LogMsgCmd(Logger *plog,const string& msg)

    {

        this->log=plog;

        this->msg=msg;

    }

    int call()

    {

        this->log->LogMsg(msg);

        return 0;

    }

private:

    Logger *log;

    string msg;

};

void Logger::LogMsg(const string& msg)

{

    cout<<endl<<msg<<endl;

    ACE_OS::sleep(2);

}

//以主動的方式記錄日志

void Logger::LogMsgActive(const string& msg)

{

    //生成指令對象,插入到指令隊列中

    cmdQueue.enqueue(new LogMsgCmd(this,msg));

}

int Logger::svc()

{

    while(true)

    {

        //周遊指令隊列,執行指令

        auto_ptr<ACE_Method_Object> mo

            (this->cmdQueue.dequeue ());

        if (mo->call () == -1)

            break;

    }

    return 0;

}

int main (int argc, ACE_TCHAR *argv[])

{

    Logger log;

    log. LogMsgActive ("hello");

    ACE_OS::sleep(1);

    log.LogMsgActive("abcd");

    while(true)

        ACE_OS::sleep(1);

    return 0;

}

在這裡需要注意一下指令隊列ACE_Activation_Queue對象,它是線程安全的,使用方法比較簡單,這裡我也不多介紹了。

主動對象的基本結構就是這樣,然而,由于主動對象是異步調用的,又引出了如下兩個新問題:

l 方法調用線程如何知道該方法已經執行完成?

l 如何或得方法的傳回值? 

要解決這兩個問題,首先得介紹一下ACE_Future對象,ACE_Future是表示一個會在将來被指派的"期貨"對象,可以通過ready()函數查詢它是否已經被指派。該對象建立的時候是未指派的,後期可以通過set()函數來進行指派,所賦的值可以通過get()函數來擷取。

下面代碼示範了它的基本用法:

#include "ace/Future.h"

#include <string>

#include <iostream>

using namespace std;

void get_info(ACE_Future<string> &fu)

{

    string state = fu.ready()?"ready":"not ready";

    cout<<endl<<state<<endl;

    if(fu.ready())

    {

        string value;

        fu.get(value);

        cout<<"value:\t"<<value<<endl;

    }

}

int main(int argc, char *argv[])

{

    ACE_Future<string> fu;

    get_info(fu);

    fu.set("12345");

    get_info(fu);

    return 0;

}

通過ACE_Future對象來解決上述兩個問題的方法如下:

l 首先建立ACE_Future對象用以保留傳回值。

l 調用主動指令時将ACE_Future對象作為參數傳入,生成的指令對象中儲存ACE_Future對象的指針。

l 指令執行線程執行完指令後,将傳回值通過set()函數設定到ACE_Future對象中。

l 調用線程可以通過ACE_Future對象的ready()函數查詢該指令是否執行完成,如果指令執行完成,則可通過get()函數來擷取傳回值。

使用的時候要注意一下ACE_Future對象的生命周期。

為了示範了如何擷取主動指令的執行狀态和結果,我将上篇文章中的代碼改動了一下,日志類記錄日志後,會将記錄的内容作為傳回值傳回,該傳回值會通過ACE_Future對象傳回,代碼如下:

#include "ace/OS.h"

#include "ace/Task.h"

#include "ace/Method_Object.h"

#include "ace/Activation_Queue.h"

#include "ace/Auto_Ptr.h"

#include "ace/Future.h"

#include <string>

#include <iostream>

using namespace std;

class Logger: public ACE_Task<ACE_MT_SYNCH>

{

public:

    Logger()

    {

        this->activate();

    }

    int svc();

    string LogMsg(const string& msg);

    void LogMsgActive (const string& msg,ACE_Future<string> *result);

private:

    ACE_Activation_Queue cmdQueue; //指令隊列

};

class LogMsgCmd: public ACE_Method_Object

{

public:

    LogMsgCmd(Logger *plog,const string& msg,ACE_Future<string> *result)

    {

        this->log=plog;

        this->msg=msg;

        this->result=result;

    }

    int call()

    {

        string reply = this->log->LogMsg(msg);

        result->set(reply);

        return 0;

    }

private:

    ACE_Future<string> *result;

    Logger *log;

    string msg;

};

string Logger::LogMsg(const string& msg)

{

    ACE_OS::sleep(2);

    cout<<endl<<msg<<endl;

    return msg;

}

//以主動的方式記錄日志

void Logger::LogMsgActive(const string& msg,ACE_Future<string> *result)

{

    //生成指令對象,插入到指令隊列中

    cmdQueue.enqueue(new LogMsgCmd(this,msg,result));

}

int Logger::svc()

{

    while(true)

    {

        //周遊指令隊列,執行指令

        auto_ptr<ACE_Method_Object> mo

            (this->cmdQueue.dequeue ());

        if (mo->call () == -1)

            break;

    }

    return 0;

}

void get_info(ACE_Future<string> &fu)

{

    string state = fu.ready()?"ready":"not ready";

    cout<<endl<<state<<endl;

    if(fu.ready())

    {

        string value;

        fu.get(value);

        cout<<"value:\t"<<value<<endl;

    }

}

int main (int argc, ACE_TCHAR *argv[])

{

    ACE_Future<string> result;

    Logger log;

    log.LogMsgActive ("hello",&result);

    while(true)

    {

        get_info(result);

        if(result.ready())

            break;

        ACE_OS::sleep(1);

    }

    cout<<endl<<"cmd end"<<endl;

    while(true)

        ACE_OS::sleep(1);

    return 0;

}

這種查詢模式比較簡單有效,但存在一個問題:調用線程必須不斷輪詢ACE_Future對象以擷取傳回值,這樣的效率比較低。可以通過觀察者模式解決這個問題:在ACE_Future對象上注冊一個觀察者,當ACE_Future對象的值發生改變(異步指令執行完成)時主動通知該觀察者,進而擷取傳回值。

ACE中的觀察者模式可以通過ACE_Future_Observer來實作,使用方法如下:

#include "ace/Future.h"

#include <string>

#include <iostream>

using namespace std;

class MyObserver:public ACE_Future_Observer<string>

{

    virtual void update (const ACE_Future<string> &future)

    {

        string value;

        future.get(value);

        cout<<endl<<"change:\t"<<value<<endl;

    }

};

int main(int argc, char *argv[])

{

    MyObserver obv;

    ACE_Future<string> fu;

    fu.attach(&obv);

    ACE_OS::sleep(3);

    fu.set("12345");

    while(true)

        ACE_OS::sleep(3);

    return 0;

}

通過觀察者模式,可以更有效,及時的擷取異步指令的傳回值,但同時也增加了程式結構的複雜度并且難以調試,使用的時候應該根據需要選取合适的方式。

5.2、Reactor模式

主動對象模式用于降低方法執行和方法調用之間的耦合。該模式描述了另外一種更為透明的任務間通信方法。

反應器(Reactor):用于事件多路分離和分派的體系結構模式

通常的,對一個檔案描述符指定的檔案或裝置, 有兩種工作方式: 阻塞與非阻塞。所謂阻塞方式的意思是指, 當試圖對該檔案描述符進行讀寫時, 如果當時沒有東西可讀,或者暫時不可寫, 程式就進入等待狀态, 直到有東西可讀或者可寫為止。而對于非阻塞狀态, 如果沒有東西可讀, 或者不可寫, 讀寫函數馬上傳回, 而不會等待。

在前面的章節中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:當接收tcp資料時,如果遠端沒有資料可以讀,則會一直阻塞到讀到需要的資料為止。這種方式的傳輸和傳統的被動方法的調用類似,非常直覺,并且簡單有效,但是同樣也存在一個效率問題,如果你是開發一個面對着數千個連接配接的伺服器程式,對每一個用戶端都采用阻塞的方式通信,如果存在某個非常耗時的讀寫操作時,其它的用戶端通信将無法響應,效率非常低下。

一種常用做法是:每建立一個Socket連接配接時,同時建立一個新線程對該Socket進行單獨通信(采用阻塞的方式通信)。這種方式具有很高的響應速度,并且控制起來也很簡單,在連接配接數較少的時候非常有效,但是如果對每一個連接配接都産生一個線程的無疑是對系統資源的一種浪費,如果連接配接數較多将會出現資源不足的情況。

另一種較高效的做法是:伺服器端儲存一個Socket連接配接清單,然後對這個清單進行輪詢,如果發現某個Socket端口上有資料可讀時(讀就緒),則調用該socket連接配接的相應讀操作;如果發現某個Socket端口上有資料可寫時(寫就緒),則調用該socket連接配接的相應寫操作;如果某個端口的Socket連接配接已經中斷,則調用相應的析構方法關閉該端口。這樣能充分利用伺服器資源,效率得到了很大提高。

在Socket程式設計中就可以通過select等相關API實作這一方式。但直接用這些API控制起來比較麻煩,并且也難以控制和移植,在ACE中可以通過Reactor模式簡化這一開發過程。

反應器本質上提供一組更進階的程式設計抽象,簡化了事件驅動的分布式應用的設計和實作。除此而外,反應器還将若幹不同種類的事件的多路分離內建到易于使用的API中。特别地,反應器對基于定時器的事件、信号事件、基于I/O端口監控的事件和使用者定義的通知進行統一地處理。

ACE中的反應器與若幹内部和外部元件協同工作。其基本概念是反應器架構檢測事件的發生(通過在OS事件多路分離接口上進行偵聽),并發出對預登記事件處理器(event handler)對象中的方法的"回調"(callback)。該方法由應用開發者實作,其中含有應用處理此事件的特定代碼。

使用ACE的反應器,隻需如下幾步:

l 建立事件處理器,以處理他所感興趣的某事件。

l 在反應器上登記,通知說他有興趣處理某事件,同時傳遞他想要用以處理此事件的事件處理器的指針給反應器。

随後反應器架構将自動地:

l 在内部維護一些表,将不同的事件類型與事件處理器對象關聯起來。

l 在使用者已登記的某個事件發生時,反應器發出對處理器中相應方法的回調。

反應器模式在ACE中被實作為ACE_Reactor類,它提供反應器架構的功能接口。

如上面所提到的,反應器将事件處理器對象作為服務提供者使用。反應器内部記錄某個事件處理器的特定事件的相關回調方法。當這些事件發生時,反應器會建立這種事件和相應的事件處理器的關聯。

l 事件處理器

事件處理器就是需要通過輪詢發生事件改變的對象清單中的對象,如在上面的例子中就是連接配接的用戶端,每個用戶端都可以看成一個事件處理器。

l 回調事件

就是反應器支援的事件,如Socket讀就緒,寫就緒。拿上面的例子來說,如果某個用戶端(事件處理器)在反應器中注冊了讀就緒事件,當用戶端給伺服器發送一條消息的時候,就會觸發這個用戶端的資料可讀的回調函數。

在反應器架構中,所有應用特有的事件處理器都必須由ACE_Event_Handler的抽象接口類派生。可以通過重載相應的"handle_"方法實作相關的回調方法。

使用ACE_Reactor基本上有三個步驟:

l 建立ACE_Event_Handler的子類,并在其中實作适當的"handle_"方法,以處理你想要此事件處理器為之服務的事件類型。

l 通過調用反應器對象的register_handler(),将你的事件處理器登記到反應器。

l 在事件發生時,反應器将自動回調相應的事件處理器對象的适當的handle_"方法。

下面我就以一個Socket用戶端的例子為例簡單的說明反應器的基本用法。

#include <ace/OS.h>

#include <ace/Reactor.h>

#include <ace/SOCK_Connector.h>

#include <string>

#include <iostream>

using namespace std;

class MyClient:public ACE_Event_Handler

{

public:

    bool open()

    {

        ACE_SOCK_Connector connector;

        ACE_INET_Addr addr(3000,"127.0.0.1");

        ACE_Time_Value timeout(5,0);

        if(connector.connect(peer,addr,&timeout) != 0)

        {

            cout<<endl<<"connecetd fail";

            return false;

        }

        ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);

        cout<<endl<<"connecetd ";

        return true;

    }

    ACE_HANDLE get_handle(void) const

    {

        return peer.get_handle();

    }

    int handle_input (ACE_HANDLE fd)

    {

        int rev=0;

        ACE_Time_Value timeout(5,0);

        if((rev=peer.recv(buffer,1000,&timeout))>0)

        {

            buffer[rev]='\0';

            cout<<endl<<"rev:\t"<<buffer<<endl;

        }

        return 3;

    }

private:

    ACE_SOCK_Stream peer;

    char buffer[1024];

};

int main(int argc, char *argv[])

{

    MyClient client;

    client.open();

    while(true)

    {

        ACE_Reactor::instance()->handle_events();

    }

    return 0;

}

在這個例子中,用戶端連接配接上伺服器後,通過ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注冊了一個讀就緒的回調函數,當伺服器端給用戶端發消息的時候,會自動觸發handle_input()函數,将接收到的資訊列印出來。

下面對如何在Socket通信中使用反應器做進一步的介紹。

5.3、接收者-連接配接(Reactor-Connect)者模式

接受器-連接配接器設計模式(Acceptor-Connector)使分布式系統中的連接配接建立及服務初始化與一旦服務初始化後所執行的處理去耦合。

這樣的去耦合通過三種元件來完成:acceptor、connector 和 servicehandler(服務處理器)。

l 連接配接器主動地建立到遠地接受器元件的連接配接,并初始化服務處理器來處理在連接配接上交換的資料。

l 接受器被動地等待來自遠地連接配接器的連接配接請求,在這樣的請求到達時建立連接配接,并初始化服務處理器來處理在連接配接上交換的資料。

l 初始化的服務處理器執行應用特有的處理,并通過連接配接器和接受器元件建立的連接配接來進行通信。

5.3.1. 服務處理器(Service Handler):

Service Handler 實作應用服務,通常扮演客戶角色、伺服器角色,或同時扮演這兩種角色。它提供挂鈎方法,由 Acceptor 或 Connector 調用,以在連接配接建立時啟用應用服務。此外,Service Handler 還提供資料模式傳輸端點,其中封裝了一個 I/O 句柄。一旦連接配接和初始化後,該端點被 Service Handler 用于與和其相連的對端交換資料。

5.3.2. 接受器(Acceptor):

Acceptor 是一個工廠,實作用于被動地建立連接配接并初始化與其相關聯的 Service Handler 的政策。此外,Acceptor 包含有被動模式的傳輸端點工廠,它建立新的資料模式端點,由 Service Handler 用于在相連的對端間傳輸資料。通過将傳輸端點工廠綁定到網絡位址,比如 Acceptor 在其上偵聽的 TCP 端口号,Acceptor的 open 方法對該工廠進行初始化。

一旦初始化後,被動模式的傳輸端點工廠偵聽來自對端的連接配接請求。當連接配接請求到達時,Acceptor 建立 Service Handler,并使用它的傳輸端點工廠來将新連接配接接受進Service Handler 中。

5.3.3. 連接配接器(Connector):

Connector 是一個工廠,實作用于主動地建立連接配接并初始化與其相關聯的 Service Handler 的政策。它提供方法,由其發起到遠地 Acceptor 的連接配接。同樣地,它還提供另一個方法,完成對 Service Handler 的啟用;該處理器的連接配接是被同步或異步地發起的。Connector 使用兩個分開的方法來透 明地支援異步連接配接建立。

5.3.4. 分派器(Dispatcher):

為 Acceptor,Dispatcher 将在一或多個傳輸端點上接收到的連接配接請求多路分離給适當的 Acceptor。Dispatcher允許多個 Acceptor 向其登記,以偵聽同時在不同端口上從不同對端而來的連接配接。 為 Connector,Dispatcher 處理異步發起的連接配接的完成。在這種情況下,當異步連接配接被建立時,Dispatcher 回調 Connector。Dispatcher 允許多個 Service Handler 通過一個 Connector 來異步地發起和完成它們 的連接配接。注意對于同步連接配接建立,Dispatcher 并不是必需的,因為發起連接配接的線程控制也完成服務服務處 理器的啟用。

Dispatcher 通常使用事件多路分離模式來實作,這些模式由反應器(Reactor)或前攝器(Proactor) 來提供,它們分别處理同步和異步的多路分離。同樣地,Dispatcher 也可以使用主動對象(Active Obj ect)模式來實作為單獨的線程或程序。

Acceptor 元件協作

Acceptor 和 Service Handler 之間的協作。這些協作被劃分為三個階段:

1. 端點初始化階段:

為被動地初始化連接配接,應用調用 Acceptor 的 open 方法。該方法建立被動模式的傳 輸端點,将其綁定到網絡位址,例如,本地主機的 IP 位址和 TCP 端口号,并随後偵聽來自對端 Connector 的連接配接請求。其次,open 方法将 Acceptor 對象登記到 Dispatcher,以使分派器能夠在連接配接事件 到達時回調 Acceptor。最後,應用發起 Dispatcher 的事件循環,等待連接配接請求從對端 Connector 到來。

2. 服務初始化階段:

當連接配接請求到達時,Dispatcher 回調 Acceptor 的accept 方法。該方法裝配以下活動 所必需的資源:

l 建立新的 Service Handler,

l 使用它的被動模式傳輸端點工廠來将連接配接接受進 該處理器的資料模式傳輸端點中,

l 通過調用 Service Handler 的 open 挂鈎将其啟用。Servic e Handler 的 open 挂鈎可以執行服務特有的初始化,比如配置設定鎖、派生線程、打開日志檔案,和/或将 該 Service Handler 登記到 Dispatcher。

3. 服務處理階段:

在連接配接被動地建立和 Service Handler 被初始化後,服務處理階段開始了。在此階段, 應用級通信協定,比如 HTTP 或 IIOP,被用于在本地 Service Handler 和與其相連的遠地 Peer 之間、 經由前者的 peer_stream_端點交換資料。當交換完成,可關閉連接配接和 Service Handler,并釋放資源。

Connector 元件協作

Connector 元件可以使用同步和異步兩種方式來初始化它的 Service Handle,這裡僅介紹一下同步時的協作情況。

同步的 Connector 情況中的參與者之間的協作可被劃分為以下三個階段:

l 連接配接發起階段:

為在 Service Handler 和它的遠地 Peer 之間發起連接配接,應用調用 Connector 的 connect 方法。該方法阻塞調用線程的線程控制、直到連接配接同步完成,以主動地建立連接配接。

l 服務初始化階段:

在連接配接完成後,Connector 的 connect 方法調用 complete 方法來啟用 Service Handl er。complete 方法通過調用 Service_Handler 的 open 挂鈎方法來完成啟用;open 方法執行服務特有的 初始化。

l 服務處理階段:

此階段與 Service Handler 被 Acceptor 建立後所執行的服務處理階段相類似。特别地, 一旦 Service Handler 被啟用,它使用與和其相連接配接的遠地 Service Handler 交換的資料來執行應用特 有的服務處理。

實作及運作一般步驟:

l 建立 Service Handler;

l 被動地或主動地将 Service Handler 連接配接到它們的遠地對端;以及

l 一旦連接配接,啟用 Service Handler。

主要角色:Service Handler(服務處理器)、Acceptor 和 Connector。

服務處理器:該抽象類繼承自 Event_Handler,并為客戶、伺服器或同時扮演兩種角色的元件所提供 的服務處理提供通用接口。應用必須通過繼承來定制此類,以執行特定類型的服務。Service Handler 接口如下所示:

template <class PEER_STREAM>

class Service_Handler : public Event_Handler

{

public:

    //連接配接成功後的初始化入口函數 (子類定義).

    virtual int open (void) = 0;

    //傳回通信流的引用

    PEER_STREAM &peer (void)

    {

        return peer_stream_;

    }

};

一旦 Acceptor 或 Connector 建立了連接配接,它們調用 Service Handler 的 open 挂鈎。該純虛方法必須被 Concrete Service Handler 子類定義;後者執行服務特有的初始化和後續處理。

連接配接器:該抽象類實作主動連接配接建立和初始化 Service Handler 的通用政策。它的接口如下所示:

template <class SERVICE_HANDLER,class PEER_CONNECTOR>

class Connector : public Event_Handler

{

public:

    enum Connect_Mode

    {

        SYNC, //以同步方式連接配接

        ASYNC //以異步方式連接配接

    };

// 主動連接配接并激活服務處理器

    int connect (SERVICE_HANDLER *sh,

        const PEER_CONNECTOR::PEER_ADDR &addr,

        Connect_Mode mode);

protected:

    //定義連接配接激活政策

    virtual int connect_service_handler(SERVICE_HANDLER *sh,

        const PEER_CONNECTOR::PEER_ADDR &addr,

        Connect_Mode mode);

    // Defines the handler's concurrency strategy.

    virtual int activate_service_handler(SERVICE_HANDLER *sh);

    // 當以異步方式連接配接完成時激活服務處理器

    virtual int complete (HANDLE handle);

private:

    // IPC mechanism that establishes

    // connections actively.

    PEER_CONNECTOR connector_;

    };

Conncetor 通過特定類型的 PEER CONNECTOR 和 SERVICE HANDLER 被參數化。PEER CONNECTO R 提供的傳輸機制被 Connector 用于主動地建立連接配接,或是同步地、或是異步地。SERVICE HANDLER提供的服務對與相連的對端交換的資料進行處理。C++參數化類型被用于使(1)連接配接建立政策與(2)服務處理器類型、網絡程式設計接口和傳輸層連接配接協定去耦合。

參數化類型是有助于提高可移植性的實作決策。例如,它們允許整體地替換 Connector 所用的 IPC 機 制。這使得 Connector 的連接配接建立代碼可在含有不同網絡程式設計接口(例如,有 socket,但沒有 TLI;反之 亦然)的平台間進行移植。

Service Handler 的 open 挂鈎在連接配接成功建立時被調用。

接受器(Acceptor):該抽象類為被動連接配接建立和初始化 Service Handler 實作通用的政策。Acceptor 的接 口如下所示:

template <class SERVICE_HANDLER,

class PEER_ACCEPTOR>

class Acceptor : public Event_Handler

{

public:

    // Initialize local_addr transport endpoint factory

    // and register with Initiation_Dispatcher Singleton.

    virtual int open(const PEER_ACCEPTOR::PEER_ADDR &local_addr);

    // Factory Method that creates, connects, and

    // activates SERVICE_HANDLER's.

    virtual int accept (void);

protected:

    //定義服務處理器的建立政策

    virtual SERVICE_HANDLER *make_service_handler (void);

    // 定義服務處理器的連接配接政策

    virtual int accept_service_handler(SERVICE_HANDLER *);

    //定義服務處理器的激活政策

    virtual int activate_service_handler(SERVICE_HANDLER *);

    // Demultiplexing hooks inherited from Event_Handler,

    // which is used by Initiation_Dispatcher for

    // callbacks.

    virtual HANDLE get_handle (void) const;

    virtual int handle_close (void);

private:

    // IPC mechanism that establishes

    // connections passively.

    PEER_ACCEPTOR peer_acceptor_;

};

Acceptor 通過特定類型的 PEER ACCEPTOR 和 SERVICE HANDLER 被參數化。PEER ACCEPTOR 提供的傳輸機制被 Acceptor 用于被動地建立連接配接。SERVICE HANDLER 提供的服務對與遠地對端交換的 資料進行處理。注意 SERVICE HANDLER 是由應用層提供的具體的服務處理器。

參數化類型使 Acceptor 的連接配接建立政策與服務處理器的類型、網絡程式設計接口及傳輸層連接配接發起協定去耦合。就如同 Connector 一樣,通過允許整體地替換 Acceptor 所用的機制,參數化類型的使用有助于提高可移植性。這使得連接配接建立代碼可在含有不同網絡程式設計接口(比如有 socket,但沒有 TLI;反之亦然)的平台間移植。

make_service_handler 工廠方法定義 Acceptor 用于建立 SERVICE HANDLER 的預設政策。如下所示:

template <class SH, class PA> SH *

Acceptor<SH, PA>::make_service_handler (void)

{

    return new SH;

}

預設行為使用了"請求政策"(demand strategy),它為每個新連接配接建立新的 SERVICE HANDLER。但是, Acceptor 的子類可以重定義這一政策,以使用其他政策建立 SERVICE HANDLE,比如建立單獨的單體 (Singleton)[10]或從共享庫中動态連結 SERVICE HANDLER。

accept_service_handler 方法在下面定義 Acceptor 所用的 SERVICE HANDLER 連接配接接受政策:

template <class SH, class PA> int

Acceptor<SH, PA>::accept_service_handler(SH *handler)

{

    peer_acceptor_->accept (handler->peer ());

}

預設行為委托 PEER ACCEPTOR 所提供的 accept 方法。子類可以重定義 accept_service_handler 方法,以 執行更為複雜的行為,比如驗證客戶的身份,以決定是接受還是拒絕連接配接。

Activate_service_handler 定義 Acceptor 的 SERVICE HANDLER 并發政策:

程式示例:

在ACE中,預設的服務處理器是ACE_Svc_Handler,這也是一個模版類,可以通過相關的參數特化。由于ACE_Svc_Handler繼承自ACE_Task和ACE_Event_Handler,功能相當強大,同時也存在一定開銷,如果需要減小開銷可以自己寫一個僅繼承自ACE_Event_Handler的服務處理器。

為了示範簡單,我這裡就以一個EchoServer的伺服器端和用戶端為例,其中接收器和連接配接器都采用預設政策,并沒有進行重載。

伺服器端:

#include "ace/Reactor.h"

#include "ace/Svc_Handler.h"

#include "ace/Acceptor.h"

#include "ace/Synch.h"

#include "ace/SOCK_Acceptor.h"

class My_Svc_Handler;

typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

class My_Svc_Handler:

    public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

{

public:

    int open(void*)

    {

        ACE_OS::printf("\nConnection established\n");

        //注冊相應事件

        ACE_Reactor::instance()->register_handler(this,

            ACE_Event_Handler::READ_MASK);

        return 0;

    }

    int handle_input(ACE_HANDLE)

    {

        int rev = peer().recv(data,1024);

        if(rev == 0)

        {

            delete this;

        }

        else

        {

            data[rev]='\0';

            ACE_OS::printf("<<rev:\t %s\n",data);

            peer().send(data,rev+1);

            return 0;

        }

    }

private:

    char data[1024];

};

int main(int argc, char* argv[])

{

    ACE_INET_Addr addr(3000);

    MyAcceptor acceptor(addr,ACE_Reactor::instance());

    while(1)

        ACE_Reactor::instance()->handle_events();

}

用戶端:

#include "ace/Reactor.h"

#include "ace/Svc_Handler.h"

#include "ace/Connector.h"

#include "ace/Synch.h"

#include "ace/SOCK_Connector.h"

class My_Svc_Handler;

typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;

class My_Svc_Handler:

    public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

{

public:

    int open(void*)

    {

        ACE_OS::printf("\nConnection established\n");

        //注冊相應事件

        ACE_Reactor::instance()->register_handler(this,

            ACE_Event_Handler::READ_MASK);

        return 0;

    }

    int handle_input(ACE_HANDLE)

    {

        int rev = peer().recv(data,1024);

        if(rev == 0)

        {

            delete this;

        }

        else

        {

            data[rev]='\0';

            ACE_OS::printf("<<rev:\t %s\n",data);

            return 0;

        }

    }

    int sendData(char *msg)

    {

        ACE_OS::printf("<<send:\t %s\n",msg);

        return peer().send(msg,strlen(msg));   

    }

private:

    char data[1024];

};

int main(int argc, char* argv[])

{

    ACE_INET_Addr addr(3000,"192.168.1.142");

    My_Svc_Handler *svchandler = new My_Svc_Handler();

    MyConnector connector;

    if(connector.connect(svchandler,addr)==-1)

    {

        ACE_OS::printf("Connect fail");

    }

    svchandler->sendData("hello wrold");

    while(1)

        ACE_Reactor::instance()->handle_events();

5.4、Proactor模式

當 OS 平台支援異步操作時,一種高效而友善的實作高性能 Web 伺服器的方法是使用前攝式事件分派。使用前攝式事件分派模型設計的 Web 伺服器通過一或多個線程控制來處理異步操作的完成。這樣,通過內建完成事件多路分離(completion event demultiplexing)和事件處理器分派,前攝器模式簡化了異步的 Web 伺服器。

異步的 Web 伺服器将這樣來利用前攝器模式:首先讓 Web 伺服器向 OS 發出異步操作,并将回調方法登記到 Completion Dispatcher(完成分派器),後者将在操作完成時通知 Web 伺服器。于是 OS 代表 Web 伺服器執行操作,并随即在一個周知的地方将結果排隊。Completion Dispatcher 負責使完成通知出隊,并執行适當的、含有應用特有的 Web 伺服器代碼的回調。

使用前攝器模式的主要優點是可以啟動多個并發操作,并可并行運作,而不要求應用必須擁有多個線程。操作被應用異步地啟動,它們在 OS 的 I/O 子系統中運作直到完成。發起操作的線程現在可以服務 另外的請求了。

在ACE中,可以通過ACE_Proactor實作前攝器模式。實作方式如下。

5.4.1、建立服務處理器:

Proactor架構中服務處理器均派生自ACE_Service_Handler,它和Reactor架構的事件處理器非常類似。當發生IO操作完成事件時,會觸發相應的事件完成會調函數。

5.4.2、實作服務處理器IO操作

Proactor架構中所有的IO操作都由相應的異步操作類來完成,這些異步操作類都繼承自ACE_Asynch_Operation。常用的有以下幾種。

l ACE_Asynch_Read_Stream, 提供從TCP/IP socket連接配接中進行異步讀操作.

l ACE_Asynch_Write_Stream, 提供從TCP/IP socket連接配接中進行異步寫操作.

使用這些操作類的一般方式如下:

l 初始化

将相關的操作注冊到服務處理器中,一般可通過調用其open方法實作。

l 發出IO操作

發出異步IO操作請求,該操作不會阻塞,具體的IO操作過程由作業系統異步完成。

l IO操作完成回調處理

異步IO操作完成後,OS會觸發服務處理器中的相應回調函數,可通過該函數的ACE_Asynch_Result參數擷取相應的傳回值。

5.2.3、使用連接配接器或接受器和遠端進行連接配接

ACE為Proactor架構提供了兩個工廠類來建立TCP/IP連接配接。

l ACE_Asynch_Acceptor, 用于被動地建立連接配接

l ACE_Asynch_Connector 用于主動地建立連接配接

當遠端連接配接建立時,連接配接器或接受器便會建立相應的服務處理器,進而可以實作服務處理。

5.2.4、啟動Proactor事件分發處理

啟動事件分發處理隻需如下調用:

while(true)

ACE_Proactor::instance()->handle_events();

2.4.5、程式示例

伺服器端:

伺服器端簡單的實作了一個EchoServer,流程如下:當用戶端建立連接配接時,首先發出一個異步讀的異步請求,當讀完成時,将所讀的資料列印出來,并發出一個新的異步請求。

#include "ace/Message_Queue.h"

#include "ace/Asynch_IO.h"

#include "ace/OS.h"

#include "ace/Proactor.h"

#include "ace/Asynch_Acceptor.h"

class HA_Proactive_Service : public ACE_Service_Handler

{

public:

~HA_Proactive_Service ()

{

if (this->handle () != ACE_INVALID_HANDLE)

ACE_OS::closesocket (this->handle ());

}

virtual void open (ACE_HANDLE h, ACE_Message_Block&)

{

this->handle (h);

if (this->reader_.open (*this) != 0 )

     {

         ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),

             ACE_TEXT ("HA_Proactive_Service open")));

delete this;

return;

     }

     ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024);

if (this->reader_.read (*mb, mb->space ()) != 0)

     {

         ACE_OS::printf("Begin read fail\n");

delete this;

return;

     }

return;

}

//異步讀完成後會調用此函數

virtual void handle_read_stream

(const ACE_Asynch_Read_Stream::Result &result)

{

     ACE_Message_Block &mb = result.message_block ();

if (!result.success () || result.bytes_transferred () == 0)

     {

         mb.release ();

delete this;

return;

     }

     mb.copy("");    //為字元串添加結束标記'\0'

     ACE_OS::printf("rev:\t%s\n",mb.rd_ptr());

     mb.release();

     ACE_Message_Block *nmb = new ACE_Message_Block(buffer,1024);

if (this->reader_.read (*nmb, nmb->space ()) != 0)

return;

}

private:

ACE_Asynch_Read_Stream reader_;

char buffer[1024];

};

int main(int argc, char *argv[])

{

int port=3000;

    ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;

if (acceptor.open (ACE_INET_Addr (port)) == -1)

return -1;

while(true)

        ACE_Proactor::instance ()->handle_events ();

return 0;

}

用戶端:

用戶端代碼比較簡單,就是每隔1秒鐘将目前的系統時間轉換為字元串形式通過異步形式發送給伺服器,發送完成後,釋放時間字元的記憶體空間。

#include "ace/Message_Queue.h"

#include "ace/Asynch_IO.h"

#include "ace/OS.h"

#include "ace/Proactor.h"

#include "ace/Asynch_Connector.h"

class HA_Proactive_Service : public ACE_Service_Handler

{

public:

~HA_Proactive_Service ()

{

if (this->handle () != ACE_INVALID_HANDLE)

ACE_OS::closesocket (this->handle ());

}

virtual void open (ACE_HANDLE h, ACE_Message_Block&)

{

this->handle (h);

if (this->writer_.open (*this) != 0 )

     {

         ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),

             ACE_TEXT ("HA_Proactive_Service open")));

delete this;

return;

     }

     ACE_OS::printf("connceted");

for(int i=0;i<10;i++)    //每隔秒中發送時間至伺服器

     {

         ACE_OS::sleep(1);

         time_t now = ACE_OS::gettimeofday().sec();

char *time = ctime(&now);        //擷取目前時間的字元串格式

         ACE_Message_Block *mb = new ACE_Message_Block(100);

         mb->copy(time);

if (this->writer_.write(*mb,mb->length()) !=0)

         {

             ACE_OS::printf("Begin read fail\n");

delete this;

return;

         }

     }

return;

}

//異步寫完成後會調用此函數

virtual void handle_write_dgram

(const ACE_Asynch_Write_Stream::Result &result)

{

     ACE_Message_Block &mb = result.message_block ();

     mb.release();

return;

}

private:

ACE_Asynch_Write_Stream writer_;

};

int main(int argc, char *argv[])

{

    ACE_INET_Addr addr(3000,"192.168.1.142");

    HA_Proactive_Service *client = new HA_Proactive_Service();

    ACE_Asynch_Connector<HA_Proactive_Service> connector;

    connector.open();

if (connector.connect(addr) == -1)

return -1;

while(true)

        ACE_Proactor::instance ()->handle_events ();

return 0;

}

6. ACE的消息存放對象

2.1、ACE Lock類屬

鎖類屬包含的類包裝簡單的鎖定機制,比如互斥體、信号量、讀/寫互斥體和令牌等。這裡我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進行一些簡單的說明。

ACE_Message_Block在Ace中用來表示消息的存放空間,可用做網絡通信中的消息緩沖區,使用非常頻繁,下面将在如下方簡單的介紹一下ACE_Message_Block相關功能。

l 建立消息塊

l 釋放消息塊

l 從消息塊中讀寫資料

l 資料的拷貝

l 其它常用函數

6.1、建立消息塊

建立消息塊的方式比較靈活,常用的有以下幾種方式 :

1、直接給消息塊配置設定記憶體空間建立。

ACE_Message_Block *mb = new ACE_Message_Block (30);

2、共享底層資料塊建立。

char buffer[100];

ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);

這種方式共享底層的資料塊,被建立的消息塊并不拷貝該資料,也不假定自己擁有它的所有權。在消息塊mb被銷毀時,相關聯的資料緩沖區data将不會被銷毀。這是有意義的:消息塊沒有拷貝資料,是以記憶體也不是它配置設定的,這樣它也不應該負責銷毀它。

3、通過duplicate()函數從已有的消息塊中建立副本。

ACE_Message_Block *mb = new ACE_Message_Block (30);

ACE_Message_Block *mb2 = mb->duplicate();

這種方式下,mb2和mb共享同一資料空間,使用的是ACE_Message_Block的引用計數機制。它傳回指向要被複制的消息塊的指針,并在内部增加内部引用計數。

4、通過clone()函數從已有的消息塊中複制。

ACE_Message_Block *mb = new ACE_Message_Block (30);

ACE_Message_Block *mb2 = mb->clone();

clone()方法實際地建立整個消息塊的新副本,包括它的資料塊和附加部分;也就是說,這是一次"深拷貝"。

6.2、釋放消息塊

一旦使用完消息塊,程式員可以調用它的release()方法來釋放它。

l 如果消息資料記憶體是由該消息塊配置設定的,調用release()方法就也會釋放此記憶體。

l 如果消息塊是引用計數的,release()就會減少計數,直到到達0為止;之後消息塊和與它相關聯的資料塊才從記憶體中被移除。

l 如果消息塊是通過共享已配置設定的底層資料塊建立的,底層資料塊不會被釋放。

無論消息塊是哪種方式建立的,隻要在使用完後及時調用release()函數,就能確定相應的記憶體能正确的釋放。

6.3、從消息塊中讀寫資料

ACE_Message_Block提供了兩個指針函數以供程式員進行讀寫操作,rd_ptr()指向可讀的資料塊位址,wr_ptr()指向可寫的資料塊位址,預設情況下都執行資料塊的首位址。下面的例子簡單了示範它的使用方法。

#include "ace/Message_Queue.h"

#include "ace/OS.h"

int main(int argc, char *argv[])

{

    ACE_Message_Block *mb = new ACE_Message_Block (30);

    ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");

    ACE_OS::printf("%s\n",mb->rd_ptr ());

    mb->release();

return 0;

}

注意:這兩個指針所指向的位置并不會自動移動,在上面的例子中,函數執行完畢後,執行的位置仍然是最開始的0,而不是最新的可寫位置5,程式員需要通過wr_ptr(5)函數手動移動寫指針的位置。

6.4、資料的拷貝

一般的資料的拷貝可以通過函數來實作資料的拷貝,copy()還會保證wr_ptr()的更新,使其指向緩沖區的新末尾處。

下面的例子示範了copy()函數的用法。

    mb->copy("hello");

    mb->copy("123",4);

注意:由于c++是以'\0'作為字元串結束标志的,對于上面的例子,底層資料塊中儲存的是"hello\0123\0",而用ACE_OS::printf("%s\n",mb->rd_ptr ());列印出來的結果是"hello",使用copy函數進行字元串連接配接的時候需要注意。

6.5、其它常用函數

length() 傳回目前的資料長度

next() 擷取和設定下一個ACE_Message_Block的連結。(用來建立消息隊列非常有用)

space() 擷取剩餘可用空間大小

size() 擷取和設定資料存儲空間大小。

注意:

這裡說一下ACE::read_n 的行為:

ACE::read_n 會試圖讀取buf長度的資料.如果遇到檔案結束(EOF)或者錯誤則傳回 0 或 -1;如果先到達了buf長度則傳回資料區長度;問題來了:如果資料讀取成功,但是沒有到達buf長度怎麼辦? 如何拿到已讀資料的長度? 這就要用到ACE::read_n的第4個參數,這個參數記錄了實際讀取的資料長度.

在上面的code裡還用到了幾個函數:

ACE_Message_Block::size 指資料區的長度, 就是初始化時指定的長度,這裡是10;

ACE_Message_Block::length 指資料的長度, 是 wr_ptr() - rd_ptr()的結果.

注意資料區和資料的差別....

ACE_Message_Block::cont ACE_Message_Block還實作了記憶體的連結清單結構;

7. 總結

一般文章整理的隻是ACE的基礎部分,如果需要深入了解ACE還需要通過檢視源代碼以進一步了解。可分為如下子產品:

1. 并發和同步

2. 程序間通信(IPC)

3. 記憶體管理

4. 定時器

5. 信号

6. 檔案系統管理

7. 線程管理

8. 事件多路分離和處理器分派

9. 連接配接建立和服務初始化

10. 軟體的靜态和動态配置、重配置

繼續閱讀