天天看点

RPC框架的异步处理

RPC异步调用(以tars rpc框架为示例说明)

引入工作线程池和io收发线程池将工作线程和io收发线程两者的同步关系解除。RPC中的上下文十分重要,因为请求包的发送,响应包的callback回调不在同一个工作线程中完成,需要一个context来记录一个请求的上下文,把请求-响应-回调等一些信息匹配起来。通过rpc框架的内部请求id作为key,来保存调用开始时间time,超时时间timeout,回调函数callback,超时回调timeout_callback等信息。注意:请求id由client端服务调用时生成,会序列化成字节流发送给server端,server端会返回该请求id。

RPC框架的异步处理

servantProxy和Objproxyd的关系

ObjectProxy:一个网络线程上的某个服务实体A;ServantProxy:RPC服务句柄,所有网络线程上的某ObjectProxy(服务实体)的总代理; ObjectProxy类是一个服务实体,注意与ServantProxy类是一个服务代理相区别,前者表示一个网络线程上的某个服务实体A,后者表示对所有网络线程上的某服务实体A的总代理。

ServantProxy内含多个服务实体ObjectProxy,能够帮助用户在同一个服务代理内进行负载均衡。ObjectProxy对象的个数,其个数由客户端的网络线程数决定,每个网络线程有一个ObjectProxy。

举例有一个Demo.StringServer.StringServantObj的服务,提供一个RPC接口是append,传入两个string类型的变量,返回两个string类型变量的拼接结果。而且假设有两台服务器,socket标识分别是192.112.112.112:112与192.112.112.113:113,设置客户端的网络线程数为3,那么执行如下代码:

Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy("[email protected] -h 192.112.112.113 -p 113:tcp -h 192.112.112.112 -p 112", _proxy);
           
RPC框架的异步处理
//communicatorepoll客户端
class CommunicatorEpoll : public TC_Thread ,public TC_ThreadRecMutex{
	 /* * ObjectProxy的工厂类 */
    ObjectProxyFactory *   _objectProxyFactory; //用于创建objproxy
    /* * 异步线程数组*/
    AsyncProcThread *      _asyncThread[MAX_CLIENT_ASYNCTHREAD_NUM];

    /* * 异步线程数目  */
    size_t                 _asyncThreadNum;
	/* * 分发给异步线程的索引seq*/
    size_t                 _asyncSeq;
    /** 网络线程的id号*/
    size_t                 _netThreadSeq;
};
ObjectProxy * CommunicatorEpoll::getObjectProxy(const string & sObjectProxyName,const string& setName)
{
    return _objectProxyFactory->getObjectProxy(sObjectProxyName,setName);
}
---
class ServantProxy : public TC_HandleBase, public TC_ThreadMutex
{
	/** * 通信器 */
    Communicator *            _communicator;
    /** * 保存ObjectProxy对象的指针数组 */
    ObjectProxy **            _objectProxy;
}


//servant初始化
ServantPrx::element_type* ServantProxyFactory::getServantProxy(const string& name,const string& setName)
{
    TC_LockT<TC_ThreadRecMutex> lock(*this);
    string tmpObjName = name + ":" + setName;
    map<string, ServantPrx>::iterator it = _servantProxy.find(tmpObjName);
    if(it != _servantProxy.end())
    {
        return it->second.get();
    }
    ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
    assert(ppObjectProxy != NULL);
    for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
    {
        ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
    }
    ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());
	...
    _servantProxy[tmpObjName] = sp;
    return sp.get();
}
           
RPC框架的异步处理

1、在每一个网络线程CommunicatorEpoll的初始化过程中,会创建_asyncThreadNum条异步线程,等待异步调用的时候处理响应数据。

CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
{
 ......
   //异步线程数
    _asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3"));

    if(_asyncThreadNum == 0)
    {
        _asyncThreadNum = 3;
    }

    if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
    {
        _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
    }
 ......
    //异步队列的大小
    size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000"));
    if(iAsyncQueueCap < 10000)
    {
        iAsyncQueueCap = 10000;
    }
 ......
    //创建异步线程
    for(size_t i = 0; i < _asyncThreadNum; ++i)
    {
        _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
        _asyncThread[i]->start();
    }
 ......
}
           

2、tars2cpp的文件中定义了回调函数基类,要继承回调函数基类实现自己的回调函数。

class ConfigAdminPrxCallback: public tars::ServantProxyCallback
{
    public:
        virtual ~ConfigAdminPrxCallback(){}
        virtual void callback_AddConfig(tars::Int32 ret,  const std::string& result)
        { throw std::runtime_error("callback_AddConfig() override incorrect."); }
        virtual void callback_AddConfig_exception(tars::Int32 ret)
        { throw std::runtime_error("callback_AddConfig_exception() override incorrect."); }
 }
           

3、客户端桩函数代理调用异步请求函数,并传入实现的回调函数的类指针。

class ConfigAdminProxy : public tars::ServantProxy
    {
    public:
        //同步调用
        tars::Int32 AddConfig(const tars::AddConfigInfo & config,std::string &result,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
        {}
		//异步调用
        void async_AddConfig(ConfigAdminPrxCallbackPtr callback,const tars::AddConfigInfo &config,const map<string, string>& context = TARS_CONTEXT())
        {}
};
           

4、客户端调用。 ServantProxy::invoke中进行负载均衡调用,并且实例化此次请求msg包,实例化该请求的objproxy。CommunicatorEpoll会中的

CommunicatorEpoll::handle

会调用msg->obj->invoke进行请求的发送。每个客户端线程中含有跟客户端网络线程通信的队列

ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //队列数组

。请求包在invoke中push到相应的队列中,comunicatorEpoll解包出msg请求体进行数据传输。

_reqNo:每个主调线程会被分配一个唯一的空闲序列号,这个序列号对应到网络线程的epoll事件数据通知fd。SeqManager类负责分配该序列号。

void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
{
   ...
    ObjectProxy * pObjProxy = NULL;
    ReqInfoQueue * pReqQ    = NULL;
    //选择网络线程
    selectNetThreadInfo(pSptd,pObjProxy,pReqQ);
    //调用发起时间
    msg->iBeginTime   = TNOWMS;
    msg->pObjectProxy = pObjProxy;//实例化msg请求包中的obj实例
  ...
    //通知网络线程
    bool bEmpty = false;
    bool bSync  = (msg->eType == ReqMessage::SYNC_CALL);
    if(!pReqQ->push_back(msg,bEmpty))
    {
    ...
        pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

        throw TarsClientQueueException("client queue full");
    }
    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
 ...
}
           

5、客户端负载均衡: 每个客户端调用线程跟客户端的网络线程池通信的队列 ,在业务客户端根据负载均衡选择网络线程池中的网络线程进行传输数据。每条caller线程与每条客户端网络线程CommunicatorEpoll进行信息交互的桥梁——通信队列ReqInfoQueue数组,数组中的每个ReqInfoQueue元素负责与一条网络线程进行交互。数组里面的ReqInfoQueue元素便是该数组对应的caller线程与两条网络线程的通信桥梁,一条网络线程对应着数组里面的一个元素,通过网络线程ID进行数组索引。整个关系有点像生产者消费者模型,生产者Caller线程向自己的线程私有数据ReqInfoQueue[]中的第N个元素ReqInfoQueue[N] push请求包,消费者客户端第N个网络线程就会从这个队列中pop请求包。

RPC框架的异步处理

(1)第一层负载均衡:轮询选择ObjectProxy(CommunicatorEpoll)和与之相对应的ReqInfoQueue

ServantProxy::invoke中进行负载均衡调用,并且实例化此次请求msg包,实例化该请求的objproxy。CommunicatorEpoll中的

CommunicatorEpoll::handle

会调用obj->invoke进行请求的发送。

//第一层:选取一个网络线程对应的信息
void ServantProxy::selectNetThreadInfo(ServantProxyThreadData * pSptd, ObjectProxy * & pObjProxy, ReqInfoQueue * & pReqQ)
{
    //指针为空 就new一个
    if(!pSptd->_queueInit)
    {
        for(size_t i=0;i<_objectProxyNum;++i)
        {
            pSptd->_reqQueue[i] = new ReqInfoQueue(_queueSize);
        }
        pSptd->_objectProxyNum = _objectProxyNum;
        pSptd->_objectProxy    = _objectProxy;
        pSptd->_queueInit      = true;
    }
    if(_objectProxyNum == 1)
    {
        pObjProxy = *_objectProxy;
        pReqQ     = pSptd->_reqQueue[0];
    }
    else
    {
        if(pSptd->_netThreadSeq >= 0)
        {
            //网络线程发起的请求
            assert(pSptd->_netThreadSeq < static_cast<int>(_objectProxyNum));
            pObjProxy = *(_objectProxy + pSptd->_netThreadSeq);
            pReqQ     = pSptd->_reqQueue[pSptd->_netThreadSeq];
        }
        else
        {
            //用线程的私有数据来保存选到的seq
            pObjProxy = *(_objectProxy + pSptd->_netSeq);//选取obj
            pReqQ     = pSptd->_reqQueue[pSptd->_netSeq];
            pSptd->_netSeq++;

            if(pSptd->_netSeq == _objectProxyNum)
                pSptd->_netSeq = 0;
        }
    }
}
           

(2)第二层负载均衡: 通过EndpointManager选择AdapterProxy,负载均衡算法(Hash、权重、轮询)

objproxy中的invoke调用selectAdapterProxy。

bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
{
    pAdapterProxy = NULL;
    //刷新主控
    refreshReg(E_DEFAULT,"");

    //无效的数据 返回true
    if(!_valid)
    {
        return true;
    }

    //如果有hash,则先使用hash策略
    if (msg->bHash)
    {
        pAdapterProxy = getHashProxy(msg->iHashCode, msg->bConHash);

        return false;
    }
    
    if(_weightType == E_STATIC_WEIGHT)
    {
        //权重模式
        bool bStaticWeighted = false;
        if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
            bStaticWeighted = true;

        pAdapterProxy = getWeightedProxy(bStaticWeighted);
    }
    else
    {
        //普通轮询模式
        pAdapterProxy = getNextValidProxy();
    }

    return false;
}
           

6、同步调用的话,创建完条件变量,通知网络线程进行请求包的发送。

//通知网络线程
    bool bEmpty = false;
    bool bSync  = (msg->eType == ReqMessage::SYNC_CALL);

    if(!pReqQ->push_back(msg,bEmpty))
    {
        TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);

        delete msg;
        msg = NULL;

        pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

        throw TarsClientQueueException("client queue full");
    }

    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
           

