天天看點

最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用

轉載位址:http://blog.csdn.net/w616589292/article/details/45694987

在開發Linux網絡程式時,通常需要維護多個定時器,如維護用戶端心跳時間、檢查多個資料包的逾時重傳等。如果采用Linux的SIGALARM信号實作,則會帶來較大的系統開銷,且不便于管理。

本文在應用層實作了一個基于時間堆的高性能定時器,同時考慮到定時的粒度問題,由于通過alarm系統調用設定的SIGALARM信号隻能以秒為機關觸發,是以需要采用其它手段實作更細粒度的定時操作,當然,這裡不考慮使用多線程+sleep的實作方法,理由性能太低。

通常的做法還有采用基于升序的時間連結清單,但升序時間連結清單的插入操作效率較低,需要周遊連結清單。是以本實作方案使用最小堆來維護多個定時器,插入O(logn)、删除O(1)、查找O(1)的效率較高。

首先是每個定時器的定義:

[cpp]  view plain copy

最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
  1. class heap_timer  
  2. {  
  3. public:  
  4.     heap_timer( int ms_delay )  
  5.     {  
  6.         gettimeofday( &expire, NULL );  
  7.         expire.tv_usec += ms_delay * 1000;  
  8.         if ( expire.tv_usec > 1000000 )  
  9.         {  
  10.             expire.tv_sec += expire.tv_usec / 1000000;  
  11.             expire.tv_usec %= 1000000;  
  12.         }  
  13.     }  
  14. public:  
  15.     struct timeval expire;  
  16.     void (*cb_func)( client_data* );  
  17.     client_data* user_data;  
  18.     ~heap_timer()  
  19.     {  
  20.         delete user_data;  
  21.     }  
  22. };  

包括一個逾時時間expire、逾時回調函數cb_func以及一個user_data變量,user_data用于存儲與定時器相關的使用者資料,使用者資料可以根據不同的應用場合進行修改,這裡實作的是一個智能博物館的網關,網關接收來自zigbee協調器的使用者資料,并為每個使用者維護一段等待時間T,在T到來之前,同一個使用者的所有資料都存放到user_data的target_list中,當T到來時,根據target_list清單選擇一個适當的target并發送到ip_address,同時删除定時器(有點扯遠了=。=)。總之,要實作的功能就是給每個使用者維護一個定時器,定時值到來時做一些操作。

[cpp]  view plain copy

最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
  1. class client_data  
  2. {  
  3. public:  
  4.     client_data(char *address):target_count(0)  
  5.     {  
  6.         strcpy(ip_address,address);  
  7.     }  
  8. private:  
  9.     char ip_address[32];  
  10.     target target_list[64];  
  11.     int target_count;  
  12.     ......  
  13. };  

以下是時間堆的類定義,包括了一些基本的堆操作:插入、删除、擴容,還包括了定時器溢出時的操作函數tick()

[cpp]  view plain copy

