天天看點

Apache SparkStreaming 簡介和程式設計模型

1. 簡介

Apache SparkStreaming 簡介和程式設計模型

圖5.22 SparkStreaming[16]

    Spark Streaming是Spark API核心擴充,提供對實時資料流進行流式處理,具備可擴充、高吞吐和容錯等特性。Spark Streaming支援從多種資料源中提取資料,例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,并提供了一些進階的API來表示複雜處理算法,如map、reduce、join、windows等,最後可以将得到的結果存儲到分布式檔案系統(如HDFS)、資料庫或者其他輸出,Spark的機器學習和圖計算的算法也可以應用于Spark Streaming的資料流中。Spark Streaming的本質實際上是一個微批處理系統,正因如此,Spark Streaming具有一些現有的流處理模型所沒有的特性。它可以對故障節點和慢節點實作秒級的恢複,且具有高吞吐量。但其實時計算延遲是在秒級的,而現有的流處理系統(如Storm)一般是在毫秒級,是以Spark Streaming不适用于一些實時性要求很高的場景,如實時金融系統等。

    許多資料需要實時進行處理,也就是說資料産生時的價值最大。例如,一個社交網絡想在分鐘級别内确定某個交流話題的趨勢,搜尋網站想根據使用者的通路訓練模型,服務商想在秒級内通過挖掘日志找到錯誤資訊。設計适用于這些場景的模型極具挑戰性,因為對于一些應用場景(如機器學習、實時日志分析),叢集規模會達到百級以上,在這樣的規模下會存在兩個主要問題:節點故障(faults)和慢節點(slow nodes)問題。這兩個問題在大規模叢集下都是經常存在的,是以快速恢複在流系統應用中是十分重要的,否則流式應用可能無法及時做出關鍵的決定。但現有的一些流處理系統在這兩個問題的處理上都十分有限,大多數流處理系統(如Storm、TimeStream、MapReduce Online等)都是基于純實時的計算模型(a-record-at-a-time,來一條資料就處理一條資料),雖然這個模型能夠有較小的計算時延,但是很難解決節點故障和慢節點的問題。一些傳統的流式處理方法在小規模叢集下運作較好,但在大規模情況下卻面臨着實質性的問題。

    Spark Streaming提供了一種抽象的連續資料流,即Discretized Stream(離散流),一個離散流本質上就是一個序列化的RDD(Resilient Distributed Datasets,彈性分布式資料集)。離散流模型利用其并行恢複(ParallelRecovery)解決了節點故障和減輕了慢節點所帶來的問題,還保證了一緻性語義。

2. 系統架構

    離散流是Spark Streaming提供的基礎抽象,它代表持續性的資料流,這些資料流既可以從外部源(如Kafka、Flume等)擷取,也可以通過離散流的算子操作來獲得。實質上,離散流由一組時間上連續的RDD組成,每個RDD都包含着一定時間片的資料,如圖5.23所示:

Apache SparkStreaming 簡介和程式設計模型

圖5.23Discretized Stream[17]

Apache SparkStreaming 簡介和程式設計模型

圖5.24 SparkStreaming 整體架構[18]

    如圖5.24所示,這是Spark Streaming系統的整體架構,它将實時的流資料分解成一系列很小的批處理作業。批處理引擎使用的是Spark Core,也就是把輸入資料按照一定的時間片(如1s)分成一段一段資料,每一段資料都會轉換成Spark的RDD輸入到Spark Core中,然後再将離散流的操作轉換為RDD的算子操作,RDD算子操作産生的中間結果會儲存在記憶體中,最後整個流式計算可以将中間結果輸出到外部。

3. 一緻性語義和容錯

    對于流式計算,容錯性的重要性在第一小節已經詳細說明過了。首先,我們需要回憶Spark中RDD的容錯機制。RDD是一個彈性不可變的分布式資料集,Spark記錄着确定性的RDD轉換的操作繼承關系(lineage),是以隻要輸入的資料是可容錯的,任何一個RDD的分區出錯時,都可以根據lineage對原始輸入資料進行轉換操作,進而重新計算。圖5.25是Spark Streaming的一個RDD繼承關系圖:

Apache SparkStreaming 簡介和程式設計模型

圖5.25 統計網頁浏覽量的lineragegraph[18]

    圖中每個橢圓代表的是一個RDD,橢圓中的每一個圓形是一個RDD的分區,圖中的每一列的所有RDD代表的是一個離散流(圖中一共有3個離散流),間隔[0,1)和[1,2)代表的是不同時間分片,圖中每一行的最後一個RDD代表的是中間結果RDD。

    并行恢複(Parallel Recovery):系統會周期性的checkpoint RDD的資料,異步的備份到其他節點(預設複制數是2),因為RDD是不可變的,是以checkpoint不會鎖住目前時間片的執行。一個Spark Streaming的流式應用,系統會每分鐘對中間結果RDD進行checkpoint。當一個節點發生故障了,系統監測出丢失的RDD,系統會選擇上一個checkpoint的資料來進行重新計算。離散流可以利用充分利用分區的并行性來達到更快的恢複速度:1)與批處理系統很相似的是,每個節點上運作多個task,每一個時間片的轉換操作會在每個節點建立多個RDD分區(例如在100個節點的叢集上有1000個RDD分區)。這樣當一個節點發生故障時,可以讓RDD的不同分區并行恢複。2)繼承關系圖(lineage graph)可以使不同時間片的資料并行恢複。如果一個機器節點發生故障,系統在每一個時間片可能丢失一些map操作的輸出,從圖5.26的浏覽量統計應用的lineage graph可以看出,不同時間片的map可以并行地恢複計算,是以并行恢複的速度是比上遊緩存政策更快的。

    慢節點問題:在現有的純實時流處理系統中,基本都沒有解決慢節點的問題。離散流則與批處理系統類似,通過運作慢任務的副本來減輕慢節點帶來的影響。Spark Streaming最開始采用一種簡單的門檻值來判斷一個任務是否是慢任務:當一個任務是這個階段(stage)的中間任務運作時間的1.4倍,則判斷這是個慢任務。

    一緻性語義:離散流還有一個好處就是提供了強一緻性。例如,考慮一個系統統計男女網頁浏覽量的比例,一個節點統計男性網頁浏覽量,另一個節點統計女性網頁浏覽量。如果一個節點落後于另一個節點,那麼最終的結果也将有誤。一些系統(如Borealis)利用同步節點來避免這個問題,而Storm就直接忽略了這個問題。而且Storm隻能保證一個記錄最少被處理一次,可能存在錯誤記錄被多次處理,這就會使可變更的狀态因更新兩次而導緻結果不正确,雖然Storm提供了Trident可以確定每條記錄有且僅被處理一次,但是非常慢且需要使用者去實作。使用離散流可以保證一緻性是很明顯的,因為時間被劃分成時間片,每一個時間片的輸出RDD都與這個時間片的輸入和前面時間片有關(參考圖5.26),而RDD是不可變的,是以最終的結果是不會改變的。

