天天看點

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

【注】該系列文章以及使用到安裝包/測試資料 可以在《傾情大奉送–Spark入門實戰系列》擷取

1 執行個體示範

1.1 流資料模拟器

1.1.1 流資料說明

在執行個體示範中模拟實際情況,需要源源不斷地接入流資料,為了在示範過程中更接近真實環境将定義流資料模拟器。該模拟器主要功能:通過Socket方式監聽指定的端口号,當外部程式通過該端口連接配接并請求資料時,模拟器将定時将指定的檔案資料随機擷取發送給外部程式。

1.1.2 模拟器代碼

import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source

object StreamingSimulation {
  // 定義随機擷取整數的方法
  def index(length: Int) = {
    import java.util.Random
    val rdm = new Random
    rdm.nextInt(length)
  }

  def main(args: Array[String]) {
    // 調用該模拟器需要三個參數,分為為檔案路徑、端口号和間隔時間(機關:毫秒)
    if (args.length != ) {
      System.err.println("Usage: <filename> <port> <millisecond>")
      System.exit()
    }

    // 擷取指定檔案總的行數
    val filename = args()
    val lines = Source.fromFile(filename).getLines.toList
    val filerow = lines.length

    // 指定監聽某端口,當外部程式請求時建立連接配接
    val listener = new ServerSocket(args().toInt)
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args().toLong)
            // 當該端口接受請求時,随機擷取某行資料發送給對方
            val content = lines(index(filerow))
            println(content)
            out.write(content + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.1.3 生成打封包件

【注】可以參見第3課《Spark程式設計模型(下)–IDEA搭建及實戰》進行打包

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

在打包配置界面中,需要在Class Path加入:/app/scala-2.10.4/lib/scala-swing.jar /app/scala-2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar ,各個jar包之間用空格分開,

點選菜單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作,使用如下指令複制打封包件到Spark根目錄下

cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-/
ll /app/hadoop/spark-/
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.2 執行個體1:讀取檔案示範

1.2.1 示範說明

在該執行個體中Spark Streaming将監控某目錄中的檔案,擷取在間隔時間段内變化的資料,然後通過Spark Streaming計算出改時間段内單詞統計數。

1.2.2 示範代碼

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object FileWordCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]")

    // 建立Streaming的上下文,包括Spark的配置和時間間隔,這裡時間為間隔秒
    val ssc = new StreamingContext(sparkConf, Seconds())

    // 指定監控的目錄,在這裡為/home/hadoop/temp/
    val lines = ssc.textFileStream("/home/hadoop/temp/")

    // 對指定檔案夾變化的資料進行單詞統計并且列印
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
    wordCounts.print()

     // 啟動Streaming
    ssc.start()
    ssc.awaitTermination()
  }
}
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.2.3 運作代碼

第一步 建立Streaming監控目錄

建立/home/hadoop/temp為Spark Streaming監控的目錄,通過在該目錄中定時添加檔案内容,然後由Spark Streaming統計出單詞個數

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第二步 使用如下指令啟動Spark叢集

$cd /app/hadoop/spark-
$sbin/start-all.sh
           

第三步 在IDEA中運作Streaming程式

在IDEA中運作該執行個體,由于該執行個體沒有輸入參數故不需要配置參數,在運作日志中将定時列印時間戳。如果在監控目錄中加入檔案内容,将輸出時間戳的同時将輸出單詞統計個數。

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.2.4 添加文本及内容

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.2.5 檢視結果

第一步 檢視IDEA中運作情況

在IDEA的運作日志視窗中,可以觀察到輸出時間戳的同時将輸出單詞統計個數

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第二步 通過webUI監控運作情況

在http://hadoop1:4040監控Spark Streaming運作情況,可以觀察到每20秒運作一次作業

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

并且與其他運作作業相比在監控菜單增加了”Streaming”項目,點選可以看到監控内容:

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.3 執行個體2:網絡資料示範

1.3.1 示範說明

在該執行個體中将由4.1流資料模拟以1秒的頻度發送模拟資料,Spark Streaming通過Socket接收流資料并每20秒運作一次用來處理接收到資料,處理完畢後列印該時間段内資料出現的頻度,即在各處理段時間之間狀态并無關系。

