bufferevent,帶buffer的event
struct bufferevent {
struct event_base *ev_base;
const struct bufferevent_ops *be_ops; // backend
struct event ev_read; // 讀事件
struct event ev_write; // 寫事件
struct evbuffer *input; // 讀緩沖
struct evbuffer *output; // 寫緩沖
bufferevent_data_cb readcb; // 讀事件回調
bufferevent_data_cb writecb; // 寫事件回調
bufferevent_event_cb errorcb; // error回調
void *cbarg; // 回調函數參數
struct event_watermark wm_read;
struct event_watermark wm_write;
struct timeval timeout_read;
struct timeval timeout_write;
short enabled;
};
下面簡單分析bufferevent相關函數(示例DEMO)
bufferevent_socket_new
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
int options)
{
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
#ifdef WIN32
if (base && event_base_get_iocp(base))
return bufferevent_async_new(base, fd, options);
#endif
if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
options) < 0) {
mm_free(bufev_p);
return NULL;
}
bufev = &bufev_p->bev;
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
evbuffer_freeze(bufev->input, 0);
evbuffer_freeze(bufev->output, 1);
return bufev;
}
函數做了4件事:
- 設定backend: bufferevent_ops_socket
- 設定fd的讀事件回調: bufferevent_readcb
- 設定fd的寫事件回調: bufferevent_writecb
- 設定輸出緩沖區的回調: bufferevent_socket_outbuf_cb
backend結構如下:
const struct bufferevent_ops bufferevent_ops_socket = {
"socket",
evutil_offsetof(struct bufferevent_private, bev),
be_socket_enable,
be_socket_disable,
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
be_socket_ctrl,
};
bufferevent_setcb
void
bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
{
BEV_LOCK(bufev);
bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = eventcb;
bufev->cbarg = cbarg;
BEV_UNLOCK(bufev);
}
該函數主要設定使用者回調函數。
bufferevent_enable
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;
_bufferevent_incref_and_lock(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
_bufferevent_decref_and_unlock(bufev);
return r;
}
static int
be_socket_enable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
return -1;
}
if (event & EV_WRITE) {
if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
return -1;
}
return 0;
}
#define be_socket_add(ev, t) \
_bufferevent_add_event((ev), (t))
int
_bufferevent_add_event(struct event *ev, const struct timeval *tv)
{
if (tv->tv_sec == 0 && tv->tv_usec == 0)
return event_add(ev, NULL);
else
return event_add(ev, tv);
}
該函數将fd加入到epoll中。
事件流程
當fd上有讀事件發生時,首先調用bufferevent_readcb。
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
struct evbuffer *input;
int res = 0;
short what = BEV_EVENT_READING;
ev_ssize_t howmuch = -1, readmax=-1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_READ, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}
input = bufev->input;
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - evbuffer_get_length(input);
/* we somehow lowered the watermark, stop reading */
if (howmuch <= 0) {
bufferevent_wm_suspend_read(bufev);
goto done;
}
}
readmax = _bufferevent_get_read_max(bufev_p);
if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
* uglifies this code. XXXX */
howmuch = readmax;
if (bufev_p->read_suspended)
goto done;
evbuffer_unfreeze(input, 0);
res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
evbuffer_freeze(input, 0);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
/* error case */
what |= BEV_EVENT_ERROR;
} else if (res == 0) {
/* eof case */
what |= BEV_EVENT_EOF;
}
if (res <= 0)
goto error;
_bufferevent_decrement_read_buckets(bufev_p, res);
/* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low)
_bufferevent_run_readcb(bufev);
goto done;
reschedule:
goto done;
error:
bufferevent_disable(bufev, EV_READ);
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decref_and_unlock(bufev);
}
通過evbuffer_read(内部調用recv),将fd上的資料讀入輸入緩沖input中。
通過_bufferevent_run_readcb調用使用者回調函數
(如果出錯,通過_bufferevent_run_eventcb調用errorcb)
void
_bufferevent_run_readcb(struct bufferevent *bufev)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->readcb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
} else {
bufev->readcb(bufev, bufev->cbarg);
}
}
在使用者回調函數中,可以通過 bufferevent_read 取出輸入緩沖input中的資料。
size_t
bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
{
return (evbuffer_remove(bufev->input, data, size));
}
/* Reads data from an event buffer and drains the bytes read */
int
evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
{
ev_ssize_t n;
EVBUFFER_LOCK(buf);
n = evbuffer_copyout(buf, data_out, datlen);
if (n > 0) {
if (evbuffer_drain(buf, n)<0)
n = -1;
}
EVBUFFER_UNLOCK(buf);
return (int)n;
}
參考資料:
Libevent源碼分析-----evbuffer結構與基本操作
Libevent源碼分析-----更多evbuffer操作函數
轉載于:https://www.cnblogs.com/gattaca/p/7827479.html