天天看點

AIGC 利器 Ray 雲原生探索之路——Ray Core 篇 (上)

作者:DaoCloud 道客
AIGC 利器 Ray 雲原生探索之路——Ray Core 篇 (上)

在上一篇《AIGC 利器 Ray 雲原生探索之路--總覽篇》中,整體的介紹了一下在雲原生和分布式的場景下,AIGC 的一些探索。接下來介紹一下 Ray 分布式計算的底座--Ray Core 是怎麼樣的一種分布式的計算架構,也是本篇文章重點要講的部分。

01 基本概念

Ray Core 是 Ray 的底層的分布式的計算架構,使用基于 actor 模型來實作的一套計算架構,它可以将 Python 的一個 Class 或者一個 Function 轉成分布式的 actor 和 task,在所有的機器上分布式的運作,并且 tasks/actor 之間可以通過分布式的對象存儲能力來共享和傳遞 object 來完成分布式的一些協作。Ray Core 的這些能力為其它的更高層的子產品提供了一個分布式計算的基礎架構。

說明:

  • class Counter 在标注了@ray.remote 注解之後,同時使用 remote()調用,如 counter = Counter.remote(),這裡的 counter 就是一個 actor 的 handle,而 Counter 就是一個 actor。而 class 中定義的 function 在使用 remote()調用的時候,就表現成了一個 task。actor 是有狀态,對于 Synchronous, Single-Threaded 類型的 actor,對于同一個調用者中按照順序調用類成員方法的時候,需要保證順序,才能保證結果的正确性,但是不同的調用者之間是可以并發處理的。對于 Asynchronous or Threaded 類型的 actor,即使是同一個調用者在調用成員方法的時候,也是可以不用保證調用順序,進行異步的執行。
  • 在一般的 function(不是 class 的成員方法)的定義上也标注了@ray.remote 注解之後,在調用的時候使用 remote(),就表現成一個 task。task 可以分布在不同機器上,并行的進行運作,而且是異步的方式,同時是無狀态的。

02 整體架構

driver process : driver 是程式的根,或者了解成 main 程式,這裡指的是運作了 ray.init()方法的 code。driver process 是一種特殊的 worker process ,用于執行 driver 的 process,它會執行頂層的應用(如, `__main__` in Python)。它能送出 task,但是它不是自己去執行 task。同時 driver processes 能運作在任何節點上,隻是預設是運作在 head 節點。driver process 也會處理來自 log monitor 發送過來的 worker 節點上日志。同時,jobs 和 drivers 的關系是 1:1 的關系。

worker process :worker process 負責 task 的送出和執行。一個 worker process 就是一個 python 的程序。worker process 可以是無狀态的,這種 worker process 可以被用來重複地執行一般的 task,這裡一般的 task 指的就是使用了 ray.remote 标注的 function。worker process 也可以用于執行有狀态的 actor,這裡有狀态的 actor 指的是使用了 ray.remote 的 class,在一個 actor 被執行個體化的時候,就會建立一個對應的 worker process,專門用于這個 actor 的運作,以及 actor 内的成員方式所對應的 task。每一個 worker process 和一個特定的 job 是相關的,預設初始化的 worker 的數量和所在機器的 cpu 核數一樣。

Distributed Scheduler:Distributed Scheduler 是 Raylet 元件的一部分,負責資源管理,task 放置的位置,保證 task 運作所需要的參數對象可以從分布式對象存儲的對象中擷取。每個 Raylet 元件會跟蹤本地節點的資源,當一個資源請求被同意之後,Raylet 就會減少本地可用資源,一旦資源被使用完之後返還回來了,Raylet 又會增加本地的可用資源,是以 Raylet 有一個強一緻性的本地的可用資源的視圖。同時,Raylet 也接收來自控制平面 GCS 服務的一些關于其它節點的資源使用的資訊,這個資訊對于分布式排程是很有用的。比如,在叢集範圍内,可以均衡一點的排程 task。GCS 會周期性的去所有的 Raylet 拉取每一個 Raylet 的本地節點的可用資源,然後将這些資訊通過廣播的方式告知所有節點的 Raylet。一般的 task 送出是使用的分布式的方式送出 task,也就是對于這些 task 的排程已經是去中心化了,舉例來說,一個 task 的調用者在排程 task 的時候,首先會選擇一個合适的 Raylet 發送 RequestWorkerLease PRC 請求,預設是發給本地節點的 Raylet 中的排程器,去申請運作 task 需要的資源。但是也會根據資料本地化的原則,節點粘性的一些特性選擇其它節點的 Raylet。當 Raylet 中的排程器經過分析之後,覺得目前的節點就是合适的節點,那這個 task 就可以在本地的節點運作,如果本地 Raylet 覺得本地節點是不合适的,那會告知任務的送出者去一個合适節點去申請資源,此時,任務的送出者就會去被推薦的節點的 Raylet 的排程器去申請運作 task 的資源。以此類推,以類似的機制處理排程的請求,直到找到一個合适的節點,這個過程就是一個去中心化的分布式的排程方式。