1.3.2 示範代碼

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds())

    // 通過Socket擷取資料,該處需要提供Socket的主機名和端口号,資料儲存在記憶體和硬碟中
    val lines = ssc.socketTextStream(args(), args().toInt, StorageLevel.MEMORY_AND_DISK_SER)

    // 對讀入的資料進行分割、計數
    val words = lines.flatMap(_.split(","))
    val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.3.3 運作代碼

第一步 啟動流資料模拟器

啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送/home/hadoop/upload/class7目錄下的people.txt資料檔案(該檔案可以在本系列配套資源目錄/data/class7中找到),其中people.txt資料内容如下:

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

模拟器Socket端口号為9999,頻度為1秒,

$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt  
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

在沒有程式連接配接時,該程式處于阻塞狀态

第二步 在IDEA中運作Streaming程式

在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1和端口号為9999

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.3.4 檢視結果

第一步 觀察模拟器發送情況

IDEA中的Spark Streaming程式運作與模拟器建立連接配接,當模拟器檢測到外部連接配接時開始發送測試資料,資料是随機的在指定的檔案中擷取一行資料并發送,時間間隔為1秒

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第二步 在監控頁面觀察執行情況

在webUI上監控作業運作情況,可以觀察到每20秒運作一次作業

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第三步 IDEA運作情況

在IDEA的運作視窗中,可以觀測到的統計結果,通過分析在Spark Streaming每段時間内單詞數為20,正好是20秒内每秒發送總數。

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.4 執行個體3:銷售資料統計示範

1.4.1 示範說明

在該執行個體中将由4.1流資料模拟器以1秒的頻度發送模拟資料(銷售資料),Spark Streaming通過Socket接收流資料并每5秒運作一次用來處理接收到資料,處理完畢後列印該時間段内銷售資料總和,需要注意的是各處理段時間之間狀态并無關系。

