P2psim源代碼分析四
Kejieleung
Channel與RPC
這次重點分析Channel與RPC。Channel作為底層task的資料通道,用于傳輸task之間的資料。channel結點定義為(注意在定義task.h裡,還有相關的操作):
struct Channel
{
unsigned int bufsize;
unsigned int elemsize;
unsigned char *buf;
unsigned int nbuf;
unsigned int off;
Altarray asend;
Altarray arecv;
char *name;
};
在Packet、RPCHandle、Network、EventQueue裡都有使用到.
EventQueue的構造函數如下:
EventQueue::EventQueue() : _time(0)
{
//kejie: Channel *_gochan;
_gochan = chancreate(sizeof(Event*), 0);
assert(_gochan);
//kejie:add self run to the threadmanage
thread();
}
之後再通過EventQueue::run()線程函數時調用 recvp(_gochan) 接收底層task發送到channel的資料。詳細的實作可以進一步看看task裡的定義,不過反正這部分不是重點,隻要了解一下就可以。
一般意義的RPC使用client/server模型。請求程式是client,而服務提供程式則為server。就像一般的本地過程調用一樣,且RPC是一個同步操作,直到遠端過程結果傳回請求程式才可以挂起。在P2PSim裡的RPC機制在是單機裡的模拟。通過底層的channel來傳遞請求。先看看RPCHandle的定義:
class RPCHandle { public:
RPCHandle(Channel*, Packet*);
~RPCHandle();
Channel *channel() { return _c; }
Packet *packet() { return _p; }
private:
Channel* _c;
Packet* _p;
};
定義很簡單,主要是看用法和相關操作,集中在Node.h裡使用,且多數操作是以模闆形式,還用到了成員函數指針,是以看起來比較費勁呵呵,不過沒關系,慢慢來。關于Node類的詳細分析留在下一章進行,這裡先抽出RPC相關操作與資料結構。
相關操作有:
// Send an RPC from a Node on one Node to a method
// of the same Node sub-class with a different ip
template<class BT, class AT, class RT>
bool doRPC(IPAddress dst, void (BT::* fn)(AT *, RT *), AT *args, RT *ret, Time timeout = 0)
// Same as doRPC, but this one is asynchronous
template<class BT, class AT, class RT>
unsigned asyncRPC(IPAddress dst,
void (BT::* fn)(AT *, RT *), AT *args, RT *ret, Time timeout = 0, unsigned token = 0)
// returns one of the RPCHandle's for which a reply has arrived. BLOCKING.( Used in asyncRPC)
unsigned rcvRPC(RPCSet*, bool&);
void _deleteRPC(unsigned);
typedef set<unsigned> RPCSet;
HashMap<unsigned, RPCHandle*> _rpcmap;
// RPC machinery
template<class BT, class AT, class RT>
class Thunk
bool _doRPC(IPAddress, void (*fn)(void *), void *args, Time timeout = 0);
RPCHandle* _doRPC_send(IPAddress, void (*)(void *), void (*)(void*), void *, Time = 0);
bool _doRPC_receive(RPCHandle*);
// creates a Thunk object with the necessary croft for an RPC
//kejie: Node::Thunk<BT, AT, RT> * 為傳回值
template<class BT, class AT, class RT>
Node::Thunk<BT, AT, RT> *
_makeThunk(IPAddress dst, BT *target, void (BT::*fn)(AT*, RT*),AT *args, RT *ret)
具體的實作都在Node.h裡,由于部分模闆函數比較複雜,等到具體協定實作時再回過頭來分析,這裡先放一下,隻分析一下recvRPC析流程。
recvRPC用于傳回一個已有回應到達的RPCHandle,注釋裡說是一個阻塞操作,了解為有立即傳回結果即可:
(1) 根據hset裡辨別的索引,在_rpcmap裡找出要接收資訊的channel資訊,記錄在Alt數組裡
Alt定義為(task.h):
struct Alt
{
Channel *c;
void *v;
unsigned int op;
Task *task;
Alt *xalt;
};
注意還有一個alt是定義為
#define alt chanalt
int chanalt(Alt *alts);
(2) 傳入alt(a)->[ chanalt(Alt*a) ] 處理執行
(3) 在alt中,首先要确定傳入的alt的長度,設定為taskrunning
(4) 檢查每個a[i]是否可運作,記錄可運作的數目,如果多于一個就随機選擇一個
(5) 确認是否有資料傳回,再删除索引
詳細代碼如下:
//kejie: 接收hset裡的一個資料, 首先在 _rpcmap => Alt數組 a / index2token
// returns one of the RPCHandle's for which a reply has arrived. BLOCKING.(typedef set<unsigned> RPCSet;)
unsigned
Node::rcvRPC(RPCSet *hset, bool &ok)
{
int na = hset->size() + 1; //多出的一個用于結束辨別 a[na].op = CHANEND
Alt *a = (Alt *) malloc(sizeof(Alt) * na); // might be big, take off stack!
Packet *p;
unsigned *index2token = (unsigned*) malloc(sizeof(unsigned) * hset->size());
int i = 0;
//HashMap<unsigned, RPCHandle*> _rpcmap;
for(RPCSet::const_iterator j = hset->begin(); j != hset->end(); j++) {
assert(_rpcmap[*j]);
a[i].c = _rpcmap[*j]->channel();
a[i].v = &p; //Packet
a[i].op = CHANRCV;
index2token[i] = *j; //第i個任務是對應該hset 裡的*j索引号
i++;
}
a[i].op = CHANEND;
//#define alt chanalt =>int chanalt(Alt *alts); -- implement in libtask/channel
//kejie: chanalt裡會運作 channel 的任務, 傳回執行了的任務channel号
if((i = alt(a)) < 0) {
cerr << "interrupted" << endl;
assert(false);
}
assert(i < (int) hset->size());
unsigned token = index2token[i];
assert(token);
//清除已執行任務辨別
hset->erase(token);
_deleteRPC(token);
//p:(packet) 在 alt執行時會将接收到的資料放到p裡
// a[i].v = &p; //Packet
if( !p ) {
// if there's no packet, then this must be a wakeup
// from a non-network source (like a condition variable)
ok = true;
} else {
ok = p->ok();
delete p;
}
free(a);
free(index2token);
//傳回執行的索引
return token;
}
整體的RPC操作可以參考以下UML圖,再對照源代碼:
(由于暫時上傳不了圖檔,以後再補上)
也可以留下E-mail我将word版本傳過去哈~~~最好直接發到我的郵箱因為有時沒上空間也不知道呢呵呵~~
[email protected]