檢視精彩回放: https://developer.aliyun.com/live/246411
内容簡要
一、Vineyard的建立初衷
二、Vineyard的定義及功能
三、Vineyard和雲原生結合
四、案例示範
五、Roadmap
(一)為什麼需要一個分布式記憶體資料管理引擎?
首先,對于單機資料計算,PyData生态已經成為事實标準,這裡資料在不同的Python庫間0拷貝的共享非常的簡單,因為資料都在一台機器的記憶體裡,彼此間通過傳遞參數的引用就可以實作。

這裡先是聲明了一個numpy的array,然後另一個庫pytorch可以從這個numpy array構造一個tensor,這個過程是0拷貝。可以看到,如果我們修改了tensor的值,array的值同樣被修改了,因為它們指向同一個程序裡的同一段記憶體。
那麼,如果對于更複雜的計算場景,我們需要在不同的程序或者運作時之間0拷貝的共享資料呢?
一種方法是使用Apache Arrow的Plasma,它使用程序間記憶體映射來實作0拷貝,但這還是在單機環境下。
對于我們業務中常見的無法放在一台機器上的大資料,如果還要實作跨系統的0拷貝資料共享,應該如何解決呢?
Vineyard可以提供這種分布式環境下跨系統資料0拷貝共享的能力,進一步,如果和Kubernetes結合的話,還可以産生一種更靈活的計算範式,下面我将逐漸展開。
(二)現實生活中的大資料應用
- 我們先看一個例子,如上所示,一個經典的反作弊工作流大緻包含以下四個部分:
1) 購買記錄的提取,這裡會用到一些分布式的SQL引擎;
2) 從原始資料構造使用者商品關系圖,并通過标簽傳播等圖算法計算節點風險,是以會引入圖計算系統進行處理;
3) 用機器學習的方法,來提升風險檢測的精度;
4) 對于得到的高風險使用者和商品,進行線上人工處理,這裡會用到一些資料可視化的系統。
可以看到,一個經典的大資料應用工作流中,往往會出現多種不同的計算需求,需要用到各種專業性的計算系統來逐一解決,而且往往都會使用一個分布式檔案系統,例如HDFS,來解決跨系統資料共享的問題,這也導緻每個子任務隻有在前序子任務完全結束以後才能被啟動。
這就帶來如下三個問題:
1) 每個專業性的計算系統想要達到生産可用都非常困難,光是開發各種資料平台資料類型的I/O接口就要花費大量時間;
2) 使用檔案系統交換資料帶來大量的I/O以及序列化和反序列化開銷;
3) 由于工作流被檔案系統人為割裂,導緻全局性優化方法無法實施。
以圖計算為例,由于圖計算的複雜性,針對各種場景下的具體問題,這些年從學界和工業界湧現了大量的針對性的圖計算系統,但是它們卻很少能達到生産可用。
這裡面一個主要原因就是光是I/O,分布式部署,容錯機制,系統擴充性,以及各類資料源資料格式的接入就需要花費大量的時間和人力。
(三)設計初衷
針對以上三個問題,我們研發Vineyard。
首先,Vineyard不僅為上層應用提供存儲,同時采用可擴充設計,為上層系統提供I/O,分塊,擴容和容錯能力,進而減少上層系統在這些方面的開發工作;
其次,Vineyard采用基于共享記憶體的0拷貝資料共享,減少I/O和資料拷貝的開銷;
最後,Vineyard提供系統間的Stream,提升工作流的整體并行效率。
(一)定義
首先,Vineyard管理的是分布式的記憶體中的不可變資料,也就是說,這些資料被存放在Vineyard以後,一經封存,就不再允許被更改。
這類資料在大資料應用中非常常見,一般來說,在一個大資料工作流中,上一個計算步驟結束後,它要傳給下個步驟的資料就固定了,下一個計算系統對它的使用僅僅是讀的操作。
其次,這類資料通過Vineyard傳遞給下一個步驟是0拷貝的,這是通過記憶體映射實作的。
同時,Vineyard還為這些資料提供開箱即用的高層次抽象,友善計算系統對這些資料的使用,例如Graph圖、Tensor張量和DataFrame等等。
這使得上層系統可以像使用本地資料變量一樣使用Vineyard上的分布式資料;
最後,Vineyard内建了大量的資料分塊,讀寫,檢查的驅動,同時采用可擴充設計,允許新的驅動以即插即用的形式在工作流中動态擴充Vineyard的功能,進而更好的支援上層工作流的開發和計算。
(二)系統架構
Vineyard系統架構圖
具體來看,Vineyard的系統架構如上方所示。
首先,Vineyard裡的每一個資料對象都包含資料負載和中繼資料兩個部分,其中資料負載儲存資料本身。
例如對于一個DataFrame而言,就是每一行每一列實際的值,而中繼資料包含DataFrame的shape、schema,以及DataFrame在分布式環境中是如何被切分和部署的資訊等。這裡中繼資料可以完備的解釋資料負載,進而實作資料正确的複用。在Vineyard裡,資料負載存儲在共享記憶體中,而中繼資料通過後端的ETCD在叢集上共享。
其次,Vineyard會在叢集的每個節點上運作一個守護程序,負責這個節點上的資料負載。資料負載隻能被IPC連接配接擷取,并且通過共享記憶體實作0拷貝的資料共享,而中繼資料還可以通過RPC連接配接來擷取。
這是因為對于一個經典的并行資料處理場景,我們可以先通過RPC連接配接通路中繼資料來獲得資料在叢集上的分布情況,然後再分布式的啟動一系列工作程序,并通過IPC連接配接來真正擷取資料負載并進行處理。
最後,Vineyard以驅動的形式,為其存儲的資料對象提供各種内建的和即插即用的功能。
(三)案例說明
上圖為一個分布式DataFrame的例子。
首先我們在第一台機器上,通過IPC域套接字連接配接到Vineyard,如下方所示:
然後我們通過對象ID獲得這個分布式DataFrame,我們可以看到,它被分成多塊,每個分塊是一個單獨的DataFrame;
在這裡,這個大的分布式DataFrame是Vineyard的一個全局對象,而這個編号為0的分塊是一個局部對象;
通常來說,一個Vineyard全局對象,由多個局部對象組成,并且這些局部對象分布在多台機器上。
這裡我們可以看到,編号為0的分塊存儲在第一台機器上,相對于第一台機器,它是本地資料;
相反,編号為1的分塊資料存儲在第三台機器上,它不是第一台機器的本地資料。
我們繼續擷取編号為0的分塊的資料負載,這裡資料通過共享記憶體映射到目前程序,實作了資料0拷貝的共享。
而編号為1的分塊不是本地資料,我們無法擷取資料負載,但是可以擷取它的中繼資料。
我們已經了解了Vineyard的基本概念,接下來我們來看Vineyard和雲原生的結合的情況,以及我們打造的全新的計算範式。
我們會先介紹如何在Kubernetes上部署Vineyard,然後介紹Vineyard如何利用Kubernetes的能力,去實作資料和工作的共同排程,進而産生一種全新的大資料計算範式。
(一)Vision: 大資料任務的雲原生範式
如上所示,還是之前的例子,在我們全新的計算範式下,它是這樣被部署的:
首先,Vineyard以Daemonset的形式部署在Kubernetes上;
其次,每個步驟的計算系統也被遷移到Kubernetes上,這使得它們獲得了動态擴充的能力;
最後,這些系統間以CRD,也就是Kubernetes上Vineyard自定義資源的形式,抽象中間資料。同時,Vineyard會利用Kubernetes的能力,協同部署中間資料和工作程序,確定每個工作程序都能獲得它需要的資料,進而實作計算工作流的高效運作。
(二)在Kubernetes上部署Vineyard
目前Vineyard已經可以通過Helm來安裝。
同時,Kubernetes上的資料共享是這樣實作的:
首先,每個節點上的Vineyard服務Pod和其他工作Pod依然通過IPC連接配接來實作記憶體共享。
在Kubernetes環境中,域套接字可以通過被挂載到Pod中的hostPath或者PVC在不同Pod之間共享。
而對于Vineyard服務容器和工作容器被綁定在同一個Pod的情形,可以通過一個emptyDir類型的Volume在兩個容器之間共享。
(三)Kubernetes的能力: CRDs
在使用Kubernetes的能力方面,首先Vineyard存儲的對象會被抽象為Kubernetes的自定義資源,這個CRD中包含其對應對象的中繼資料。
同時,如果是局部對象,也就是比如剛剛說到的一個分布式DataFrame的某個分塊,還會包含位置資訊,也就是這個分塊存儲在哪個節點上。。
(四)Kubernetes的能力:協同排程資料和工作負載
由于局部對象的CRD中包含位置資訊,我們可以利用它來協同排程資料和工作負載:
- 在工作負載Pod裡描述這個工作需要的Vineyard對象的ID(通常是一個全局對象);
- 通過Kubernetes的scheluder-plugin來增強排程器的排程能力,使得當工作Pod被排程時,排程器會先通過CRD檢視它需要的Vineyard對象所包含的局部對象的位置資訊,進而優先将工作Pod排程到相應的節點上。
- 如果因為種種原因,無法排程成功,資料遷移則會被自動觸發,進而保證工作Pod總是能夠通路到它需要的資料。
下面,我們通過一個Demo來展示Vineyard如何借助Scheduler Plugin實作資料和工作的協同排程。
這裡使用一個簡化版本來做Demo,我們會針對作弊交易資料構造一個作弊交易的分類器,在這個Demo裡面我們會用Pandas和Mars分别在單機和分布式的情況下來預處理資料,同時用Pytorch訓練一個作弊交易的分類器。
好,我們先來看看資料。如上所示,通常情況下Vineyard以及它上面的系統用到的資料都是非常大規模的資料,這裡我們用一個小資料來做Demo,否則的話光是加載這個資料就要花費大量的時間。
這裡有三張表,分别是使用者表、商品表,還有交易表。使用者表和商品表主要包含使用者和商品的ID,以及它們各自Feature的向量。而這個交易表每一條記錄表示一個使用者購買一個商品,還有一個标簽Frod來辨別這條記錄是否是一個作弊的交易,同時一些關于這些交易的Feature也都存在這個表裡面。
對于這樣一個問題,我們在單機的情況下是如何實作的呢?我們看一下 Pandas版本的代碼,如下所示。
首先我們會加載資料,用Pandas讀這三個檔案得到三個DataFrame,然後把這三個DataFrame合并成一個大的Dataset,這裡用到了DataFrame的Join,這是第一步。
第二步我們通過Dataset來訓練模型,通過Dataset來構造Feature資料和Label資料。
然後使用一個最簡單的線性模型來訓練分類器,這是單機版本的實作。
如果資料非常大,無法在一台機器上存儲的話,那麼這個單機版本的代碼顯然是無法實作這樣的功能,因為Pandas甚至都無法去讀這樣的一個檔案,這種情況下我們如何實作呢?
這裡我們使用Mars來做第一步DataFrame處理的工作。
Mars是集團開源的一個項目,支援大規模DataFrame的計算。
對于一個Mars中的DataFrame分類,它和Vineyard類似,包含多個分塊,并且分布在多台機器上,是以這裡可以先用Vineyard将檔案讀成一個GlobalDataFrame,然後直接加載為Mars的一個DataFrame。
同時Mars支援DataFrame的計算,比如Join操作。Mars通過多台機器上多個分塊間的并行計算,實作分布式DataFrame的交易操作。最後,Mars合并的大的DataFrame會以Global DataFrame的形式存進Vineyard。
第二步使用Distributed Pytorch來進行計算。這裡Distributed Pytorch要從Vineyard裡Log出GlobalDataFrame,這個Distributed Pytorch會起多個工作程序,每個工作程序會得到相應的分塊。
首先會建立一個Vineyard Client連接配接到VINEYARD_IPC_SOCKET,然後獲得本地分塊的ID,随後Client會拿到這些資料負載,并且把它們合并成一個總的表,并且通過總表和之前一樣建構Pytorch Dataset,接下來訓練的内容就和剛剛一緻了。
上圖為實際執行Demo過程。
首先我們有一個幹淨的Kubernetes環境,接着用Helm安裝Vineyard,安裝了Vineyard以後得到了兩個CRD,一個是全局對象,一個是本地對象。Vineyard在叢集上運作是以Demon Set的形式,也就是說每個Log上都有一個Vineyard Pod。
上圖為Run Mars,Mars會啟動一個叢集,然後分布式地做這個計算,得到一個GlobalDataFrame與它的ID。
同樣的,在Kubernetes裡面有一個和GlobalDataFrame對應的全局對象,這個全局對象包含多個分塊,也就是多個局部對象。可以看到全局對象分布在192和193兩台機器上,可以登入192檢視其中一個分塊具體長什麼樣。
通過Import Vineyard然後建立一個IPC Socket Client,然後選取一個192上面的一個分塊來Get。
可以看到我們得到了DataFrame,它是我們剛剛大的DataFrame的一個分塊。
我們再試圖get一下193上面的分塊,可以看到我們無法拿到這個分塊,是以如果我們的工作Pod在被排程的時候,沒有和資料所在的節點對齊,就會發生無法擷取資料的問題。應該如何解決呢?我們看一下Pytorch的Yaml。
在InitContainers裡面我們增加了一些内容,它可以確定當資料和工作負載沒有被對齊的時候,資料遷移會自動地被觸發,下面我們來Run Pytorch。
首先檢視全局對象并拿到 ID,然後把ID輸入給要Run 的Pytorch。
Pytorch部署到Kubernetes後,可以看到其中兩個Pod運作在188和187上,和剛剛的192、193沒有對齊,那麼這裡會發生什麼呢?我們看一下worker-0的Log,可以看到資料被自動地遷移了,但是資料遷移的過程是很花費時間的。
這裡由于資料遷移,我們會産生重複的局部對象,比如31a也被複制了,它從193被複制到了188,進而我們的Worker可以使用它。
可以看到,資料遷移的過程不僅會花費時間,而且還會浪費記憶體,是以我們應該想辦法去盡量減少這種資料遷移的發生,這也就是為什麼我們要用Scheduler Plugin來增強Kubernetes的排程能力,進而減少這種資料遷移的發生。
下面把環境清理一下,重新Run以上過程,不過這一次會使用Scheduler Plugin。
重新安裝Vineyard,重新再執行一遍Mars,又得到了一個GlobalDataFrame,那麼我們接下來檢查一下,全局對象所包含的局部對象在我們叢集中的位置。
可以看到,它依然是在192和193這兩台機器上,接下來要安裝Scheduler Plugin。
Scheduler Plugin會根據工作負載所需要的Vineyard對象的位置來排程這個工作的Pod。後面會通過觀察Scheduler Plugin的Log來檢視這個過程。
再次打開Pytorch的Yaml,可以看到我們把需要的Vineyard對象的ID寫在了這裡,Scheduler Plugin可以通過讀取這個Spec來知道工作負載是需要哪一個Vineyard對象。
我們運作Pytorch拿到 ID,Set Scheduler Plugin,可以看到兩個Pod已經運作在192和193上面,接着我們去看一下Scheduler Plugin的Log,可以看到Scheduler Plugin給193和192兩台機器打了最高分。回到Worker-0的Log,可以看到沒有資料遷移發生,也沒有重複的局部對象被建立,是因為沒有進行任何的資料遷移。
在這裡我們通過Scheduler Plugin成功地将資料和工作負載對齊,進行協同地排程,進而實作了高效的并行計算。
以上是Vineyard利用Scheduler Plugin實作協同排程的Demo,這為我們打造基于Vineyard和Kubernetes的高效且靈活的大資料計算範式,打開了一扇大門。
(一)項目狀态
目前,Vineyard在GitHub已經收獲375顆星,同時有6位同僚在維護這個項目,我們期待獲得來自社群的更多的支援,也希望有社群成員可以逐漸成為Vineyard的維護者。
(二)Roadmap
- 目前:
1) 通過Github Actions建構和測試;
2) 支援各種資料類型,如數組、圖形,并支援多種計算引擎,如pytorch, mars, GraphScope;
3) 在DockerHub和Quay上釋放;
4) 與Helm進行內建。
- 未來:
1) Vineyard Operator for Kubernetes;
2) 性能改進;
3) 更多語言SDK,如Java, Go等;
4) 存儲層次結構;
5) Scheduler-plugin:Fluid
在未來的規劃中,我們首先會在Kubernetes上做功能性更強的Vineyard Operator,比如檢查點,容錯等。此外,我們還會和集團的Fluid合作,進一步優化排程能力。