天天看點

libev源碼解讀

1、源碼:

源碼參見官網或者我的github上

2、安裝使用:

安裝與基本使用參見我的另一篇部落格:傳送門

3、架構

Libev通過一個 struct ev_loop結結構表示一個事件驅動的架構。在這個架構裡面通過ev_xxx結構,ev_init、ev_xxx_set、ev_xxx_start接口向這個事件驅動的架構裡面注冊事件監控器,當相應的事件監控器的事件出現時,便會觸發該事件監控器的處理邏輯,去處理該事件。處理完之後,這些監控器進入到下一輪的監控中。符合一個标準的事件驅動狀态的模型。

4、源碼

對于libev這樣元件類的網絡庫閱讀源碼,我是按照以下的順序:

1.官方文檔2.看api寫幾個示例程式3.浏覽代碼,搞清楚結構組織4.根據API中的接口去源碼中看看是怎麼實作的,主幹是什麼5.把源碼中demo沒有的功能自己測試使用下

libev庫是用c代碼編寫的,共有八千行左右,下面分析每個檔案

ev.c:是libev的主要邏輯,大概有五千行,

ev_select.c:包括ev_select.c在内的這幾個都是對系統提供的IO複用機制的支援

libev源碼解讀

ev.h:是對一些接口api和變量、常量的定義

ev_wrap.h:各種宏定義,對各種變量的引用的掩蓋、封裝

ev.vars.h:結構的成員變量定義所有的這些代碼大緻都是分為3個部分,1、一連串的宏定義,根據環境定義哪些狀态變量2、針對不同的硬體平台及編譯器的跨平台實作3、程式的主要邏輯

如圖

libev源碼解讀

是以我們可以直接看代碼結構部分即可,具體分析如下:

首先我們來看ev.h,其中的對于每一項事件都有一個結構體/類的定義,先看一下基類watcher是怎麼定義的(都在ev.h中):

/* base class, nothing to see here unless you subclass */
typedef struct ev_watcher
{
  EV_WATCHER (ev_watcher)
} ev_watcher;
           

其中宏定義EV_WATCHER (ev_watcher)定義如下:

/* **shared by all watchers** */
#define EV_WATCHER(type)            \
  int active; /* private */         \
  int pending; /* private */            \
  EV_DECL_PRIORITY /* private */        \
  EV_COMMON /* rw */                \
  EV_CB_DECLARE (type) /* private */
           

實際上可以看做如下:

typedef struct ev_watcher
{
    int active; 
    int pending;
    int priority;
    void *data;    
    void (*cb)(struct ev_loop *loop, struct ev_watcher *w, int revents);
} ev_watcher;
           

宏定義太多了反而不利于我們閱讀,借助Windows下的IDE或者Linux下vim emacs或者vim+cscope或者vim+ctags來閱讀下載下傳的源碼,可以很友善的找到變量的定義位置。

除了watcher的基類,還有一個監控器list類:

typedef struct ev_watcher_list
{
  EV_WATCHER_LIST (ev_watcher_list)
} ev_watcher_list;
           

其也是基于上面EV_WATCHER(type),展開也就是如下:

typedef struct ev_watcher_list
{
    int active; 
    int pending;
    int priority;
    void *data;    
    void (*cb)(struct ev_loop *loop, struct ev_watcher_list *w, int revents);
    struct ev_watcher_list *next;
} ev_watcher_list;
           

同理,再看一個io watcher:

typedef struct ev_io
{
  EV_WATCHER_LIST (ev_io)

  int fd;     /* ro */
  int events; /* ro */
} ev_io;
           

展開就是:

typedef struct ev_io
{
    int active; 
    int pending;
    int priority;
    void *data;    
    void (*cb)(struct ev_loop *loop, struct ev_io *w, int revents);
    struct ev_watcher_list *next;

    int fd;
    int events; 
} ev_io;
           