最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
  1. class time_heap  
  2. {  
  3. public:  
  4.     time_heap( int cap  = 1) throw ( std::exception )  
  5.         : capacity( cap ), cur_size( 0 )  
  6.     {  
  7.         array = new heap_timer* [capacity];  
  8.         if ( ! array )  
  9.         {  
  10.             throw std::exception();  
  11.         }  
  12.         for( int i = 0; i < capacity; ++i )  
  13.         {  
  14.             array[i] = NULL;  
  15.         }  
  16.     }  
  17.     ~time_heap()  
  18.     {  
  19.         for ( int i =  0; i < cur_size; ++i )  
  20.         {  
  21.             delete array[i];  
  22.         }  
  23.         delete [] array;  
  24.     }  
  25. public:  
  26.     int get_cursize()  
  27.     {  
  28.         return cur_size;  
  29.     }  
  30.     void add_timer( heap_timer* timer ) throw ( std::exception )  
  31.     {  
  32.         if( !timer )  
  33.         {  
  34.             return;  
  35.         }  
  36.         if( cur_size >= capacity )  
  37.         {  
  38.             resize();  
  39.         }  
  40.         int hole = cur_size++;  
  41.         int parent = 0;  
  42.         for( ; hole > 0; hole=parent )  
  43.         {  
  44.             parent = (hole-1)/2;  
  45.             if ( timercmp( &(array[parent]->expire), &(timer->expire), <= ) )  
  46.             {  
  47.                 break;  
  48.             }  
  49.             array[hole] = array[parent];  
  50.         }  
  51.         array[hole] = timer;  
  52.     }  
  53.     void del_timer( heap_timer* timer )  
  54.     {  
  55.         if( !timer )  
  56.         {  
  57.             return;  
  58.         }  
  59.         // lazy delelte  
  60.         timer->cb_func = NULL;  
  61.     }  
  62.     int top(struct timeval &time_top) const  
  63.     {  
  64.         if ( empty() )  
  65.         {  
  66.             return 0;  
  67.         }  
  68.         time_top = array[0]->expire;  
  69.         return 1;  
  70.     }  
  71.     void pop_timer()  
  72.     {  
  73.         if( empty() )  
  74.         {  
  75.             return;  
  76.         }  
  77.         if( array[0] )  
  78.         {  
  79.             delete array[0];  
  80.             array[0] = array[--cur_size];  
  81.             percolate_down( 0 );  
  82.         }  
  83.     }  
  84.     void tick()  
  85.     {  
  86.         heap_timer* tmp = array[0];  
  87.         struct timeval cur;  
  88.         gettimeofday( &cur, NULL );  
  89.         while( !empty() )  
  90.         {  
  91.             if( !tmp )  
  92.             {  
  93.                 break;  
  94.             }  
  95.             if( timercmp( &cur, &(tmp->expire), < ) )  
  96.             {  
  97.                 break;  
  98.             }  
  99.             if( array[0]->cb_func )  
  100.             {  
  101.                 array[0]->cb_func( array[0]->user_data );  
  102.             }  
  103.             pop_timer();  
  104.             tmp = array[0];  
  105.         }  
  106.     }  
  107.     bool empty() const  
  108.     {  
  109.         return cur_size == 0;  
  110.     }  
  111.     heap_timer** get_heap_array()  
  112.     {  
  113.         return array;  
  114.     }  
  115. private:  
  116.     void percolate_down( int hole )  
  117.     {  
  118.         heap_timer* temp = array[hole];  
  119.         int child = 0;  
  120.         for ( ; ((hole*2+1) <= (cur_size-1)); hole=child )  
  121.         {  
  122.             child = hole*2+1;  
  123.             if ( (child < (cur_size-1)) && timercmp( &(array[child+1]->expire), &(array[child]->expire), < ) )  
  124.             {  
  125.                 ++child;  
  126.             }  
  127.             if ( timercmp( &(array[child]->expire), &(temp->expire), < ) )  
  128.             {  
  129.                 array[hole] = array[child];  
  130.             }  
  131.             else  
  132.             {  
  133.                 break;  
  134.             }  
  135.         }  
  136.         array[hole] = temp;  
  137.     }  
  138.     void resize() throw ( std::exception )  
  139.     {  
  140.         heap_timer** temp = new heap_timer* [2*capacity];  
  141.         for( int i = 0; i < 2*capacity; ++i )  
  142.         {  
  143.             temp[i] = NULL;  
  144.         }  
  145.         if ( ! temp )  
  146.         {  
  147.             throw std::exception();  
  148.         }  
  149.         capacity = 2*capacity;  
  150.         for ( int i = 0; i < cur_size; ++i )  
  151.         {  
  152.             temp[i] = array[i];  
  153.         }  
  154.         delete [] array;  
  155.         array = temp;  
  156.     }  
  157. private:  
  158.     heap_timer** array;  
  159.     int capacity;  
  160.     int cur_size;  
  161. };  

如何用epoll實作多個定時器的操作是本設計的關鍵,我們知道,epoll_wait的最後一個參數是阻塞等待的時候,機關是毫秒。可以這樣設計:

1、當時間堆中沒有定時器時,epoll_wait的逾時時間T設為-1,表示一直阻塞等待新使用者的到來;

2、當時間堆中有定時器時,epoll_wait的逾時時間T設為最小堆堆頂的逾時值,這樣可以保證讓最近觸發的定時器能得以執行;

3、在epoll_wait阻塞等待期間,若有其它的使用者到來,則epoll_wait傳回n>0,進行正常的處理,随後應重新設定epoll_wait為小頂堆堆頂的逾時時間。

為此,本實作對epoll_wait進行了封裝,名為tepoll_wait,調用接口與epoll_wait差不多,但傳回值有所不同:tepoll_wait不傳回n=0的情況(即逾時),因為逾時事件在tepoll_wait中進行處理,隻有等到n>0(即在等待過程中有使用者資料到來)或者n<0(出現錯誤)才進行傳回。

