前言
以前一直有使用celery的優先級機制(基于redis的任務隊列),一直很好奇它的實作機制,在查閱了部分資料後,決定寫這篇文章,作為總結。
1. 利用Sorted Set 實作
使用Sorted Set 做優先級隊列最大的優點是直覺明了。
ZADD key score member [[score member] [score member] ...]
score 作為優先級,member 作為相應的任務
在Sorted Set 中,score 小的,位于優先級隊列的頭部,即優先級較高
由于score 就是menber的優先級,是以非常直覺
可以使用
MULTI
ZRANGE key WITHSCORES
ZREMRANGEBYRANK task_list
EXEC
來擷取任務隊列中優先級最高的元素
ZRANGE 用于擷取任務,ZREMRANGEBYRANK 用于從消息隊列中移除
注意:由于Sorted Set本身是一個set,是以消息隊列中的消息不能重複,否則新加入的消息會覆寫以前加入的消息
注意:對于score 相同的消息,Sorted Set 會按照字典序進行排序
2. 利用List實作
應該一下就能想到,list 是作為消息隊列的最理想的選擇,但這裡使用list 實作帶優先級的消息隊列也可以有好幾種不同的實作方式。
2.1 準備
首先,如果我們假定消息隊列中的消息,從消息隊列的右側推入(RPUSH),從左側取出(LPOP)
那麼單個list 很容易構造成一個FIFO 隊列。但是如果優先級隻有兩級,高和低,那麼我們可以把高優先級的消息,使用LPUSH 推入隊列左側,把低優先級的消息,使用RPUSH推入到隊列右側, 這樣單個list就可以實作2級的帶優先級的消息隊列。
2.2 使用BLPOP
redis 提供了清單的阻塞式(blocking)彈出原語。
BLPOP key [key ...] timeout
當給定多個 key 參數時,按參數 key 的先後順序依次檢查各個清單,彈出第一個非空清單的頭元素。
這樣我們可以建立三個隊列,high,normal, low ,分别代表高優先級,普通優先級,低優先級
BLPOP high normal low
2.3 基于多個key 的LPOP
有時候我們并不想要阻塞式的原語,那麼在業務層,我們可以在多個隊列中周遊,查找來擷取消息
queue_list = ["high", "normal", "low"]
def get_msg():
for queue in queue_list:
msg = redis_db.lpop(queue)
if msg is not None:
return msg
return None
RQ – 異步任務系統,從任務隊列中擷取任務的代碼如下。它封裝了redis的LPOP 既支援阻塞式,也支援非阻塞式的擷取優先級最高的任務。
@classmethod
def lpop(cls, queue_keys, timeout, connection=None):
connection = resolve_connection(connection)
if timeout is not None: # blocking variant
if timeout == :
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.')
result = connection.blpop(queue_keys, timeout)
if result is None:
raise DequeueTimeout(timeout, queue_keys)
queue_key, job_id = result
return queue_key, job_id
else: # non-blocking variant
for queue_key in queue_keys:
blob = connection.lpop(queue_key)
if blob is not None:
return queue_key, blob
return None
2.4 擴充
如果我們需要10個優先級的消息隊列,可以想到我們需要至少5個隊列(參考2.1)
這時候我們的消息隊列的命名可能就需要采取某種規則
比如,原打算命名的消息隊列的名稱為 msg_queue
那麼這5個消息隊列就可以被命名為
msg_queue-0
msg_queue-1
msg_queue-2
msg_queue-3
msg_queue-4
如果再結合
KEYS pattern
我們就可以得到對任意多個優先級支援的消息隊列
# priority 1 ~ 10
# push message into list
def push_message(queue, priority, message):
num = (priority - ) /
target_queue = queue + "-" + str(num)
# direct
if priority % == :
redis_db.lpush(target_queue, message)
else:
redis_db.rpush(target_queue, message)
# fetch a message
def fetch_message(queue):
queue_list = redis_db.keys(queue + "-?")
queue_list = sorted(queue_list)
for queue in queue_list:
msg = redis_db.lpop(queue)
if msg is not None:
return msg
return None
注意:采用這種做法,同一優先級的消息,并不滿足FIFO