這裡的fd,events就是派生類的私有成員,分别表示監聽的檔案描述符fd和觸發的事件,可以看到,所有派生類的私有變量放在公共變量的後面,這樣,在使用c語言的指針強制轉換的時候,若把一個派生類ev_io類型對象的指針指派給一個基類ev_watcher類型的指針p後,就可以通過p->active,p->pending,p->priority等來通路派生類中的active成員了。(不是多态,我想多了)

同樣,定時器的watcher直接寫出如下:

typedef struct ev_watcher_time
{
    int active; 
    int pending;
    int priority;
    void *data;    
    void (*cb)(struct ev_loop *loop, struct ev_watcher_time *w, int revents);

    ev_tstamp at;
} ev_watcher_time;
           

這個at就是派生類中新的自有成員 ,表示的是at時間後,觸發這個定時器,at類型為double

信号signal的watcher如下:

typedef struct ev_signal
{
    int active; 
    int pending;
    int priority;
    void *data;    
    void (*cb)(struct ev_loop *loop, struct ev_signal *w, int revents);
    struct ev_watcher_list *next;

    int signum; 
} ev_signal;
           

這個signum就是派生類中新的自有成員 ,當給定的signum信号接收到時,觸發調用

其他的事件watcher跟這三大事件類似,也就是:

**ev_periodic:**invoked at some specific time, possibly repeating at regular intervals (based on UTC)

**ev_child:**invoked when sigchld is received and waitpid indicates the given pid

**ev_stat:**invoked each time the stat data changes for a given path

**ev_fork:**the callback gets invoked before check in the child process when a fork was detected

**ev_idle:**invoked when the nothing else needs to be done, keeps the process from blocking

ev_prepare:

invoked for each run of the mainloop, just before the blocking call ,you can still change events in any way you like

**ev_check:**invoked for each run of the mainloop, just after the blocking call

**ev_cleanup:**is invoked just before the loop gets destroyed

ev_embed:

used to embed an event loop inside another ,the callback gets invoked when the event loop has handled events, and can be 0

**ev_async:**invoked when somebody calls ev_async_send on the watcher

我數了一下,包括3個常用類型,一共是13個事件類型,代碼中還定義了一個可以容納所有watcher對象的類型ev_any_watcher,定義如下:

union ev_any_watcher
{
  struct ev_watcher w;
  struct ev_watcher_list wl;

  struct ev_io io;
  struct ev_timer timer;
  struct ev_periodic periodic;
  struct ev_signal signal;
  struct ev_child child;
#if EV_STAT_ENABLE
  struct ev_stat stat;
#endif
#if EV_IDLE_ENABLE
  struct ev_idle idle;
#endif
  struct ev_prepare prepare;
  struct ev_check check;
#if EV_FORK_ENABLE
  struct ev_fork fork;
#endif
#if EV_CLEANUP_ENABLE
  struct ev_cleanup cleanup;
#endif
#if EV_EMBED_ENABLE
  struct ev_embed embed;
#endif
#if EV_ASYNC_ENABLE
  struct ev_async async;
#endif
};
           

/割割割割割割割割

下面再看ev.c中主循環ev_loop定義如下:

struct ev_loop
  {
    ev_tstamp ev_rt_now;
    #define ev_rt_now ((loop)->ev_rt_now)
    #define VAR(name,decl) decl;
      #include "ev_vars.h"
    #undef VAR
  };
           

其中ev_tstamp 就是時間機關,實際上就是double類型

把宏定義替換後,看到完整的ev_loop定義如下:

struct ev_loop
{
    ev_tstamp ev_rt_now;
    ev_tstamp now_floor;
    int rfeedmax;
    ... .........;
}
           

類似這種加很多宏定義的

ev.h中還有如下的友善程式設計的define,可以使代碼簡潔不少,但同時也增加了閱讀的難度。

