二、瞬間了解SparkStreaming本質
選擇SparkStreaming架構源碼研究、二次開發的原因
1、Spark起初隻有Spark Core基礎架構沒有其他的子架構(Spark SQL、Spark Streaming、Spark ML、Spark Graphx、Spark R等),在其後加入了各種子架構來滿足不同的需求。而分析這些子架構發現,選擇Spark Streaming架構來研究,是最明智的選擇,通過研究Spark Core上的Spark Streaming子架構,是邁向精通Spark力量源泉和解決所有問題之道。
2、Spark有很多子架構,我們選擇Spark Streaming而為什麼不用其他架構?
Spark SQL涉及了很多SQL文法細節的解析和優化,當然分析其解析、優化進而集中精力去研究Spark而言是一件重要的事情,但不是最重要的事情,是以Spark SQL不太适合作為具體的子架構值得我們去研究。
目前Spark R現在不成熟,支撐功能有限。
Mechine Learning在封裝了Vector向量、Metrics建構了衆多的算法庫,進而涉及了太多的數學知識,所有選擇ML其實也不是太好的選擇。
最後篩選出SparkStreaming子架構才是最佳的研究切入黃金點。
整個Spark的所有應用程式,哪些程式容易出問題?肯定是SparkStreaming程式是最容易出問題的,因為資料是不斷流入的,ss要動态的控制資料的流入、作業的切分、資料的處理,是以最容易出問題,但最容易出問題的地方同時也是最受關注的地方也是展示大資料最神奇魅力的地方。這些特色結合的話,也是最需要技術人才的地方。關注SparkStreaming在Spark的發展,你會很清晰知道,ss跟其他自架構不同之處,ss很象SparkCore上的一個應用程式。
正如世界萬物發展一樣,任何技術都有其關鍵點或轉折點,SparkStreaming運作在SparkCore上,是以很多性能調優都是建立在SparkCore上的;Spark是大資料的龍脈,SparkStreaming是龍脈的穴位。
接下來感受一下龍脈和穴位
研究SparkStreaming時,有困惑你的東西,SparkStreaming資料不斷流進來,根據batchInterval時間片不斷生成Job,并将Job送出叢集處理,如果能清晰的看到資料的流入和資料的處理,你心裡會很很踏實。
如何能清晰的看到資料的處理過程呢?隻需要一個小技巧:就是把SparkStreaming中的batchInterval放的足夠大,例如說從30秒調整為1分鐘一次batch,或者5分鐘一次batch,你會很清晰的看到整個流程式的運作過程。
以廣告點選線上黑名單的過濾為例
調整時間次元:

