天天看點

[源碼解析] 并行分布式任務隊列 Celery 之 負載均衡

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。本文介紹 Celery 的負載均衡機制。

目錄

[源碼解析] 并行分布式任務隊列 Celery 之 負載均衡

0x00 摘要

0x01 負載均衡

1.1 哪幾個 queue

1.1.1 _brpop_start 選擇下次讀取的queue

1.1.2 round_robin_cycle 設定下次讀取的 queue

1.2 哪一個worker

1.3 哪一個程序

1.3.1 政策

1.3.2 公平排程

1.3.3 公平排程 in Celery

0x02 Autoscaler

2.1 調用時機

2.2 具體實作

2.2.1 bgThread

2.2.2 定義

0xEE 個人資訊

0xFF 參考

Autoscaler 的作用 實際就是線上調節程序池大小。這也和緩解負載相關,是以放在這裡一起論述。

Celery 的負載均衡其實可以分為三個層次,而且是與 Kombu 高度耦合(本文 broker 以 Redis 為例)。

在 worker 決定 與 哪幾個 queue 互動,有一個負載均衡(對于 queues );

在 worker 決定與 broker 互動,使用 brpop 擷取消息時候有一個負載均衡(決定哪一個 worker 來處理任務);

在 worker 獲得 broker 消息之後,内部 具體 調用 task 時候,worker 内部進行多程序配置設定時候,有一個負載均衡(決定 worker 内部哪幾個程序)。

注意,這個順序是從 worker 讀取任務處理任務的角度 出發,而不是從系統架構角度出發。

因為從系統架構角度說,應該是 <code>which worker ----&gt; which queue in the worker ----&gt; which subprocess in the worker</code> 這個角度。

我們下面按照 "worker 讀取任務處理任務角度" 的順序進行分析。

Kombu 事實上是使用 redis 的 BRPOP 功能來完成對具體 queue 中消息的讀取。

Kombu 是循環調用,每次調用會制定讀取哪些内部queues的消息;

queue 這個邏輯概念,其實就是對應了 redis 中的一個 實體key,從 queue 讀取,就代表 BRPOP 需要指定 監聽的 key。

Kombu 是在每一次監聽時候,根據這些 queues 得到 其在 redis 之中對應的實體keys,即都指定監聽哪些 redis keys;

brpop是個多key指令,當給定多個 key 參數時,按參數 key 的先後順序依次檢查各個清單,彈出第一個非空清單的頭元素。這樣就得到了這些 邏輯queue 對應的消息。

因為 task 可能會 用到多個 queue,是以具體從哪幾個queue 讀取?這時候就用到了政策。

Kombu 在每次監聽時候,調用 _brpop_start 完成監聽。其作用就是 選擇下一次讀取的queues。

_brpop_start 如下:

此時變量如下:

是以<code>_brpop_start</code> 就是從 self._queue_cycle 獲得幾個需要讀取的queue。

具體如下圖:

從上面代碼中,我們可以知道 consume 就是傳回 round_robin_cycle 中前幾個 queue,即 return self.items[:n]。

而 self.items 的維護,是通過 rotate 完成的,就是把 最近用的 那個 queue 放到隊列最後,這樣給其他 queue 機會,就是 round robin 的概念了。

比如在如下代碼中,當讀取到消息之後,就會調用 <code>self._queue_cycle.rotate(dest)</code> 進行調整。

如果多個 worker 同時去使用 brpop 擷取 broker 消息,那麼具體哪一個能夠讀取到消息,其實這就是有一個 競争機制,因為redis 的單程序處理,是以隻能有一個 worker 才能讀到。

這本身就是一個負載均衡。這個和 spring quartz 的負載均衡實作非常類似。

spring quartz 是 多個節點讀取 同一個資料庫記錄決定誰能開始下一次處理,哪一個得到了資料庫鎖 就是哪個。

Kombu 是通過 多個 worker 讀取 redis "同一個或者一組key" 的 實際結果 來決定 "哪一個 worker 能開始下一次處理"。

程序池中,使用了政策來決定具體使用哪一個程序來處理任務。

先講解 strategy。在 AsynPool 啟動有如下,配置了政策:

于是我們看看 strategy 定義如下,基本由名字可以知道其政策意義:

我們講講公平排程的概念。

不同系統對于公平排程的了解大同小異,我們舉幾個例子看看。

Linux 中,排程器必須在各個程序之間盡可能公平地共享CPU時間,而同時又要考慮不同的任務優先級。一般原理是:按所需配置設定的計算能力,向系統中每個程序提供最大的公正性,或者從另外一個角度上說, 試圖確定沒有程序被虧待。

Hadoop 中,公平排程是一種賦予作業(job)資源的方法,它的目的是讓所有的作業随着時間的推移,都能平均的擷取等同的共享資源。當單獨一個作業在運作時,它将使用整個叢集。當有其它作業被送出上來時,系統會将任務(task)空閑時間片(slot)賦給這些新的作業,以使得每一個作業都大概擷取到等量的CPU時間。

Yarn 之中,Fair Share指的都是Yarn根據每個隊列的權重、最大,最小可運作資源計算的得到的可以配置設定給這個隊列的最大可用資源。

在 asynpool之中,有設定,看看"是否為 fair 排程":

基于 is_fair_strategy 這個變量,Celery 的公平排程有幾處展現。

在開始 poll 時候,如果是 fair,則需要 存在 idle worker 才排程,這樣就給了 idler worker 一個排程機會。

在具體釋出 寫操作 時候,也會看看是否 worker 已經正在忙于執行某一個 task,如果正在執行,就不排程,這樣就給了其他 不忙worker 一個排程的機會。

具體邏輯如下:

在 WorkerComponent 中可以看到,為 AutoScaler 注冊了兩個調用途徑:

注冊在 consumer 消息響應方法中,這樣消費時候如果有需要,就會調整;

利用 Hub 的 call_repeatedly 方法注冊了周期任務,即周期看看是否需要調整。

這樣就會最大程度的加大調用頻率。

Autoscaler 是Background thread,這樣 AutoScaler就可以在背景運作:

Autoscaler 的定義如下,可以看到其邏輯就是定期判斷是否需要調整:

如果目前并發已經到了最大,則下調;

如果到了最小并發,則上調;

則具體上調下調的,都是通過具體線程池函數做到的,這就是要根據具體作業系統來進行分析,此處略過。

★★★★★★關于生活和技術的思考★★★★★★

微信公衆賬号:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,敬請關注。

[源碼解析] 并行分布式任務隊列 Celery 之 負載均衡

Hadoop公平排程器指南

淺析Linux中完全公平排程——CFS

yarn公平排程詳細分析(一)