7、数据在客户端epoll中监听发送。 在CommunicatorEpoll::notify()中,caller线程往请求事件通知数组NotifyInfo _notify[]中添加请求事件,通知CommunicatorEpoll进行请求包的发送。注意了,这个函数的作用仅仅是通知网络线程准备发送数据,通过TC_Epoller::mod()或者TC_Epoller::add()触发一个EPOLLIN事件,从而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的网络线程CommunicatorEpoll被唤醒,并设置唤醒后的epoll_event中的联合体epoll_data变量为&_notify[iSeq].stFDInfo。

_iSeq为每个主调线程会被分配一个唯一的序列号,这个序列号对应到网络线程的epoll事件数据列表。SeqManager类负责分配该序列号。

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{
    assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);

    if(_notify[iSeq].bValid)
    {
        _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
        assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
    }
    else
    {
        _notify[iSeq].stFDInfo.iType   = FDInfo::ET_C_NOTIFY;
        _notify[iSeq].stFDInfo.p       = (void*)msgQueue;
        _notify[iSeq].stFDInfo.fd      = _notify[iSeq].eventFd;
        _notify[iSeq].stFDInfo.iSeq    = iSeq;
        _notify[iSeq].notify.createSocket();
        _notify[iSeq].bValid           = true;
		//主调线程占用第iSeq个通知事件fd, 用于向communicatorEpoll注册发送事件。
        _ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
    }
}
           

