天天看點

大資料系列之Spark Streaming

Spark Streaming是建構在Spark上的實時流計算架構,可以進行實時流資料處理。本文簡要介紹了Spark Streaming的基本概念和基本算子的使用。

1、Spark Streaming介紹

Spark是一個類似于MapReduce的分布式計算架構,其核心是彈性分布式資料集,提供了比MapReduce更豐富的模型,可以在快速在記憶體中對資料集進行多次疊代,以支援複雜的資料挖掘算法和圖形計算算法。Spark Streaming是一種建構在Spark上的實時計算架構,它擴充了Spark處理大規模流式資料的能力。Spark Streaming的優勢在于:

  1. 能運作在100+的結點上,并達到秒級延遲。
  2. 使用基于記憶體的Spark作為執行引擎,具有高效和容錯的特性。
  3. 能內建Spark的批處理和互動查詢。
  4. 為實作複雜的算法提供和批處理類似的簡單接口。
    大資料系列之Spark Streaming

Spark Streaming使用“微批次”的架構,把流式計算當作一系列連續的小規模批處理來對待。Spark Streaming從各種輸入源中讀取資料,并把資料分組為小的批次,新的批次按均勻的時間間隔建立出來。在每個時間區間開始的時候,一個新的批次就建立出來,在該區間内收到的資料都會被添加到這個批次中。在時間區間結束時,批次停止增長。時間區間的大小是由批次間隔這個參數決定的,批次間隔一般設在500毫秒到幾秒之間,由應用開發者配置。每個輸入批次都形成一個RDD,以Spark作業的方式處理并生成其他的RDD。處理的結果可以以批處理的方式傳給外部系統。

大資料系列之Spark Streaming

從原理上看,把傳統的spark批處理程式變成streaming程式,spark需要建構四個内容:

  1. 一個靜态的RDD DAG的模闆,來表示處理邏輯;
  2. 一個動态的工作控制器,将連續的streaming data切分資料片段,并按照模闆複制出新的RDD DAG的執行個體,對資料片段進行處理;
  3. Receiver進行原始資料的産生和導入;Receiver将接收到的資料合并為資料塊并存到記憶體或硬碟中,供後續batch RDD進行消費;
  4. 對長時運作任務的保障,包括輸入資料的失效後的重構,處理任務的失敗後的重調。
大資料系列之Spark Streaming

下圖提供了Spark的driver、workers、流資料源和目标之間的資料流:

大資料系列之Spark Streaming

從Spark Streaming Context開始,由圖中的ssc.start()表示:

  1. 當Spark Streaming Context啟動時,driver配置設定給executor(即workers)一個長期任務
  2. Executor(本圖中的Executor1)上的接收者從流式源接收資料流。随着輸入的資料流,接收器将流分成塊并将這些塊儲存在記憶體中
  3. 這些資料塊也被複制到另一個Executor,以避免資料丢失
  4. 資料塊的 ID 資訊 被傳送到 driver 中的 Block Management Master上
  5. 對于Spark Streaming Context中配置的每個批處理間隔(通常這是每1秒),driver将啟動Spark任務處理塊。然後将這些塊儲存到任意數量的目标資料存儲中,包括雲存儲(例如S3/WASB等),關系資料存儲(例如MySQL,PostgreSQL等),以及NoSQL存儲

2、Sample程式-Wordcount

1)Import StreamingContext,建立本地StreamingContext,包括2個線程,批量間隔為1s

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
           

2)建立DStream,TCP源資料作為資料流,指定hostname和port

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
           

3)對DStream中的記錄進行分詞處理

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
- flatMap:一對多的DStream操作,從源DStream記錄中分成多個DStream記錄

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
- map(lambda word: (word, 1)):生成(word, 1)的DStream對

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
           

4)開始處理操作

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate
           

5)開兩個終端,其中一個輸入網絡指令輸入詞組:

[[email protected] ipynotebook]# nc -lk 9999
           

6)另一個終端執行腳本如下:

[[email protected] spark-2.3.0]# spark-submit /usr/local/spark/ipynotebook/02-Streaming-01.py localhost 9999
           

7)輸出結果如下:

大資料系列之Spark Streaming

3、Spark Streaming操作

3.1 初始化StreamingContext

從SparkContext中建立StreamingContext:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
           
  • Master是Spark、Mesos或YARN或“local[*]”運作在local mode
  • Batch interval是根據應用的延遲設定的
3.2 Input DStream和Receivers

Spark Streaming有兩種内置的Streaming源:

  • Basic source:StreamingContext API可用的源,比如檔案系統、socket連接配接
  • Advanced source:比如kafka、flume等

1)File system:HDFS API相容的任何檔案系統,通過以下方式建立DStream:

StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
           

注:Python API中不支援filesystem,隻支援textFileStream

streamingContext.textFileStream(dataDirectory)
           