struct ev_loop;
# define EV_P  struct ev_loop *loop               /* a loop as sole parameter in a declaration */
# define EV_P_ EV_P,                              /* a loop as first of multiple parameters */
# define EV_A  loop                               /* a loop as sole argument to a function call */
# define EV_A_ EV_A,                              /* a loop as first of multiple arguments */
# define EV_DEFAULT_UC  ev_default_loop_uc_ ()    /* the default loop, if initialised, as sole arg */
# define EV_DEFAULT_UC_ EV_DEFAULT_UC,            /* the default loop as first of multiple arguments */
# define EV_DEFAULT  ev_default_loop (0)          /* the default loop as sole arg */
# define EV_DEFAULT_ EV_DEFAULT,                  /* the default loop as first of multiple arguments */
           

在struct ev_loop的定義下面可以看到這樣一句:

static struct ev_loop default_loop_struct;
 extern struct ev_loop *ev_default_loop_ptr = ; 
  /* needs to be initialised to make it a definition despite extern */
           

該操作就是建立了一個ev_loop類型的執行個體對象以及ev_loop類型的空指針,default_loop_struct表示的是預制事件驅動器(就是loop)。如果在代碼中使用的是預制事件驅動器,那麼後續的操作就都圍繞着這個資料結構展開了。

注意ev_default_loop_ptr是一個全局變量

//

下面我們用一個隻有io事件的例子,通過gdb設定斷點單步調試,來觀察代碼的過程,測試demo如下:

#include<ev.h>
#include <stdio.h>
#include <signal.h>
#include <sys/unistd.h>

ev_io io_w;
void io_action(struct ev_loop *main_loop,ev_io *io_w,int e)
{
        int rst;
        char buf[] = {'\0'};
        puts("in io cb\n");
        read(STDIN_FILENO,buf,sizeof(buf));
        buf[] = '\0';
        printf("Read in a string %s \n",buf);
        ev_io_stop(main_loop,io_w);

}
int main(int argc ,char *argv[])
{
    struct ev_loop *main_loop = ev_default_loop();

    ev_init(&io_w,io_action);
    ev_io_set(&io_w,STDIN_FILENO,EV_READ);  
    ev_io_start(main_loop,&io_w);

    ev_run(main_loop,);

    return ;
}
           

編譯該程式,注意帶上-g參數,這樣就是把調試資訊加到可執行檔案中,如果沒有-g,你将看不見程式的函數名、變量名,所代替的全是運作時的記憶體位址。編譯指令如下:

gcc -g libev_io.c -l ev -o libio
           

啟動gdb調試:

libev源碼解讀

我們從main開始一步步走。首先執行 struct ev_loop *main_loop = ev_default_loop(0)這一句,通過跟進代碼可以跟到函數 ev_default_loop 裡面去,通過查閱源碼我們可以看到這一塊:

ev_default_loop (unsigned int flags) EV_THROW
{
  if (!ev_default_loop_ptr)
    {
#if EV_MULTIPLICITY
      EV_P = ev_default_loop_ptr = &default_loop_struct;
#else
      ev_default_loop_ptr = ;
#endif

      **loop_init (EV_A_ flags);**

      if (ev_backend (EV_A))
        {
#if EV_CHILD_ENABLE
          ev_signal_init (&childev, childcb, SIGCHLD);
          ev_set_priority (&childev, EV_MAXPRI);
          ev_signal_start (EV_A_ &childev);
          ev_unref (EV_A); /* child watcher should not keep loop alive */
#endif
        }
      else
        ev_default_loop_ptr = ;
    }

  return ev_default_loop_ptr;
}
           

首先判斷上面說的在ev_loop中定義的全局對象指針ev_default_loop_ptr是否為空,也就是不曾使用預制的驅動器時,就讓他指向靜态變量default_loop_struct。

EV_P = ev_default_loop_ptr = &default_loop_struct;這一句表明同時在本函數裡面統一用名字”loop”來表示該預制驅動器的指針。進而與函數參數為 EV_P 以及 EV_A的寫法配合。也即是:

