天天看點

關于ZeroC Ice C++異步invoke,整合ProtoBuf對象

導讀

      • 前奏
      • 問題馬上就來了
      • 沒辦法隻能自己摸索了
        • 1. 先來看看其定義
        • 2. 參數的準備
        • 3. 準備回調函數
        • 4. 正式調用
        • 5. 到此用戶端就完成了,服務端怎麼辦呢?
      • 現在大家喜歡用ProtoBuf,怎麼來整合到Ice中

前奏

本文并不是一篇教程,隻是提供了一個思路,最好先熟悉了Ice相關概念,動手調試運作了官方Demo後,再來看。

我們知道Ice的一般使用方式,是通過Slice檔案生成的對應的類來操作,Ice為我們封裝了網絡通信的過程,隻需調用Slice中定義類方法即可,如下Slice檔案定義所示:

module Demo {
    interface AskPerson {
        string AskNumber(string sName);
        string AskName(string sName);
    };
};
           

用戶端通過該類的Proxy直接調用該類的方,對于用戶端來說Proxy就是代表着該類。我們可以按照如下的方式調用,也可以學習官方Demo。

// 從配置檔案讀取屬性資訊
Ice::InitializationData initData;
initData.properties = Ice::createProperties();
initData.properties->load("config.client");

ic = Ice::initialize(argc, argv,initData);

// 從配置檔案中讀取Ice::Object即我們的目标對象的辨別号
Ice::ObjectPrx base = ic->propertyToProxy("AskPerson.Proxy");
AskPersonPrx askerPrx = AskPersonPrx::checkedCast(base);
if (!askerPrx)
{
    throw "Invalid proxy";
}

std::cout<<"client get"<<askerPrx->AskName("xx")<<std::endl;

           

很簡單,讀一下Demo和網上的文章基本就能學會使用,但是可能這個技術有點Old,是以好多文章都是2012年之前的,也鮮有人讨論。

問題馬上就來了

假設一個項目中有50個這樣的對象,各個業務子產品,難道都需要建立對象askPerson1~50?且萬一類名改變了怎麼辦?在業務子產品經常因為這樣的變動而修改代碼,肯定沒有人會感覺到很舒服。

首先想到的是封裝一層,這些生成的對象都有Ice基類的,但方法名是個麻煩事兒,Ice不可能在Ice::Object中提前定義好我們需要的虛函數。

幸好,Ice提供了invoker方法,我們隻需提供函數名和參數,invoker會幫我們發起RPC調用。

同步方式調用invoker很簡單,跟直接調AskName差不多,隻是傳回值,變成了輸出參數,官方Demo也有比較簡單的例子,就不贅述了。invoker定義如下:

bool ice_invoke(const ::std::string& operation,
                ::Ice::OperationMode mode,
                const ::std::vector< ::Ice::Byte>& inParams,
                ::std::vector< ::Ice::Byte>& outParams)
           

可奇怪的是,官方Demo中隻講了生成類AskPerson直接調成員方法的async,沒有講invoker的async。網上的文章也幾乎沒有讨論的。

沒辦法隻能自己摸索了

直接上結論了,Ice提供了begin_ice_invoke,其實就是圍繞它來完成異步的。我們按照begin_ice_invoke的要求,準備參數,準備回調函數。

1. 先來看看其定義

::Ice::AsyncResultPtr begin_ice_invoke(
    const ::std::string& operation,
    ::Ice::OperationMode mode,
    const ::std::vector< ::Ice::Byte>& inParams, //RPC調用的參數
    const ::Ice::Context& __ctx,
    const ::Ice::CallbackPtr& __del,             //回調函數指針
    const ::Ice::LocalObjectPtr& __cookie = 0)
           

其中operation就是slice檔案中定義的成員方法,例如下面代碼中的GoNameAsync:

#ifndef ASKPERSON_ICE
#define ASKPERSON_ICE

[["cpp:include:person.pb.h"]]
[["cpp:include:StreamProtobuf.h"]]

module Demo {
    ["cpp:type:PersonModule::Person"] sequence<byte> Person;
    interface AskPerson {
        string AskNumber(string sName);
        string AskName(string sName);
        ["amd"] void GoNameAsync(string sName);
        ["amd"] void GoNumberAsync(int nGid,string sName);
        ["amd"] void GoPerson(Person p);
    };
};

#endif
           

而inParams就是GoNameAsync的參數,隻不過是序列化後的;Ice::CallbackPtr是一個回調函數的指針,用于異步的方式傳回資料。

GoNameAsynce的傳回值為void,伺服器也可以傳回一個序列化的資料,目前還不清楚内部實作。

2. 參數的準備

利用Ice提供的輸入輸出流,将資料直接序列化為buffer,如下代碼所示,将字元串write到Ice的位元組序列ByteSeq中,ByteSeq的實質是std::vector< ::Ice::Byte>