1.4.2 示範代碼

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object SaleAmount {
  def main(args: Array[String]) {
    if (args.length != ) {
      System.err.println("Usage: SaleAmount <hostname> <port> ")
      System.exit()
    }
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds())

   // 通過Socket擷取資料,該處需要提供Socket的主機名和端口号,資料儲存在記憶體和硬碟中
    val lines = ssc.socketTextStream(args(), args().toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.map(_.split(",")).filter(_.length == )
    val wordCounts = words.map(x=>(, x().toDouble)).reduceByKey(_ + _)

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.4.3 運作代碼

第一步 啟動流資料模拟器

啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送第五課/home/hadoop/upload/class5/saledata目錄下的tbStockDetail.txt資料檔案(參見第五課《5.Hive(下)–Hive實戰》中2.1.2資料描述,該檔案可以在本系列配套資源目錄/data/class5/saledata中找到),其中表tbStockDetail字段分别為訂單号、行号、貨品、數量、金額,資料内容如下:

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

模拟器Socket端口号為9999,頻度為1秒

$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class5/saledata/tbStockDetail.txt  
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1和端口号為9999

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.4.4 檢視結果

第一步 觀察模拟器發送情況

IDEA中的Spark Streaming程式運作與模拟器建立連接配接,當模拟器檢測到外部連接配接時開始發送銷售資料,時間間隔為1秒

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第二步 IDEA運作情況

在IDEA的運作視窗中,可以觀察到每5秒運作一次作業(兩次運作間隔為5000毫秒),運作完畢後列印該時間段内銷售資料總和。

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第三步 在監控頁面觀察執行情況

在webUI上監控作業運作情況,可以觀察到每5秒運作一次作業

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.5 執行個體4:Stateful示範

1.5.1 示範說明

該執行個體為Spark Streaming狀态操作,模拟資料由4.1流資料模拟以1秒的頻度發送,Spark Streaming通過Socket接收流資料并每5秒運作一次用來處理接收到資料,處理完畢後列印程式啟動後單詞出現的頻度,相比較前面4.3執行個體在該執行個體中各時間段之間狀态是相關的。

1.5.2 示範代碼

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object StatefulWordCount {
  def main(args: Array[String]) {
    if (args.length != ) {
      System.err.println("Usage: StatefulWordCount <filename> <port> ")
      System.exit()
    }
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 定義更新狀态方法,參數values為目前批次單詞頻度,state為以往批次單詞頻度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft()(_ + _)
      val previousCount = state.getOrElse()
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 建立StreamingContext,Spark Steaming運作時間間隔為秒
    val ssc = new StreamingContext(sc, Seconds())
    // 定義checkpoint目錄為目前目錄
    ssc.checkpoint(".")

    // 擷取從Socket發送過來資料
    val lines = ssc.socketTextStream(args(), args().toInt)
    val words = lines.flatMap(_.split(","))
    val wordCounts = words.map(x => (x, ))

    // 使用updateStateByKey來更新狀态,統計從運作開始以來單詞總的次數
    val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.5.3 運作代碼

第一步 啟動流資料模拟器

啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送/home/hadoop/upload/class7目錄下的people.txt資料檔案(該檔案可以在本系列配套資源目錄/data/class7中找到),其中people.txt資料内容如下:

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

模拟器Socket端口号為9999,頻度為1秒

$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt  
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

在沒有程式連接配接時,該程式處于阻塞狀态,在IDEA中運作Streaming程式

在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1和端口号為9999

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.5.4 檢視結果

第一步 IDEA運作情況

在IDEA的運作視窗中,可以觀察到第一次運作統計單詞總數為1,第二次為6,第N次為5(N-1)+1,即統計單詞的總數為程式運作單詞數總和。

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第二步 在監控頁面觀察執行情況

在webUI上監控作業運作情況,可以觀察到每5秒運作一次作業

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第三步 檢視CheckPoint情況

在項目根目錄下可以看到checkpoint檔案

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.6 執行個體5:Window示範

1.6.1 示範說明

該執行個體為Spark Streaming視窗操作,模拟資料由4.1流資料模拟以1秒的頻度發送,Spark Streaming通過Socket接收流資料并每10秒運作一次用來處理接收到資料,處理完畢後列印程式啟動後單詞出現的頻度。相比前面的執行個體,Spark Streaming視窗統計是通過reduceByKeyAndWindow()方法實作的,在該方法中需要指定視窗時間長度和滑動時間間隔。

1.6.2 示範代碼

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    if (args.length != ) {
      System.err.println("Usage: WindowWorldCount <filename> <port> <windowDuration> <slideDuration>")
      System.exit()
    }
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

     // 建立StreamingContext 
    val ssc = new StreamingContext(sc, Seconds())
     // 定義checkpoint目錄為目前目錄
    ssc.checkpoint(".")

    // 通過Socket擷取資料,該處需要提供Socket的主機名和端口号,資料儲存在記憶體和硬碟中
    val lines = ssc.socketTextStream(args(), args().toInt, StorageLevel.MEMORY_ONLY_SER)
    val words = lines.flatMap(_.split(","))

    // windows操作,第一種方式為疊加處理,第二種方式為增量處理
    val wordCounts = words.map(x => (x , )).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args().toInt), Seconds(args().toInt))
//val wordCounts = words.map(x => (x , )).reduceByKeyAndWindow(_+_, _-_,Seconds(args().toInt), Seconds(args().toInt))

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.6.3 運作代碼

第一步 啟動流資料模拟器

啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送/home/hadoop/upload/class7目錄下的people.txt資料檔案(該檔案可以在本系列配套資源目錄/data/class7中找到),其中people.txt資料内容如下:

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

模拟器Socket端口号為9999,頻度為1秒

$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt  
           
Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

在沒有程式連接配接時,該程式處于阻塞狀态,在IDEA中運作Streaming程式

在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1、端口号為9999、時間視窗為30秒和滑動時間間隔10秒

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

1.6.4 檢視結果

第一步 IDEA運作情況

在IDEA的運作視窗中,可以觀察到第一次運作統計單詞總數為4,第二次為14,第N次為10(N-1)+4,即統計單詞的總數為程式運作單詞數總和。

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

第二步 在監控頁面觀察執行情況

在webUI上監控作業運作情況,可以觀察到每10秒運作一次作業

Spark入門實戰系列--7.Spark Streaming(下)--Spark Streaming實戰

參考資料:

(1) 《Spark Streaming》 http://blog.debugo.com/spark-streaming/

繼續閱讀