天天看點

說說阿裡增量計算架構Galaxy

本文我會介紹一些我認為可以公開出來說的galaxy技術上的特點,讓技術人員對該計算架構有個更準确的認識。

首先明确根本的一點, galaxy是增量計算模型 ,不是"簡單"的流計算,這點在業界是沒有的。 增量計算是有狀态的計算 。批量計算裡,每一次的輸出結果隻與本次全量掃進來的資料有關,而且計算是幂等的。增量計算,每批計算結果,是由本批資料和曆史批次結果計算出來的,即newvalue

= function(currentbatchvalue, oldvalue),然後本批計算出來的newvalue會作為oldvalue參與下一批資料的計算中。這個公式看起來與疊代計算相似,實質上,疊代計算是增量計算的一種。

下面說明流計算與galaxy的關系。

網上對galaxy有過了解的技術朋友,包括阿裡内部的同僚,可能會認為galaxy隻是storm上的一層封裝。在這裡,我嚴肅地告訴大家,隻要你動腦子想一想, 絕對不是的 ,不但不是封裝,而且與storm有本質差別。

storm是流式計算,資料流進,經曆拓撲計算,資料流出,與增量模型沒有半毛錢關系。如果真要類比storm和galaxy,大家可以想一想trident。trident是storm上的封裝,暴露接口讓使用者可以操縱state,如此,批與批之間的計算結果的确可以通過state持久化起來了,并且可以參與下一批的計算,這看上去與galaxy做的相似。但還是有一個本質差別,trident的state不是它能掌控的,說白了,你state是額外的輔助存儲,不是與我這個引擎挂鈎的,而 galaxy中的state是與整個計算架構關聯起來的 ,這點太重要了,而且實作起來是很複雜的。

再說為什麼現在的galaxy版本需要依賴storm。galaxy目前舊的版本還是跑在storm上的,借助storm的拓撲拉起,worker排程和消息傳遞。galaxy隻是需要一個能夠拉起worker,傳遞消息或者作rpc的引擎而已,storm是當時一個的臨時選擇。目前,storm引擎已經不再适合galaxy,無論是其拓撲建構和拉起的耗時,還是拓撲的不可變性,或是消息格式及序列化方面的性能等等種種問題,都不再适合galaxy,而僅僅适合于流計算。是以,将來galaxy的引擎,不會是這樣。

簡單說幾點galaxy的想象空間。

現今spark可以做的場景,galaxy都是可以做的,而且galaxy在性能上 至少是準實時 的。spark的rdd代表了使用上的易用性和計算上的reuse資料。galaxy同樣有增量語義,表達能力同樣強大的算子層和增量計算模型天然引入的資料reuse。

galaxy的計算模型在業界是沒有的,其計算場景覆寫了流式計算,疊代計算,還可以輕松愉悅地做bsp模型。

在前一篇文章中,介紹到了galaxy的增量計算性質,其state是架構内部管理的,以及與storm的簡單對比。這篇文章将講述更多galaxy增量模型的事情,并介紹這套增量模型之上實作的galaxy sql和galaxy operator,同時會從增量角度對比spark streaming。

mrm模型全稱為mapreducemerge,比mapreduce做了一個merge操作。merge階段可與state互動,讀寫某個key的oldvalue,并且這個merge接口還具備rollback語義。在流計算場景下,資料按時間或條數切成不同的批,批内可以做普遍意義下的mapreduce操作,批之間需要merge階段做跨批聚合的計算。大家可以對比spark streaming的updatestatebykey操作,在一個dstream内,各個時間段内的rdd(即各批)可以通過這個接口更新一次任務内的state。而galaxy的merge本質上是一次add的過程,對應的rollback是一次delete的過程,從資料庫的語義看,兩個過程合起來相當于是update操作,而這倆過程都是根據一個primary

key來做的,是以這件事情與spark streaming的updatestatebykey做的事情是一樣的,但是細看的話,兩者還是存在很大的差異。

galaxy的state暴露給計算task是線程級别獨享的,spark streaming的state是任務内全局共享的。線程級别獨享的優點,就在于同一批資料,按key shuffle之後來到不同的merge計算節點,各自不會阻塞各自的計算過程,而spark streaming的updatestatebykey操作會阻塞其他rdd的計算,雖然spark streaming能做到dstream内各個rdd并發執行,但是隻要有state操作,最終還是落到了時間序列上的阻塞。本時間點staterdd的計算需要依賴前一時間點父staterdd的計算結果,而批内各個key對state操作是互相阻塞和影響的,是以着眼在這層barrier上的話,galaxy的merge過程更加精細,add和delete過程是分開的,批内的key是落到不同線程上計算而state是線程内獨享的。

