天天看點

Linux下使用hiredis庫與libevent實作異步接口的I/O複用

1 前言

    之前的一篇文章《Linux下使用hiredis庫實作優先級隊列》,用的同步的接口實踐;

    後來遇到一個場景,同時需要處理Redis訂閱的消息,又需要處理其他網絡socket操作、定時器操作,當然多線程是一個思路,本文嘗試從Reactive模式上解決這個問題,即用redis的異步接口,與libevent進行對接。

    其實最終目的就是就是Redis接入到這篇文章程式設計執行個體裡面:《Linux下使用libevent庫實作伺服器端程式設計》

2.接口

2.1 async.h

/usr/include/hiredis/aync.h

裡面,定義了異步接口:

連接配接與釋放的接口:

/* Functions that proxy to hiredis */
redisAsyncContext *redisAsyncConnect(const char *ip, int port);
void redisAsyncDisconnect(redisAsyncContext *ac);
           

可以注冊連接配接、斷連接配接的回調函數:

(上述異步connect是直接傳回成功的,需要在callback裡面判斷是否連接配接redis成功)

int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
           

讀寫處理,具體怎麼用先不糾結:

/* Handle read/write events */
void redisAsyncHandleRead(redisAsyncContext *ac);
void redisAsyncHandleWrite(redisAsyncContext *ac);
           

與同步接口用法類似

redisCommand

,異步指令用如下接口,但是通過redisCallbackFn擷取操作的結果:

/* Command functions for an async context. Write the command to the
 * output buffer and register the provided callback. */
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
           

異步執行個體的結構體定義:

/* Context for an async connection to Redis */
typedef struct redisAsyncContext {
    /* Hold the regular context, so it can be realloc'ed. */
    redisContext c;

    /* Setup error flags so they can be used directly. */
    int err;
    char *errstr;

    /* Not used by hiredis */
    void *data;

    /* Event library data and hooks */
    struct {
        void *data;

        /* Hooks that are called when the library expects to start
         * reading/writing. These functions should be idempotent. */
        void (*addRead)(void *privdata);
        void (*delRead)(void *privdata);
        void (*addWrite)(void *privdata);
        void (*delWrite)(void *privdata);
        void (*cleanup)(void *privdata);
    } ev; 

    /* Called when either the connection is terminated due to an error or per
     * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */
    redisDisconnectCallback *onDisconnect;

    /* Called when the first write event was received. */
    redisConnectCallback *onConnect;

    /* Regular command callbacks */
    redisCallbackList replies;

    /* Subscription callbacks */
    struct {
        redisCallbackList invalid;
        struct dict *channels;
        struct dict *patterns;
    } sub;
} redisAsyncContext;
           

2.2 adapters接口

接口然後下一步看看怎麼跟libevent結合,能在結構體看到裡面有個

ev

結構,這個地方就是事件觸發的核心。

這塊hiredis已經幫我們考慮到了,直接用async.h比較麻煩,是以他都給适配了大部分事件庫的接口:

/usr/include/hiredis/adapters
├── ae.h
├── glib.h
├── ivykis.h
├── libevent.h
├── libev.h
├── libuv.h
├── macosx.h
└── qt.h
           

與libevent關聯,我們需要額外調用一個接口

redisLibeventAttach

static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
    redisContext *c = &(ac->c);
    redisLibeventEvents *e; 

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;

    /* Create container for context and r/w events */
    e = (redisLibeventEvents*)malloc(sizeof(*e));
    e->context = ac; 

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisLibeventAddRead;
    ac->ev.delRead = redisLibeventDelRead;
    ac->ev.addWrite = redisLibeventAddWrite;
    ac->ev.delWrite = redisLibeventDelWrite;
    ac->ev.cleanup = redisLibeventCleanup;
    ac->ev.data = e;

    /* Initialize and install read/write events */
    e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e); 
    e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e); 
    event_add(e->rev, NULL);
    event_add(e->wev, NULL);
    return REDIS_OK;
}
           

3 程式設計執行個體

以下程式設計執行個體準備實作:

  1. Redis異步連接配接,斷線重連機制(定時器);
  2. Redis-LIST-POP功能,外部有事件或逾時了,進行觸發;
  3. Libevent定時器,模拟其他業務功能(定時器);
  4. Libevent信号處理;

3.1 類定義

定義類

class mod_redisev

如下:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include <event2/event.h>
#include <string>

class mod_redisev
{
private:
    uint16_t _port;
    std::string _addr;
    int _status; // 辨別是否連接配接成功