8、adapterProxy发送数据

Client发起请求时,如果发送缓冲区没有数据,就直接从连接发送出去,如果发送缓冲区有数据,则将发送请求先放入超时队列,网络线程从超时队列拉取请求进行发送。每个AdapterProxy有一个 超时队列 _timeoutQueue,存储了原始的msg结构体信息(含同步异步调用等信息)。**

//交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
    if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
    {
        //请求发送成功了,单向调用直接返回
        if(msg->eType == ReqMessage::ONE_WAY)
        {
       ...
            return 0;
        }

        bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
      ...
    }
    else
    {
 //请求发送失败了,放入超时队列等待重发
        bool bFlag = _timeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout+msg->iBeginTime, false);
       ...
        }
    }
           

adapterproxy调用transceiver向sever发送数据,并在communicatorEpoll中注册客户端socket,监听可读可写事件。

void Transceiver::connect()
{
    if(isValid())  {  return; }
    if(_connStatus == eConnecting || _connStatus == eConnected) {return;}
    int fd = -1;
    if (_ep.type() == EndpointInfo::UDP)
    {
        fd = NetworkUtil::createSocket(true, false, _ep.isIPv6());
        NetworkUtil::setBlock(fd, false);
        _connStatus = eConnected;
    }
    else
    {
        fd = NetworkUtil::createSocket(false, false, _ep.isIPv6());
        NetworkUtil::setBlock(fd, false);

        socklen_t len = _ep.isIPv6() ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
        bool bConnected = NetworkUtil::doConnect(fd, _ep.addrPtr(), len);
        if(bConnected)
        {
            setConnected();
        }
        else
        {
            _connStatus     = Transceiver::eConnecting;
            _conTimeoutTime = TNOWMS + _adapterProxy->getConTimeout();
        }
    }
    _fd = fd;
...
    //设置套接口选项
    vector<SocketOpt> &socketOpts = _adapterProxy->getObjProxy()->getSocketOpt();
    for(size_t i=0; i<socketOpts.size(); ++i)
    {
        if(setsockopt(_fd,socketOpts[i].level,socketOpts[i].optname,socketOpts[i].optval,socketOpts[i].optlen) == -1)
        {
            ...
    }

    _adapterProxy->getObjProxy()->getCommunicatorEpoll()->addFd(fd, &_fdInfo, EPOLLIN|EPOLLOUT);
}
           

9、发送和响应的数据包中含有请求ID字段,iRequestId是一个自增的id,用来关联请求和响应包,另外ReqMessage结构体中,有调用的类型字段用于区分异步同步等。

struct ReqMessage : public TC_HandleBase
{
    //调用类型
    enum CallType
    {
        SYNC_CALL = 1, //同步
        ASYNC_CALL,    //异步
        ONE_WAY,       //单向
        THREAD_EXIT    //线程退出的标识
    };
}



//请求包体
    struct RequestPacket
    {
        1  require short        iVersion;		//版本号
        2  require byte         cPacketType;	//包类型
        3  require int          iMessageType;//消息类型
        4  require int          iRequestId;	//请求ID
        5  require string       sServantName;	//servant名字
        6  require string       sFuncName;	//函数名称
        7  require vector<byte> sBuffer;		//二进制buffer
        8  require int          iTimeout;		//超时时间(毫秒)
        9  require map<string, string> context;	//业务上下文
        10 require map<string, string> status; 	//框架协议上下文
    };

    struct ResponsePacket
    {
        1 require short         iVersion;		//版本号
        2 require byte          cPacketType;	//包类型
        3 require int           iRequestId;		//请求ID
        4 require int           iMessageType;	//消息类型
        5 require int           iRet;			//返回值
        6 require vector<byte>  sBuffer;		//二进制流
        7 require map<string, string> status; 	//协议上下文
        8 optional string        sResultDesc;   //描述
        9 optional map<string, string> context;   //业务上下文
    };
    
           

10、客户端接收服务端发来的信息包,并根据响应包的请求ID(iRequestId),从超时队列中取出原发送的msg结构体,查验调用类型等。

void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
    ReqMessage * msg = NULL;

    //requestid 为0 是push消息
    if(rsp.iRequestId == 0)
    {
        msg               = new ReqMessage();
        msg->eStatus      = ReqMessage::REQ_RSP;
        msg->eType        = ReqMessage::ASYNC_CALL;
        msg->bFromRpc     = true;
        msg->bPush        = true;
        msg->proxy        = _objectProxy->getServantProxy();
        msg->pObjectProxy = _objectProxy;
        msg->adapter      = this;
        msg->callback     = _objectProxy->getPushCallback();
    }
    else
    {
        //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了
        bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

        //找不到此请求id信息
        if (!retErase)
        {
            if(_timeoutLogFlag)
            {
                TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId 
                    << ",desc:" << _endpoint.desc() << endl);
            }
            return ;
        }

        assert(msg->eStatus == ReqMessage::REQ_REQ);

        msg->eStatus = ReqMessage::REQ_RSP;
    }

    msg->response = rsp;//完善msg的响应包

    finishInvoke(msg);
}
           