我們把時間從30秒調至300秒:
讀取SparkStreaming Socket端口的資料:
打包程式釋出至叢集部署環境:
檢查叢集程序:
通過webui檢查HDFS啟動情況:
啟動history-server監控程序及其對應的webui監控界面:
至此整個叢集環境啟動一切OK。
開始啟動運作SparkStreaming應用程式
啟動外部請求SparkStreaming服務端口的用戶端:
輸入待處理的資料流:
看結果如下:
看webui控制台:
點選連結進入後産生了0~4個Job:
有意思的是SparkStreaming應用程式啟動實際執行的是一個Job,但真正執行的是5個Job,其分别是Receiver Job,Output Job,Output Job,Output Job,Start Job.
第 0 個Job是不是我們邏輯中的代碼?不是的,不是reduceByKey的執行結果Job,如下圖:
SparkStreaming在啟動的過程中會自動啟動一些Job,如start操作:
SparkStreaming最像一個應用程式,就算是算一次,也執行了好幾個Job,就像spark應用程式一樣,可以啟動不同的Job完成不同的功能。
繼續看Job1:
通過Job告訴你内幕:通過追蹤Receiver發現其會産生makeRDD,實際上作為整個Job獨立的一個stage,隻在一台機器上執行,而且執行了1.5分鐘,剛才啟動SparkStreaming,沒有任務執行1.5分鐘的,如下圖:
思考一下什麼東西執行了1.5分鐘,而整個Job隻運作了2分鐘?
答案就是ReceiverTracker接收器運作的,它需要接收流入的資料。這個Job就是Receiver,并且執行了1.5分鐘,而啟動的Receiver就是一個Job。
結論:
SparkStreaming啟動Receiver的是一個Job,在具體的Cluster的Worker上的executor中,啟動Receiver是通過Job啟動的。通過作業的運作時間看出,整個SparkStreaming運作的時間是2分鐘,其中有個Job運作了1.5分鐘,這個Job就是Receiver,其實指的是Receiver啟動運作的時間,Receiver是在executor中運作的,也就是說SparkStreaming架構在啟動Recevier是通過Job啟動的。而且Receiver(可以啟動多個receiver接收資料)就是在一個executor中運作且通過一個Task去接收我們的資料:
從這個角度講Receiver接收資料和普通job有什麼差別?沒有差別。轉過來給我們啟發:在一個Spark application中可以啟動很多的job,這些job之間可以互相配合。例如:SparkStreaming架構預設啟動job給你接收資料,然後為後續的處理做準備,為你寫複雜的應用程式奠定了一個良好的基礎。這就是你寫非常複雜的Spark應用程式的黃金切入點,複雜的程式一般都是有多個job構成的。
上圖的Process_local即記憶體節點,SparkStreaming在預設情況下接收資料是memory_and_disk_ser_2的方式,也就是說接收的資料量比較少記憶體能存下的話預設情況下是不會存儲磁盤的,在這裡直接使用記憶體中。
看下第0個job:
在4個worker上啟動4個executor,是在最大化的使用計算資源,通過第1個job 不斷接收資料。
這裡處理資料有shuffle read,shuffle write,通過socketTextStream即rdd,這裡叫blockRdd,而且blockrdd來自于socketTextStream的方法:
其實是inputStream幫我們在固定時間間隔内會産生固定的rdd,接收資料是在一個executor的task中接收的,但現在處理資料是transform操作發生在executor裡面的發生在4個executor,這個結果告訴我們在一台機器上接收資料,但實際上是在四台機器上處理資料的。最大化利用叢集資源處理資料。SparkStreaming程式執行時就是一個batch級别的Job,裡面做了很多事情。整個處理,其實隻有一個Job真正在執行,但産生很多Job互相協調來完成複雜的業務處理,這個情況告訴我們SparkStreaming并不是網絡、部落格、書籍、官網上講的那麼簡單。
SparkStreaming本身是随着流進來的資料按照時間為機關生成job,然後觸發job在Cluster上執行的流式處理的引擎,它本身是加上以時間為次元的批處理,執行個體中以300秒為會産生一批資料,基于這一批資料會生成rdd,基于rdd會觸發job,rdd的生成、job的觸發,都是SparkStreaming架構去做的。SparkStreaming中有個至關隻要的東西叫DStream,我們每隔一定時間都會生成rdd,産生rdd的依賴或觸發job具體的執行。每隔時間,是以弄了一個DStream,DStream代表時空的概念,時間為次元,随着時間的推進不斷産生rdd,實際上DStream就是rdd的集合,隻不過是有時間的先後順序;空間次元實際上是DStream的處理層面,我們對DStream進行處理實際上是對DStream裡面的每個rdd的處理。整個時空是一個很大的概念,時間固定的話,可以鎖定對空間的操作,操作其實就是transform,對DStream的操作會建構DStream
Graph。
總結:
随着時間為次元有個DStream Graph,同時在時間次元下有個空間次元,空間次元就是操作,空間次元确定的情況下時間不斷推進的時候他就不斷把空間次元的DStream Graph執行個體化成rdd的graph,然後觸發具體的job進行執行。
一、解密SparkStreaming運作機制
sql,SparkStreaming,Spark ml,Spark graphx子架構都是後面開發出來的,我們要洞悉Spark Core 的話,SparkStreaming是最好的切入方式。
進入Spark官網,可以看到SparkCore和其他子架構的關系:
SparkStreaming啟動後,資料不斷通過inputStream流進來,根據時間劃分成不同的job、就是batchs of input data,每個job有一序列rdd的依賴。Rdd的依賴有輸入的資料,是以這裡就是不同的rdd依賴構成的batch,這些batch是不同的job,根據spark引擎來得出一個個結果。DStream是邏輯級别的,而RDD是實體級别的。DStream是随着時間的流動内部将集合封裝RDD。對DStream的操作,轉過來是對其内部的RDD操作。
我是使用SparkCore 程式設計都是基于rdd程式設計,rdd間有依賴關系,如下圖右側的依賴關系圖,SparkStreaming運作時,根據時間為次元不斷的運作。Rdd的dag依賴是空間次元,而DStream在rdd的基礎上加上了時間次元,是以構成了SparkStreaming的時空次元。
SparkStreaming在rdd的基礎上增加了時間次元,運作時可以清晰看到jobscheduler、mappartitionrdd、shuffledrdd、blockmaanager等等,這些都是SparkCore的内容,而DStream、jobgenerator、socketInputDstream等等都是SparkStreaming的内容,如下圖運作過程可以很清晰的看到:
現在通過SparkStreaming的時空次元來細緻說明SparkStreaming運作機制
時間次元:按照固定時間間隔不斷地産生job對象,并在叢集上運作:
包含有batch interval,視窗長度,視窗滑動時間等
空間次元:代表的是RDD的依賴關系構成的具體的處理邏輯的步驟,是用DStream來表示的:
1、需要RDD,DAG的生成模闆
2、TimeLine的job控制器、
3、InputStream和outputstream代表的資料輸入輸出
4、具體Job運作在Spark Cluster之上,此時系統容錯就至關重要
5、事務處理,在處理出現奔潰的情況下保證Exactly once的事務語義一緻性
随着時間的流動,基于DStream Graph不斷生成RDD Graph,也就是DAG的方式生成job,并通過Job Scheduler的線程池的方式送出給Spark Cluster不斷的執行,
由上圖可知,RDD 與 DStream之間的關系如下:
1、RDD是實體級别的,而 DStream 是邏輯級别的;
2、DStream是RDD的封裝模闆類,是RDD進一步的抽象;
3、DStream要依賴RDD進行具體的資料計算;
Spark Streaming源碼解析
1、StreamingContext方法中調用JobScheduler的start方法:
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("Master", 9999)
......//業務處理代碼略
ssc.start()
ssc.awaitTermination()
我們進入JobScheduler start方法的内部繼續分析:
1、JobScheduler 通過onReceive方法接收各種消息并存入enventLoop消息循環體中。
2、通過rateController對流入SparkStreaming的資料進行限流控制。
3、在JobScheduler的start内部會構造JobGenerator和ReceiverTacker,并且調用JobGenerator和ReceiverTacker的start方法。
ReceiverTacker的啟動方法:
1、ReceiverTracker啟動後會建立ReceiverTrackerEndpoint這個消息循環體,來接收運作在Executor上的Receiver發送過來的消息。
2、ReceiverTracker啟動後會在Spark Cluster中啟動executor中的Receivers。
JobGenerator的啟動方法:
1、JobGenerator啟動後會啟動以batchInterval時間間隔發送GenerateJobs消息的定時器
b. Spark Streaming Job 容錯架構和運作機制
注:本講内容基于Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。
上節課談到Spark Streaming是基于DStream程式設計。DStream是邏輯級别的,而RDD是實體級别的。DStream是随着時間的流動内部将集合封裝RDD。對DStream的操作,歸根結底還是對其RDD進行的操作。
如果将Spark Streaming放在坐标系中,并以Y軸表示對RDD的操作,RDD的依賴關系構成了整個job的邏輯應用,以X軸作為時間。随着時間的流逝,以固定的時間間隔(Batch Interval)産生一個個job執行個體,進而在叢集中運作。
同時也為大家詳細總結并揭秘 Spark Streaming五大核心特征:特征1:邏輯管理、特征2:時間管理、特征3:流式輸入和輸出、特征4:高容錯、特征5:事務處理。最後結合Spark Streaming源碼做了進一步解析。
**
由上一講可以得知,以固定的時間間隔(Batch Interval)産生一個個job執行個體。那麼在時間次元和空間次元組成的時空次元的Spark Streaming中,Job的架構和運作機制、及其容錯架構和運作機制是怎樣的呢?
那我們從愛因斯坦的相對時空講起吧:
a、時間和空間是緊密聯系的統一體,也稱為時空連續體。
b、時空是相對的,不同的觀察者看到的時間,長度,品質都可以不一樣。
c、對于兩個沒有聯系的事件,沒有絕對的先後順序。但是因果關系可以确定事件的先後,比如Job的執行個體産生并運作在叢集中,那麼Job執行個體的産生事件必然發生在Job運作叢集中之前。
就是說Job的執行個體産生和單向流動的時間之間,沒有必然的聯系;在這裡時間隻是一種假象。
怎麼更好的了解這句話呢?那我們就得從以下方面為大家逐漸解答。
什麼是Spark Streaming Job 架構和運作機制 ?
對于一般的Spark應用程式來說,是RDD的action操作觸發了Job的運作。那對于SparkStreaming來說,Job是怎麼樣運作的呢?我們在編寫SparkStreaming程式的時候,設定了BatchDuration,Job每隔BatchDuration時間會自動觸發,這個功能是Spark Streaming架構提供了一個定時器,時間一到就将編寫的程式送出給Spark,并以Spark job的方式運作。
通過案例透視Job架構和運作機制
案例代碼如下:
将上述代碼打成JAR包,再上傳到叢集中運作
叢集中運作結果如下
運作過程總圖如下
案例詳情解析
a、 首先通過StreamingContext調用start方法,其内部再啟動JobScheduler的Start方法,進行消息循環;
(StreamingContext.scala,610行代碼)
(JobScheduler.scala,83行代碼)
b、 在JobScheduler的start内部會構造JobGenerator和ReceiverTacker;
(JobScheduler.scala,82、83行代碼)
c、 然後調用JobGenerator和ReceiverTacker的start方法執行以下操作:
(JobScheduler.scala,79、98行代碼)
(ReceiverTacker.scala,149、157行代碼)
JobGenerator啟動後會不斷的根據batchDuration生成一個個的Job ;
(JobScheduler.scala,208行代碼)
ReceiverTracker的作用主要是兩點:
1.對Receiver的運作進行管理,ReceiverTracker啟動時會調用lanuchReceivers()方法,進而會使用rpc通信啟動Receiver(實際代碼中,Receiver外面還有一層包裝ReceiverSupervisor實作高可用)
(ReceiverTracker.scala,423行代碼)
2.管理Receiver的中繼資料,供Job對資料進行索引,中繼資料的核心結構是receivedBlockTracker
(ReceiverTracker.scala,106~112行代碼)
d、 在Receiver收到資料後會通過ReceiverSupervisor存儲到Executor的BlockManager中 ;
e、 同時把資料的Metadata資訊發送給Driver中的ReceiverTracker,在ReceiverTracker内部會通過ReceivedBlockTracker來管理接受到的中繼資料資訊;
這裡面涉及到兩個Job的概念:
為什麼使用線程池呢?
a 、作業不斷生成,是以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;
b 、有可能設定了Job的FAIR公平排程的方式,這個時候也需要多線程的支援;
Spark Streaming Job 容錯架構和運作機制
Spark Streaming是基于DStream的容錯機制,DStream是随着時間流逝不斷的産生RDD,也就是說DStream是在固定的時間上操作RDD,容錯會劃分到每一次所形成的RDD。
Spark Streaming的容錯包括 Executor 與 Driver兩方面的容錯機制 :
a、 Executor 容錯:
1. 資料接收:分布式方式、wal方式,先寫日志再儲存資料到Executor
2. 任務執行安全性 Job基于RDD容錯 :
b、Driver容錯 : checkpoint 。
基于RDD的特性,它的容錯機制主要就是兩種:
1. 基于checkpoint;
在stage之間,是寬依賴,産生了shuffle操作,lineage鍊條過于複雜和冗長,這時候就需要做checkpoint。
2. 基于lineage(血統)的容錯:
一般而言,spark選擇血統容錯,因為對于大規模的資料集,做檢查點的成本很高。
考慮到RDD的依賴關系,每個stage内部都是窄依賴,此時一般基于lineage容錯,友善高效。
總結: stage内部做lineage,stage之間做checkpoint。
<a target="_blank" href="http://blog.csdn.net/qq_21234493/article/details/51362044#"></a>