廢話不多說,看代碼最清楚:

[cpp]  view plain copy

最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
  1. void timer_handler()  
  2. {  
  3.     heap.tick();  
  4.     //setalarm();  
  5. }  
  6. int tepoll_wait( int epollfd, epoll_event *events, int max_event_number )  
  7. {  
  8.     struct timeval now;  
  9.     struct timeval tv;  
  10.     struct timeval *tvp;  
  11.     //tevent_t *tp;  
  12.     int n;  
  13.     for ( ;; )  
  14.     {  
  15.         if ( gettimeofday( &now, NULL ) < 0 )  
  16.             perror("gettimeofday");  
  17.         struct timeval time_top;  
  18.         if ( heap.top(time_top) )  
  19.         {  
  20.             tv.tv_sec = time_top.tv_sec - now.tv_sec;;  
  21.             tv.tv_usec = time_top.tv_usec - now.tv_usec;  
  22.             if ( tv.tv_usec < 0 )  
  23.             {  
  24.                 tv.tv_usec += 1000000;  
  25.                 tv.tv_sec--;  
  26.             }  
  27.             tvp = &tv;  
  28.         }  
  29.         else  
  30.             tvp = NULL;  
  31.         if(tvp == NULL)  
  32.             n = epoll_wait( epollfd, events, max_event_number, -1 );  
  33.         else  
  34.             n = epoll_wait( epollfd, events, max_event_number, tvp->tv_sec*1000 + tvp->tv_usec/1000 );  
  35.         if ( n < 0 )  
  36.             return -1;  
  37.         if ( n > 0 )  
  38.             return n;  
  39.         timer_handler();  
  40.     }  
  41. }  

代碼一目了然,在tepoll_wait中,是個死循環,隻有等到上述兩種情況發生時,才進行傳回,此時在調用方進行處理,處理過程跟epoll_wait一樣。

[cpp]  view plain copy

最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
最小堆定時器的實作以及與網絡程式設計中的多路IO複用的應用
  1. while( !stop_server )  
  2.     {  
  3.         number = tepoll_wait( epollfd, events, MAX_EVENT_NUMBER);  
  4.         for ( i= 0; i < number; i++ )  
  5.         {  
  6.             int fd = events[i].data.fd;  
  7.             if ( (events[i].events & EPOLLIN)&& (fd == uart_fd) )  
  8.             {  
  9.                //讀取使用者資料  
  10.                 if( (timer_id = find_exist_timer(ip_address)) != -1)  
  11.                 {  
  12.                     //add to the exist timer  
  13.                     heap_timer ** heap_array = heap.get_heap_array();  
  14.                     heap_array[timer_id]->user_data->add_target(RSSI,target_id);  
  15.                     continue;  
  16.                 }  
  17. <span style="white-space:pre">      </span>//new timer  
  18.                 heap_timer *timer = new heap_timer(200);  
  19.                 timer->cb_func = cb_func;  
  20.                 timer->user_data = new client_data(ip_address);  
  21.                 timer->user_data->add_target(RSSI,target_id);  
  22.                 heap.add_timer(timer);  
  23.             }  
  24.             else if( ( fd == pipefd[0] ) && ( events[i].events & EPOLLIN ) )  
  25.             {  
  26.                 //此處進行了統一信号源處理,通過雙向管道來擷取SIGTERM以及SIGINT的信号,在主循環中進行統一處理  
  27. <span style="white-space:pre">      </span>char signals[1024];  
  28.                 ret = recv( pipefd[0], signals, sizeof( signals ), 0 );  
  29.                 if( ret == -1 )  
  30.                 {  
  31.                     continue;  
  32.                 }  
  33.                 else if( ret == 0 )  
  34.                 {  
  35.                     continue;  
  36.                 }  
  37.                 else  
  38.                 {  
  39.                     for( int i = 0; i < ret; ++i )  
  40.                     {  
  41.                         switch( signals[i] )  
  42.                         {  
  43.                         case SIGTERM:  
  44.                         case SIGINT:  
  45.                         {  
  46.                             stop_server = true;  
  47.                         }  
  48.                         }  
  49.                     }  
  50.                 }  
  51.             }  
  52.         }  
  53.     }  
1

繼續閱讀