天天看點

《Spark大資料分析實戰》——3.2節Spark Streaming

本節書摘來自華章社群《spark大資料分析實戰》一書中的第3章,第3.2節spark streaming,作者高彥傑 倪亞宇,更多章節内容可以通路雲栖社群“華章社群”公衆号檢視

3.2 spark streaming

spark streaming是一個批處理的流式計算架構。它的核心執行引擎是spark,适合處理實時資料與曆史資料混合處理的場景,并保證容錯性。下面将對spark streaming進行詳細的介紹。

3.2.1 spark streaming簡介

spark streaming是建構在spark上的實時計算架構,擴充了spark流式大資料處理能力。spark streaming将資料流以時間片為機關進行分割形成rdd,使用rdd操作處理每一塊資料,每塊資料(也就是rdd)都會生成一個spark job進行處理,最終以批處理的方式處理每個時間片的資料。請參照圖3-6。

《Spark大資料分析實戰》——3.2節Spark Streaming

spark streaming程式設計接口和spark很相似。在spark中,通過在rdd上用transformation(例如:map, f?ilter等)和action(例如:count, collect等)算子進行運算。在spark streaming中通過在dstream(表示資料流的rdd序列)上進行算子運算。圖3-7為spark streaming轉

化過程。

《Spark大資料分析實戰》——3.2節Spark Streaming

圖3-7中spark streaming将程式中對dstream的操作轉換為dstream有向無環圖(dag)。對每個時間片,dstream dag會産生一個rdd dag。在rdd中通過action算子觸發一個job,然後spark streaming會将job送出給jobmanager。jobmanager會将job插入維護的job隊列,然後jobmanager會将隊列中的job逐個送出給spark dagscheduler,然後spark會排程job并将task分發到各節點的executor上執行。

(1)優勢及特點

1)多範式資料分析管道:能和spark生态系統其他元件融合,實作互動查詢和機器學習等多範式組合處理。

2)擴充性:可以運作在100個節點以上的叢集,延遲可以控制在秒級。

3)容錯性:使用spark的lineage及記憶體維護兩份資料進行備份達到容錯。rdd通過lineage記錄下之前的操作,如果某節點在運作時出現故障,則可以通過備援備份資料在其他節點重新計算得到。

《Spark大資料分析實戰》——3.2節Spark Streaming

5)實時性:spark streaming也是一個實時計算架構,spark streaming能夠滿足除對實時性要求非常高(例如:高頻實時交易)之外的所有流式準實時計算場景。目前spark streaming最小的batch size的選取在0.5~2s(對比:storm目前最小的延遲是100ms左右)。

(2)适用場景

spark streaming适合需要曆史資料和實時資料結合進行分析的應用場景,對于實時性要求不是特别高的場景也能夠勝任。

3.2.2 spark streaming架構

通過圖3-10,讀者可以對spark streaming的整體架構有宏觀把握。

《Spark大資料分析實戰》——3.2節Spark Streaming

元件介紹:

network inputtracker:通過接收器接收流資料,并将流資料映射為輸入dstream。

job scheduler:周期性地查詢dstream圖,通過輸入的流資料生成spark job,将spark job送出給job manager進行執行。

jobmanager:維護一個job隊列,将隊列中的job送出到spark進行執行。

通過圖3-10可以看到d-stream lineage graph進行整體的流資料的dag圖排程,taskscheduler負責具體的任務分發,block tracker進行塊管理。在從節點,如果是通過網絡輸入的流資料會将資料存儲兩份進行容錯。input receiver源源不斷地接收輸入流,task execution負責執行主節點分發的任務,block manager負責塊管理。spark streaming整體架構和spark很相近,很多思想是可以遷移了解的。

3.2.3 spark streaming原理剖析

下面将由一個example示例,通過源碼呈現spark streaming的底層機制。

1.?初始化與接收資料

spark streaming通過分布在各個節點上的接收器,緩存接收到的流資料,并将資料包裝成spark能夠處理的rdd的格式,輸入到spark streaming,之後由spark streaming将作業送出到spark叢集進行執行,如圖3-11所示。

《Spark大資料分析實戰》——3.2節Spark Streaming

初始化的過程主要可以概括為兩點。

1)排程器的初始化。

排程器排程spark streaming的運作,使用者可以通過配置相關參數進行調優。

2)将輸入流的接收器轉化為rdd在叢集進行分布式配置設定,然後啟動接收器集合中的每個接收器。

針對不同的資料源,spark streaming提供了不同的資料接收器,分布在各個節點上的每個接收器可以認為是一個特定的程序,接收一部分流資料作為輸入。

使用者也可以針對自身生産環境狀況,自定義開發相應的資料接收器。

如圖3-12所示,接收器分布在各個節點上。通過下面代碼,建立并行的、在不同worker節點分布的receiver集合。

《Spark大資料分析實戰》——3.2節Spark Streaming

2.?資料接收與轉化

在“初始化與接收資料”部分中已經介紹過,receiver集合轉換為rdd,在叢集上分布式地接收資料流。那麼每個receiver是怎樣接收并處理資料流的呢?讀者可以通過圖3-13,對輸入流的處理有一個全面的了解。圖3-13為spark streaming資料接收與轉化的示意圖。

圖3-13的主要流程如下。

1)資料緩沖:在receiver的receive函數中接收流資料,将接收到的資料源源不斷地放入到blockgenerator.currentbuffer。

2)緩沖資料轉化為資料塊:在blockgenerator中有一個定時器(recurringtimer),将目前緩沖區中的資料以使用者定義的時間間隔封裝為一個資料塊block,放入到blockgenerator的blocksforpush隊列中(這個隊列)。

3)資料塊轉化為spark資料塊:在blockgenerator中有一個blockpushingthread線程,不斷地将blocksforpush隊列中的塊傳遞給blockmanager,讓blockmanager将資料存儲為塊。blockmanager負責spark中的塊管理。

4)中繼資料存儲:在pusharraybuffer方法中還會将已經由blockmanager存儲的中繼資料資訊(例如:block的id号)傳遞給receivertracker,receivertracker會将存儲的blockid放到對應streamid的隊列中。

《Spark大資料分析實戰》——3.2節Spark Streaming

圖中部分元件的作用如下:

感興趣的讀者可以參照圖中所示的類和方法進行更加具體的機制的了解。篇幅所限,對這個資料生成過程不再做具體的代碼剖析。

3.?生成rdd與送出spark job

spark streaming根據時間段,将資料切分為rdd,然後觸發rdd的action送出job,job被送出到job manager中的job queue中由job scheduler排程,之後job scheduler将job送出到spark的job排程器,然後将job轉換為大量的任務分發給spark叢集執行,如圖3-14所示。

《Spark大資料分析實戰》——3.2節Spark Streaming

job generator中通過下面的方法生成job進行排程和執行。

從下面的代碼可以看出job是從outputstream中生成的,然後再觸發反向回溯執行整個dstream dag,類似rdd的機制。

spark streaming在保證明時處理的要求下還能夠保證高吞吐與容錯性。使用者的資料分析中很多情況下也存在需要分析圖資料,運作圖算法,通過graphx可以簡便地開發分布式圖分析算法。

繼續閱讀