11、在函数

AdapterProxy::finishInvoke(ReqMessage)

中,程序在

CommunicatorEpoll::pushAsyncThreadQueue()

中,通过轮询的方式选择异步回调处理线程处理接收到的响应包,每个异步处理线程有一个任务处理队列,通过以下代码将信息包msg(带响应信息)放到异步回调处理线程中的队列中。异步处理线程数默认是3,最大是1024。

//异步回调,放入回调处理线程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
    //轮询的方式选择异步处理线程。
    _asyncThread[_asyncSeq]->push_back(msg);
    _asyncSeq ++;

    if(_asyncSeq == _asyncThreadNum)
    {
        _asyncSeq = 0;
    }
}
           

12、选取之后,通过

AsyncProcThread::push_back()

,将msg包放在响应包队列

AsyncProcThread::_msgQueue

中,然后通过

AsyncProcThread:: notify()

函数通知本异步回调处理线程进行处理,

AsyncProcThread:: notify()

函数可以令阻塞在

AsyncProcThread:: run()中的AsyncProcThread::timedWait()

的异步处理线程被唤醒。在

AsyncProcThread::run()

中,主要执行下面的程序进行函数回调:

if (_msgQueue->pop_front(msg))
{
 ......

    try
    {
        ReqMessagePtr msgPtr = msg;
        msg->callback->onDispatch(msgPtr);
    }
    catch (exception& e)
    {
        TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);
    }
    catch (...)
    {
        TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);
    }
}
           

13、通过msg->callback,程序可以调用回调函数基类

ServantPrxCallback里面的onDispatch(

)函数。在

ServantPrxCallback:: onDispatch()

中,分析此次响应所对应的RPC方法名,获取响应结果,并通过动态多态,执行用户所定义好的派生类的虚函数。通过ReqMessagePtr的引用计数,还可以将

ReqNessage msg

删除掉,与同步调用不同,同步调用的msg的新建与删除都在caller线程中,而异步调用的msg在caller线程上构造,在异步回调处理线程中析构。

