天天看點

用redis實作支援優先級的消息隊列

用redis實作支援優先級的消息隊列

為什麼需要消息隊列

系統中引入消息隊列機制是對系統一個非常大的改善。例如一個web系統中,使用者做了某項操作後需要發送郵件通知到使用者郵箱中。你可以使用同步方式讓使用者等待郵件發送完成後回報給使用者,但是這樣可能會因為網絡的不确定性造成使用者長時間的等待進而影響使用者體驗。

有些場景下是不可能使用同步方式等待完成的,那些需要背景花費大量時間的操作。例如極端例子,一個線上編譯系統任務,背景編譯完成需要30分鐘。這種場景的設計不可能同步等待後在回饋,必須是先回報使用者随後異步處理完成,再等待處理完成後根據情況再此回報使用者與否。

另外适用消息隊列的情況是那些系統處理能力有限的情況下,先使用隊列機制把任務暫時存放起來,系統再一個個輪流處理掉排隊的任務。這樣在系統吞吐量不足的情況下也能穩定的處理掉高并發的任務。

消息隊列可以用來做排隊機制,隻要系統需要用到排隊機制的地方就可以使用消息隊列來作。

rabbitmq的優先級做法 

目前成熟的消息隊列産品有很多,著名的例如rabbitmq。它使用起來相對還是比較簡單的,功能也相對比較豐富,一般場合下是完全夠用的。但是有個很煩人的就是它不支援優先級。

例如一個發郵件的任務,某些特權使用者希望它的郵件能夠更加及時的發送出去,至少比普通使用者要優先對待。預設情況下rabbitmq是無法處理掉的,扔給rabbitmq的任務都是FIFO先進先出。但是我們可以使用一些變通的技巧來支援這些優先級。建立多個隊列,并為rabbitmq的消費者設定相應的路由規則。

例如預設情況下有這樣一個隊列,我們拿list來模拟 [task1, task2, task3],消費者輪流按照FIFO的原則一個個拿出task來處理掉。如果有高優先級的任務進來,它也隻能跟在最後被處理[task1, task2, task3, higitask1]. 但是如果使用兩個隊列,一個高優先級隊列,一個普通優先級隊列。 普通優先級[task1, task2, task3], 高優先級[hightask1 ] 然後我們設定消費者的路由讓消費者随機從任意隊列中取資料即可。

并且我們可以定義一個專門處理高優先級隊列的消費者,它空閑的時候也不處理低優先級隊列的資料。這類似銀行的VIP櫃台,普通客戶在銀行取号排隊,一個VIP來了他雖然沒有從取号機裡拿出一個排在普通會員前面的票,但是他還是可以更快地直接走VIP通道。

使用rabbitmq來做支援優先級的消息隊列的話,就像是上面所述同銀行VIP會員一樣,走不同的通道。但是這種方式隻是相對的優先級,做不到絕對的優先級控制,例如我希望某一個優先級高的任務在絕對意義上要比其他普通任務優先處理掉,這樣上面的方案是行不通的。因為rabbitmq的消費者隻知道再自己空閑的情況下從自己關心的隊列中“随機”取某一個隊列裡面的第一個資料來處理,它沒法控制優先取找哪一個隊列。或者更加細粒度的優先級控制。或者你系統裡面設定的優先級有10多種。這樣使用rabbitmq也是很難實作的。

但是如果使用redis來做隊列的話上面的需求都可以實作。

使用redis怎麼做消息隊列

首先redis它的設計是用來做緩存的,但是由于它自身的某種特性使得他可以用來做消息隊列。它有幾個阻塞式的API可以使用,正是這些阻塞式的API讓他有做消息隊列的能力。

試想一下在”資料庫解決所有問題“的思路下,不使用消息隊列也是可以完成你的需求的。我們把任務全部存放在資料庫然後通過不斷的輪詢方式來取任務處理。這種做法雖然可以完成你的任務但是做法很粗劣。但是如果你的資料庫接口提供一個阻塞的方法那麼就可以避免輪詢操作了,你的資料庫也可以用來做消息隊列,隻不過目前的資料庫還沒有這樣的接口。

另外做消息隊列的其他特性例如FIFO也很容易實作,隻需要一個List對象從頭取資料,從尾部塞資料即可實作。

redis能做消息隊列得益于他list對象blpop brpop接口以及Pub/Sub(釋出/訂閱)的某些接口。他們都是阻塞版的,是以可以用來做消息隊列。

redis消息隊列優先級的實作

一些基礎redis基礎知識的說明

