天天看點

玩轉redis-簡單消息隊列

使用

go

語言基于

redis

寫了一個簡單的消息隊列 源碼位址 使用demo redis的

list

非常的靈活,可以從左邊或者右邊添加元素,當然也以從任意一頭讀取資料

添加資料和擷取資料的操作也是非常簡單的

LPUSH

從左邊插入資料

RPUSH

大右邊插入資料

LPOP

從左邊取出一個資料

RPOP

從右邊取出一個資料

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"           

或者使用

BLPOP

BRPOP

來讀取資料,不同之處是取資料時,如果沒有資料會等待指定的時間,

如果這期間有資料寫入,則會讀取并傳回,沒有資料則會傳回空

在一個

視窗1

讀取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"           

在另一個

視窗2

寫入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3           

再開一個

視窗3

讀取,第二次讀取時,

list

是空的,是以等待1秒後傳回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)           

簡單消息隊列的實作

如果我們隻從一邊新增元素,向另一邊取出元素,這就不是一個消息隊列麼。但我估計你會有一個疑問,在消費資料時,同一個消息會不會同時被多個

consumer

消費掉?

當然不會,因為redis是單線程的,在從

list

取資料時天然不會出現并發問題。但是這是一個簡單的消息隊列,消費不成功怎麼處理還是需要我們自己寫代碼來實作的

下面我說一下使用

list

實作一個簡單的消息隊列的整體思路

comsumer的實作

consumer

主要做的就是從list裡讀取資料,使用

LPOP

或者

BLPOP

都可以,

這裡做了一個開關

options

UseBLopp

如果為

true

時會使用

BLPOP

type consumer struct {
    once            sync.Once
    redisCmd        redis.Cmdable
    ctx             context.Context
    topicName       string
    handler         Handler
    rateLimitPeriod time.Duration
    options         ConsumerOptions
    _               struct{}
}

type ConsumerOptions struct {
    RateLimitPeriod time.Duration
    UseBLPop        bool
}
           

看一下建立

consumer

的代碼,最後面的

opts

參數是可選的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    consumer := &consumer{
        redisCmd:  redisCmd,
        ctx:       ctx,
        topicName: topicName,
    }
    for _, o := range opts {
        o(&consumer.options)
    }
    if consumer.options.RateLimitPeriod == 0 {
        consumer.options.RateLimitPeriod = time.Microsecond * 200
    }
    return consumer
}
           

讀取資料後具體怎麼進行處理調用者可以根據自己的業務邏輯進行相應處理

有一個小的

interface

調用者根據自己的邏輯去實作

type Handler interface {
    HandleMessage(msg *Message)
}           

讀取資料的邏輯使用一個gorouting實作

func (s *consumer) startGetMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get message.")
            ticker.Stop()
        }()
        for {
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var revBody []byte
                var err error
                if !s.options.UseBLPop {
                    revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
                } else {
                    revs := s.redisCmd.BLPop(time.Second, s.topicName)
                    err = revs.Err()
                    revValues := revs.Val()
                    if len(revValues) >= 2 {
                        revBody = []byte(revValues[1])
                    }
                }
                if err == redis.Nil {
                    continue
                }
                if err != nil {
                    log.Printf("LPOP error: %#v \n", err)
                    continue
                }

                if len(revBody) == 0 {
                    continue
                }
                msg := &Message{}
                json.Unmarshal(revBody, msg)
                if s.handler != nil {
                    s.handler.HandleMessage(msg)
                }
            }
        }
    }()
}
           

Producer 的實作

Producer

還是很簡單的就是把資料推送到

reids

type Producer struct {
    redisCmd redis.Cmdable
    _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
    return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
    msg := NewMessage("", body)
    sendData, _ := json.Marshal(msg)
    return p.redisCmd.RPush(topicName, string(sendData)).Err()
}