P2psim源代碼分析三
Kejieleung
繼續上一篇事件發生器EventGenerator的分析,這部分重點分析事件隊列EventQueue的設計。事件隊列,作為Observer模式中的ConcreteSubject,首先複習一下Observer模式吧。
可參考:
http://www.cppblog.com/zliner/archive/2006/09/10/12217.html
有一篇簡要介紹
觀察者模式把目标對觀察者的依賴進行抽象:使目标隻知道自己有若幹觀察者,但不知道這些觀察者具體是誰,可能有多少個;當目标狀态改變時隻要給這些觀察者一個通知,不必作更多的事情。這樣目标對觀察者的依賴就達到了抽象和最小,而目标對具體觀察者的依賴被解除了。
類圖如下:

Subject對象儲存一個Observer引用的清單,當我們讓一個ConcreteObserver對象觀察Subject對象時,調用後者的Attach()方法,将前者的引用加入該清單中。當Subject對象狀态改變時,它調用自身的Notify方法,該方法調用清單中每一個Observer的Update()方法。一個ConcreteObserver隻要重定義Update()就能收到通知,作為對通知的響應,Update()調用Subject對象的getStatus()擷取資料,然後更新自身。當不需要繼續觀察時,ConcreteObserver對象調用Subject對象的Detach()方法,其引用被從清單中移除。
OK看完上面的部分後,即可看一下EventQueue的觀察者模型的實作。可看下面類圖
Observerd對應的就是Subject了,EventQueue對應ConcreteSubject,按照觀察者模式,首先目标對象儲存觀察者對像,這部分的是由具體的實作ChurnEventGenerator的構造函數裡調用:
- //kejie: 注冊螢幕
- EventQueue::Instance()->registerObserver(this);
- EventQueue的完整class聲明:
- class EventQueue : public Threaded, public Observed {
- friend class EventQueueObserver;
- public:
- static EventQueue* Instance(); //singletion
- ~EventQueue();
- void add_event(Event*);
- Time time() { return _time; }
- static Time fasttime() { return _instance?_instance->time():0; }
- void go();
- private:
- EventQueue();
- struct eq_entry {
- eq_entry() { ts = 0; events.clear(); }
- eq_entry(Event *e) { ts = e->ts; events.clear(); }
- Time ts;
- vector<Event*> events;
- sklist_entry<eq_entry> _sortlink;
- };
- skiplist<eq_entry, Time, &eq_entry::ts, &eq_entry::_sortlink> _queue;
- static EventQueue *_instance;
- Time _time;
- Channel *_gochan;
- virtual void run(); //kejie:線程函數
- bool advance();
- // for debuging
- void dump();
- };
EventQueue這部分實作又使用了singleton單件模式,在第一次使用時執行個體化:
EventQueue*
EventQueue::Instance()
{
if(_instance)
return _instance;
return (_instance = New EventQueue());
}
EventQueue的構造函數再建立一個底層channel和啟動線程,關于channel模型的分析會放在後,這裡先了解為底層線程隊列的實作即可。注意這裡的事件都是能過一個skiplist的資料結構來儲存,具體實作在./p2psim/skiplist.h,因為是使用了template是以聲明實作都放在這個檔案裡了。
EventQueue::EventQueue() : _time(0)
{
//kejie: Channel *_gochan;
_gochan = chancreate(sizeof(Event*), 0);
assert(_gochan);
//kejie:add self run to the threadmanage
thread();
}
這裡最主要的就是run函數了,本身也作為一個總線程的線程函數,當EventQueue有多于一個事件到達時,就會調用advance()處理,這是一個線程觸發函數,将事件分發到另一個線程中處理。
- //kejie:線程函數
- void
- EventQueue::run()
- {
- // Wait for threadmain() to call go().
- recvp(_gochan);
- while(true) {
- // let others run
- while(anyready())
- yield();
- // time is going to move forward.
- // everyone else is quiet.
- // must be time for the next event.
- // run events for next time in the queue
- if(!advance())
- break;
- }
- }
- // moves time forward to the next event
- bool
- EventQueue::advance()
- {
- ...
- for(vector<Event*>::const_iterator i = eqe->events.begin(); i != eqe->events.end(); ++i) {
- assert((*i)->ts == eqe->ts &
- (*i)->ts >= _time &
- (*i)->ts < _time + 100000000);
- // notify observers, who will not add eventsinto the eventqueue using EventQueueObserver::add_event
- notifyObservers((ObserverInfo*) *i);
- Event::Execute(*i); // new thread, execute(), delete Event
- ...
- }
在有事件到達時,通知EventQueueObserver更新狀态,調用Observerd的kick函數
void
Observed::notifyObservers(ObserverInfo *oi)
{
if(!_hasObservers)
return;
for(set<Observer*>::const_iterator i = _observers.begin(); i != _observers.end(); ++i)
(*i)->kick(this, oi);
}
kick函數是 Observer的純虛函數(Update),通知所有Observer更新狀态,為調用具本的實作函:ChurnEventGenerator::kick(Observed *o, ObserverInfo *oi)作相應的處理。
在ChurnEventGenerator處理三種基本的事類型:join,crash,lookup
最後分析一下ChurnEventGenerator::run()
事件發生器,整個模拟都是通這是裡發出的事件來推動 join/depatrture/die,繼承自 Threaded 的虛函數,以線程方式運作(包裝好的底層task)。通過預先随機計算所有結點的join,lookup 事件,将其加入到EventQueue中,crash事件是在lookup失效(指定失效機率)後再加到EventQueue的。在計算完成後,才調用EventQueue::Instance()->go();啟動
- void
- ChurnEventGenerator::run()
- {
- ...
- vector<IPAddress> *_ips = Network::Instance()->getallfirstips();
- IPAddress ip = 0;
- for(u_int xxx = 0; xxx < _ips->size(); xxx++){
- ip = (*_ips)[xxx];
- Args *a = New Args();
- (*a)["wellknown"] = _wkn_string; //每個結點都要設定引導結點
- (*a)["first"] = _wkn_string; //a hack //first lookup node
- u_int jointime;
- ...
- if( now() + jointime < _exittime ) {
- P2PEvent *e = New P2PEvent(now() + jointime, ip, "join", a); //kejie:生成單個P2P事件
- add_event(e); //加入事件隊列
- } else {
- delete a;
- a = NULL;
- }//end if
- ...
- if( _lookupmean > 0 && now() + jointime + tolookup < _exittime ) {
- P2PEvent *e = New P2PEvent(now() + jointime + tolookup, ip, "lookup", a); //kejie:查找事件
- add_event(e);
- } else {
- delete a;
- }//end if
- }//end for
EventQueue::Instance()->go(); //kejie:确定好所有結點的加入/查找事件時間,就可以啟動事件運作.