2)Advanced source:需要和外部的non-spark庫進行互動,其中像kafka和flume有很複雜的依賴包,如果希望在Spark Shell中使用這些源,則需要将依賴包添加到classpath

大資料系列之Spark Streaming
3.3 DStream的轉換操作
  • UpdateStateByKey操作

UpdateStateByKey允許維護任意一種狀态,同時不斷更新新的資訊,需要完成兩步:

  1. 定義狀态,狀态可以是任意的資料類型
  2. 定義狀态更新函數,指定函數使用之前的狀态和新輸入的值更新狀态
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

runningCounts = pairs.updateStateByKey(updateFunction)
           
  • Transform操作

允許任意的RDD-RDD函數應用于DStream,例如,可以通過将輸入資料流與預先計算的垃圾資訊(也可以使用Spark生成)進行實時資料清理,然後基于此進行過濾。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
           
  • Window操作

Spark Streaming支援視窗計算,這樣允許在滑動建立進行轉換,如下圖所示:

大資料系列之Spark Streaming

當視窗滑過源DStream,源RDDs被組合産生視窗DStream的RDDs。Windows需要2個參數:

  1. 視窗長度:視窗持續的時間
  2. 滑動間隔:視窗操作的間隔
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
           
  • Join操作

1)Stream-stream join

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
           

2)Stream-datasets join

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
           
3.4 DStream的Output操作
  • foreachRDD設計模式

最通用的輸出操作,将函數func應用于從流中生成的每個RDD。此功能應将每個RDD中的資料推送到外部系統,例如将RDD儲存到檔案,或通過網絡将其寫入資料庫。注意,函數func在流應用程式的驅動程式程序中執行,并且通常會在其中執行RDD操作,強制執行流RDD的計算。通過維護一個靜态的對象連接配接池,在多個RDDs/batches之間重用連接配接,降低消耗:

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
           

注:連接配接池中的連接配接按需建立,并且一段時間不使用時會逾時,實作了将資料發送到外部系統的最有效方法。

3.5 DataFrame和SQL操作

Streaming data也可以使用DataFrame和SQL操作,首先需要通過StreamingContext正在使用的SparkContext建立SparkSession,隻有這樣在driver failure時候才能完成重新開機。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)
           
3.6 Checkpoint機制

Streaming應用為了保證7x24運作,需要對與程式邏輯無關的故障具有恢複能力。為了實作這種可能,Spark Streaming需要對一個fault-tolerant系統checkpoint足夠多的資訊,這樣可以從故障中進行恢複。有兩種類型的資料可以checkpoint:

  1. MetaData checkpointing:将流式計算的定義資訊儲存在容錯存儲系統如HDFS中,用于從故障節點進行恢複,metadata包括
    1. Configuration:用來建立Streaming應用
    2. DStream操作:Streaming應用中定義的DStream操作集合
    3. Incomplete batches:在隊列中尚未完成的batches
  2. Data checkpointing:将生成的RDDs儲存在可靠的存儲系統中,這對于将多個批次的資料組合在一起的有狀态轉換是很有必要的。在這樣的轉換中,生成的RDD依賴于之前批次的RDD,導緻依賴鍊的長度随着時間的推移而不斷增加。為了避免恢複時間的這種無限增長(與依賴鍊成比例),有狀态轉換的中間RDD被周期性地檢查為可靠存儲(如HDFS)以切斷依賴鍊

總而言之,metadata checkpointing主要用于從驅動程式故障中恢複,而資料或RDD checkpointing對于使用有狀态轉換的基本功能是必須的。

  • Checkpointing的時間

當遇到以下情形時,需要使用到checkpoint功能:

  1. 使用了有狀态的轉換操作,應用中使用到updateStateByKey或reduceByKeyAndWindow,需要提供checkpoint目錄以備周期性的checkpoint動作
  2. 從驅動故障中恢複應用,meta checkpoint用于恢複運作資訊

    注:沒有上述狀态轉換的簡單streaming流應用程式可以啟用checkpointing

  • 配置Checkpoint

Checkpointing通過在容錯和高可靠性的檔案系統中設定目錄進行配置,用于儲存checkpoint資訊。這個功能使用streamingContext.checkpoint(checkpointDirectory)來實作,如下所示:

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)  # new context
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()
           
  • 如果checkpointDirectory已經存在,context會從checkpoint data中重建
  • 如果checkpointDirectory不存在,會調用functionToCreateContext建立一個context
參考資料
  1. http://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://github.com/lw-lin/CoolplaySpark/tree/
  3. https://github.com/apache/spark/tree/v2.3.1/examples/src/main/python/streaming

轉載請注明原文位址:https://blog.csdn.net/solihawk/article/details/116420919

文章會同步在公衆号“牧羊人的方向”更新,感興趣的可以關注公衆号,謝謝!

大資料系列之Spark Streaming

繼續閱讀