天天看點

libevent源碼解析(二)event接口函數

一.前言

  在上一篇中,我們提到了event結構注釋裡說明了可用的接口函數,即

*    event_new(), event_free(), event_assign(), event_get_assignment(),
 *    event_add(), event_del(), event_active(), event_pending(),
 *    event_get_fd(), event_get_base(), event_get_events(),
 *    event_get_callback(), event_get_callback_arg(),
 *    event_priority_set()
           

  本文對這些接口函數逐個進行分析。這些函數實作了事件的建立、添加、删除、釋放、查詢、激活、閑置等等功能,即事件的基礎功能實作。其實從這些接口函數的名字很容易就可以知道其作用,本文的主要目的是分析這些接口函數的實作原理和内在源碼。這些接口函數均在event.c中實作。其中幾個get函數其實就是簡單的傳回操作,是以本文掠過不提。

二.接口函數分析

(1)event_new()函數

  配置設定并指派新的event結構,準備用于添加和删除,即event_add() 或 event_del()。

參數包括:

  (1)base 新事件屬于的事件庫event_base

  (2)fd 檔案描述符或者信号

  (3)events 對應的控制事件: bitfield of EV_READ, EV_WRITE, EV_SIGNAL, EV_PERSIST, EV_ET.

  (4)callback 事件發生時的回調函數

  (5)callback_arg 回調函數傳參

  fd和event決定了什麼情況會觸發該事件,回調函數和回調傳參決定了事件觸發時應做什麼。

傳回值:

  傳回新的結構體event,必須由event_free()釋放或者置為NULL

  若events包括 EV_READ, EV_WRITE, 或 EV_READ|EV_WRITE,則fd是檔案描述符或者套接字,并可以用于讀、寫。若event包括EV_SIGNAL則fd是信号事件。若沒有任何标記,則事件僅可以在逾時或手動激活時(調用event_actifve())生效。

  代碼非常的簡單,源碼和分析如下所示:

/*建立事件*/
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
	/*建立指針*/
	struct event *ev;
	ev = mm_malloc(sizeof(struct event));

	/*錯誤判斷*/
	if (ev == NULL)
		return (NULL);

	/*調用event_assign*/
	if (event_assign(ev, base, fd, events, cb, arg) < 0) {
		mm_free(ev);
		return (NULL);
	}

	return (ev);
}
           

(2)event_free()函數

  源碼和分析如下所示:

/*釋放事件(包括資源釋放)*/
void
event_free(struct event *ev)
{
	/* This is disabled, so that events which have been finalized be a
	 * valid target for event_free(). That's */
	// event_debug_assert_is_setup_(ev);

	/* make sure that this event won't be coming back to haunt us. */
	event_d el(ev);
	event_debug_note_teardown_(ev);/*關閉針對ev的debug*/
	mm_free(ev);

}
           

(3)event_assign()函數

  源碼和分析如下所示:

/*事件的配置:指派以及異常處理*/
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
	/*異常處理*/
	if (!base)
		base = current_base;
	if (arg == &event_self_cbarg_ptr_)
		arg = ev;

	event_debug_assert_not_added_(ev);

	/*将事件的堆屬性指派為已有的事件堆base*/
	ev->ev_base = base;

	/*指派*/
	ev->ev_callback = callback;
	ev->ev_arg = arg;
	ev->ev_fd = fd;
	ev->ev_events = events;
	ev->ev_res = 0;
	ev->ev_flags = EVLIST_INIT;
	ev->ev_ncalls = 0;
	ev->ev_pncalls = NULL;

	/*根據事件類型分開進行錯誤判斷*/
	/*信号類型*/
	if (events & EV_SIGNAL) {
		/*信号類型不允許有IO類型的讀寫關閉标記位*/
		if ((events & (EV_READ|EV_WRITE|EV_CLOSED)) != 0) {
			event_warnx("%s: EV_SIGNAL is not compatible with "
			    "EV_READ, EV_WRITE or EV_CLOSED", __func__);
			return -1;
		}
		ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL;
	} else {
		/*對永久事件,逾時置零*/
		if (events & EV_PERSIST) {
			evutil_timerclear(&ev->ev_io_timeout);
			ev->ev_closure = EV_CLOSURE_EVENT_PERSIST;
		} else {
			ev->ev_closure = EV_CLOSURE_EVENT;
		}
	}

	min_heap_elem_init_(ev);

	/*優先級:預設放在隊列中間*/
	if (base != NULL) {
		/* by default, we put new events into the middle priority */
		ev->ev_pri = base->nactivequeues / 2;
	}

	/*debug功能開啟*/
	event_debug_note_setup_(ev);

	return 0;
}
           

