“工業流水線”的鼻祖,福特 T 型汽車[1]的電機裝配,将組裝過程拆成 29 道工序,将裝備時間由平均二十分鐘降到五分鐘,效率提升四倍 ,下圖圖源[2]。
T 型汽車裝配流水線
這種流水線的思想在資料處理過程中也随處可見。其核心概念是:
- 标準化的資料集合:對應待組裝對象,是對資料進行中各個環節輸入輸出的一種一緻性抽象。所謂一緻,就是一個任意處理環節的輸出,都可以作為任意處理環節的輸入。
- 可組合的資料變換:對應單道組裝工序,定義了對資料進行變換的一個原子操作。通過組合各種原子操作,可以具有強大的表達力。
則,資料處理的本質是:針對不同需求,讀取并标準化資料集後,施加不同的變換組合。
Unix 管道
Unix 管道是一項非常偉大的發明,展現了 Unix 的一貫哲學:
程式應該隻關注一個目标,并盡可能把它做好。讓程式能夠互相協同工作。應該讓程式處理文本資料流,因為這是一個通用的接口。
— Unix Pipe 機制發明者 Malcolm Douglas McIlroy
上述三句話哲學正展現了我們提到的兩點,标準化的資料集合——來自标準輸入輸出的文本資料流,可組合的資料變換——能夠協同工作的程式(如像 sort, head, tail 這種 Unix 自帶的工具,和使用者自己編寫的符合管道要求的程式)。
讓我們來看一個使用 Unix tools 和管道來解決實際問題的例子。假設我們有一些關于服務通路的日志檔案(var/log/nginx/access.log ,例子來自 DDIA[3] 第十章),日志的每一行格式如下:
shell複制代碼// $remote_addr - $remote_user [$time_local] "$request"
// $status $body_bytes_sent "$http_referer" "$http_user_agent"
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"
我們的需求是,統計出日志檔案中最受歡迎的五個網頁。使用 Unix Shell ,我們會寫出類似的指令:
shell複制代碼cat /var/log/nginx/access.log | # 讀取檔案,打入标準輸出
awk '{print $7}' | # 取出每行按空格分割的第七個字段
sort | # 對每行按字面值進行排序
uniq -c | # 歸并重複行,并給出重複次數
sort -r -n | # 按重複次數降序進行排序
head -n 5 # 輸出前五行
可以看出上述 Shell 指令有以下幾個特點:
- 每個指令實作的功能都很簡單(高内聚)
- 所有指令通過管道進行組合(低耦合),當然這也要求可組合的程式隻面向标準輸入、标準輸出進行程式設計,無其他副作用(比如輸出到檔案)
- 輸入輸出面向文本而非二進制
此外,Unix 的管道的另一大優點是——流式的處理資料。也即所有程式中間結果并非都計算完成之後,才送入下一個指令,而是邊算邊送,進而達到多個程式并行執行的效果,這就是流水線的精髓了。
當然,管道也有缺點——隻能進行線性的流水線排布,這也限制了他的表達能力。
GFS 和 MapReduce
MapReduce 是谷歌 2004 年的論文 MapReduce: Simplified Data Processing on Large Clusters[4] 提出的,用以解決大規模叢集、并行資料處理的一種算法。GFS 是與 MapReduce 配套使用的基于磁盤的分布式檔案系統。
MapReduce 算法主要分為三個階段:
- Map:在不同機器上并行的對每個資料分區執行使用者定義的 map() → List<Key, Value> 函數。
- Shuffle:将 map 的輸出結果(KV 對)按 key 進行重新分區,按 key 聚集送到不同機器上, Key→ List<Value>。
- Reduce:在不同機器上并行地對 map 輸出的每個 key 對應的List<Value> 調用 reduce 函數。
DDIA 第十章 MapReduce 執行示意圖
每個 MapReduce 程式就是對存儲在 GFS 上的資料集(标準化的資料集)的一次變換。理論上,我們可以通過組合多個 MapReduce 程式(可組合的變換),來滿足任意複雜的資料處理需求。
但與管道不同的是,每次 MapReduce 的輸出都要進行“物化”,即完全落到分布式檔案系統 GFS 上,才會執行下一個 MapReduce 程式。好處是可以進行任意的、非線性的 MapReduce 程式排布。壞處是代價非常高,尤其考慮到 GFS 上的檔案是多機多副本的資料集,這意味着大量的跨機器資料傳輸、額外的資料拷貝開銷。
但要考慮到曆史上開創式的創新,縱然一開始缺點多多,但會随着時間疊代而慢慢克服。GFS + MapReduce 正是這樣一種在工業界開創了在大規模叢集尺度上處理海量資料的先河。
Spark
Spark 便是為了解決 MapReduce 中每次資料集都要落盤的一種演進。
首先,Spark 提出了标準的資料集抽象——RDD[5],這是一種通過分片的形式分散在多機上、基于記憶體的資料集。基于記憶體可以使得每次處理結果不用落盤,進而處理延遲更低。基于分片可以使得在機器當機時,隻用恢複少量分片,而非整個資料集。邏輯上,我們可以将其當做一個整體來進行變換,實體上,我們使用多機記憶體承載其每個分片。
其次,基于 RDD,Spark 提供了多種可靈活組合的算子集,這相當于對一些常用的變換邏輯進行“構件化”,可以讓使用者開箱即用。(下面圖源 RDD 論文[6])
RDD 論文中列出的算子
基于此,使用者可以進行任意複雜資料處理,在實體上多個資料集(點)和算子(邊)會構成一個複雜的 DAG (有向無環圖)執行拓撲:
RDD 和算子構成的 DAG
關系型資料庫
關系型資料庫是資料處理系統的集大成者。一方面,它對外提供強大的聲明式查詢語言——SQL,兼顧了靈活性和易用性。另一方面,他對内使用緊湊、索引友好的存儲方式,可以支撐高效的資料查詢需求。關系型資料庫系統同時集計算和存儲于一身,又會充分利用硬碟,甚至網絡(分布式資料庫)特點,是對計算機各種資源全方位使用的一個典範。本文不去過分展開關系型資料庫實作的各個環節,而是聚焦本文重點——标準的資料集和可組合的算子。
關系型資料庫對使用者提供的資料基本組織機關是——關系,或者說表。在 SQL 模型中,這是一種由行列組成的、強模式的二維表。所謂強模式,可以在邏輯上了解為表格中每個單元所存儲的資料必須要符合該列“表頭”的類型定義。針對這種标準的二維表,使用者可以施加各種關系代數算子(選擇、投影、笛卡爾乘積)。
一條 SQL 語句,在進入 RDBMS 之後,經過解析、校驗、優化,最後轉化成算子樹進行執行。對應的 RDBMS 中的邏輯單元,我們通常稱之為——執行引擎,Facebook Velox[7] 就是專門針對該生态位的一個 C++ 庫。
傳統的執行引擎多使用火山模型,一種屬于拉( pull-based )流派的執行方式。其基本概念就是以樹形的方式組織算子,并從根節點開始,自上而下的進行遞歸調用,算子間自下而上的以行(row)或者批(batch)的粒度傳回資料。
基于 Pull 的算子執行
近些年來,基于推(push-based)的流派漸漸火起來了,DuckDB、Velox 都屬于此流派。類似于将遞歸轉化為疊代,自下而上,從葉子節點進行計算,然後推給父親節點,直到根節點。每個算子樹都可以拆解為多個可以并行執行的算子流水線(下圖源,Facebook Velox 文檔[8])
将執行計劃樹拆成 pipeline
我們把上圖順時針旋轉九十度,可以發現他和 Spark 的執行方式如出一轍,更多關于 velox 機制的解析,可以參考我寫的這篇文章[9]。
但無論推還是拉,其對資料集和算子的抽象都符合本文一開始提出的理論。
小結
考察完上述四種系統之後,可以看出,資料處理在某種角度上是大一統的——首先抽象出歸一化的資料集,然後提供施加于該資料集之上的運算集,最終通過組合的形式表達使用者的各種資料處理需求。
參考資料
[1]福特 T 型汽車: www.youtube.com/watch?v=As0…
[2]汽車流水線圖源: www.motor1.com/features/17…
[3]DDIA: ddia.qtmuniao.com/
[4]MapReduce 論文: research.google.com/archive/map…
[5]RDD 分析: www.qtmuniao.com/2019/11/14/…
[6]RDD 論文: www.usenix.org/system/file…
[7]Facebook Velox: github.com/facebookinc…
[8]Facebook Velox 文檔: facebookincubator.github.io/velox/devel…
[9]Facebook velox 運作機制解析: zhuanlan.zhihu.com/p/614918289