天天看點

C++高性能服務架構revolver:同時支援100萬個的定時事件的定時器

在高性能的伺服器程式當中,定時器是必不可少的部件,而且定時器的效率是直接影響到服務的性能。在衆多的開源項目中,定時器設計都有各有各的方法,例如ACE和libEvent都采用了最小堆的算法實作,還有其他的開源項目采用平衡二叉樹來做定時的器管理算法。不管是最小堆還是平衡二叉樹,其定時器掃描都是O(1),但定時器插入和删除都是O(logN)的複雜度。在定時事件少的情況下,這種算法是足夠的,如果超過上百萬的定時事件,效率會成為瓶頸。是以revolver在定時器的實作上并沒有使用通用的平衡二叉樹和最小堆,而是采用了輪轉HASH算法來做定時器管理。

什麼是輪轉HASH算法?輪轉HASH是通過4個時間輪的轉動來觸發定時事件,就像時鐘的秒針輪、分針輪、時針輪之間的關系一樣。如圖:

C++高性能服務架構revolver:同時支援100萬個的定時事件的定時器

每個輪的有256個刻度,4個輪剛好是一個uint32_t整型數。最小輪的一刻度表示一個reactor的event loop時間(5ms)

1.定時器的掃描

1)擷取目前的系統時間cur_ts,判斷與上次掃描時刻prev_ts之間差距超過最小刻度,計算需要轉動最小刻度的數量

scale = (cur_ts - prev_ts) / loop_delay;

2) 進行第一輪的輪轉,轉動scale個刻度,所有轉動的刻度中的定時事件全部進行觸發。如果轉動後沒有超過最大刻度256,輪轉結束。

3)如果超過最大刻度256,那麼從它的父輪(第二輪)上輪轉1刻度。将父輪上輪轉的刻度所有的定時事件全部按時間映射到子輪上。相當于重複2步。這是個疊代過程,如果父輪到了256,那麼繼續輪轉父輪的父輪。直到到達第四輪轉動為止。這和秒鐘走一圈就是分針走一個刻度的原理是一樣的。

代碼如下:

template<class HANDLER, class FUNCTOR, class LOCK>
uint32_t CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::expire()
{
  BASE_GUARD_RETURN(LOCK, cf_mon, mutex_, 0);
  uint32_t ret = SELECT_DELAY;  //預設20MS

  CBaseTimeValue cur_timer = CBaseTimeValue::get_time_value();
  
  if(cur_timer > prev_time_)
  {
    uint32_t scale = static_cast<uint32_t>((cur_timer.msec() - prev_time_.msec()) / SELECT_DELAY);
    if(scale > 0)
    {
      ret = revolver(scale);
      prev_time_ = cur_timer;
    }
  }

  return ret;
}

template<class HANDLER, class FUNCTOR, class LOCK>
uint32_t CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::revolver(uint32_t scale)
{
  //std::cout << "pos, first = " << rings_[0].get_pos() << ", second = " << rings_[1].get_pos()
  //  << ", third = " << rings_[2].get_pos() << ", fourth = " << rings_[3].get_pos() <<std::endl;

  uint32_t ret = SELECT_DELAY;

  uint8_t index = 0;
  uint32_t rewind_scale = scale;
  while(rewind_scale > 0)
  {
    index = 0;
    if(rings_[index].cycle(rewind_scale, this)) //掃描第一輪
    {
      index ++;
      uint32_t sc = 1;
      while(rings_[index].cycle(sc, this))//掃描下一輪,刻度隻往前推進1格
      {
        sc = 1;
        index ++;
        if(index >= RINGS_SIZE)
        {
          start_time_ = CBaseTimeValue::get_time_value();
          break;
        }
      }
    }
  }

  return ret;
}      

2.定時事件的插入

1)首先計算需觸發的時刻與定時器輪目前時刻的距離

d = (cur_ts - start_ts + delay) / loop_delay;

2)分别計算在4個輪的位置

first = (uint8_t)(timeout_stamp_ / FIRST_ROUND);

second = (uint8_t)((timeout_stamp_ % FIRST_ROUND) / SECOND_ROUND);

third =  (uint8_t)((timeout_stamp_ % SECOND_ROUND) / THIRD_ROUND);

fourth = (uint8_t) (timeout_stamp_ % THIRD_ROUND);

3)通過計算得到的為止,儲存到對應的輪刻度上,插入就完畢了。例子,如果計算的first = 0, second = 2, third = 30, fouth = 1,就會儲存到第3輪的第2刻度上。

代碼如下:

template<class HANDLER, class FUNCTOR, class LOCK>
uint32_t CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::schedule(HANDLER handler, const void *act, uint32_t delay, uint32_t interval)
{
  BASE_GUARD_RETURN(LOCK, cf_mon, mutex_, 0);

  BaseTimerNode_T<HANDLER>* timer_obj = node_pool_.pop_obj();
  if(timer_obj != NULL)
  {
    uint32_t timer_id = get_free_node();
    CBaseTimeValue cur_timer = CBaseTimeValue::get_time_value();
    //計算距離
    uint64_t distance = delay / SELECT_DELAY; //直接以目前時間作為坐标,相差一個掃描間隔20MS
    if(cur_timer > start_time_)
      distance = (cur_timer.msec() - start_time_.msec() + delay) / SELECT_DELAY;
    distance = distance % (UNINT32_MAX);
    timer_obj->set(handler, act, (uint32_t)(core_max(distance, 1)), interval, timer_id);
    heap_[timer_id] = timer_obj;

    used_num_ ++;
    //插入事件
    insert_node(timer_obj);
    upcall_functor().registration(timer_obj->get_handler(), timer_id);
    return timer_id;
  }
  return 0;
}

