天天看點

基于spark的流式資料處理—SparkStreaming開發demo—檔案流

概述

本文主要完成一個spark streaming的檔案流demo,如果是編寫一個獨立的Spark Streaming程式,而不是在spark-shell中運作,則需要通過如下方式建立StreamingContext對象:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 建立StreamingContext對象
val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
           

檔案流

在spark-shell中建立檔案流,我這裡的建立腳本如下:

cd /opt/IdeaProjects/
mkdir streaming
mkdir streaming/logfile
cd streaming/logfile/
           

運作完之後,記住這個檔案路徑:

/opt/IdeaProjects/streaming/logfile
           

建立檔案流監聽代碼:

package sparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @Author: Garrett Wang
 * @Description: 測試spark streaming的檔案輸入流測試
 * @Date:Create:in 2019/12/25 17:04
 * @Modified By:
 * @Parameters
 */
object LzSparkStreamingFileInput {

  def main(args: Array[String]): Unit = {

    // 建立StreamingContext對象
    val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(1))

    // 建立檔案流,這裡的檔案路徑切記,如果是本地檔案已定是三個斜杠,當然也可以hdfs檔案
    val lines = ssc.textFileStream("file:///opt/IdeaProjects/streaming/logfile")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)
    wordCounts.print()

    // 執行這一步之後,程式就開始自動進入循環監聽狀态
    ssc.start()
    // 當出現異常時退出
    ssc.awaitTermination()

  }

}

           

在剛才的目錄下建立一個檔案,并編輯内容,如下所示:

vim log1.txt
           

内容如下:

Hello,my name is Garrett Wang
Hello,my name is Garrett Wang
           

運作上面代碼啟動流計算,運作指令如下:

spark2-submit --class sparkStreaming.LzSparkStreamingFileInput /opt/IdeaProjects/LzScalaSparkTest/target/scala-2.11/lzscalasparktest_2.11-0.3.jar
           

運作結果如下,每秒鐘都會又一次重新整理:

基于spark的流式資料處理—SparkStreaming開發demo—檔案流

在監聽的檔案路徑下面再建立一個檔案,log2.txt,運作如下指令:

vim log2.txt
           

内容就用上述代碼内容:

package sparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @Author: Garrett Wang
 * @Description: 測試spark streaming的檔案輸入流測試
 * @Date:Create:in 2019/12/25 17:04
 * @Modified By:
 * @Parameters
 */
object LzSparkStreamingFileInput {

  def main(args: Array[String]): Unit = {

    // 建立StreamingContext對象
    val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(1))

    // 建立檔案流,這裡的檔案路徑切記,如果是本地檔案已定是三個斜杠,當然也可以hdfs檔案
    val lines = ssc.textFileStream("file:///opt/IdeaProjects/streaming/logfile")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)
    wordCounts.print()

    // 執行這一步之後,程式就開始自動進入循環監聽狀态
    ssc.start()
    // 當出現異常時退出
    ssc.awaitTermination()

  }

}
           

輸出結果如下:

基于spark的流式資料處理—SparkStreaming開發demo—檔案流

繼續閱讀