# define EV_P  struct ev_loop *loop               /* a loop as sole parameter in a declaration */
# define EV_P_ EV_P,                              /* a loop as first of multiple parameters */
# define EV_A  loop                               /* a loop as sole argument to a function call */
# define EV_A_ EV_A,                              /* a loop as first of multiple arguments */
           

接着對該指針做 loop_init操作,即初始化預制的事件驅動器。這裡函數的調用了也是用到了 EV_A這樣的寫法進行簡化。

初始化之後如果配置中Libev支援子程序,那麼通過信号監控器實作了子程序監控器。

注意,在Libev的函數定義的時候,會看到 “EV_THROW” 這個東西,這裡可以不用管它,他是對CPP中”try … throw”的支援,和EV_CPP中的extern “C”一樣是一種編碼技巧。

下面看看驅動器loop的初始化過程,也就是這一句:

loop_init (EV_A_ flags);

比較長,很多都是根據環境判斷宏定義,來确定是否執行,代碼如下:

/* initialise a loop structure, must be zero-initialised */
static void noinline ecb_cold
loop_init (EV_P_ unsigned int flags) EV_THROW
{
  if (!backend)
    {
      origflags = flags;

#if EV_USE_REALTIME
      if (!have_realtime)
        {
          struct timespec ts;

          if (!clock_gettime (CLOCK_REALTIME, &ts))
            have_realtime = ;
        }
#endif

#if EV_USE_MONOTONIC
      if (!have_monotonic)
        {
          struct timespec ts;

          if (!clock_gettime (CLOCK_MONOTONIC, &ts))
            have_monotonic = ;
        }
#endif

      /* pid check not overridable via env */
#ifndef _WIN32
      if (flags & EVFLAG_FORKCHECK)
        curpid = getpid ();
#endif

      if (!(flags & EVFLAG_NOENV)
          && !enable_secure ()
          && getenv ("LIBEV_FLAGS"))
        flags = atoi (getenv ("LIBEV_FLAGS"));

      ev_rt_now          = ev_time ();
      mn_now             = get_clock ();
      now_floor          = mn_now;
      rtmn_diff          = ev_rt_now - mn_now;
#if EV_FEATURE_API
      invoke_cb          = ev_invoke_pending;
#endif

      io_blocktime       = ;
      timeout_blocktime  = ;
      backend            = ;
      backend_fd         = -;
      sig_pending        = ;
#if EV_ASYNC_ENABLE
      async_pending      = ;
#endif
      pipe_write_skipped = ;
      pipe_write_wanted  = ;
      evpipe []         = -;
      evpipe []         = -;
#if EV_USE_INOTIFY
      fs_fd              = flags & EVFLAG_NOINOTIFY ? - : -;
#endif
#if EV_USE_SIGNALFD
      sigfd              = flags & EVFLAG_SIGNALFD  ? - : -;
#endif

      if (!(flags & EVBACKEND_MASK))
        flags |= ev_recommended_backends ();

#if EV_USE_IOCP
      if (!backend && (flags & EVBACKEND_IOCP  )) backend = iocp_init   (EV_A_ flags);
#endif
#if EV_USE_PORT
      if (!backend && (flags & EVBACKEND_PORT  )) backend = port_init   (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
      if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
      if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init  (EV_A_ flags);
#endif
#if EV_USE_POLL
      if (!backend && (flags & EVBACKEND_POLL  )) backend = poll_init   (EV_A_ flags);
#endif
#if EV_USE_SELECT
      if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif

      ev_prepare_init (&pending_w, pendingcb);

#if EV_SIGNAL_ENABLE || EV_ASYNC_ENABLE
      ev_init (&pipe_w, pipecb);
      ev_set_priority (&pipe_w, EV_MAXPRI);
#endif
    }
}
           

開始是根據系統變量或者環境來選擇性的執行一些步驟,這些在官方的Manual中有提到,主要就是影響預設支援的IO複用機制。接着是一連串的初始值的指派,接着是根據系統支援的IO複用機制,對其進行初始化操作。

譬如如果選擇select進行io複用,則會執行如下的初始化過程:

if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
           

select_init的過程具體在ev_select.c中,如下:

int inline_size
select_init (EV_P_ int flags)
{
//确定了要用的io複用方式,首先把名字統一了
  backend_mintime = ;
  backend_modify  = select_modify;
  backend_poll    = select_poll;
//讀和寫的fd_set的vector  ri用來裝select函數傳回後符合條件的fd
#if EV_SELECT_USE_FD_SET
  vec_ri  = ev_malloc (sizeof (fd_set)); FD_ZERO ((fd_set *)vec_ri);
  vec_ro  = ev_malloc (sizeof (fd_set));
  vec_wi  = ev_malloc (sizeof (fd_set)); FD_ZERO ((fd_set *)vec_wi);
  vec_wo  = ev_malloc (sizeof (fd_set));
  #ifdef _WIN32
  vec_eo  = ev_malloc (sizeof (fd_set));
  #endif
#else
  vec_max = ;
  vec_ri  = ;
  vec_ro  = ;
  vec_wi  = ;
  vec_wo  = ;
  #ifdef _WIN32
  vec_eo  = ;
  #endif
#endif

  return EVBACKEND_SELECT;
}
           

最後是判斷如果系統需要信号事件,那麼進行一系列的操作。

下面看我們的demo下一句監控器初始化:

ev_init(&io_w,io_action);

經過跟蹤檢視,發現這不是一個函數,僅僅是一個宏定義:

/* these may evaluate ev multiple times, and the other arguments at most once */
/* **either use ev_init + ev_TYPE_set, or the ev_TYPE_init macro, below, to first initialise a watcher** */
#define ev_init(ev,cb_) do {            \
  ((ev_watcher *)(void *)(ev))->active  =   \
  ((ev_watcher *)(void *)(ev))->pending = ;    \
  ev_set_priority ((ev), );            \
  ev_set_cb ((ev), cb_);            \
} while ()
           

