天天看點

Spark Streaming初探Spark Streaming初探

Spark Streaming初探

Spark Streaming是一個基于Spark核心的流式計算的擴充。

主要有以下兩個特點:

1. 高吞吐量

2. 容錯能力強

1.原理:

Spark Streaming支援多種資料源的輸入,向Flume,Kafka,HDFS,ZeroMQ,Twitter以及原始的TCP sockets。

資料可以使用Spark的RDD的Transformation,也可以應用很多Spark内置的機器學習算法,還有圖計算。

以下是Spark官方截圖:

Spark Streaming初探Spark Streaming初探

1.1主要實作原理

Spark Streaming會接收線上的資料流,然後将資料流分成獨立的批次。

然後Spark Streaming Engine就會分别處理這些獨立的批資料,最後生成分批的結果。

Spark Streaming初探Spark Streaming初探

Spark Streaming 提出了一種高度的抽象叫 DStream(discretized stream)離散流,代表了一段持續的資料流。

建立DStream可以從檔案建立,也可以從Kafka或者Flume,代表了一個RDD的序列。

2.WorkCount流式計算執行個體

摘自官方例子的代碼:

package org.apache.spark.streaming.examples

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
        "In local mode, <master> should be 'local[n]' with n > 1")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           

首先我們需要建立一個發送資料的端口連接配接:

nc -lk 9999
           

程序會阻塞,當我們輸入資料的時候,會向9999這個端口發送我們的資料。

首先我們先建立一個Spark Streaming engine 現在我們需要Spark Streaming Engine來監聽這個端口。

val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
           

這裡args(1)就是ip, args(2)就是port 我們看到這裡實踐是建立了一個socketTextStream,其實還有很多選擇。 StorageLevel是Memory_only,很明顯,建立的是一個RDD

val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
           

WordCount的邏輯很簡單,這裡不贅述。 最後要啟動Spark Streaming要使用這句:

ssc.start()
ssc.awaitTermination()
           

運作結果: 1.在終端輸入

# nc -lk 9999
what is your name ? 
my name is sheng li ~ haha
           
-------------------------------------------
Time: 1397470893000 ms
-------------------------------------------
(is,1)
(what,1)
(your,1)
(?,1)
(name,1)

-------------------------------------------
Time: 1397470894000 ms
           
-------------------------------------------
Time: 1397470962000 ms
-------------------------------------------
(haha,1)
(my,1)
(is,1)
(~,1)
(li,1)
(sheng,1)
(name,1)
           

大緻流程: 1.Spark Streaming Engine 監聽9999這個端口發送的資訊,當作資料源,其實是一個RDD。

2.這裡會有一個lines是一個DStream,是一個不可變的分布式彈性資料集。 lines實際上是帶一個帶版本的RDD,每一個時刻它代表的是不同的資料,比如第一個輸入的是what is your name,那麼這個時刻Lines代表的是這句話。 第二次輸入的是my name is shengli ~ haha 這是另一個時刻的Lines。 3.DStream可以應用和RDD一樣的API,其中Transformation變形成其它的RDD。 這裡會變成Words DStream。

4.最後計算完成,輸出wordCounts.print()。

總結:

實際上每次的input都是一個[email protected] N

每個[email protected] N 都可以被transform成其它的RDD進行處理。

看起來相當簡單。

再來看下官方的解釋圖,會很好的了解:

Spark Streaming初探Spark Streaming初探
Spark Streaming初探Spark Streaming初探

參考文獻:http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html

原創文章,轉載注明出處http://blog.csdn.net/oopsoom/article/details/23692079

繼續閱讀