(4)event_get_assignment()函數

  源碼和分析如下所示:

/*擷取event的屬性:包括事件堆event_base, IO事件的fd, 信号事件的資訊,回調函數等*/
void
event_get_assignment(const struct event *event, struct event_base **base_out, evutil_socket_t *fd_out, short *events_out, event_callback_fn *callback_out, void **arg_out)
{
	event_debug_assert_is_setup_(event);

	if (base_out)
		*base_out = event->ev_base;
	if (fd_out)
		*fd_out = event->ev_fd;
	if (events_out)
		*events_out = event->ev_events;
	if (callback_out)
		*callback_out = event->ev_callback;
	if (arg_out)
		*arg_out = event->ev_arg;
}

           

(5)event_add()函數

  源碼和分析如下所示:

/*添加事件*/
int
event_add(struct event *ev, const struct timeval *tv)
{
	int res;

	/*異常處理,檢查是否有事件堆event_base*/
	if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
		event_warnx("%s: event has no event_base set.", __func__);
		return -1;
	}

	/*加鎖、調用event_add_nolock_添加事件,解鎖*/
	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

	res = event_add_nolock_(ev, tv, 0);

	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

	return (res);
}
           

其中調用的event_add_nolock_(),該函數較為複雜,源碼和分析如下:

/* Implementation function to add an event.  Works just like event_add,
 * except: 1) it requires that we have the lock.  2) if tv_is_absolute is set,
 * we treat tv as an absolute time, not as an interval to add to the current
 * time */
int
event_add_nolock_(struct event *ev, const struct timeval *tv,
    int tv_is_absolute)
{
	struct event_base *base = ev->ev_base;
	int res = 0;
	int notify = 0;

	/*上鎖判斷,debug判斷*/
	EVENT_BASE_ASSERT_LOCKED(base);
	event_debug_assert_is_setup_(ev);

	event_debug((
		 "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%s%scall %p",
		 ev,
		 EV_SOCK_ARG(ev->ev_fd),
		 ev->ev_events & EV_READ ? "EV_READ " : " ",
		 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
		 ev->ev_events & EV_CLOSED ? "EV_CLOSED " : " ",
		 tv ? "EV_TIMEOUT " : " ",
		 ev->ev_callback));

	EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

	if (ev->ev_flags & EVLIST_FINALIZING) {
		/* XXXX debug */
		return (-1);
	}

	/*
	 * 新的timer事件,調用timer heap接口在堆上預留一個位置  
     * 注:這樣能保證該操作的原子性:  
     * 向系統I/O機制注冊可能會失敗,而當在堆上預留成功後,  
     * 定時事件的添加将肯定不會失敗;  
     * 而預留位置的可能結果是堆擴充,但是内部元素并不會改變
	 * prepare for timeout insertion further below, if we get a
	 * failure on any step, we should not change any state.
	 */
	if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
		if (min_heap_reserve_(&base->timeheap,
			1 + min_heap_size_(&base->timeheap)) == -1)
			return (-1);  /* ENOMEM == errno */
	}


	/* If the main thread is currently executing a signal event's
	 * callback, and we are not the main thread, then we want to wait
	 * until the callback is done before we mess with the event, or else
	 * we can race on ev_ncalls and ev_pncalls below. */
#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (base->current_event == event_to_event_callback(ev) &&
	    (ev->ev_events & EV_SIGNAL)
	    && !EVBASE_IN_THREAD(base)) {
		++base->current_event_waiters;
		EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
	}