再看demo下一句,設定io watcher的觸發條件:

ev_io_set(&io_w,STDIN_FILENO,EV_READ);

跟蹤發現該句也是一個宏定義:

#define ev_io_set(ev,fd_,events_)            do { (ev)->fd = (fd_); (ev)->events = (events_) | EV__IOFDSET; } while ()
           

即把檢測的檔案描述符跟事件類型指派

下面看demo中io觀察器配置的最後一步:

ev_io_start(main_loop,&io_w);

在ev.c檔案中可以看到,該函數定義如下:

ev_io_start:

void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  int fd = w->fd;

  if (expect_false (ev_is_active (w)))
    return;

  assert (("libev: ev_io_start called with negative fd", fd >= ));
  assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));

  EV_FREQUENT_CHECK;

  **ev_start (EV_A_ (W)w, );**
  array_needsize (ANFD, anfds, anfdmax, fd + , array_init_zero);
  wlist_add (&anfds[fd].head, (WL)w);

  /* common bug, apparently */
  assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
  w->events &= ~EV__IOFDSET;

  EV_FREQUENT_CHECK;
}
           

首先是兩個assert,就是用來檢測錯誤的,可能是檢查内部資料結構啊,邊界值是否合理等等。

再下面就是ev_start,其代碼如下:

inline_speed void
ev_start (EV_P_ W w, int active)
{
  pri_adjust (EV_A_ w);
  w->active = active;
  ev_ref (EV_A);
}
           

在這裡面把active置1,并且++了activecnt,activecnt也就是指事件驅動器loop上面存在的監控器數量,因為現在建立了一個io watcher,是以數量要加1,而active就是指watcher是否正在監控。

下面就是:

array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
  wlist_add (&anfds[fd].head, (WL)w);
           

這就是libev裡面對動态數組的實作,判斷anfd數組的空間是否足夠fd+1,不夠的話就調整數組記憶體的大小。其中ADFD結構的定義如下:

