Spark Streaming初探
Spark Streaming是一個基于Spark核心的流式計算的擴充。
主要有以下兩個特點:
1. 高吞吐量
2. 容錯能力強
1.原理:
Spark Streaming支援多種資料源的輸入,向Flume,Kafka,HDFS,ZeroMQ,Twitter以及原始的TCP sockets。
資料可以使用Spark的RDD的Transformation,也可以應用很多Spark内置的機器學習算法,還有圖計算。
以下是Spark官方截圖:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0NXYFhGd192UvwVe0lmdhJ3ZvwFM38CXlZHbvN3cpR2Lc1TPB10QGtWUCpEMJ9CXsxWam9CXwADNvwVZ6l2c052bm9CXUJDT1wkNhVzLcRnbvZ2Lc1TPRJmd5IzY3ljMiZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39jN5czMwAjM3EDNxQDM0EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
1.1主要實作原理
Spark Streaming會接收線上的資料流,然後将資料流分成獨立的批次。
然後Spark Streaming Engine就會分别處理這些獨立的批資料,最後生成分批的結果。
Spark Streaming 提出了一種高度的抽象叫 DStream(discretized stream)離散流,代表了一段持續的資料流。
建立DStream可以從檔案建立,也可以從Kafka或者Flume,代表了一個RDD的序列。
2.WorkCount流式計算執行個體
摘自官方例子的代碼:
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
首先我們需要建立一個發送資料的端口連接配接:
nc -lk 9999
程序會阻塞,當我們輸入資料的時候,會向9999這個端口發送我們的資料。
首先我們先建立一個Spark Streaming engine 現在我們需要Spark Streaming Engine來監聽這個端口。
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
這裡args(1)就是ip, args(2)就是port 我們看到這裡實踐是建立了一個socketTextStream,其實還有很多選擇。 StorageLevel是Memory_only,很明顯,建立的是一個RDD
val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
WordCount的邏輯很簡單,這裡不贅述。 最後要啟動Spark Streaming要使用這句:
ssc.start()
ssc.awaitTermination()
運作結果: 1.在終端輸入
# nc -lk 9999
what is your name ?
my name is sheng li ~ haha
-------------------------------------------
Time: 1397470893000 ms
-------------------------------------------
(is,1)
(what,1)
(your,1)
(?,1)
(name,1)
-------------------------------------------
Time: 1397470894000 ms
-------------------------------------------
Time: 1397470962000 ms
-------------------------------------------
(haha,1)
(my,1)
(is,1)
(~,1)
(li,1)
(sheng,1)
(name,1)
大緻流程: 1.Spark Streaming Engine 監聽9999這個端口發送的資訊,當作資料源,其實是一個RDD。
2.這裡會有一個lines是一個DStream,是一個不可變的分布式彈性資料集。 lines實際上是帶一個帶版本的RDD,每一個時刻它代表的是不同的資料,比如第一個輸入的是what is your name,那麼這個時刻Lines代表的是這句話。 第二次輸入的是my name is shengli ~ haha 這是另一個時刻的Lines。 3.DStream可以應用和RDD一樣的API,其中Transformation變形成其它的RDD。 這裡會變成Words DStream。
4.最後計算完成,輸出wordCounts.print()。
總結:
實際上每次的input都是一個[email protected] N
每個[email protected] N 都可以被transform成其它的RDD進行處理。
看起來相當簡單。
再來看下官方的解釋圖,會很好的了解:
參考文獻:http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html
原創文章,轉載注明出處http://blog.csdn.net/oopsoom/article/details/23692079