使用 go redis list
語言基于
寫了一個簡單的消息隊列 源碼位址 使用demo redis的
非常的靈活,可以從左邊或者右邊添加元素,當然也以從任意一頭讀取資料
添加資料和擷取資料的操作也是非常簡單的
LPUSH
從左邊插入資料 RPUSH
大右邊插入資料 LPOP
從左邊取出一個資料 RPOP
從右邊取出一個資料 127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"
或者使用
BLPOP
BRPOP
來讀取資料,不同之處是取資料時,如果沒有資料會等待指定的時間,
如果這期間有資料寫入,則會讀取并傳回,沒有資料則會傳回空
在一個
視窗1
讀取
127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"
在另一個
視窗2
寫入
127.0.0.1:6379> RPUSH list1 a b c
(integer) 3
再開一個
視窗3
讀取,第二次讀取時,
list
是空的,是以等待1秒後傳回空。
127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"
127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)
簡單消息隊列的實作
如果我們隻從一邊新增元素,向另一邊取出元素,這就不是一個消息隊列麼。但我估計你會有一個疑問,在消費資料時,同一個消息會不會同時被多個
consumer
消費掉?
當然不會,因為redis是單線程的,在從
list
取資料時天然不會出現并發問題。但是這是一個簡單的消息隊列,消費不成功怎麼處理還是需要我們自己寫代碼來實作的
下面我說一下使用
list
實作一個簡單的消息隊列的整體思路
comsumer的實作
consumer
主要做的就是從list裡讀取資料,使用
LPOP
或者
BLPOP
都可以,
這裡做了一個開關
options
的
UseBLopp
如果為
true
時會使用
BLPOP
。
type consumer struct {
once sync.Once
redisCmd redis.Cmdable
ctx context.Context
topicName string
handler Handler
rateLimitPeriod time.Duration
options ConsumerOptions
_ struct{}
}
type ConsumerOptions struct {
RateLimitPeriod time.Duration
UseBLPop bool
}
看一下建立
consumer
的代碼,最後面的
opts
參數是可選的配置
type Consumer = *consumer
func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
consumer := &consumer{
redisCmd: redisCmd,
ctx: ctx,
topicName: topicName,
}
for _, o := range opts {
o(&consumer.options)
}
if consumer.options.RateLimitPeriod == 0 {
consumer.options.RateLimitPeriod = time.Microsecond * 200
}
return consumer
}
讀取資料後具體怎麼進行處理調用者可以根據自己的業務邏輯進行相應處理
有一個小的
interface
調用者根據自己的邏輯去實作
type Handler interface {
HandleMessage(msg *Message)
}
讀取資料的邏輯使用一個gorouting實作
func (s *consumer) startGetMessage() {
go func() {
ticker := time.NewTicker(s.options.RateLimitPeriod)
defer func() {
log.Println("stop get message.")
ticker.Stop()
}()
for {
select {
case <-s.ctx.Done():
log.Printf("context Done msg: %#v \n", s.ctx.Err())
return
case <-ticker.C:
var revBody []byte
var err error
if !s.options.UseBLPop {
revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
} else {
revs := s.redisCmd.BLPop(time.Second, s.topicName)
err = revs.Err()
revValues := revs.Val()
if len(revValues) >= 2 {
revBody = []byte(revValues[1])
}
}
if err == redis.Nil {
continue
}
if err != nil {
log.Printf("LPOP error: %#v \n", err)
continue
}
if len(revBody) == 0 {
continue
}
msg := &Message{}
json.Unmarshal(revBody, msg)
if s.handler != nil {
s.handler.HandleMessage(msg)
}
}
}
}()
}
Producer 的實作
Producer
還是很簡單的就是把資料推送到
reids
type Producer struct {
redisCmd redis.Cmdable
_ struct{}
}
func NewProducer(cmd redis.Cmdable) *Producer {
return &Producer{redisCmd: cmd}
}
func (p *Producer) Publish(topicName string, body []byte) error {
msg := NewMessage("", body)
sendData, _ := json.Marshal(msg)
return p.redisCmd.RPush(topicName, string(sendData)).Err()
}