    struct redisAsyncContext *_rc; // Redis異步執行個體
    struct event_base *_base; // libevent執行個體

    struct event *_ev_quit; // 信号處理
    struct event *_ev_timer; // 業務定時器
    struct event *_ev_kalive; // 保活定時器
public:

    mod_redisev(const std::string &addr = "127.0.0.1", uint16_t port = 6379)
        : _port(port)
        , _addr(addr)
        , _status(REDIS_ERR)
        , _rc(NULL)
    {
        _base = event_base_new();
        if (!_base) {
            throw errno;
        }

        _ev_kalive = event_new(_base, -1, EV_PERSIST, on_keepalive, this);
        _ev_timer = event_new(_base, -1, EV_PERSIST, on_do_something, this);
        _ev_quit = evsignal_new(_base, SIGINT, on_signal_quit, this);
        if (!_ev_kalive  || !_ev_quit || !_ev_timer) {
            throw errno;
        }
    }

    ~mod_redisev()
    {
        event_base_free(_base);
    }

    int redis_connect(); 
    int dispatch(); 
    void redis_close();

private:
    void __do_keepalive();
    void __on_auth(struct redisReply *reply);
    void __on_pop(struct redisReply *reply);
    void __on_connect(int status);
    void __on_quit()
    {
        printf("quit...\n");
        event_base_loopbreak(_base);
    }

private:
    static void on_do_something(int fd, short events, void *args)
    {
        printf("Do something...\n");
    }

    static void on_keepalive(int fd, short events, void *args)
    {
        ((class mod_redisev *)args)->__do_keepalive();
    }

    static void on_signal_quit(int fd, short events, void *args)
    {
        ((class mod_redisev *)args)->__on_quit();
    }

    static void on_redis_connect(const struct redisAsyncContext *rc, int status)
    {
        ((class mod_redisev *)rc->data)->__on_connect(status);
    }

    static void on_redis_close(const struct redisAsyncContext *rc, int status)
    {
        printf("Redis disconnect...\n");
    }

    static void on_redis_auth(struct redisAsyncContext *rc, void *reply, void *args)
    {
        ((class mod_redisev *)args)->__on_auth((struct redisReply *)reply);
    }

    static void on_redis_pop(struct redisAsyncContext *rc, void *reply, void *args)
    {
        ((class mod_redisev *)args)->__on_pop((struct redisReply *)reply);
    }
};
           

輕松愉快的main入口:

int main(int argc, char *argv[])
{
    class mod_redisev redisev;
    redisev.redis_connect();
    redisev.dispatch();
    redisev.redis_close();
    exit(EXIT_SUCCESS);
}
           

3.2 Redis連接配接

異步連接配接功能,用

redisLibeventAttach

與libevent綁定:

設定兩個回調函數:

on_redis_connect

on_redis_close

int mod_redisev::redis_connect()
{
    int res = -1;

    _rc = redisAsyncConnect(_addr.c_str(), _port);
    if (!_rc || _rc->err) {
        printf("redisAsyncConnect: %s\n", _rc ? _rc->errstr : "error");
        return -1;
    }
    _rc->data = this; // attch to class

    res = redisLibeventAttach(_rc, _base);
    if (0 != res) {
        printf("redisLibeventAttach\n");
        return -1;
    }

    (void)redisAsyncSetConnectCallback(_rc, on_redis_connect);
    (void)redisAsyncSetDisconnectCallback(_rc, on_redis_close);
    return 0;
}
           

連接配接的流程為:觸發連接配接成功了,設定

REDIS_OK

辨別:

void mod_redisev::__on_connect(int status)
{
    _status = status;

    switch (_status) {
    case REDIS_OK:
        printf("Redis connected...\n");
        (void)redisAsyncCommand(_rc, on_redis_auth, this, "AUTH 123456");
        break;

    case REDIS_ERR:
        printf("Redis reconnecting...\n");
        break;

    default:
        break;
    }
}
           

Redis的連接配接保活借助了定時器對

_status

的檢查,如果連接配接不正常,再次進行

connect

操作:

void mod_redisev::__do_keepalive()
{
    if (_status != REDIS_OK) {
        printf("Reconect...\n");
        redis_connect();
    }
}
           

3.2 認證

上述

__on_connnect

函數中,如果連接配接成功了,則發起認證指令

AUTH

AUTH并不是可選的,由于我redis-server配置了密碼:

/etc/redis.conf
requirepass 123456
           

認證結果的處理,如果成功,那麼進入下一個邏輯,

BRPOP

擷取資料