Ice::InitializationData initData;
initData.properties = Ice::createProperties();
initData.properties->load("config.client");

//ic = Ice::initialize(argc, argv,initData);
ic = Ice::initialize(initData);

...

Ice::ByteSeq inParams;
Ice::OutputStreamPtr out = Ice::createOutputStream(ic);
out->startEncapsulation();
out->write("tom");
out->endEncapsulation();
out->finished(inParams);
           

上例我們向輸出流裡寫入了一個字元串"tom",如果一個RPC調用有多個參數,隻需多次調用out->write()方法,該方法重載了多種類型,我們傳值就可以了。

最後直接将流裡面的資料,重新整理到緩沖區(Ice::ByteSeq位元組序列)。

3. 準備回調函數

這裡有點特殊,有參數類型要求,不是随便寫一個函數指針就可以的,多寫寫就習慣了。

class CallBack :public IceUtil::Shared
{
    // 這個回調函數名不限制,但是參數類型要給正确
    void response(const Ice::AsyncResultPtr& p)
    {
        Ice::ByteSeq datas;
        // 從invoke中擷取位元組序列
        p->getProxy()->end_ice_invoke(datas,p);
        
        // 通過輸入流将該位元組序,直接讀取
        Ice::InputStreamPtr in = Ice::createInputStream(p->getCommunicator(), datas);
        
        // 具體的資料是哪種類型,要看服務端怎麼發的
        std::string message;
        in->read(message);
        in->readPendingObjects();
        in->endEncapsulation();
        std::cout<<"get response"<<message<<std::endl;
    }
};
           

回調函數被調用後,我們首先擷取位元組序列,然後通過輸入流,直接讀取資料到對應的類型變量中。位元組序列+流的形式,封裝了解析細節,使用起來非常友善。

4. 正式調用

通過Ice::newCallback方法建立回調函數的指針。并發起async RPC調用。

CallbackPtr cb = new Callback();
// 這個context現在我還沒用過,初次使用直接建立對象吧,不要猶豫
Ice::Context context;

// CallbackPtr是ICE定義的,重點關注newCallback這個方法
Ice::CallbackPtr cbPtr = Ice::newCallback(cb, &Callback::response);

// 正式發起RPC調用,輸入參數,等待回調
Ice::AsyncResultPtr asyncPtr = base->begin_ice_invoke("AskNameAsync"
                               ,Ice::Normal,inParams
                               ,context,cbPtr);
asyncPtr->waitForCompleted();
           

5. 到此用戶端就完成了,服務端怎麼辦呢?

服務端可以采用同步方式處理(即使用戶端采用異步),也可以采用異步(AMD模式),兩端是獨立的,這是很基本的概念。

服務端采用異步方式,開線程搞個workqueue是一般步驟,下面來看看具體的目标服務對象怎麼定義的,由于是invoker方式,就不能搞一個類然後繼承Demo::AskPerson了。要像下面這樣:

class AskPersonInvokerAsync : public Ice::BlobjectAsync
{
public:

    AskPersonInvokerAsync(const WorkQueuePtr& workQueue):_workQueue(workQueue)
    {
    }

    virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& cb
                                  , const std::vector<Ice::Byte>&inParam
                                  , const Ice::Current& current)
    {
        _workQueue->add(cb,inParam,current);
    }

private:
    WorkQueuePtr _workQueue;
};
           

注意ice_invoke_async第一個參數(即回調函數的指針)的類型;WorkQueue就是官方Demo中的,直接拿來用的哈,流程就走完了。

現在大家喜歡用ProtoBuf,怎麼來整合到Ice中

原理就是不用PB來序列化,隻是用PB的對象,而采用Ice輸入輸出流,直接序列化PB對象到Ice::ByteSeq中。

https://github.com/zeroc-ice/ice-protobuf中有例子,但沒有invoke的,我們把它的序列化頭檔案StreamProtobuf.h直接拿來用。

該檔案中定義的模闆StreamHelper是一個特化模闆,将PB對象序列化的具體實作,Ice中也定義了這樣的一個模闆,如此一來,PB就可以很友善的整合進Ice的輸入輸出流,編譯時會自動比對這個特化模闆,使用時像如下代碼:

// 定義的一個PB類
PersonModule::Person testPerson;
testPerson.set_age(15);
testPerson.set_name("kyle");
...

Ice::ByteSeq inParams;
Ice::OutputStreamPtr out = Ice::createOutputStream(ic);
out->startEncapsulation();

// 注意通過輸出流write對象時,記得填模闆參數
out->write<PersonModule::Person>(testPerson);
out->endEncapsulation();
out->finished(inParams);

           
注意是用Ice::OutputStream來序列化,而不用PB來序列化

然後就可以傳參,調用了。

繼續閱讀