1 前言
之前的一篇文章《Linux下使用hiredis庫實作優先級隊列》,用的同步的接口實踐;
後來遇到一個場景,同時需要處理Redis訂閱的消息,又需要處理其他網絡socket操作、定時器操作,當然多線程是一個思路,本文嘗試從Reactive模式上解決這個問題,即用redis的異步接口,與libevent進行對接。
其實最終目的就是就是Redis接入到這篇文章程式設計執行個體裡面:《Linux下使用libevent庫實作伺服器端程式設計》
2.接口
2.1 async.h
/usr/include/hiredis/aync.h
裡面,定義了異步接口:
連接配接與釋放的接口:
/* Functions that proxy to hiredis */
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
void redisAsyncDisconnect(redisAsyncContext *ac);
可以注冊連接配接、斷連接配接的回調函數:
(上述異步connect是直接傳回成功的,需要在callback裡面判斷是否連接配接redis成功)
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
讀寫處理,具體怎麼用先不糾結:
/* Handle read/write events */
void redisAsyncHandleRead(redisAsyncContext *ac);
void redisAsyncHandleWrite(redisAsyncContext *ac);
與同步接口用法類似
redisCommand
,異步指令用如下接口,但是通過redisCallbackFn擷取操作的結果:
/* Command functions for an async context. Write the command to the
* output buffer and register the provided callback. */
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
異步執行個體的結構體定義:
/* Context for an async connection to Redis */
typedef struct redisAsyncContext {
/* Hold the regular context, so it can be realloc'ed. */
redisContext c;
/* Setup error flags so they can be used directly. */
int err;
char *errstr;
/* Not used by hiredis */
void *data;
/* Event library data and hooks */
struct {
void *data;
/* Hooks that are called when the library expects to start
* reading/writing. These functions should be idempotent. */
void (*addRead)(void *privdata);
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
} ev;
/* Called when either the connection is terminated due to an error or per
* user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */
redisDisconnectCallback *onDisconnect;
/* Called when the first write event was received. */
redisConnectCallback *onConnect;
/* Regular command callbacks */
redisCallbackList replies;
/* Subscription callbacks */
struct {
redisCallbackList invalid;
struct dict *channels;
struct dict *patterns;
} sub;
} redisAsyncContext;
2.2 adapters接口
接口然後下一步看看怎麼跟libevent結合,能在結構體看到裡面有個
ev
結構,這個地方就是事件觸發的核心。
這塊hiredis已經幫我們考慮到了,直接用async.h比較麻煩,是以他都給适配了大部分事件庫的接口:
/usr/include/hiredis/adapters
├── ae.h
├── glib.h
├── ivykis.h
├── libevent.h
├── libev.h
├── libuv.h
├── macosx.h
└── qt.h
與libevent關聯,我們需要額外調用一個接口
redisLibeventAttach
:
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
redisContext *c = &(ac->c);
redisLibeventEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
e = (redisLibeventEvents*)malloc(sizeof(*e));
e->context = ac;
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisLibeventAddRead;
ac->ev.delRead = redisLibeventDelRead;
ac->ev.addWrite = redisLibeventAddWrite;
ac->ev.delWrite = redisLibeventDelWrite;
ac->ev.cleanup = redisLibeventCleanup;
ac->ev.data = e;
/* Initialize and install read/write events */
e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e);
e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e);
event_add(e->rev, NULL);
event_add(e->wev, NULL);
return REDIS_OK;
}
3 程式設計執行個體
以下程式設計執行個體準備實作:
- Redis異步連接配接,斷線重連機制(定時器);
- Redis-LIST-POP功能,外部有事件或逾時了,進行觸發;
- Libevent定時器,模拟其他業務功能(定時器);
- Libevent信号處理;
3.1 類定義
定義類
class mod_redisev
如下:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include <event2/event.h>
#include <string>
class mod_redisev
{
private:
uint16_t _port;
std::string _addr;
int _status; // 辨別是否連接配接成功
struct redisAsyncContext *_rc; // Redis異步執行個體
struct event_base *_base; // libevent執行個體
struct event *_ev_quit; // 信号處理
struct event *_ev_timer; // 業務定時器
struct event *_ev_kalive; // 保活定時器
public:
mod_redisev(const std::string &addr = "127.0.0.1", uint16_t port = 6379)
: _port(port)
, _addr(addr)
, _status(REDIS_ERR)
, _rc(NULL)
{
_base = event_base_new();
if (!_base) {
throw errno;
}
_ev_kalive = event_new(_base, -1, EV_PERSIST, on_keepalive, this);
_ev_timer = event_new(_base, -1, EV_PERSIST, on_do_something, this);
_ev_quit = evsignal_new(_base, SIGINT, on_signal_quit, this);
if (!_ev_kalive || !_ev_quit || !_ev_timer) {
throw errno;
}
}
~mod_redisev()
{
event_base_free(_base);
}
int redis_connect();
int dispatch();
void redis_close();
private:
void __do_keepalive();
void __on_auth(struct redisReply *reply);
void __on_pop(struct redisReply *reply);
void __on_connect(int status);
void __on_quit()
{
printf("quit...\n");
event_base_loopbreak(_base);
}
private:
static void on_do_something(int fd, short events, void *args)
{
printf("Do something...\n");
}
static void on_keepalive(int fd, short events, void *args)
{
((class mod_redisev *)args)->__do_keepalive();
}
static void on_signal_quit(int fd, short events, void *args)
{
((class mod_redisev *)args)->__on_quit();
}
static void on_redis_connect(const struct redisAsyncContext *rc, int status)
{
((class mod_redisev *)rc->data)->__on_connect(status);
}
static void on_redis_close(const struct redisAsyncContext *rc, int status)
{
printf("Redis disconnect...\n");
}
static void on_redis_auth(struct redisAsyncContext *rc, void *reply, void *args)
{
((class mod_redisev *)args)->__on_auth((struct redisReply *)reply);
}
static void on_redis_pop(struct redisAsyncContext *rc, void *reply, void *args)
{
((class mod_redisev *)args)->__on_pop((struct redisReply *)reply);
}
};
輕松愉快的main入口:
int main(int argc, char *argv[])
{
class mod_redisev redisev;
redisev.redis_connect();
redisev.dispatch();
redisev.redis_close();
exit(EXIT_SUCCESS);
}
3.2 Redis連接配接
異步連接配接功能,用
redisLibeventAttach
與libevent綁定:
設定兩個回調函數:
on_redis_connect
、
on_redis_close
int mod_redisev::redis_connect()
{
int res = -1;
_rc = redisAsyncConnect(_addr.c_str(), _port);
if (!_rc || _rc->err) {
printf("redisAsyncConnect: %s\n", _rc ? _rc->errstr : "error");
return -1;
}
_rc->data = this; // attch to class
res = redisLibeventAttach(_rc, _base);
if (0 != res) {
printf("redisLibeventAttach\n");
return -1;
}
(void)redisAsyncSetConnectCallback(_rc, on_redis_connect);
(void)redisAsyncSetDisconnectCallback(_rc, on_redis_close);
return 0;
}
連接配接的流程為:觸發連接配接成功了,設定
REDIS_OK
辨別:
void mod_redisev::__on_connect(int status)
{
_status = status;
switch (_status) {
case REDIS_OK:
printf("Redis connected...\n");
(void)redisAsyncCommand(_rc, on_redis_auth, this, "AUTH 123456");
break;
case REDIS_ERR:
printf("Redis reconnecting...\n");
break;
default:
break;
}
}
Redis的連接配接保活借助了定時器對
_status
的檢查,如果連接配接不正常,再次進行
connect
操作:
void mod_redisev::__do_keepalive()
{
if (_status != REDIS_OK) {
printf("Reconect...\n");
redis_connect();
}
}
3.2 認證
上述
__on_connnect
函數中,如果連接配接成功了,則發起認證指令
AUTH
AUTH并不是可選的,由于我redis-server配置了密碼:
/etc/redis.conf
requirepass 123456
認證結果的處理,如果成功,那麼進入下一個邏輯,
BRPOP
擷取資料
void mod_redisev::__on_auth(struct redisReply *reply)
{
if (!reply || reply->type == REDIS_REPLY_ERROR) {
printf("Reply: %s\n", reply ? reply->str : "error");
_status = REDIS_ERR;
return;
}
else if (reply->type != REDIS_REPLY_STATUS) {
printf("Reply unknown: %d\n", reply->type);
_status = REDIS_ERR;
return;
}
printf("AUTH success...\n");
(void)redisAsyncCommand(_rc, on_redis_pop, this, "BRPOP queue1 queue2 queue3 10");
}
注意常見的認證錯誤有下面這種:
Reply: NOAUTH Authentication required. // 沒有配置密碼,無須認證
Reply: ERR invalid password // 錯誤的密碼
3.3 拉取資料
認證成功後,進入拉取資料階段,這裡用了循環拉取的邏輯:
以阻塞10秒的形式,分别看 queue1 queue2 queue3 是否有資料過來:
BRPOP queue1 queue2 queue3 10
void mod_redisev::__on_pop(struct redisReply *reply)
{
if (!reply || reply->type == REDIS_REPLY_ERROR) {
printf("Reply: %s\n", reply ? reply->str : "error");
_status = REDIS_ERR;
return;
}
if (reply->type == REDIS_REPLY_NIL) {
printf("BRPOP: empty...\n");
}
else if (reply->type == REDIS_REPLY_ARRAY) {
if (reply->elements > 0) {
struct redisReply *rs = reply->element[1];
printf("BRPOP: %s\n", rs->str);
}
}
redisAsyncCommand(_rc, on_redis_pop, this, "BRPOP queue1 queue2 queue3 10");
}
這裡就展現出異步事件的優勢出來了,如果使用同步的請求接口,那麼就得阻塞10秒,期間什麼也不能幹;
每秒1次的,假裝幹活的接口:
class mod_redisev {
...
static void on_do_something(int fd, short events, void *args)
{
printf("Do something...\n");
}
...
};
3.4 信号處理
class mod_redisev {
...
static void on_signal_quit(int fd, short events, void *args)
{
((class mod_redisev *)args)->__on_quit();
}
void __on_quit()
{
printf("quit...\n");
event_base_loopbreak(_base);
}
...
};
最後剩下的兩個處理,dispatch與連接配接關閉:
int mod_redisev::dispatch()
{
struct timeval tv = {1};
event_add(_ev_timer, &tv);
event_add(_ev_kalive, &tv);
event_add(_ev_quit, NULL);
return event_base_dispatch(_base);
}
void mod_redisev::redis_close()
{
if (_rc && _status == REDIS_OK) {
printf("Redis disconnect...\n");
redisAsyncDisconnect(_rc); // __redisAsyncFree() called
_rc = NULL;
}
}
3.5 執行效果
編譯方法記得把
-lhiredis
、
-levent
加進來
g++ -o redis_async -std=c++11 -Wall -g2 -O3 -Os redis_async.cc -levent -lhiredis
Redis connected... // redis連接配接成功
AUTH success... // auth成功,發起BRPOP請求
Do something... // 每秒1次的 do-something
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
BRPOP: empty... // BRPOP 10秒逾時到,沒有資料
Do something... // 每秒1次的、執着的 do-something
Do something...
Do something...
Do something...
Do something...
Do something...
BRPOP: a // 外部我執行了指令,LPUSH queue1 a b c d e
BRPOP: b // BRPOP函數連續觸發出來,擷取 a b c d e
BRPOP: c
BRPOP: d
BRPOP: e
Do something...
Do something...
Do something...
Do something...
quit... // 外部 ctrl + c
Redis disconnect... // 回收資源,disconnect
4. 總結
本文通過libhiredis庫提供的async異步接口,利用了
adapters/libevent.h
實作快速與libevent銜接,主要适合的場景,還是單線程下的事件觸發模型;
主要是幾個心得:
- 需要習慣的地方,異步回調的程式設計思路,每個指令都需要一個回調函數處理;
- 如果中間還想穿插直接用同步接口,如redisCommand,似乎_rc->c是不能直接用;
- hiredis容易用錯出現double-free,需要外部_status進行綜合判斷多多檢查;
- valgrind檢查記憶體,中間過程倒是不洩露,但最後redis-libevent釋放有不幹淨的地方;
參考文章:
[1] https://github.com/redis/hiredis/blob/master/README.md