天天看點

P2psim源代碼分析四

P2psim源代碼分析四

Kejieleung

Channel與RPC

這次重點分析Channel與RPC。Channel作為底層task的資料通道,用于傳輸task之間的資料。channel結點定義為(注意在定義task.h裡,還有相關的操作):

struct Channel

{

      unsigned int               bufsize;

      unsigned int               elemsize;

      unsigned char            *buf;

      unsigned int               nbuf;

      unsigned int               off;

      Altarray asend;

      Altarray arecv;

      char                     *name;

};

在Packet、RPCHandle、Network、EventQueue裡都有使用到.

EventQueue的構造函數如下:

EventQueue::EventQueue() : _time(0)

{

//kejie: Channel *_gochan;

  _gochan = chancreate(sizeof(Event*), 0);

  assert(_gochan);

//kejie:add self run to the threadmanage

  thread();

}

之後再通過EventQueue::run()線程函數時調用 recvp(_gochan) 接收底層task發送到channel的資料。詳細的實作可以進一步看看task裡的定義,不過反正這部分不是重點,隻要了解一下就可以。

   一般意義的RPC使用client/server模型。請求程式是client,而服務提供程式則為server。就像一般的本地過程調用一樣,且RPC是一個同步操作,直到遠端過程結果傳回請求程式才可以挂起。在P2PSim裡的RPC機制在是單機裡的模拟。通過底層的channel來傳遞請求。先看看RPCHandle的定義:

class RPCHandle { public:

  RPCHandle(Channel*, Packet*);

  ~RPCHandle();

  Channel *channel() { return _c; }

  Packet *packet() { return _p; }

private:

  Channel* _c;

  Packet* _p;

};

定義很簡單,主要是看用法和相關操作,集中在Node.h裡使用,且多數操作是以模闆形式,還用到了成員函數指針,是以看起來比較費勁呵呵,不過沒關系,慢慢來。關于Node類的詳細分析留在下一章進行,這裡先抽出RPC相關操作與資料結構。

相關操作有:

// Send an RPC from a Node on one Node to a method

// of the same Node sub-class with a different ip

template<class BT, class AT, class RT>

 bool doRPC(IPAddress dst, void (BT::* fn)(AT *, RT *), AT *args, RT *ret, Time timeout = 0)

// Same as doRPC, but this one is asynchronous

template<class BT, class AT, class RT>

unsigned asyncRPC(IPAddress dst,

  void (BT::* fn)(AT *, RT *), AT *args, RT *ret, Time timeout = 0, unsigned token = 0)

// returns one of the RPCHandle's for which a reply has arrived. BLOCKING.( Used in asyncRPC)

unsigned rcvRPC(RPCSet*, bool&);

void _deleteRPC(unsigned);

typedef set<unsigned> RPCSet;

HashMap<unsigned, RPCHandle*> _rpcmap;

// RPC machinery

template<class BT, class AT, class RT>

class Thunk

bool _doRPC(IPAddress, void (*fn)(void *), void *args, Time timeout = 0);

RPCHandle* _doRPC_send(IPAddress, void (*)(void *), void (*)(void*), void *, Time = 0);

bool _doRPC_receive(RPCHandle*);           

// creates a Thunk object with the necessary croft for an RPC

//kejie:  Node::Thunk<BT, AT, RT> * 為傳回值

template<class BT, class AT, class RT>

Node::Thunk<BT, AT, RT> *

  _makeThunk(IPAddress dst, BT *target, void (BT::*fn)(AT*, RT*),AT *args, RT *ret)

具體的實作都在Node.h裡,由于部分模闆函數比較複雜,等到具體協定實作時再回過頭來分析,這裡先放一下,隻分析一下recvRPC析流程。

recvRPC用于傳回一個已有回應到達的RPCHandle,注釋裡說是一個阻塞操作,了解為有立即傳回結果即可:

(1)   根據hset裡辨別的索引,在_rpcmap裡找出要接收資訊的channel資訊,記錄在Alt數組裡

Alt定義為(task.h):

struct Alt

{

       Channel               *c;

       void                            *v;

       unsigned int        op;

       Task                            *task;

       Alt                        *xalt;

};

注意還有一個alt是定義為

#define  alt          chanalt

int          chanalt(Alt *alts);

(2)   傳入alt(a)->[ chanalt(Alt*a) ] 處理執行

(3)   在alt中,首先要确定傳入的alt的長度,設定為taskrunning

(4)   檢查每個a[i]是否可運作,記錄可運作的數目,如果多于一個就随機選擇一個

(5)   确認是否有資料傳回,再删除索引

詳細代碼如下:

//kejie: 接收hset裡的一個資料, 首先在 _rpcmap => Alt數組 a / index2token

// returns one of the RPCHandle's for which a reply has arrived. BLOCKING.(typedef set<unsigned> RPCSet;)

unsigned

Node::rcvRPC(RPCSet *hset, bool &ok)

{

int na = hset->size() + 1; //多出的一個用于結束辨別 a[na].op = CHANEND

Alt *a = (Alt *) malloc(sizeof(Alt) * na); // might be big, take off stack!

Packet *p;

unsigned *index2token = (unsigned*) malloc(sizeof(unsigned) * hset->size());

int i = 0;

//HashMap<unsigned, RPCHandle*> _rpcmap;

for(RPCSet::const_iterator j = hset->begin(); j != hset->end(); j++) {

assert(_rpcmap[*j]);

a[i].c = _rpcmap[*j]->channel();

a[i].v = &p; //Packet

a[i].op = CHANRCV;

index2token[i] = *j; //第i個任務是對應該hset 裡的*j索引号

i++;

}

a[i].op = CHANEND;

//#define alt chanalt =>int chanalt(Alt *alts); -- implement in libtask/channel

//kejie: chanalt裡會運作 channel 的任務, 傳回執行了的任務channel号

if((i = alt(a)) < 0) {

cerr << "interrupted" << endl;

assert(false);

}

assert(i < (int) hset->size());

unsigned token = index2token[i];

assert(token);

//清除已執行任務辨別

hset->erase(token);

_deleteRPC(token);

//p:(packet) 在 alt執行時會将接收到的資料放到p裡

// a[i].v = &p; //Packet

if( !p ) {

// if there's no packet, then this must be a wakeup

// from a non-network source (like a condition variable)

ok = true;

} else {

ok = p->ok();

delete p;

}

free(a);

free(index2token);

//傳回執行的索引

return token;

}

整體的RPC操作可以參考以下UML圖,再對照源代碼:

(由于暫時上傳不了圖檔,以後再補上)

也可以留下E-mail我将word版本傳過去哈~~~最好直接發到我的郵箱因為有時沒上空間也不知道呢呵呵~~

[email protected]

繼續閱讀