最近,在 PyCon China 2018 的北京主會場、成都和杭州分會場都分享了我們最新的工作 Mars,基于矩陣的統一計算架構。本文會以文字的形式對 PyCon 中國上的分享再進行一次闡述。
聽到 Mars,很多第一次聽說的同學都會靈魂三問:Mars 是什麼,能做什麼,怎麼做的。今天我們就會從背景,以及一個例子出發,來回答這幾個問題。
背景

首先是 scipy 技術棧的全景圖,numpy 是基礎,它提供了多元數組的資料結構,并提供了它上面的各種計算。再往上,重要的有 scipy,主要面向各種科學計算的操作;pandas,其中核心的概念是 DataFrame,他提供對表類型資料的處理、清洗等功能。往上一層,比較經典的庫,有 scikit-learn,它是最知名的機器學習架構之一。最上面一層,是各種垂直領域的庫,如 astropy 主要面向天文,biopython 面向生物領域等。
從 scipy 技術棧可以看出,numpy 是一個核心的地位,大量上層的庫都使用了 numpy 的資料結構和計算。
我們真實世界的資料,并不隻是表這種二維類型資料那麼簡單,很多時候,我們要面對的往往是多元資料,比如我們常見的圖檔處理,首先我們有圖檔的個數,然後有圖檔的長寬,以及 RGBA 通道,這就是四維的資料;這樣的例子不勝枚舉。有這樣多元的處理能力,就有處理各種更加複雜,甚至是科學領域的能力;同時,由于多元資料本身包含二維資料,是以,我們也是以具備表類型資料的處理能力。
另外,如果我們需要探究資料的内在,光靠對表資料進行一些統計等操作是絕對不夠的,我們需要更深層的“數學” 的方法,比如運用矩陣乘法、傅裡葉變換等等的能力,來對資料進行更深層次的分析。而 numpy 由于是數值計算的庫,加上各種上層的庫,我們認為它們很适合用來提供這方面的能力。
為什麼要做 Mars,從一個例子開始
那麼,為什麼要做 Mars 這個項目呢?我們不妨從一個例子來看。
我們試圖用蒙特卡洛方法來求解 pi,蒙特卡洛方法其實很簡單,就是用随機數的方法來解決特定的問題。如圖,這裡我們有個半徑為1的圓和邊長為2的正方形,我們生成很多随機點的方式,通過右下角的公式,我們就可以計算出 pi 的值為 4 乘以落在圓裡點的個數除以總的點個數。随機生成的點越多,計算出來的 pi 就越精确。
用純 Python 實作非常簡單,我們隻要周遊 N 次,生成 x 和 y 點,計算是不是落在圓内即可。運作1千萬個點,需要超過10秒的時間。
Cython 是常見加速 Python 代碼的方式,Cython 定義了 Python 語言的超集,把這個語言翻譯到 c/c++,然後再進行編譯來加速執行。這裡,我們增加了幾個變量的類型,可以看到比純 Python 提升了 40% 的性能。
Cython 現在已經成為 Python 項目的标配,核心的 Python 三方庫基本都使用 Cython 來加速 Python 代碼的性能。
我們這個例子裡的資料都是一個類型,我們可以想到用專門的數值計算的庫,通過矢量化的方式,能極快加速這個任務的性能。numpy 就是當仁不讓的選擇了,使用 numpy,我們需要的是面向 array 的思維方式,我們應當減少使用循環。這裡先用
numpy.random.uniform
來生成 N*2 的一個二維數組,然後
data ** 2
會對該數組裡的所有資料做平方操作,然後
sum(axis=1)
,會對 axis=1 也就是行方向上求和,這個時候,得到的是長度為 N 的 vector,然後我們用
numpy.sqrt
來對這個 vector 的每一個值求開方,<1 會得到一個布爾值的 vector,即每個點是不是都是落在圓裡,最後接一個 sum,就可以求出來總共的點的個數。初次上手 numpy 可能會不太習慣,但是用多了以後,就會發現這種寫法的友善,它其實是非常符合直覺的。
可以看到,通過使用 numpy,我們寫出了更簡單的代碼,但是性能确大幅提升,比純 Python 的寫法性能提升超過 10 倍。
那麼 numpy 的代碼還能夠優化麼,答案是肯定的,我們通過一個叫 numexpr 的庫,來将 numpy 的多個操作合并成一個操作執行,來加速 numpy 的執行。
可以看到,通過 numexpr 優化的代碼,性能比純 Python 代碼提升超過 25 倍。
此時的代碼運作已經相當快了,如果我們手上有 GPU,那麼我們可以利用硬體來加速任務執行。
這裡必須要安利一個庫,叫
cupy,他提供了和 numpy 一緻的 API,通過簡單的 import 替換,就能讓 numpy 代碼跑在英偉達的顯示卡之上。
這時可以看到,性能大幅提升超過 270 倍。真的非常誇張了。
為了讓蒙特卡洛方法計算的結果更加精确,我們把計算量擴大 1000 倍。會碰到什麼情況呢?
沒錯,這就是大家不時碰到的,OutOfMemory,記憶體溢出。更慘的是,在 jupyter 裡,有時候記憶體溢出導緻程序被殺,甚至會導緻之前跑的全部結果都丢失。
蒙特卡洛方法還是比較容易處理的,我把問題分解成 1000 個,每個求解1千萬資料就好了嘛,寫個循環,做個彙總。但此時,整個計算的時間來到了12分鐘多,太慢了。
此時我們會發現,整個運作過程中,其實隻有一個 CPU 在幹活,其他核都在原地吆喝。那麼,我們怎麼讓 numpy 并行化呢?
首先,numpy 裡有一些操作是能并行的,比如 tensordot 來做矩陣乘法,其他大部分操作都不能利用多核。那麼,要将 numpy 并行化,我們可以:
- 采用多線程/多程序編寫任務
- 分布式化
蒙特卡洛方法算 pi 改寫成多線程和多程序實作還是非常容易的,我們寫一個函數來處理1千萬的資料,我們把這個函數通過 concurrent.futures 的 ThreadPoolExecutor 和 ProcessPoolExecutor 來分别送出函數 1000 遍用多線程和多程序執行即可。可以看到性能能提升到 2倍和3倍。
但是呢,蒙特卡洛求解 pi 本身就很容易手寫并行,考慮更複雜的情況。
import numpy as np
a = np.random.rand(100000, 100000)
(a.dot(a.T) - a).std()
這裡建立了 10萬*10萬的矩陣 a,輸入就有大概 75G,我們拿 a 矩陣乘 a 的轉置,再減去 a 本身,最終求标準差。這個任務的輸入資料就很難塞進記憶體,後續的手寫并行就更加困難。
這裡問題就引出來了,我們需要什麼樣架構呢?
- 提供熟悉的接口,像 cupy 這樣,通過簡單的 import 替換,就能讓原來 numpy 寫的代碼并行。
- 具備可擴充性。小到單機,也可以利用多核并行;大到一個很大的叢集,支援上千台機器的規模來一起分布式處理任務。
- 支援硬體加速,支援用 GPU 等硬體來加速任務執行。
- 支援各種優化,比如操作合并,能利用到一些庫來加速執行合并的操作。
- 我們雖然是記憶體計算的,但不希望單機或者叢集記憶體不足,任務就會失敗。我們應當讓暫時用不到的資料 spill 到磁盤等等存儲,來保證即使記憶體不夠,也能完成整個計算。
Mars 是什麼,能做什麼事
Mars 就是這樣一個架構,它的目标就是解決這幾個問題。目前 Mars 包括了 tensor :分布式的多元矩陣計算。
100億大小的蒙特卡洛求解 pi的問題規模是 150G,它會導緻 OOM。通過 Mars tensor API,隻需要将
import numpy as np
替換成
import mars.tensor as mt
,後續的計算完全一緻。不過有一個不同,mars tensor 需要通過
execute
觸發執行,這樣做的好處是能夠對整個中間過程做盡量多的優化,比如操作合并等等。不過這種方式對 debug 不太友好,後續我們會提供 eager mode,來對每一步操作都觸發計算,這樣就和 numpy 代碼完全一緻了。
可以看到這個計算時間和手寫并行時間相當,峰值記憶體使用也隻是 1個多G,是以可以看到 Mars tensor 既能充分并行,又能節省記憶體的使用 。
目前,Mars 實作了 70% 的常見 numpy 接口,完整清單見
這裡。我們一緻在努力提供更多 numpy 和 scipy 的接口,我們最近剛剛完成了對逆矩陣計算的支援。
Mars tensor 也提供了對 GPU 和稀疏矩陣的支援。
eye
是建立機關對角矩陣,它隻有對角線上有值為1,如果用稠密的方式存儲會浪費存儲。不過目前,Mars tensor 還隻支援二維稀疏矩陣。
Mars 怎麼做到并行和更省記憶體
和所有的 dataflow 的架構一樣,Mars 本身也有計算圖的概念,不同的是,Mars 包含粗粒度圖和細粒度圖的概念,使用者寫的代碼在用戶端生成粗粒度圖,在送出到服務端後,會有 tile 的過程,将粗粒度圖 tile 成細粒度圖,然後我們會排程細粒度圖執行。
這裡,使用者寫下的代碼,在記憶體裡會表達成 Tensor 和 Operand 構成的粗粒度圖。
當使用者調用
execute
方法時,粗粒度的圖會被序列化到服務端,反序列化後,我們會把這個圖 tile 成細粒度圖。對于輸入 10002000 的矩陣,假設指定每個次元上的 chunk 大小都是 500,那它會被 tile 成 24 一共 8 個chunk。
後續,我們會對每個我們實作的 operand 也就是算子提供 tile 的操作,将一個粗粒度的圖 tile 成細粒度圖。這時,我們可以看到,在單機,如果有8個核,那麼我們就可以并行執行整個細粒度圖;另外給定 1/8 大小的記憶體,我們就可以完成整個圖的計算。
不過,我們在真正執行前,會對整個圖進行 fuse 也就是操作合并的優化,這裡的三個操作真正執行的時候,會被合并成一個算子。針對執行目标的不同,我們會使用 numexpr 和 cupy 的 fuse 支援來分别對 CPU 和 GPU 進行操作合并執行。
上面的例子,都是我們造出來很容易并行的任務。如我們先前提到的例子,通過 tile 之後生成的細粒度圖其實是非常複雜的。真實世界的計算場景,這樣的任務其實是很多的。
為了将這些複雜的細粒度圖能夠充分排程執行,我們必須要滿足一些基本的準則,才能讓執行足夠高效。
首先,初始節點的配置設定非常重要。比如上圖,假設我們有兩個 worker,如果我們把 1和3 配置設定到一個 worker,而将 2和4 配置設定到另一個 worker,這時當 5 或者 6 被排程的時候,他們就需要觸發遠端資料拉取,這樣執行效率會大打折扣。如果我們一開始将 1和2 配置設定到一個 worker,将 3和4 配置設定到另一個 worker,這時執行就會非常高效。初始節點的配置設定對整體的執行影響是很大的,這就需要我們對整個細粒度的圖有個全局的掌握,我們才能做到比較好的初始節點配置設定。
另外,深度優先執行的政策也是相當重要的。假設這時,我們隻有一個 worker,執行完 1和2 後,我們排程了 3 的話,就會導緻 1和2 的記憶體不能釋放,因為 5 此時還沒有被觸發執行。但是,如果我們執行完 1和2 後,排程了 5 執行,那麼當 5 執行完後,1和2 的記憶體就可以釋放,這樣整個執行過程中的記憶體就會是最省的。
是以,初始節點配置設定,以及深度優先執行是兩個最基本的準則,光有這兩點是遠遠不夠的,mars 的整個執行排程中有很多具有挑戰的任務,這也是我們需要長期優化的對象。
Mars 分布式
是以,Mars 本質上其實是一個細粒度的,異構圖的排程系統。我們把細粒度的算子排程到各個機器上,在真正執行的時候其實是調用 numpy、cupy、numexpr 等等的庫。我們充分利用了成熟的、高度優化的單機庫,而不是重複在這些領域造輪子。
在這個過程中,我們會遇到一些難點:
- 因為我們是 master-slave 架構,我們 master 如何避免單點?
- 我們的 worker 如何避免 Python 的 GIL(全局解釋器鎖)的限制?
- Master 的控制邏輯交錯複雜,我們很容易寫出來高耦合的,又臭又長的代碼,我們如何将代碼解耦?
我們的解法是使用 Actor model。Actor模型定義了并行的方式,也就是一切皆 Actor,每個 Actor 維護一個内部狀态,它們都持有郵箱,Actor 之間通過消息傳遞,消息收到會放在郵箱中,Actor 從郵箱中取消息進行處理,一個 Actor 同時隻能處理一個消息。Actor 就是一個最小的并行單元,由于一個 Actor 同時隻能處理一個消息,你完全不需要擔心并發的問題,并發應當是 Actor 架構來處理的。而所有 Actor 是不是在同一台機器上,這在 Actor 模型裡也變得不重要,Actor 在不同機器上,隻要能完成消息的傳遞就可以了,這樣 Actor 模型也天然支援分布式系統。
因為 Actor 是最小的并行單元,我們在寫代碼的時候,可以将整個系統分解成很多 Actor,每個 Actor 是單一職責的,這有點類似面向對象的思想,這樣讓我們的代碼得以解耦。
另外,Master 解耦成 Actor 之後,我們可以讓這些 Actor 分布在不同的機器上,這樣就讓 Master 不再成為單點。同時,我們讓這些 Actor 根據一緻性哈希來進行配置設定,後續如果有 scheduler 機器挂掉, Actor 可以根據一緻性哈希重新配置設定并重新建立來達到容錯的目的。
最後,我們的 actors 是跑在多程序上的,每個程序裡是很多的協程,這樣,我們的 worker 也不會受到 GIL 的限制。
像 Scala 或者 Java 這些 JVM 語言 可以使用 akka 這個 Actor 架構,對于 Python 來說,我們并沒有什麼标準做法,我們認為我們隻是需要一個輕量的 Actor 架構就可以滿足我們使用,我們不需要 akka 裡面一些高階的功能。是以,我們開發了 Mars actors,一個輕量的 Actor 架構,我們 Mars 整個分布式的 schedulers 和 workers 都在 Mars actors 層之上。
這是我們 Mars actors 的架構圖,在啟動 Actor pool 的時候,我們子程序會根據并發啟動若幹子程序。主程序上有 socket handler 來接受遠端 socket 連接配接傳遞消息,另外主程序有個 Dispatcher 對象,用來根據消息的目的地來進行分發。我們所有的 Actor 都在子程序上建立,當 Actor 收到一個消息處理時,我們會通過協程調用
Actor.on_receive(message)
方法。
一個 Actor 發送消息到另一個 Actor,分三種情況。
- 它們在同一個程序,那麼直接通過協程調用即可。
- 它們在一台機器不同程序,這個消息會被序列化通過管道送到主程序的 Dispatcher,dispatcher 通過解開二進制的頭部資訊得到目标的程序 ID,通過對應的管道送到對應子程序,子程序通過協程觸發相應 Actor 的消息處理即可。
- 它們在不同機器,那麼目前子程序會通過 socket 把序列化的消息發送到對應機器的主程序,該機器再通過 Dispatcher 把消息送到對應子程序。
由于使用協程作為子程序内的并行方式,而協程本身在 IO 處理上有很強的性能,是以,我們的 Actor 架構在 IO 方面也會有很好的性能。
上圖是裸用 Mars actors 來求解蒙特卡洛方法算 pi。這裡定義兩個 Actor,一個 Actor 是 ChunkInside,它接受一個 chunk 的大小,來計算落在圓内點的個數;另外一個 Actor 是 PiCaculator,它負責接受總的點個數,來建立 ChunkInside,這個例子就是直接建立 1000 個 ChunkInside,然後通過發送消息來觸發他們計算。
create_actor
時指定 address 可以讓 Actor 配置設定在不同的機器上。
這裡可以看到,我們裸用 Mars actors 的性能是要快過多程序版本的。
這裡我們總結一下,通過使用 Mars actors,我們能不受 GIL 限制,編寫分布式代碼變得非常容易,它讓我們 IO 變得高效,此外,因為 Actor 解耦,代碼也變得更容易維護。
現在讓我們看下 Mars 分布式的完整執行過程。現在有1個 client,3個 scheduler 和 5個worker。使用者建立一個 session,在服務端會建立一個 SessionActor 對象,通過一緻性哈希,配置設定到 scheduler1 上。此時,使用者運作了一個 tensor,首先 SessionActor 會建立一個 GraphActor,它會 tile 粗粒度圖,圖上假設有三個節點,則會建立三個 OperandActor,分别配置設定到不同的 scheduler 上。每個 OperandActor 會控制 operand 的送出、任務狀态的監督和記憶體的釋放等操作。此時 1 和 2 的 OperandActor 發現沒有依賴,并且叢集資源充足,那麼他們會把任務送出到相應的 worker 執行,在執行完成後,向 3 通知任務完成,3 發現 1和2 都執行完成後,因為資料在不同 worker 執行,決定好執行 worker 後,先觸發資料拉取操作,然後再執行。用戶端這邊,通過輪詢 GraphActor 得知任務完成,則會觸發資料拉取到本地的操作。整個任務就完成了。
我們對 Mars 分布式做了兩個 benchmark,第一個是對 36 億資料的每個元素加一再乘以2,圖中紅色叉是 numpy 的執行時間,可以看到,我們比 numpy 有數倍提升,藍色的虛線是理論運作時間,可以看到我們真實加速非常接近理論時間加速。第二個 benchmark,我們增加了資料量,來到 144 億資料,對每個元素加1乘以2後,再求和,可以看到單機 numpy 已經不能完成任務了,此時,針對這個任務,我們也可以取得不錯的加速比。
未來計劃
Mars 已經在 Github 上源代碼,讓更多同學來一起參與共建 Mars:
https://github.com/mars-project/mars。
在後續 Mars 的開發計劃上,如上文說,我們會支援 eager mode,讓每一步觸發執行,提升對性能不敏感的任務開發以及 debug 時的使用體驗;我們會支援更多 numpy 和 scipy 接口;後續很重要的一個是,我們會提供 100% 相容 pandas 的接口,由于利用了 mars tensor 作為基礎,我們也可以提供 GPU 的支援;我們會提供相容 scikit-learn 的機器學習的支援;我們還會提供在細粒度圖上排程自定義函數和自定義類的功能,增強靈活性;最後,因為我們用戶端其實并不依賴 Python,任意語言都可以序列化粗粒度圖,是以我們完全可以提供多語言的用戶端版本,不過這點,我們會視需求而定。
總之,開源對我們是很重要的,龐大的 scipy 技術棧的并行化,光靠我們的力量是不夠的,需要大家來一起幫我們來共建。
現場圖檔
最後再 po 一點現場圖檔吧,現場觀衆對 Mars 的問題還是蠻多的。我大緻總結下:
- Mars 在一些特定計算的性能,比如 SVD 分解,這裡我們有和使用者合作項目的一些測試資料,輸入資料是 8億*32的矩陣做 SVD 分解,分解完再矩陣乘起來和原矩陣做對比,這整個計算過程使用 100個 worker(8核),用7分鐘時間算完
- Mars 何時開源,我們已經開源:
- Mars 開源後會不會閉源,答:不會
- Mars actors 的詳細工作原理
- Mars 是靜态圖還是動态圖,目前是靜态圖,eager mode 做完後可以支援動态圖
- Mars 會不會涉及深度學習,答:目前不會
Mars user group 釘釘群掃碼加入。