/* file descriptor info structure */
typedef struct
{
  WL head;
  unsigned char events; /* the events watched for */
  unsigned char reify;  /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
  unsigned char emask;  /* the epoll backend stores the actual kernel mask in here */
  unsigned char unused;
#if EV_USE_EPOLL
  unsigned int egen;    /* generation counter to counter epoll bugs */
#endif
#if EV_SELECT_IS_WINSOCKET || EV_USE_IOCP
  SOCKET handle;
#endif
#if EV_USE_IOCP
  OVERLAPPED or, ow;
#endif
} ANFD;
           

這是一個檔案描述符資訊的資料結構,包括WL類型也就是watcher的基類連結清單:ev_watcher_list *類型的變量head,一個ANFD就是一個檔案描述符所關聯的資訊,前面anfds就是ANFD類型的數組,數組的下标是通過fd進行索引的,這樣由于fd不會太大,是以配置設定的空間合理,索引速度也是o(1)級别的。

再看下面:

fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
  w->events &= ~EV__IOFDSET;
           

對于fd_change:

/* something about the given fd changed */
inline_size void
fd_change (EV_P_ int fd, int flags)
{
  unsigned char reify = anfds [fd].reify;
  anfds [fd].reify |= flags;

  if (expect_true (!reify))
    {
      ++fdchangecnt;
      array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);
      fdchanges [fdchangecnt - ] = fd;
    }
}
           

**fd_change:如果對于給定的fd,在其上的event發生了,那麼就把fd添加到一個fdchanges的數組裡。

fd_reify:依次周遊fdchanges數組中的fd,并且比較fd上的handle與對應anfd上的handle是否相同,進而判斷監控條件是否發生改變,如果改變了則調用backend_modify也就是 select_modify或者epoll_ctl等調整系統對該fd的監控條件,代碼如下:**

SOCKET handle = EV_FD_TO_WIN32_HANDLE (fd);

if (handle != anfd->handle)
 {
   unsigned long arg;
    assert (("libev: only socket fds supported in this configuration", ioctlsocket (handle, FIONREAD, &arg) == ));

   /* handle changed, but fd didn't - we need to do it in two steps */
   backend_modify (EV_A_ fd, anfd->events, );
   anfd->events = ;
   anfd->handle = handle;
 }
           

fdchanges數組的作用就在于此,該數組記錄了anfds數組中fd對應的watcher監控條件可能被修改的檔案描述符,并且在适當的時候低啊用系統的io複用機制修改系統的監控條件。正如官方代碼注釋中說的:

make sure the external fd watch events are in-sync with the kernel/libev internal state

具體的anfd和fdchanges結構如下:

libev源碼解讀

至此,把io事件的注冊過程(ev_io_start)走了一遍,大緻就是從之前設定了監控條件的io watcher擷取檔案描述符fd,找到fd在anfds中對應的ANFD結構,然後把watcher加入到該結構的ev_watcher_list *類型的head鍊上去。因為對應該fd的監控條件肯定已經改變了(新加入的啊),是以在fdchanges數組中加入該fd,然後後續會通過io複用機制來修改對該fd的監控條件。

/

下面就是啟動我們的時間驅動器,也就是定義的loop/mainloop:

ev_run(main_loop,0);

這個函數直接從ev.c中就可以看到,比較長就不全部貼上來了,大緻的邏輯就是

do{

    //fork prepare...
    XXXXXX;

    /* update fd-related kernel structures */
    fd_reify (EV_A);

     /* calculate blocking time */
     XXXXXxxxX;
     //進入poll之前,先sleep io_blocktime
    backend_poll();//也就是select_poll或者poll_poll或者epoll_poll

}while(condition_is_ok)
           

先計算出poll之前要阻塞的時間,進入poll之前,先sleep io_blocktime,然後就進入backend_poll()函數,也就是select_poll或者poll_poll或者epoll_poll,我們以epoll舉例,epoll共有3個系統調用:

1)調用epoll_create()建立一個epoll對象(在epoll檔案系統中為這個句柄對象配置設定資源)