#endif

	/*如果事件ev不在已注冊或者激活連結清單中,則調用evbase注冊事件 */
	if ((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL)) &&
	    !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
	    
	    /*判斷io或者信号分類添加*/
		if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
			res = evmap_io_add_(base, ev->ev_fd, ev);
		else if (ev->ev_events & EV_SIGNAL)
			res = evmap_signal_add_(base, (int)ev->ev_fd, ev);
		
		/*// 注冊成功,插入event到已注冊連結清單中 */
		if (res != -1)
			event_queue_insert_inserted(base, ev);
		if (res == 1) {
			/* evmap says we need to notify the main thread. */
			notify = 1;
			res = 0;
		}
	}

	/* 準備添加定時事件
	 * we should change the timeout state only if the previous event
	 * addition succeeded.
	 */
	if (res != -1 && tv != NULL) {
		struct timeval now;
		int common_timeout;
#ifdef USE_REINSERT_TIMEOUT
		int was_common;
		int old_timeout_idx;
#endif

		/* 
		 * for persistent timeout events, we remember the
		 * timeout value and re-add the event.
		 *
		 * If tv_is_absolute, this was already set.
		 */
		if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
			ev->ev_io_timeout = *tv;

		/*EVLIST_TIMEOUT表明event已經在定時器堆中了,删除舊的*/
#ifndef USE_REINSERT_TIMEOUT
		if (ev->ev_flags & EVLIST_TIMEOUT) {
			event_queue_remove_timeout(base, ev);
		}
#endif

		/* 如果事件已經是就緒狀态則從激活連結清單中删除 
		 * Check if it is active due to a timeout.  Rescheduling
		 * this timeout before the callback can be executed
		 * removes it from the active list. */
		if ((ev->ev_flags & EVLIST_ACTIVE) &&
		    (ev->ev_res & EV_TIMEOUT)) {
			if (ev->ev_events & EV_SIGNAL) {
				/* See if we are just active executing
				 * this event in a loop
				 * 将ev_callback調用次數設定為0以終止循環  
				 */
				if (ev->ev_ncalls && ev->ev_pncalls) {
					/* Abort loop */
					*ev->ev_pncalls = 0;
				}
			}

			event_queue_remove_active(base, event_to_event_callback(ev));
		}

		/* 計算時間,并插入到timer根堆中 */
		gettime(base, &now);

		common_timeout = is_common_timeout(tv, base);
#ifdef USE_REINSERT_TIMEOUT
		was_common = is_common_timeout(&ev->ev_timeout, base);
		old_timeout_idx = COMMON_TIMEOUT_IDX(&ev->ev_timeout);
#endif

		if (tv_is_absolute) {
			ev->ev_timeout = *tv;
		} else if (common_timeout) {
			struct timeval tmp = *tv;
			tmp.tv_usec &= MICROSECONDS_MASK;
			evutil_timeradd(&now, &tmp, &ev->ev_timeout);
			ev->ev_timeout.tv_usec |=
			    (tv->tv_usec & ~MICROSECONDS_MASK);
		} else {
			evutil_timeradd(&now, tv, &ev->ev_timeout);
		}

		event_debug((
			 "event_add: event %p, timeout in %d seconds %d useconds, call %p",
			 ev, (int)tv->tv_sec, (int)tv->tv_usec, ev->ev_callback));

#ifdef USE_REINSERT_TIMEOUT
		event_queue_reinsert_timeout(base, ev, was_common, common_timeout, old_timeout_idx);
#else
		event_queue_insert_timeout(base, ev);
#endif

		if (common_timeout) {
			struct common_timeout_list *ctl =
			    get_common_timeout_list(base, &ev->ev_timeout);
			if (ev == TAILQ_FIRST(&ctl->events)) {
				common_timeout_schedule(ctl, &now, ev);
			}
		} else {
			struct event* top = NULL;
			/* See if the earliest timeout is now earlier than it
			 * was before: if so, we will need to tell the main
			 * thread to wake up earlier than it would otherwise.
			 * We double check the timeout of the top element to
			 * handle time distortions due to system suspension.
			 */
			if (min_heap_elt_is_top_(ev))
				notify = 1;
			else if ((top = min_heap_top_(&base->timeheap)) != NULL &&
					 evutil_timercmp(&top->ev_timeout, &now, <))
				notify = 1;
		}
	}

	/* if we are not in the right thread, we need to wake up the loop */
	if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
		evthread_notify_base(base);

	event_debug_note_add_(ev);

	return (res);
}
           