template<class HANDLER, class FUNCTOR, class LOCK>
void CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::insert_node(BaseTimerNode_T<HANDLER>* node)
{
  uint32_t timer_id = node->get_timer_id();

  uint8_t poss[RINGS_SIZE] = {0};
  //擷取位置
  node->get_revolver_pos(poss[RINGS_SIZE - 1], poss[RINGS_SIZE - 2], poss[RINGS_SIZE - 3], poss[RINGS_SIZE - 4]);
  uint8_t index = RINGS_SIZE - 1;
  //進行插入
  while(!rings_[index].add_element(poss[index], timer_id))
  {
    if(index == 0)
      break ;

    index --;
  }
}      

3.定時事件的删除

删除和插入計算差不多步驟

1)首先計算需要删除定時事件觸發的時刻與定時器輪目前時刻的距離

d = (cur_ts - start_ts + delay) / loop_delay;

2)分别計算在4個輪的位置

first = (uint8_t)(timeout_stamp_ / FIRST_ROUND);

second = (uint8_t)((timeout_stamp_ % FIRST_ROUND) / SECOND_ROUND);

third =  (uint8_t)((timeout_stamp_ % SECOND_ROUND) / THIRD_ROUND);

fourth = (uint8_t) (timeout_stamp_ % THIRD_ROUND);

3)根據位置坐标找到對應的輪位置删除輪上的定時事件

代碼:

template<class HANDLER, class FUNCTOR, class LOCK>
void CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::cancel_timer(uint32_t timer_id, const void **act)
{
  BASE_GUARD(LOCK, cf_mon, mutex_);
  if(timer_id < heap_size_ && heap_[timer_id] != NULL)
  {
    //查找對應的定時事件内容
    BaseTimerNode_T<HANDLER>* timer_obj = heap_[timer_id];
    //删除輪上的定時事件
    delete_node(timer_obj);

    heap_[timer_id] = NULL;
    if(used_num_ > 0)
      used_num_ --;

    freeTimers_.push_back(timer_id);
    *act = timer_obj->get_act();
    upcall_functor().cancel_timer(timer_obj->get_handler(), timer_id);

    node_pool_.push_obj(timer_obj);
  }
}

template<class HANDLER, class FUNCTOR, class LOCK>
void CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::delete_node(BaseTimerNode_T<HANDLER>* node)
{
  uint32_t timer_id = node->get_timer_id();
  uint8_t poss[RINGS_SIZE] = {0};
  node->get_revolver_pos(poss[RINGS_SIZE - 1], poss[RINGS_SIZE - 2], poss[RINGS_SIZE - 3], poss[RINGS_SIZE - 4]);
  //删除掉對應的定時事件
  for(uint8_t index = 0; index < RINGS_SIZE; index ++) //在每個輪上進行删除
  {
    rings_[index].delete_element(poss[index], timer_id);
  }
}      

4.測試

在revovler的test工程中的main()函數中将test_timer_queue注釋去掉,就可以進行測試。以下是test_timer_queue的代碼:

void test_timer_queue()
{
  srand(time(NULL));

  CTimerFunctor functor;
  TIMEQUEUE  timer_queue(&functor);
  CTest_Event_Handler handler;

  handler.tq_ = &timer_queue;

  CBaseTimeValue  begin_timer = CBaseTimeValue::get_time_value();
  for(int i = 0; i < 1000000; i ++)
  {
    insert_timer(&handler, (rand() % 240) * 1000, timer_queue);
  }
  CBaseTimeValue stop_timer = CBaseTimeValue::get_time_value();
  stop_timer = stop_timer - begin_timer;
  std::cout << "insert 1000000 timer, delay = " << stop_timer.msec() << " MS" << std::endl;

  g_ts = stop_timer.get_time_value().msec();
#if _DEBUG
  //timer_queue.set_ring_id();
#endif
  std::cout << "exprie ......" << std::endl;
  while(1)
  {
    uint32_t ms = timer_queue.expire();
    usleep((1000));
  }
}      

這個函數可以測試插入100萬個定時事件的耗時多少,在100個定時事件在定時器管理的時候,CPU和記憶體都可以進行相對應的監控和檢視。我在window 7下面的release版本的資訊如下:

C++高性能服務架構revolver:同時支援100萬個的定時事件的定時器

從上圖看,插入100萬個定時事件耗時978MS,也就是說,當有幾十萬個定時事件在運作的時候,插入一個定時事件隻需要0.97微秒。

以下是100萬個定時事件在處理過程中的CPU和記憶體占用圖。

C++高性能服務架構revolver:同時支援100萬個的定時事件的定時器

5.總結

定時器的實作從效率和功能上實作都達到了最初設想的效率,但是記憶體使用上稍微過高(120M),如果用C的代碼實作,對記憶體做嚴格的控制和記憶體管理,應該會好很多,以後優化的工作應該重點在這。如果需要完整的代碼,請到:

​​https://github.com/yuanrongxi/revolver​​ 下載下傳。

繼續閱讀