Distributed Object Store:負責存儲和轉移大對象。這裡是分布式的方式支援對象存儲。在本地存儲的大對象,在其它的節點上也是可以引用,在 task 執行的時候,如果 task 被排程的節點上,沒有對應需要的對象,就會到這個對象的節點去擷取這個對象到本地,然後執行 task。這樣不同節點的 task 之間是可以共享和引用這些對象的。

Global Control Service:GCS 是一個 server,它是 ray 的控制平面,在新版的 ray 中,GCS 加強了容錯能力,這樣 GCS 就可以運作在任意的節點,以及多個節點,而不是僅僅運作在 head 節點。GCS 服務主要包含以下能力:

1. 節點管理:管理節點的增加和删除,同時廣播這些資訊給所有節點的 Raylets,這樣所有的 Raylet 就能感覺到節點的變化。

2. 資源管理:廣播每一個 Raylet 可用的資源給整個叢集,這樣可以保證每一個 Raylet 都可以及時的更新自己的全局的可用資源的視圖,在分布式排程 task 的時候是很重要的。Ray 會先自動探測節點的資源資料作為可以排程的邏輯資源的設定。

3. Actor 管理:管理 actor 的建立和删除請求,同時也監控 actor 的存活情況,以及在 actor 失敗的時候重新建立 actor。所有的 actor 在被排程之前是需要先注冊到 GCS 中。

4. Placement group 管理:配合完成 Placement group 的建立和删除。管理 Placement group 的生命周期,同時使用 two phase commit protocol 建立 Placement group。Placement group 是屬于 job 或者 actor 的,當 job 或者 actor 是處于 dead 狀态,那 Placement group 就會被删除,這時所有使用了 Placement group 預留資源的 actors 和 tasks 都會被 kill,同時釋放資源。

5. 中繼資料存儲:提供了一個鍵值對的存儲能力,這些資料可以讓任何 worker 通路到。這裡隻存儲一些小的中繼資料。主要包括叢集的 dashboard address,Remote function 的定義(當一個 worker 被指定運作一個 task,它會去 GCS 下載下傳該 task 對應的 function 的定義),Runtime environment 的資料(預設情況下,Runtime environment 的工作目錄是儲存在 GCS),還有一些其它的元件也将中繼資料儲存在 GCS,比如 Ray Serve 元件将部署的資訊就儲存在 GCS 中。

6. Worker 管理器:處理 Raylet 上報上來的失敗的 worker。

7. 運作時環境:管理運作時環境的包,同時也會統計這些包的使用量,以及回收。運作時環境的包是可以共享的,在資源不足的時候會回收一部分。

API server:不僅僅包含了 dashboard server,這裡的 dashboard server 也可以了解成 dashboard 的 backend,同時還是送出 job 的 api 入口,也提供了一些叢集狀态的 api。

API agent:收集本地節點的名額資料,用于去聚合叢集層面的健康資料。也負責給 task 和 actor 去安裝運作時環境用于執行 task 和 actor。

Autoscaler:基于叢集的資源的使用情況,以及 job 對資源的需求,去增加和移除叢集的節點。

Log monitor:當 task 和 actor 列印 logs 到它的 stdout or stderr,這些輸出會被自動的重定向到對應的 worker 的日志檔案。在每一個節點上都會運作一個 log monitor,它會定期的從本地的 ray 日志檔案中讀取這些日志檔案,然後通過 GCS pubsub 的機制,釋出這些日志消息到 head 的 driver 中去。

