天天看點

P2psim源代碼分析三

P2psim源代碼分析三

Kejieleung

       繼續上一篇事件發生器EventGenerator的分析,這部分重點分析事件隊列EventQueue的設計。事件隊列,作為Observer模式中的ConcreteSubject,首先複習一下Observer模式吧。

可參考:

http://www.cppblog.com/zliner/archive/2006/09/10/12217.html

有一篇簡要介紹

觀察者模式把目标對觀察者的依賴進行抽象:使目标隻知道自己有若幹觀察者,但不知道這些觀察者具體是誰,可能有多少個;當目标狀态改變時隻要給這些觀察者一個通知,不必作更多的事情。這樣目标對觀察者的依賴就達到了抽象和最小,而目标對具體觀察者的依賴被解除了。

類圖如下:

P2psim源代碼分析三

Subject對象儲存一個Observer引用的清單,當我們讓一個ConcreteObserver對象觀察Subject對象時,調用後者的Attach()方法,将前者的引用加入該清單中。當Subject對象狀态改變時,它調用自身的Notify方法,該方法調用清單中每一個Observer的Update()方法。一個ConcreteObserver隻要重定義Update()就能收到通知,作為對通知的響應,Update()調用Subject對象的getStatus()擷取資料,然後更新自身。當不需要繼續觀察時,ConcreteObserver對象調用Subject對象的Detach()方法,其引用被從清單中移除。

OK看完上面的部分後,即可看一下EventQueue的觀察者模型的實作。可看下面類圖

P2psim源代碼分析三

Observerd對應的就是Subject了,EventQueue對應ConcreteSubject,按照觀察者模式,首先目标對象儲存觀察者對像,這部分的是由具體的實作ChurnEventGenerator的構造函數裡調用:

  1. //kejie: 注冊螢幕
  2.   EventQueue::Instance()->registerObserver(this);
  3. EventQueue的完整class聲明:
  4. class EventQueue : public Threaded, public Observed {
  5.   friend class EventQueueObserver;
  6. public:
  7.   static EventQueue* Instance();    //singletion
  8.   ~EventQueue();
  9.   void add_event(Event*);
  10.   Time time() { return _time; }
  11.   static Time fasttime() { return _instance?_instance->time():0; }
  12.   void go();
  13. private:
  14.   EventQueue();
  15.   struct eq_entry {
  16.     eq_entry() { ts = 0; events.clear(); }
  17.     eq_entry(Event *e) { ts = e->ts; events.clear(); }
  18.     Time ts;
  19.     vector<Event*> events;
  20.     sklist_entry<eq_entry> _sortlink;
  21.   };
  22.   skiplist<eq_entry, Time, &eq_entry::ts, &eq_entry::_sortlink> _queue;
  23.   static EventQueue *_instance;
  24.   Time _time;
  25.   Channel *_gochan;
  26.   virtual void run();   //kejie:線程函數
  27.   bool advance();
  28.   // for debuging
  29.   void dump();
  30. };

EventQueue這部分實作又使用了singleton單件模式,在第一次使用時執行個體化:

EventQueue*

EventQueue::Instance()

{

  if(_instance)

    return _instance;

  return (_instance = New EventQueue());

}

EventQueue的構造函數再建立一個底層channel和啟動線程,關于channel模型的分析會放在後,這裡先了解為底層線程隊列的實作即可。注意這裡的事件都是能過一個skiplist的資料結構來儲存,具體實作在./p2psim/skiplist.h,因為是使用了template是以聲明實作都放在這個檔案裡了。

EventQueue::EventQueue() : _time(0)

{

//kejie: Channel *_gochan;

  _gochan = chancreate(sizeof(Event*), 0);

  assert(_gochan);

//kejie:add self run to the threadmanage

  thread();

}

       這裡最主要的就是run函數了,本身也作為一個總線程的線程函數,當EventQueue有多于一個事件到達時,就會調用advance()處理,這是一個線程觸發函數,将事件分發到另一個線程中處理。

  1. //kejie:線程函數
  2. void
  3. EventQueue::run()
  4. {
  5.   // Wait for threadmain() to call go().
  6.   recvp(_gochan);
  7.   while(true) {
  8.     // let others run
  9.     while(anyready())
  10.       yield();
  11.     // time is going to move forward.
  12.     // everyone else is quiet.
  13.     // must be time for the next event.
  14.     // run events for next time in the queue
  15.     if(!advance())
  16.       break;
  17.   }
  18. }
  19. // moves time forward to the next event
  20. bool
  21. EventQueue::advance()
  22. {
  23. ...
  24.   for(vector<Event*>::const_iterator i = eqe->events.begin(); i != eqe->events.end(); ++i) {
  25.     assert((*i)->ts == eqe->ts &
  26.            (*i)->ts >= _time &
  27.            (*i)->ts < _time + 100000000);
  28.     // notify observers, who will not add eventsinto the eventqueue using EventQueueObserver::add_event
  29.    notifyObservers((ObserverInfo*) *i);
  30.     Event::Execute(*i); // new thread, execute(), delete Event
  31. ...
  32. }

在有事件到達時,通知EventQueueObserver更新狀态,調用Observerd的kick函數

void

Observed::notifyObservers(ObserverInfo *oi)

{

  if(!_hasObservers)

    return;

    for(set<Observer*>::const_iterator i = _observers.begin(); i != _observers.end(); ++i)

    (*i)->kick(this, oi);

}

kick函數是 Observer的純虛函數(Update),通知所有Observer更新狀态,為調用具本的實作函:ChurnEventGenerator::kick(Observed *o, ObserverInfo *oi)作相應的處理。

在ChurnEventGenerator處理三種基本的事類型:join,crash,lookup

最後分析一下ChurnEventGenerator::run()

事件發生器,整個模拟都是通這是裡發出的事件來推動 join/depatrture/die,繼承自 Threaded 的虛函數,以線程方式運作(包裝好的底層task)。通過預先随機計算所有結點的join,lookup 事件,将其加入到EventQueue中,crash事件是在lookup失效(指定失效機率)後再加到EventQueue的。在計算完成後,才調用EventQueue::Instance()->go();啟動

  1. void
  2. ChurnEventGenerator::run()
  3. {
  4. ...
  5.   vector<IPAddress> *_ips = Network::Instance()->getallfirstips();
  6.   IPAddress ip = 0;
  7.   for(u_int xxx = 0; xxx < _ips->size(); xxx++){
  8.     ip = (*_ips)[xxx];
  9.     Args *a = New Args();
  10.     (*a)["wellknown"] = _wkn_string;        //每個結點都要設定引導結點
  11.     (*a)["first"] = _wkn_string; //a hack   //first lookup node
  12.     u_int jointime;
  13.   ...
  14.     if( now() + jointime < _exittime ) {
  15.         P2PEvent *e = New P2PEvent(now() + jointime, ip, "join", a);    //kejie:生成單個P2P事件
  16.         add_event(e);       //加入事件隊列
  17.     } else {
  18.       delete a;
  19.       a = NULL;
  20.     }//end if
  21. ...
  22.     if( _lookupmean > 0 && now() + jointime + tolookup < _exittime ) {
  23.       P2PEvent *e = New P2PEvent(now() + jointime + tolookup, ip, "lookup", a); //kejie:查找事件
  24.       add_event(e);
  25.     } else {
  26.       delete a;
  27.     }//end if
  28.   }//end for

EventQueue::Instance()->go();        //kejie:确定好所有結點的加入/查找事件時間,就可以啟動事件運作.

P2psim源代碼分析三

繼續閱讀