galaxy有三種model,分别是maponlymodel,mapreducemodel,mapreducemergemodel。即,你可以使用m model和mr model做普通的流計算或小批計算,當需要跨批操作的時候就使用mrm model。model之間是随意組合串聯的,接口相比mapreduce其實是相當靈活甚至過于靈活的,靈活的弊端是計算模型上帶來複雜性。

galaxy sql是一種streamsql,而且是目前業界沒有的。從文法上galaxy sql貼近hivesql,但又有些流計算語義上(無限資料流)不能支援的文法,比如limit, order by。

intel那邊搞了一個spark streaming + spark sql的結合,叫streamsql。利用spark sql裡的schemardd,為spark streaming流進來的rdd帶上了schema元資訊。借助spark streaming支援的操作,這種streamsql可以做滑窗效果的sql計算。但是真正跨批的增量語義(不僅僅是固定的window跨批計算),是支援不了的。galaxy sql可以做真正的增量流式sql。

舉個最簡單的例子,

第一句sql中,根據t1的a字段分組,求了個count值。第二句sql中,t2表分組的字段變為t1表裡count出來的cnt值。大家可以想象,在流計算場景裡,第一次a求count出來的值可能是100,下一個時間點,同一個a的key,count出來的值就是200了,這時候,100這個cnt已經丢到t2表裡計算出結果了,現在100已經更新到200了,200這個新的值的計算是簡單的,但問題是如何把t2裡之前100的計算結果撤銷呢?

可以仔細想想,streamsql是做不了這樣的sql的,本質上是因為spark streaming不支援這樣的操作。galaxy計算架構的merge階段可以做rollback操作,復原之前"錯誤"的狀态,使得galaxy sql可以做分布式流式sql。

galaxy operator是galaxy mrm程式設計接口之上的一層dag封裝,兼具易用性和表達能力。

算子層最終将映射成多個galaxy的mrm model,使使用者可以更加關注計算邏輯,屏蔽較複雜的mrm model,特别是merge階段。

算子層相當于是實體執行計劃,本身可以做節點合并、謂詞下推等優化的工作,即實體執行計劃的優化。從本質上,我認為類似hive、spark catalyst裡對執行計劃的優化工作,在算子層這個dag裡都是可以做的。通過算子這一層,理論上任何dsl都是可以映射之後在galaxy計算架構上運作的。

算子層提供五類正交的基礎算子:map, reduce,merge,shuffle,union。五類基礎算子可以互相組合,衍生成更進階的算子。

需要注意的是,reduce類的算子 ,針對的是 本批 内資料的聚合。增量語義下的reduce與批量語義下mapreduce中的reduce并不一樣,增量語義下的reduce針對的是本批,mapreduce中的reduce對應跨批的資料,更加類似增量語義下的merge。merge類的算子 ,針對的是 跨批 的聚合操作。merge()對應的是mrm模型裡的merge phase,可與oldvalue互動,是增量場景中的特性操作。通常用于實作count、sum等udaf操作,也可以實作top、distinct、類join的操作。

union類的算子 ,針對的是多流合并的場景。union()操作是将多條流合并成一條流輸出,要求各流的columns對齊且一緻。mix()操作也是多流合并成一條,但内部标明了資料來自左流還是右流,各流的column可以不一緻,後續可以銜接集合性的批内或跨批操作。mix()是 專門為集合性 操作而設計的接口。

功能上,算子層可以類比spark rdd。spark rdd 核心價值 有二:其一,在api層面,規避mapreduce模型的抽象和不舒适的原生接口,提供多種transformations和actions,友善開發者了解和使用,即 easy to use ;其二,在計算層面,通過持久化rdd做到了批量計算過程中對中間資料的複用,使spark誕生之初以适合疊代型計算的記憶體計算架構聞名,即 reuse

data 。反觀galaxy算子層,一方面,算子層與spark rdd一樣,在api設計上具備flumejava的設計理念,兼具易用性和表達能力;另一方面,galaxy之增量計算模型是 "有狀态的計算" ,天然做到了實時資料各批之間"狀态"的reuse(在merge phase)。

之後有時間,希望可以介紹下galaxy的任務模型、對于state的管理和容錯等方面的内容。

繼續閱讀