03Job 介紹

每個 job 由其對應專用的 job supervisor actor 來管理,job supervisor actor 是運作在 head 節點上,後續會支援将 job supervisor actor 也可以運作在非 head 節點來減小 head 節點的壓力。job supervisor actor 運作在使用者指定的運作環境下面,job 的 entrypoint command 作為其子程序進行運作,同時會繼承 job supervisor actor 運作的環境。這個 actor 在 ray 的 dashboard 中可以檢視到,它是一個内部類型的 actor。

job manager 會管理這些 job supervisor actor 以及它的日志,同時 job 自己的 entrypoint script 的輸出也會被寫到 head 節點,這些日志檔案可以通過 Http 的方式通路,也可以在 ray dashborad 上檢視。

job 會報告自己的狀态(如,PENDING、RUNNING、SUCCEEDED)和一些消息,這些被存儲在 GCS 中,同時可以通過 API 的方式擷取到。

如果想要停止一個 job,需要給對應的 job supervisor actor 發送停止事件,這個過程本身是異步的, job supervisor actor 會負責中斷 job 的子程序以及更新 job 的狀态。

一個 job 會包含相關的 actors、tasks、objects 和 runtime env。jobs 和 drivers 的關系是 1:1 的關系。

04Actor 介紹

4.1 actor 的生命周期:

  • actor 的生命周期和它的中繼資料都是被 GCS 服務所管理的,每一個 actor 的 client 會緩存這個中繼資料在本地,這些中繼資料的資訊中有和發送 Tasks 相關的資訊,比如 actor 的位址,client 可以通過這個位址直接通過 gRPC 發送 task 給 actor,這裡發送的 task 主要指 actor 的成員方法所對應的 task。不像一般 task 的送出,一般 task 的任務送出完全是由調用者控制什麼時候排程以及運作,通過分布式排程完成排程工作。而 actor 的整個生命周期都是由 GCS 集中式的管理,包括 actor 的排程請求也是由 GCS 發起的,而不是 actor 的調用者。actor 被執行個體化之後會建立一個 worker process,這個 worker 是目前這個 actor 專享的(不像給一般 task 使用的 worker,這種 worker 是可以被 task 重複使用的),所有這個 actor 的成員方法的調用(也就是 task),會被排程這個指定的 worker,這些方法可以通路和修改這個 worker 的狀态。
  • 當建立一個 actor 的時候,負責建立 actor 的 worker 會建構一個 task,這個 task 叫做 actor creation task,actor creation task 會運作 actor 的構造函數。負責建立這個 actor 的 worker 會等待 actor creation task 運作所依賴的一些外部條件就緒了之後,會異步的注冊這個 actor 到 GCS,對于 detached actors,注冊的處理過程是同步的方式,對于 non-detached actors 的注冊處理過程是異步的方式,然後 GCS 會負責排程 actor creation task 去建立 actor,就好像 GCS 是這個 actor 的調用者一樣。
  • 同時,就像方法調用一樣,建立的 actor 會傳回一個 actor handle 給調用者。這個調用者就可以用這個 actor handle 去調用 actor 的方法,也可以了解成發送 Tasks,因為 actor 的成員方法也是一個 remote 的分布式的 task。當一個 task 的參數是用的 actor handle,這個 task 不會立即排程,而是要等到 “actor creation task” 這個任務完成。
  • actor 的 tasks 執行和一般的 task 是類似的,這些 actor tasks 通過 gRPC 被直接送出給 actor process,但是這些 task 不會直接運作,除非它的依賴都已經是 ready 的。但是 actor 的 task 和一般的 task 有兩個主要的不同的地方:1. 預設情況下,actor task 執行是不需要資源申請的,因為 actor 在建立的時候已經申請了資源。actor task 就是 actor 的成員方法被調用的建立出來的方法。2. 對于每一個 actor 的調用者來說,同一個調用者調用的 actor 的方法時,也就是 task 是按照調用順序執行的,這樣就可以保證 actor 有狀态的特性,這種模式的 actor 是 Synchronous, Single-Threaded Actor。還有一種是 Asynchronous or Threaded Actor,這種 actor 的方法是可以異步,并且無順序要求的。
  • 預設情況下,actors 在失敗的時候是不會重新開機的,但是為了支援故障恢複,可以設定相關參數讓其支援失敗之後重新開機的能力,這裡可以設定 max_restarts 和 max_task_retries 選項在 ray.remote() 和.options() 裡面。

