天天看點

利用redis實作帶優先級的消息隊列

前言

以前一直有使用celery的優先級機制(基于redis的任務隊列),一直很好奇它的實作機制,在查閱了部分資料後,決定寫這篇文章,作為總結。

1. 利用Sorted Set 實作

使用Sorted Set 做優先級隊列最大的優點是直覺明了。

ZADD key score member [[score member] [score member] ...]
           

score 作為優先級,member 作為相應的任務

在Sorted Set 中,score 小的,位于優先級隊列的頭部,即優先級較高

由于score 就是menber的優先級,是以非常直覺

可以使用

MULTI
ZRANGE key   WITHSCORES
ZREMRANGEBYRANK task_list  
EXEC
           

來擷取任務隊列中優先級最高的元素

ZRANGE 用于擷取任務,ZREMRANGEBYRANK 用于從消息隊列中移除

注意:由于Sorted Set本身是一個set,是以消息隊列中的消息不能重複,否則新加入的消息會覆寫以前加入的消息

注意:對于score 相同的消息,Sorted Set 會按照字典序進行排序

2. 利用List實作

應該一下就能想到,list 是作為消息隊列的最理想的選擇,但這裡使用list 實作帶優先級的消息隊列也可以有好幾種不同的實作方式。

2.1 準備

首先,如果我們假定消息隊列中的消息,從消息隊列的右側推入(RPUSH),從左側取出(LPOP)

那麼單個list 很容易構造成一個FIFO 隊列。但是如果優先級隻有兩級,高和低,那麼我們可以把高優先級的消息,使用LPUSH 推入隊列左側,把低優先級的消息,使用RPUSH推入到隊列右側, 這樣單個list就可以實作2級的帶優先級的消息隊列。

2.2 使用BLPOP

redis 提供了清單的阻塞式(blocking)彈出原語。

BLPOP key [key ...] timeout
           

當給定多個 key 參數時,按參數 key 的先後順序依次檢查各個清單,彈出第一個非空清單的頭元素。

這樣我們可以建立三個隊列,high,normal, low ,分别代表高優先級,普通優先級,低優先級

BLPOP high normal low
           

2.3 基于多個key 的LPOP

有時候我們并不想要阻塞式的原語,那麼在業務層,我們可以在多個隊列中周遊,查找來擷取消息

queue_list = ["high", "normal", "low"]
def get_msg():
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None
           

RQ – 異步任務系統,從任務隊列中擷取任務的代碼如下。它封裝了redis的LPOP 既支援阻塞式,也支援非阻塞式的擷取優先級最高的任務。

@classmethod
def lpop(cls, queue_keys, timeout, connection=None):
    connection = resolve_connection(connection)
    if timeout is not None:  # blocking variant
        if timeout == :
            raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.')
            result = connection.blpop(queue_keys, timeout)
            if result is None:
                raise DequeueTimeout(timeout, queue_keys)
            queue_key, job_id = result
            return queue_key, job_id
        else:  # non-blocking variant
            for queue_key in queue_keys:
                blob = connection.lpop(queue_key)
                if blob is not None:
                    return queue_key, blob
            return None
           

2.4 擴充

如果我們需要10個優先級的消息隊列,可以想到我們需要至少5個隊列(參考2.1)

這時候我們的消息隊列的命名可能就需要采取某種規則

比如,原打算命名的消息隊列的名稱為 msg_queue

那麼這5個消息隊列就可以被命名為

msg_queue-0

msg_queue-1

msg_queue-2

msg_queue-3

msg_queue-4

如果再結合

KEYS pattern
           

我們就可以得到對任意多個優先級支援的消息隊列

# priority 1 ~ 10
# push message into list
def push_message(queue, priority, message):
    num = (priority - ) / 
    target_queue = queue + "-" + str(num)
    # direct
    if priority %  == :
        redis_db.lpush(target_queue, message)
    else:
        redis_db.rpush(target_queue, message)
# fetch  a message
def fetch_message(queue):
    queue_list = redis_db.keys(queue + "-?")
    queue_list = sorted(queue_list)
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None
           

注意:采用這種做法,同一優先級的消息,并不滿足FIFO

繼續閱讀