天天看點

spark Streaming介紹及執行個體編寫

1,引用一段官網介紹

spark Streaming是Spark core API的擴充,支援實時資料流的處理,并且具有可擴充,高吞吐量,容錯的特點。 

資料可以從許多來源擷取,如Kafka,Flume,Kinesis或TCP sockets,并且可以使用複雜的算法進行處理,這些算法使用諸如map,reduce,join和window等進階函數表示。

最後,處理後的資料可以推送到檔案系統,資料庫等。 實際上,您可以将Spark的機器學習和圖形處理算法應用于資料流。

spark Streaming介紹及執行個體編寫

輸入:可以從Kafka,Flume,HDFS等擷取資料 

計算:我們可以通過map,reduce,join等一系列算子通過spark計算引擎進行計算(基本和RDD一樣,使用起來更友善。) 

輸出:可以輸出到HDFS,資料庫,HBase等。

spark Streaming介紹及執行個體編寫

在内部,它的工作原理如下。 Spark Streaming接收實時輸入資料流并将資料分成批,然後由Spark引擎處理,以批量生成最終結果流。

嚴格來說spark streaming 并不是一個真正的實時架構,因為他是分批次進行處理的。

Spark Streaming提供了一個高層抽象,稱為discretized stream或DStream,它表示連續的資料流。 DStream可以通過Kafka,Flume和Kinesis等來源的輸入資料流建立,也可以通過在其他DStream上應用進階操作來建立。在内部,DStream表示為一系列RDD。

2,編寫一個稍複雜的資料累加示例

package cn.itcast.spark.day5
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object StateFulWordCount {


  //Seq這個批次某個單詞的次數
  //Option[Int]:以前的結果

  //分好組的資料
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    //iter.map{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
    //iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
    iter.map{ case(word, current_count, history_count) => (word, current_count.sum + history_count.getOrElse(0)) }
  }

  def main(args: Array[String]) {
    //LoggerLevels.setStreamingLogLevels()
    //StreamingContext
    val conf = new SparkConf().setAppName("StateFulWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    //updateStateByKey必須設定setCheckpointDir, 計算過程中生成的rdd的公共緩存路徑
    sc.setCheckpointDir("E://ck")
    val ssc = new StreamingContext(sc, Seconds(5)) //設定每五秒産生一個批次

    val ds = ssc.socketTextStream("192.168.2.201", 8888) //在虛拟機上啟動一個nc 8888端口  ./netcat -l -p 8888
    //DStream是一個特殊的RDD
    //a a a a b b b b c c c
    val result = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultParallelism), true) //調用方法 根據key值統計

    result.print()
    ssc.start()// 開始計算
    ssc.awaitTermination()// 等待計算結束
    //ssc.stop() 隻能手動停止
  }
}
           

在虛拟機上啟動一個nc socket端口 

./netcat -l -p 8888

啟動程式,然後再8888端口下輸入幾個測試資料

spark Streaming介紹及執行個體編寫

檢視程式背景輸出:

spark Streaming介紹及執行個體編寫

到這裡,一個資料累加執行個體就完成了。

3,Discretized Streams (DStreams) 同樣來自官網介紹

Discretized Streams或DStream是Spark Streaming提供的基本抽象。 它表示連續的資料流,即從源接收的輸入資料流或通過轉換輸入流生成的已處理資料流。 在内部,DStream由連續的RDD系清單示,這是Spark對不可變的分布式資料集的抽象。 DStream中的每個RDD都包含來自特定時間間隔的資料,如下圖所示。

spark Streaming介紹及執行個體編寫

在DStream上應用的任何操作都會轉化為對每個RDD的操作。例如,wordcount案例中(下面會進行代碼示範),flatMap操作應用于DStream行中的每個RDD,以生成單詞DStream的RDD。 這在下圖中顯示。 

spark Streaming介紹及執行個體編寫

這些基礎RDD轉換由Spark引擎計算。 DStream操作隐藏了大部分這些細節,并為開發人員提供了更進階别的API以友善使用。

想看更詳細的講解請自行檢視官網:(http://spark.apache.org/docs/latest/streaming-programming-guide.html)使用浏覽器将網頁文本整體翻譯成中文即可,英語7、8級的請随意。

繼續閱讀