4.2 actor task 的執行:

  • actor task 指的是 actor 的成員方法被調用的時候所建立的 task。一個 actor 可能有無數個調用者,一個 actor handle 就代表一個調用者,同時 actor handle 中包含了 actor 的 RPC 位址,這樣調用者就可以連接配接到 actor 進行送出 actor task 給 actor 對應的 worker process。
  • 一個 actor 可以并發處理很多的調用,上圖僅僅顯示了一次調用。

4.3 actor dead:

  • actor 可能是 detached 類型的也可能是 non-detached 類型,detached 是同步的場景,non-detached 是異步的場景,在使用中推薦使用 non-detached 的類型,同時也是預設的方式,當一個 actor 的所有的 actor handle 超出範圍了(通過引用計數的方式來跟蹤),這個時候,actor 的建立者會通知 GCS,然後 GCS 發送一個 KillActor RPC 給對應的 actor,之後 actor 将會退出它的程序,actor 可以被自動的垃圾回收。或者當 GCS 檢測到 actor 的建立者退出了,也會中斷對應的 actor,actor 可以被自動的垃圾回收。或者是 actor 對應的 job 退出了, 也會中斷對應的 actor,actor 也可以被自動地垃圾回收。不管是哪種情況,送出給 actor 的,但是還沒有執行的 tasks 就會報 RayActorError 錯誤。而對于 detached 類型的 actor,在不再需要的時候,是需要應用自己手動的方式去删除的。
  • actor 在運作期間,也有可能遇到不可預期的錯誤導緻退出,那送出給 actor 的 task 也是會報 RayActorError 錯誤。
  • Ray 中支援設定 actor 失敗之後的重新開機次數,如果設定了 max_restarts,那在 actor 的 owner 還存活的時候,GCS 會嘗試通過 actor creation task 的方式,去重新開機 actor。在這個過程中,所有想要調用 actor 的用戶端就會将要調用的 tasks 緩存起來,在 actor 重新開機成功了之後再送出過去,如果 actor 重新開機固定次數之後,還是沒有啟動好,那緩存起來的,還沒有運作的 actor task 就會全部被标記成失敗。
  • 同時在 actor 重新開機成功之後,也有一個參數 max_restarts 來控制 actor task 的重新開機次數。

05 Task 介紹

5.1 task 的生命周期:

  • 這裡的 task 指的是一般的 task(NORMAL_TASK)。task 的持有者會負責保證一個被送出的 task 的執行,這裡的 task 的持有者可以了解成調用 function 的某一個 process,這個 process 也是 function 傳回值的持有者。task 的持有者會去負責和 raylet 通信去申請執行 task 需要的資源,這裡為什麼是和 raylet 通信,因為 raylet 中包含了分布式排程器。上圖中 Driver 是 task A 的持有者,Worker1 是 task B 的持有者,是以 Driver 會負責申請 task A 執行需要的資源,而 Worker1 會負責 task B 執行需要的資源。
  • 在申請資源之前,首先需要解決的 task 的依賴,才會去分布式排程器去申請資源。這裡的 task 的依賴主要指的是 task(function)的參數依賴,因為參數可以是其它的 task 的傳回值。分布式排程器會嘗試去申請資源,同時去從分布式對象存儲中去擷取 task 的依賴(ObjectRef)到本地節點,一旦資源和依賴都就緒了,分布式排程器會同意資源的申請,然後傳回一個可以執行這個 task 的 worker 的位址給到 task 的持有者。
  • 然後 task 的持有者就會通過 gRPC 的方式,去送出這個 task 到分布式排程器傳回的 worker 中去運作。在 task 執行成功之後,負責執行 task 的 worker 需要儲存 task 的傳回值。如果 task 的傳回值很小,worker 就直接将傳回值傳回給 task 的持有者,如果 task 的傳回值比較大,那就存到分布式對象存儲中,傳回給 task 的持有者一個存放的位置。