2)調用epoll_ctl向epoll對象中添加這100萬個連接配接的套接字

3)調用epoll_wait收集發生的事件的連接配接

具體select poll epoll的使用與分析見我的另一篇部落格:

傳送門

我們進入backend_poll也就是epoll_poll,首先進入epoll_wait()去收集發生的事件連接配接,傳回發生的事件數目eventcnt,然後依次周遊所有發生的事件,分别做處理。

從epoll傳回後,将監控中的檔案描述符fd以及其pending(滿足監控)的條件通過fd_event()做判斷監控條件是否改變,然後到fd_event_nocheck()裡面對anfds[fd]數組中的fd上的挂的監控器依次做檢測,如果pending條件符合,便通過ev_feed_event()将該監控器加入到pendings數組中pendings[pri]上的pendings[pri][old_lenght+1]的位置。ev_feed_event()代碼如下:

void noinline
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
  W w_ = (W)w;
  int pri = ABSPRI (w_);

  if (expect_false (w_->pending))
    pendings [pri][w_->pending - ].events |= revents;
  else
    {
      w_->pending = ++pendingcnt [pri];
      array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
      pendings [pri][w_->pending - ].w      = w_;
      pendings [pri][w_->pending - ].events = revents;
    },
  pendingpri = NUMPRI - ;
}
           

這裡要介紹一個新的資料結構,他表示pending中的監控條件滿足了,但是還沒有觸發動作的狀态:

/* stores the pending event set for a given watcher */
typedef struct
{
  W w;
  int events; /* the pending event set for the given watcher */
} ANPENDING;
           

W就是 ev_watcher * 類型,上面用到的pendings[]就是ANPENDING類型的一個二維數組,其一級下标pri就是指watcher的優先級,該優先級上pending的監控器數目為二級下标,對應監控器中實際的pending值就是二維下标+1。

首先對于二維數組pendings[],其定義為ANPENDING *pendings [NUMPRI],其中以優先級為下标的每個元素都是ANPENDING類型的,這樣就可以把pending的event類型及watcher指針加入到二維數組pendings[]裡去,這是通過xxx_reify函數實作這個過程的。

下圖就是anfds數組跟pendings數組的對應結構與關系:

libev源碼解讀

再後面就是執行:

EV_INVOKE_PENDING;

其實就是

invoke_cb (EV_A)

,也就是調用loop->invoke_cb,也就是ev_invoke_pending()函數,其代碼如下:

void noinline
ev_invoke_pending (EV_P)
{
  pendingpri = NUMPRI;

  while (pendingpri) /* pendingpri possibly gets modified in the inner loop */
    {
      --pendingpri;

      while (pendingcnt [pendingpri])
        {
          ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];

          p->w->pending = ;
          EV_CB_INVOKE (p->w, p->events);
          EV_FREQUENT_CHECK;
        }
    }
}
           

該函數會周遊pendings這個二維數組,按優先級執行在pending的每一個watcher上觸發事件對應的的回調函數。周遊結束後判斷是否結束整個loop,不的話就再次從io複用那兒等待來一遍,結束的話就清理、退出。

//

至此,我們就把一個簡單的io watcher的例子看完了,其他的事件觀察器也基本類似,整個邏輯都是圍繞watcher來做的,libev内部維護一個基類ev_watcher和一些特定監控器的派生類ev_xxx,如ev_io,當我們要使用一個監控器的時候,首先生成一個具體的watcher執行個體,并且通過派生類的私有成員設定觸發條件,監控fd。

然後就用anfds或者最小堆管理這些watchers,然後通過backend_poll代表的系統io複用機制如epoll,以及時間堆管理運算出pending的watcher,再調用reify函數把觸發事件的watcher按優先級加入到pendings[]二維數組中去。

最後就是依次按優先級調用pendings裡面watcher上觸發事件的回調函數,這樣就實作了一個按優先級的事件模型。

整個過程如下圖所示:

libev源碼解讀