redis> blpop tasklist 0

"im task 01"

這個例子使用blpop指令會阻塞方式地從tasklist清單中取頭一個資料,最後一個參數就是等待逾時的時間。如果設定為0則表示無限等待。另外redis存放的資料都隻能是string類型,是以在任務傳遞的時候隻能是傳遞字元串。我們隻需要簡單的将負責資料序列化成json格式的字元串,然後消費者那邊再轉換一下即可。

這裡我們的示例語言使用python,連結redis的庫使用redis-py. 如果你有些程式設計基礎把它切換成自己喜歡的語言應該是沒問題的。

1.簡單的FIFO隊列

import redis, time     
def handle(task):
    print task
    time.sleep(4)     
def main():
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
    r = redis.Redis(connection_pool=pool)
    while 1:
        result = r.brpop('tasklist', 0)
        handle(result[1])     
if __name__ == "__main__":
    main()      

上例子即使一個最簡單的消費者,我們通過一個無限循環不斷地從redis的隊列中取資料。如果隊列中沒有資料則沒有逾時的阻塞在那裡,有資料則取出往下執行。

一般情況取出來是個複雜的字元串,我們可能需要将其格式化後作為再傳給處理函數,但是為了簡單我們的例子就是一個普通字元串。另外例子中的處理函數不做任何處理,僅僅sleep 用來模拟耗時的操作。

我們另開一個redis的用戶端來模拟生産者,自帶的用戶端就可以。多往tasklist 隊列裡面塞上一些資料。

redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> lpush tasklist 'im task 03'
redis> lpush tasklist 'im task 04'
redis> lpush tasklist 'im task 05'      

随後在消費者端便會看到這些模拟出來的任務被挨個消費掉。

2.簡單優先級的隊列

假設一種簡單的需求,隻需要高優先級的比低優先級的任務率先處理掉。其他任務之間的順序一概不管,這種我們隻需要在在遇到高優先級任務的時候将它塞到隊列的前頭,而不是push到最後面即可。

因為我們的隊列是使用的redis的 list,是以很容易實作。遇到高優先級的使用rpush 遇到低優先級的使用lpush

redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'      

随後會看到,高優先級的總是比低優先級的率先執行。但是這個方案的缺點是高優先級的任務之間的執行順序是先進後出的。

3.較為完善的隊列

例子2中隻是簡單的将高優先級的任務塞到隊列最前面,低優先級的塞到最後面。這樣保證不了高優先級任務之間的順序。

假設當所有的任務都是高優先級的話,那麼他們的執行順序将是相反的。這樣明顯違背了隊列的FIFO原則。

不過隻要稍加改進就可以完善我們的隊列。

跟使用rabbitmq一樣,我們設定兩個隊列,一個高優先級一個低優先級的隊列。高優先級任務放到高隊列中,低的放在低優先隊列中。redis和rabbitmq不同的是它可以要求隊列消費者從哪個隊列裡面先讀。

def main():
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
    r = redis.Redis(connection_pool=pool)
    while 1:
        result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
        handle(result[1])      

上面的代碼,會阻塞地從'high_task_queue', 'low_task_queue'這兩個隊列裡面取資料,如果第一個沒有再從第二個裡面取。

是以隻需要将隊列消費者做這樣的改進便可以達到目的。

redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004      

通過上面的測試看到,高優先級的會被率先執行,并且高優先級之間也是保證了FIFO的原則。

這種方案我們可以支援不同階段的優先級隊列,例如高中低三個級别或者更多的級别都可以。

4.優先級級别很多的情況

假設有個這樣的需求,優先級不是簡單的高中低或者0-10這些固定的級别。而是類似0-99999這麼多級别。那麼我們第三種方案将不太合适了。

雖然redis有sorted set這樣的可以排序的資料類型,看是很可惜它沒有阻塞版的接口。于是我們還是隻能使用list類型通過其他方式來完成目的。

有個簡單的做法我們可以隻設定一個隊列,并保證它是按照優先級排序号的。然後通過二分查找法查找一個任務合适的位置,并通過 lset 指令插入到相應的位置。 

例如隊列裡面包含着寫優先級的任務[1, 3, 6, 8, 9, 14],當有個優先級為7的任務過來,我們通過自己的二分算法一個個從隊列裡面取資料出來反和目标資料比對,計算出相應的位置然後插入到指定地點即可。

因為二分查找是比較快的,并且redis本身也都在記憶體中,理論上速度是可以保證的。但是如果說資料量确實很大的話我們也可以通過一些方式來調優。

繼續閱讀