5.2 分布式 task 排程:

下圖中,task A 已經在 worker1 中運作,task B 和 task C 已經被送出到了 worker1,是以在 worker1 的本地的關系表中已經包含了 X 和 Y 對象,X 代表 task B,Y 代表了 task C。首先來看看怎麼樣來排程 task B。

  • worker1 去詢問本地的 raylet 中的排程器去給 task B 去申請資源。
  • 本地排程器告訴 worker1 去嘗試 node2 去請求排程。
  • worker1 更新了本地的關系表去表示 task B 現在是阻塞在 node2 了。
  • worker1 去詢問 node2 的排程器,去為 task B 申請資源。
  • node2 的排程器同意了 worker1 的資源請求,同時傳回了 worker2 的位址給了 worker1。隻要 task B 還在占用 worker2 去運作,node2 的排程器會保證沒有其它的 tasks 會被再指定到 worker2 上去。
  • 之後,worker1 就發送了 task B 到 worker2 去執行了。

5.3 Task 的執行:

  • 依賴解決:task 的調用者在去分布式排程器去為 task 申請資源之前,會等待所有 task 運作所需要的依賴都就緒了。很多時候,任務的調用者也是其它任務的持有者,比如 foo.remote(bar.remote()),在這個例子中,task 的調用者是執行 foo.remote 這個方法的程序,同時在 foo.remote()中又依賴了 bar.remote 這個 task 的傳回值,而 bar.remote 這個 task 也是屬于 foo.remote 調用者的,是以這個時候不會直接排程 foo 這個 task,而是會等 bar 這個 task 已經完成了,才會去調用 foo 這個 task。在這個情況下,bar 這個 task 的傳回值會被儲存在本地的程序中,直接給到 foo 這個 task 作為參數進行運作 foo 這個 task。如果 foo 這個 task 依賴的參數是不在本地的,比如是存在分布式對象存儲中的大對象,那在排程 foo 這個 task 之前會将 foo 依賴的大對象拉到本地來,然後再完成排程和運作 foo 這個 task。
  • 資源滿足:一個 task 的調用者通過發送資源請求給一個更合适的 raylet,也就是更合适的分布式排程器,那怎麼選擇哪個 raylet 是更合适的,從以下三個方面考慮:1. 通過資料本地化優先, 如果一個 task 依賴了很多對象參數,那就會現在那些節點上能包含了絕大多數參數對象的節點,因為不需要再去讀取很多的參數對象了,因為大部分都在某一個節點了,那就排程這個節點上,至于還有少部分沒有的對象參數,隻好從分布式對象存儲中去讀取到本地了。2. 通過節點粘性,如果目标 raylet 是通過NodeAffinitySchedulingStrategy 政策指定的,那就直接到對應的節點的 raylet 去。3. 也是預設的一種方式,就是認為本地的 raylet 就是開始認為最合适的排程器,雖然最終可能不一定被排程在本地。
  • 在選擇好了合适的 raylet 之後,raylet 首先會将資源的請求放到隊列中,如果這個 raylet 同意了資源請求,就傳回給調用者目前 raylet 它的本地的 worker 的位址。

06 Object 介紹

Object 是一個應用的 value,這些 value 是由 task 傳回的或通過 ray.put 建立的。一個 Object 的 value 是不可變的值,一旦建立就不能修改,它可以被存儲,也可以被引用,隻要是在 Ray cluster 叢集範圍内。

task(function)是可以有傳回值的,這個傳回值是一個 Object,當在某一個 worker 中,去調用這個 function 的時候就會去送出建立這個 task 的請求,那這個 task 的傳回值就屬于這個 worker 的,這個 worker 就是 owner。是以在這個情況下,Object 的 owner 是通過送出 creating task 來建立初始 ObjectRef 的 worker。同時這個 worker 會儲存這個 Object 的引用計數。

ray.put(obj) 這種方式也是可以建立一個 Object,當在一個 worker 中,調用 ray.put 的時候,是以在這種情況下,Object 的 owner 是通過調用 ray.put 來建立初始 ObjectRef 的 worker。同時這個 worker 會儲存這個 Object 的引用計數。