virtual int onDispatch(tars::ReqMessagePtr msg)
        {
            static ::std::string __ConfigAdmin_all[]=
            {
                "AddConfig",
                 ...
            };
            pair<string*, string*> r = equal_range(__ConfigAdmin_all, __ConfigAdmin_all+14, string(msg->request.sFuncName));
            if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
            switch(r.first - __ConfigAdmin_all)
            {
                case 0:
                {
                    if (msg->response.iRet != tars::TARSSERVERSUCCESS)
                    {
                        callback_AddConfig_exception(msg->response.iRet);

                        return msg->response.iRet;
                    }
                    tars::TarsInputStream<tars::BufferReader> _is;

                    _is.setBuffer(msg->response.sBuffer);
                    tars::Int32 _ret;
                    _is.read(_ret, 0, true);

                    std::string result;
                    _is.read(result, 2, true);
                    CallbackThreadData * pCbtd = CallbackThreadData::getData();
                    assert(pCbtd != NULL);

                    pCbtd->setResponseContext(msg->response.context);

                    callback_AddConfig(_ret, result);

                    pCbtd->delResponseContext();

                    return tars::TARSSERVERSUCCESS;

        }
  }
           

二、异步并发模式 promise和future

Future & Promise 模式,属于一种实现异步调用的并发模式。 普通的异步调用是基于回调的异步,当服务规模和业务逻辑比较简单的时候,基于回调的异步调用能简单就搞定了,但是随着业务服务的规模扩大,业务逻辑慢慢变得复杂,一些需要进行多次异步串行调用和异步并行调用的业务需求慢慢显现出来,虽然基于回调的异步调用能够满足这些需求,但是使得异步调用的逻辑流程逻辑分散在不同地方,这点造成了对业务开发提出了很大的挑战,编码代码十分不便,代码逻辑难于理解和维护。代码采用同步模式编写异步的业务逻辑。

Future表示一个调用结果,而这个结果可能不会立即给出,代表了未来某个时刻会得到结果,而这个结果是由Promise来保证的,可以通过get()得到这个结果,其成员函数then,表示注册一个对get()得到的结果进行处理的函数。Promise表示给调用其成员函数getFuture()的Future的一个承诺,通过setValue设置承诺的结果。

异步串行化:

//服务对外接口,串行调用
taf::Int32 
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
    //设置异步回包
    current->setResponse(false);

    // 向服务B发送异步请求,返回值的类型是
    // promise::Future<std::string>,
    // 意思就是服务B未来会返回一个string类型的数据
    promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);

    // f调用其成员函数then,给未来要到达的string类型的
    // 返回结果设置一个处理函数
    // 在handleBRspAndSendCReq中获取返回结果,
    // 并return sendCReq(),即f2,然后f2通过链式法则调用then
    f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
    .then(promise::bind(&handleCRspAndReturnClient, current));

    return 0;
}
promise::Future<std::string> sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)

{

//定义一个promise::Promise<std::string>类型的变量promise,

//其目的是承诺会在promise里面存放一个string类型的数据,

//然后把这个变量传到BServantCallback对象中,

//然后发起异步调用

//最后返回promise.getFuture(),意思是promise承诺的string类型数据

//可以通过promise::Future<std::string>类型的promise.getFuture()来获得

       promise::Promise<std::string> promise;

 

//其实这个的current可以不用传递给BServantCallback,当然传也没有影响

    Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);

 

       prx->async_queryResult(cb, sIn);

 

    return promise.getFuture();

}
           

异步并行化:

taf::Int32 AServantImp::queryResultParallel(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)

{
       current->setResponse(false);
       promise::Future<std::string> f1 = sendBReq(_pPrxB, sIn, current);
       promise::Future<std::string> f2 = sendCReq(_pPrxC, sIn, current); 

       promise::Future<promise::Tuple<promise::Future<std::string>, promise::Future<std::string> > > f_all = promise::whenAll(f1, f2);

       f_all.then(promise::bind(&handleBCRspAndReturnClient, current));
       return 0;

}