4. Apache SparkStreaming程式設計模型

4.1 資料模型

    在第2節我們知道,Spark Streaming就是把資料流劃分為微批交給Spark Core處理的。Spark Core的處理的資料被抽象成了一個RDD,而Spark Streaming的處理資料被抽象成了一系列的DStream。實質上,離散流由一組時間上連續的RDD組成,每個RDD都包含着一定時間片的資料,如圖5.23所示。

4.2 計算模型

    Spark Streaming的程式設計模型可以看成是一個批處理Spark Core的程式設計模型,除了API是調用Spark Streaming的API,很多概念都是一樣的。在Spark Core編寫程式時,隻需要指定初始RDD的生成,然後對初始RDD進行一系列轉換的操作,不斷生成新的RDD,最後生成最終的結果RDD。

    Spark Streaming也是類似的計算模型,DStream本質是一組時間上連續的RDD組成的,RDD是依靠着分區(Partition)來保證并行性的。在編寫Spark Streaming程式的時候,我們需要指定初始DStream的輸入源,生成初始的DStream,然後定義一些轉換操作,這些DStream的操作最終都會轉換成RDD的操作,然後在每一個時間片内,可以獲得最終的結果DStream對應的RDD(也可以将結果選擇輸出到外部檔案中),可以參考後面單詞計數的執行個體分析。

PS:關于Spark Core中RDD的程式設計模型不屬于本章所要講的重點,在這裡就不做贅述。

4.3 基本操作

    從Spark Streaming的系統架構可知,Spark Streaming中對DStream的各種操作,最終會在Spark Core中轉換成RDD的操作,是以對DStream的操作是與Spark Core對RDD的操作是十分類似的。Spark Streaming在其資料模型DStream的模型下,為DStream提供了一系列的操作方法,這些操作大概可以分為3類:普通的轉換操作、視窗轉換操作和輸出操作。常用的普通轉換操作有flatMap、map、filter、reduceByKey、countByKey等操作,并且Spark Streaming支援将DStream的資料輸出到外部系統,如資料庫或檔案系統。具體Spark Streaming支援的所有操作,可以到官網檢視。

4.4 程式設計模型執行個體分析

       下面用最基本的wordcount例子來解釋其程式設計模型,其DStream的轉換如下所示:

Apache SparkStreaming 簡介和程式設計模型

     圖XXX:單詞計數的DStream轉換圖

    如上圖所示,一共定義了四個離散流,wordCounts的離散流是我們最終要的結果。LinesDStream可以從檔案系統、資料庫、kafka等擷取,然後對其進行flatMap操作,将每一行的文本分割成單詞,形成新的離散流words DStream,随即進行mapToPair操作,将其映射成<word,1>的模式,最後用reduceByKey操作對每個單詞進行計數,得到最終的結果離散流wordCountsDStream。

Java核心代碼如下:

//建立SparkConf對象
//與Spark Core的有一點不同,設定Master屬性的時候,使用local模式時,
// local後面必須跟一個方括号,裡面填寫一個數字,數字代表了用幾個線程執行Spark Streaming程式。
SparkConf conf = new SparkConf()
        .setMaster("local[2]")
        .setAppName("WordCountLocal");


//建立SparkStreamingContext對象,還需指定每隔多長時間的資料劃分為一個batch,這裡是1s
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));

//首先,建立一個DStream,代表了從一個資料源(這裡是socket)來的持續不斷的實時資料流
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999);

//将一行行的文本用flatMap切分成多個單詞,words DStream的RDD元素類型為一個個單詞
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String line) throws Exception {
        return Arrays.asList(line.split(" ")).iterator();
    }
});

/
//接着開始進行mapToPair操作,将單詞映射成<word,1>的pair格式,得到離散流pairs
JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String word) throws Exception {
        return new Tuple2<String, Integer>(word,1);
    }
});

//對離散流pairs進行reduceByKey操作,進行單詞計數,得到wordCounts離散流
JavaPairDStream<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
        return v1 + v2;
    }
});
//最後,每次計算完,就列印這一秒鐘的單詞計數情況
wordCounts.print();
//必須調用JavaStreamingContext的start()方法,整個Java Streaming Application才會啟動執行
//否則,不會執行
jsc.start();
try {
    jsc.awaitTermination();//等待應用程式的終止,可以使用CTRL+C手動停止
    //也可以通過調用JavaStreamingContext的stop()方法來終止程式
} catch (InterruptedException e) {
    e.printStackTrace();
}
jsc.close();

           

繼續閱讀