(6)event_del()函數

  源碼和分析如下所示:

int
event_del(struct event *ev)
{
	return event_del_(ev, EVENT_DEL_AUTOBLOCK);
}
           

調用的event_del_()如下:

static int
event_del_(struct event *ev, int blocking)
{
	int res;
	struct event_base *base = ev->ev_base;

	/*異常處理*/
	if (EVUTIL_FAILURE_CHECK(!base)) {
		event_warnx("%s: event has no event_base set.", __func__);
		return -1;
	}

	/*上鎖*/
	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	res = event_del_nolock_(ev, blocking);
	EVBASE_RELEASE_LOCK(base, th_base_lock);

	return (res);
}
           

真正的删除和添加一樣,在上鎖之後調用函數執行:

/** 事件删除 Helper for event_del: always called with th_base_lock held.
 *
 * "blocking" must be one of the EVENT_DEL_{BLOCK, NOBLOCK, AUTOBLOCK,
 * EVEN_IF_FINALIZING} values. See those for more information.
 */
int
event_del_nolock_(struct event *ev, int blocking)
{
	struct event_base *base;
	int res = 0, notify = 0;

	event_debug(("event_del: %p (fd "EV_SOCK_FMT"), callback %p",
		ev, EV_SOCK_ARG(ev->ev_fd), ev->ev_callback));

	/* 異常處理,ev_base為NULL,表明ev沒有被注冊 
	 * An event without a base has not been added 
	 */
	if (ev->ev_base == NULL)
		return (-1);

	EVENT_BASE_ASSERT_LOCKED(ev->ev_base);

	if (blocking != EVENT_DEL_EVEN_IF_FINALIZING) {
		if (ev->ev_flags & EVLIST_FINALIZING) {
			/* XXXX Debug */
			return 0;
		}
	}

	base = ev->ev_base;

	EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

	/* 終止循環 See if we are just active executing this event in a loop */
	if (ev->ev_events & EV_SIGNAL) {
		if (ev->ev_ncalls && ev->ev_pncalls) {
			/* Abort loop */
			*ev->ev_pncalls = 0;
		}
	}

	if (ev->ev_flags & EVLIST_TIMEOUT) {
		/* 從逾時隊列中删除
		 * NOTE: We never need to notify the main thread because of a
		 * deleted timeout event: all that could happen if we don't is
		 * that the dispatch loop might wake up too early.  But the
		 * point of notifying the main thread _is_ to wake up the
		 * dispatch loop early anyway, so we wouldn't gain anything by
		 * doing it.
		 */
		event_queue_remove_timeout(base, ev);
	}

	/*從激活/等待激活隊列中删除*/
	if (ev->ev_flags & EVLIST_ACTIVE)
		event_queue_remove_active(base, event_to_event_callback(ev));
	else if (ev->ev_flags & EVLIST_ACTIVE_LATER)
		event_queue_remove_active_later(base, event_to_event_callback(ev));

	/*從對應的連結清單中删除事件 */
	if (ev->ev_flags & EVLIST_INSERTED) {
		event_queue_remove_inserted(base, ev);
		if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
			res = evmap_io_del_(base, ev->ev_fd, ev);
		else
			res = evmap_signal_del_(base, (int)ev->ev_fd, ev);
		if (res == 1) {
			/* evmap says we need to notify the main thread. */
			notify = 1;
			res = 0;
		}
		/* If we do not have events, let's notify event base so it can
		 * exit without waiting */
		if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base))
			notify = 1;
	}

	/* 多線程情況下,判斷是否在該線程執行 if we are not in the right thread, we need to wake up the loop */
	if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
		evthread_notify_base(base);

	event_debug_note_del_(ev);

	/* If the main thread is currently executing this event's callback,
	 * and we are not the main thread, then we want to wait until the
	 * callback is done before returning. That way, when this function
	 * returns, it will be safe to free the user-supplied argument.
	 */
#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (blocking != EVENT_DEL_NOBLOCK &&
	    base->current_event == event_to_event_callback(ev) &&
	    !EVBASE_IN_THREAD(base) &&
	    (blocking == EVENT_DEL_BLOCK || !(ev->ev_events & EV_FINALIZE))) {
		++base->current_event_waiters;
		EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
	}