void mod_redisev::__on_auth(struct redisReply *reply)
{
    if (!reply || reply->type == REDIS_REPLY_ERROR) {
        printf("Reply: %s\n", reply ? reply->str : "error");
        _status = REDIS_ERR;
        return;
    }
    else if (reply->type != REDIS_REPLY_STATUS) {
        printf("Reply unknown: %d\n", reply->type);
        _status = REDIS_ERR;
        return;
    }
    printf("AUTH success...\n");
    (void)redisAsyncCommand(_rc, on_redis_pop, this, "BRPOP queue1 queue2 queue3 10");
}
           

注意常見的認證錯誤有下面這種:

Reply: NOAUTH Authentication required.  // 沒有配置密碼,無須認證
Reply: ERR invalid password // 錯誤的密碼
           

3.3 拉取資料

認證成功後,進入拉取資料階段,這裡用了循環拉取的邏輯:

以阻塞10秒的形式,分别看 queue1 queue2 queue3 是否有資料過來:

BRPOP queue1 queue2 queue3 10
void mod_redisev::__on_pop(struct redisReply *reply)
{
    if (!reply || reply->type == REDIS_REPLY_ERROR) {
        printf("Reply: %s\n", reply ? reply->str : "error");
        _status = REDIS_ERR;
        return;
    }

    if (reply->type == REDIS_REPLY_NIL) {
        printf("BRPOP: empty...\n");
    }
    else if (reply->type == REDIS_REPLY_ARRAY) {
        if (reply->elements > 0) {
            struct redisReply *rs = reply->element[1];
            printf("BRPOP: %s\n", rs->str);
        }
    }
    redisAsyncCommand(_rc, on_redis_pop, this, "BRPOP queue1 queue2 queue3 10");
}
           

這裡就展現出異步事件的優勢出來了,如果使用同步的請求接口,那麼就得阻塞10秒,期間什麼也不能幹;

每秒1次的,假裝幹活的接口:

class mod_redisev {
	...
    static void on_do_something(int fd, short events, void *args)
    {
        printf("Do something...\n");
    }
    ...
};
           

3.4 信号處理

class mod_redisev {
	...
    static void on_signal_quit(int fd, short events, void *args)
    {
        ((class mod_redisev *)args)->__on_quit();
    }
    void __on_quit()
    {
        printf("quit...\n");
        event_base_loopbreak(_base);
    }
    ...
};
           

最後剩下的兩個處理,dispatch與連接配接關閉:

int mod_redisev::dispatch()
{
    struct timeval tv = {1};
    event_add(_ev_timer, &tv);
    event_add(_ev_kalive, &tv);
    event_add(_ev_quit, NULL);
    return event_base_dispatch(_base);
}

void mod_redisev::redis_close()
{
    if (_rc && _status == REDIS_OK) {
        printf("Redis disconnect...\n");
        redisAsyncDisconnect(_rc); // __redisAsyncFree() called
        _rc = NULL;
    }
}
           

3.5 執行效果

編譯方法記得把

-lhiredis

-levent

加進來

g++ -o redis_async -std=c++11 -Wall -g2 -O3 -Os redis_async.cc -levent -lhiredis
Redis connected...  // redis連接配接成功
AUTH success... // auth成功,發起BRPOP請求
Do something... // 每秒1次的 do-something
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
Do something...
BRPOP: empty... // BRPOP 10秒逾時到,沒有資料 
Do something... // 每秒1次的、執着的 do-something
Do something...
Do something...
Do something...
Do something...
Do something...
BRPOP: a         // 外部我執行了指令,LPUSH queue1 a b c d e
BRPOP: b         // BRPOP函數連續觸發出來,擷取 a b c d e
BRPOP: c
BRPOP: d
BRPOP: e
Do something...
Do something...
Do something...
Do something...
quit...          // 外部 ctrl + c
Redis disconnect... // 回收資源,disconnect
           

4. 總結

    本文通過libhiredis庫提供的async異步接口,利用了

adapters/libevent.h

實作快速與libevent銜接,主要适合的場景,還是單線程下的事件觸發模型;

    主要是幾個心得:

  1. 需要習慣的地方,異步回調的程式設計思路,每個指令都需要一個回調函數處理;
  2. 如果中間還想穿插直接用同步接口,如redisCommand,似乎_rc->c是不能直接用;
  3. hiredis容易用錯出現double-free,需要外部_status進行綜合判斷多多檢查;
  4. valgrind檢查記憶體,中間過程倒是不洩露,但最後redis-libevent釋放有不幹淨的地方;

參考文章:

[1] https://github.com/redis/hiredis/blob/master/README.md

繼續閱讀