目前,worker 可以使用 ObjectRef 來引用對象。Object 的生命周期是由它的 owner 管理的。Ray 會保證,如果 Object 的 owner 是存活的,那麼 Object 最終可能被解析為它的 value(或者在 worker 失敗的情況下抛出錯誤)。如果 owner 挂了,嘗試擷取該 Object 的值将抛出異常,即使該 Object 仍然存在實體副本。

Object 是可以被儲存在 owner’s in-process memory store,也可以儲存在分布式對象存儲中。in-process memory store 是被配置設定在 owner 的堆記憶體的,同時不會限制記憶體的大小,因為在 Ray 中,隻有小的 Object 才會儲存在 worker 的 in-process memory store,如果小的 Object 太多了,也會導緻 worker 因 OOM 而被 kill。那些儲存在分布式對象存儲中的大的 Object 首先會被儲存在共享的記憶體對象存儲中。共享的記憶體對象存儲會強制一個使用者指定容量的限制(預設是機器記憶體的 30%),同時在達到容量限制的時候會儲存到本地磁盤。這樣做是為了減少每個 Object 的記憶體占用和解析時間。

有兩種方式可以擷取到 ObjectRef 對應的 Object 的 value。1. 一種是通過在一個 ObjectRef(s) 上,調用 ray.get 的方式;2. 另一種是傳遞一個 ObjectRef 作為 task 的參數,當 worker 去執行這個 task 的時候,就會解析這個 ObjectRef,然後使用解析出來的 value 去替換 task 的參數。

對于一個小的 Object,可以直接從它的 owner 的 in-process store 中去解析出來。如果是被儲存在分布式對象中的大的 Object,就必須通過分布式協定去解析這個大的 Object 了。

當一個 task A 的傳回值是一個大的 Object,它會被儲存在運作這個 task A 的節點 A 的 local shared memory store 中,當另一台節點 B 的 task B 要用 taskA 的傳回值作為 task B 的參數,那首先會從調用 task B 的調用者,也就是 taskB 的 owner 中查找 object directory,從中得知 task A 的傳回值這個大的 Object 的儲存為位置,這裡可以簡單了解為節點 A,接下來,task B 所在的 raylet 是會去節點 A 的 raylet 中的分布式對象存儲中去讀取 task A 的傳回值對象,這時會使用到分布式協定,将需要的對象複制到本地。這裡提一下 object directory,在 object directory 就儲存了 object 的位置資訊,在之前的版本是儲存在 GCS 中,現在的版本是儲存在 owners 中的(這裡的 owner 就是 task 的調用者)。當在節點 A 的 task C 要使用 task B 的傳回值,這個時候就會直接從節點 A 的 local shared memory store 中直接去擷取了。

07 PlacementGroup 管理

描述:Placement Groups 主要是用來申請預留一些資源用于去運作 actors 或 tasks。actors 或者 tasks 可以在運作的時候指定 PlacementGroupSchedulingStrategy。Placement Groups 在預留資源的時候的表現形式是使用 Bundles 來表示,Bundles 是一組資源的集合,如 bundle 0 為 {"CPU": 1}, bundle 1 為 {"CPU": 1, "GPU": 4}。一個 Placement Group 中可以包含多個 bundles,需要注意的是 bundle 是在 Placement Groups 的最小單元,而且一個 bundle 可以被預留的前提是,資源的申請不能超過單台機器的資源剩餘。舉例來說,叢集的機器中,具有最充裕資源的機器剩下了 cpu :5 ,gpu :5,如果定義了一個 Placement Group,Placement Group 中定義了一個 bundle,這個 bundle 定義了 {"CPU": 5, "GPU": 9},那這個 bundle 是不能被排程的,也可以了解成這個 Placement Group 是無法被滿足以及排程的。Placement Group 的資源預留也是可以跨機器預留的。比如叢集有 3 台機器,讓每一台機器預留 {"CPU": 1, "GPU": 1},然後 tasks 分布在這三個機器運作。通過 Placement Groups 的能力,可以讓 actors 或者 tasks 以 gang scheduling(組排程,要麼一起運作,要麼都不要運作)的方式運作。

繼續閱讀