#endif

	return (res);
}
           

(7)event_active()函數

  源碼和分析如下所示:

void
event_active(struct event *ev, int res, short ncalls)
{
	/*異常處理*/
	if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
		event_warnx("%s: event has no event_base set.", __func__);
		return;
	}

	/*上鎖*/
	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

	event_debug_assert_is_setup_(ev);

	event_active_nolock_(ev, res, ncalls);

	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}
           

event_active_nolock_()代碼如下:

/*激活事件*/
void
event_active_nolock_(struct event *ev, int res, short ncalls)
{
	struct event_base *base;

	event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
		ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));

	base = ev->ev_base;
	EVENT_BASE_ASSERT_LOCKED(base);

	/*标記為終止則無法激活*/
	if (ev->ev_flags & EVLIST_FINALIZING) {
		/* XXXX debug */
		return;
	}

	/*根據标記位判斷立刻激活或者稍後激活*/
	switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
	default:
	case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
		EVUTIL_ASSERT(0);
		break;
	case EVLIST_ACTIVE:
		/* We get different kinds of events, add them together */
		ev->ev_res |= res;
		return;
	case EVLIST_ACTIVE_LATER:
		ev->ev_res |= res;
		break;
	case 0:
		ev->ev_res = res;
		break;
	}

	if (ev->ev_pri < base->event_running_priority)
		base->event_continue = 1;

	if (ev->ev_events & EV_SIGNAL) {
#ifndef EVENT__DISABLE_THREAD_SUPPORT
		if (base->current_event == event_to_event_callback(ev) &&
		    !EVBASE_IN_THREAD(base)) {
			++base->current_event_waiters;
			EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
		}
#endif
		ev->ev_ncalls = ncalls;
		ev->ev_pncalls = NULL;
	}

	/*将事件添加入激活清單中*/
	event_callback_activate_nolock_(base, event_to_event_callback(ev));
}
           

(8)event_pending()函數

  源碼和分析如下所示:

/* 檢測某事件是否待發生,傳回标記位
 * Checks if a specific event is pending or scheduled.
 */

int
event_pending(const struct event *ev, short event, struct timeval *tv)
{
	int flags = 0;

	/*異常檢測*/
	if (EVUTIL_FAILURE_CHECK(ev->ev_base == NULL)) {
		event_warnx("%s: event has no event_base set.", __func__);
		return 0;
	}

	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
	event_debug_assert_is_setup_(ev);

	/*檢查标記位*/
	if (ev->ev_flags & EVLIST_INSERTED)
		flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL));
	if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
		flags |= ev->ev_res;
	if (ev->ev_flags & EVLIST_TIMEOUT)
		flags |= EV_TIMEOUT;

	event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL);

	/* 添加逾時 See if there is a timeout that we should report */
	if (tv != NULL && (flags & event & EV_TIMEOUT)) {
		struct timeval tmp = ev->ev_timeout;
		tmp.tv_usec &= MICROSECONDS_MASK;
		/* correctly remamp to real time */
		evutil_timeradd(&ev->ev_base->tv_clock_diff, &tmp, tv);
	}

	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

	return (flags & event);
}
           

(9)event_priority_set()函數

  源碼和分析如下所示:

/* 設定優先級
 * Set's the priority of an event - if an event is already scheduled
 * changing the priority is going to fail.
 */

int
event_priority_set(struct event *ev, int pri)
{
	event_debug_assert_is_setup_(ev);

	/*優先級設定對已激活的事件無效*/
	if (ev->ev_flags & EVLIST_ACTIVE)
		return (-1);
	if (pri < 0 || pri >= ev->ev_base->nactivequeues)
		return (-1);

	ev->ev_pri = pri;

	return (0);
}
           

三.小結

  本文分析了event的接口函數,下一篇中會分析event_base的接口函數,并由此分析事件處理的中心部分——事件主循環,根據系統提供的事件多路分發機制執行事件循環,對已注冊的就緒事件,調用注冊事件的回調函數來處理事件。

歡迎關注本人公衆号,公衆号會更新一些不一樣的内容,相信一定會有所收獲。

libevent源碼解析(二)event接口函數