【注】該系列文章以及使用到安裝包/測試資料 可以在《傾情大奉送–Spark入門實戰系列》擷取
1 執行個體示範
1.1 流資料模拟器
1.1.1 流資料說明
在執行個體示範中模拟實際情況,需要源源不斷地接入流資料,為了在示範過程中更接近真實環境将定義流資料模拟器。該模拟器主要功能:通過Socket方式監聽指定的端口号,當外部程式通過該端口連接配接并請求資料時,模拟器将定時将指定的檔案資料随機擷取發送給外部程式。
1.1.2 模拟器代碼
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object StreamingSimulation {
// 定義随機擷取整數的方法
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
// 調用該模拟器需要三個參數,分為為檔案路徑、端口号和間隔時間(機關:毫秒)
if (args.length != ) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit()
}
// 擷取指定檔案總的行數
val filename = args()
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
// 指定監聽某端口,當外部程式請求時建立連接配接
val listener = new ServerSocket(args().toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args().toLong)
// 當該端口接受請求時,随機擷取某行資料發送給對方
val content = lines(index(filerow))
println(content)
out.write(content + '\n')
out.flush()
}
socket.close()
}
}.start()
}
}
}
1.1.3 生成打封包件
【注】可以參見第3課《Spark程式設計模型(下)–IDEA搭建及實戰》進行打包
在打包配置界面中,需要在Class Path加入:/app/scala-2.10.4/lib/scala-swing.jar /app/scala-2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar ,各個jar包之間用空格分開,
點選菜單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作,使用如下指令複制打封包件到Spark根目錄下
cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-/
ll /app/hadoop/spark-/
1.2 執行個體1:讀取檔案示範
1.2.1 示範說明
在該執行個體中Spark Streaming将監控某目錄中的檔案,擷取在間隔時間段内變化的資料,然後通過Spark Streaming計算出改時間段内單詞統計數。
1.2.2 示範代碼
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object FileWordCount {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]")
// 建立Streaming的上下文,包括Spark的配置和時間間隔,這裡時間為間隔秒
val ssc = new StreamingContext(sparkConf, Seconds())
// 指定監控的目錄,在這裡為/home/hadoop/temp/
val lines = ssc.textFileStream("/home/hadoop/temp/")
// 對指定檔案夾變化的資料進行單詞統計并且列印
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
wordCounts.print()
// 啟動Streaming
ssc.start()
ssc.awaitTermination()
}
}
1.2.3 運作代碼
第一步 建立Streaming監控目錄
建立/home/hadoop/temp為Spark Streaming監控的目錄,通過在該目錄中定時添加檔案内容,然後由Spark Streaming統計出單詞個數
第二步 使用如下指令啟動Spark叢集
$cd /app/hadoop/spark-
$sbin/start-all.sh
第三步 在IDEA中運作Streaming程式
在IDEA中運作該執行個體,由于該執行個體沒有輸入參數故不需要配置參數,在運作日志中将定時列印時間戳。如果在監控目錄中加入檔案内容,将輸出時間戳的同時将輸出單詞統計個數。
1.2.4 添加文本及内容
1.2.5 檢視結果
第一步 檢視IDEA中運作情況
在IDEA的運作日志視窗中,可以觀察到輸出時間戳的同時将輸出單詞統計個數
第二步 通過webUI監控運作情況
在http://hadoop1:4040監控Spark Streaming運作情況,可以觀察到每20秒運作一次作業
并且與其他運作作業相比在監控菜單增加了”Streaming”項目,點選可以看到監控内容:
1.3 執行個體2:網絡資料示範
1.3.1 示範說明
在該執行個體中将由4.1流資料模拟以1秒的頻度發送模拟資料,Spark Streaming通過Socket接收流資料并每20秒運作一次用來處理接收到資料,處理完畢後列印該時間段内資料出現的頻度,即在各處理段時間之間狀态并無關系。
1.3.2 示範代碼
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds())
// 通過Socket擷取資料,該處需要提供Socket的主機名和端口号,資料儲存在記憶體和硬碟中
val lines = ssc.socketTextStream(args(), args().toInt, StorageLevel.MEMORY_AND_DISK_SER)
// 對讀入的資料進行分割、計數
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.3.3 運作代碼
第一步 啟動流資料模拟器
啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送/home/hadoop/upload/class7目錄下的people.txt資料檔案(該檔案可以在本系列配套資源目錄/data/class7中找到),其中people.txt資料内容如下:
模拟器Socket端口号為9999,頻度為1秒,
$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt
在沒有程式連接配接時,該程式處于阻塞狀态
第二步 在IDEA中運作Streaming程式
在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1和端口号為9999
1.3.4 檢視結果
第一步 觀察模拟器發送情況
IDEA中的Spark Streaming程式運作與模拟器建立連接配接,當模拟器檢測到外部連接配接時開始發送測試資料,資料是随機的在指定的檔案中擷取一行資料并發送,時間間隔為1秒
第二步 在監控頁面觀察執行情況
在webUI上監控作業運作情況,可以觀察到每20秒運作一次作業
第三步 IDEA運作情況
在IDEA的運作視窗中,可以觀測到的統計結果,通過分析在Spark Streaming每段時間内單詞數為20,正好是20秒内每秒發送總數。
1.4 執行個體3:銷售資料統計示範
1.4.1 示範說明
在該執行個體中将由4.1流資料模拟器以1秒的頻度發送模拟資料(銷售資料),Spark Streaming通過Socket接收流資料并每5秒運作一次用來處理接收到資料,處理完畢後列印該時間段内銷售資料總和,需要注意的是各處理段時間之間狀态并無關系。
1.4.2 示範代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object SaleAmount {
def main(args: Array[String]) {
if (args.length != ) {
System.err.println("Usage: SaleAmount <hostname> <port> ")
System.exit()
}
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds())
// 通過Socket擷取資料,該處需要提供Socket的主機名和端口号,資料儲存在記憶體和硬碟中
val lines = ssc.socketTextStream(args(), args().toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(_.split(",")).filter(_.length == )
val wordCounts = words.map(x=>(, x().toDouble)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.4.3 運作代碼
第一步 啟動流資料模拟器
啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送第五課/home/hadoop/upload/class5/saledata目錄下的tbStockDetail.txt資料檔案(參見第五課《5.Hive(下)–Hive實戰》中2.1.2資料描述,該檔案可以在本系列配套資源目錄/data/class5/saledata中找到),其中表tbStockDetail字段分别為訂單号、行号、貨品、數量、金額,資料内容如下:
模拟器Socket端口号為9999,頻度為1秒
$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class5/saledata/tbStockDetail.txt
在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1和端口号為9999
1.4.4 檢視結果
第一步 觀察模拟器發送情況
IDEA中的Spark Streaming程式運作與模拟器建立連接配接,當模拟器檢測到外部連接配接時開始發送銷售資料,時間間隔為1秒
第二步 IDEA運作情況
在IDEA的運作視窗中,可以觀察到每5秒運作一次作業(兩次運作間隔為5000毫秒),運作完畢後列印該時間段内銷售資料總和。
第三步 在監控頁面觀察執行情況
在webUI上監控作業運作情況,可以觀察到每5秒運作一次作業
1.5 執行個體4:Stateful示範
1.5.1 示範說明
該執行個體為Spark Streaming狀态操作,模拟資料由4.1流資料模拟以1秒的頻度發送,Spark Streaming通過Socket接收流資料并每5秒運作一次用來處理接收到資料,處理完畢後列印程式啟動後單詞出現的頻度,相比較前面4.3執行個體在該執行個體中各時間段之間狀态是相關的。
1.5.2 示範代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object StatefulWordCount {
def main(args: Array[String]) {
if (args.length != ) {
System.err.println("Usage: StatefulWordCount <filename> <port> ")
System.exit()
}
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 定義更新狀态方法,參數values為目前批次單詞頻度,state為以往批次單詞頻度
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft()(_ + _)
val previousCount = state.getOrElse()
Some(currentCount + previousCount)
}
val conf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
// 建立StreamingContext,Spark Steaming運作時間間隔為秒
val ssc = new StreamingContext(sc, Seconds())
// 定義checkpoint目錄為目前目錄
ssc.checkpoint(".")
// 擷取從Socket發送過來資料
val lines = ssc.socketTextStream(args(), args().toInt)
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, ))
// 使用updateStateByKey來更新狀态,統計從運作開始以來單詞總的次數
val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
1.5.3 運作代碼
第一步 啟動流資料模拟器
啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送/home/hadoop/upload/class7目錄下的people.txt資料檔案(該檔案可以在本系列配套資源目錄/data/class7中找到),其中people.txt資料内容如下:
模拟器Socket端口号為9999,頻度為1秒
$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt
在沒有程式連接配接時,該程式處于阻塞狀态,在IDEA中運作Streaming程式
在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1和端口号為9999
1.5.4 檢視結果
第一步 IDEA運作情況
在IDEA的運作視窗中,可以觀察到第一次運作統計單詞總數為1,第二次為6,第N次為5(N-1)+1,即統計單詞的總數為程式運作單詞數總和。
第二步 在監控頁面觀察執行情況
在webUI上監控作業運作情況,可以觀察到每5秒運作一次作業
第三步 檢視CheckPoint情況
在項目根目錄下可以看到checkpoint檔案
1.6 執行個體5:Window示範
1.6.1 示範說明
該執行個體為Spark Streaming視窗操作,模拟資料由4.1流資料模拟以1秒的頻度發送,Spark Streaming通過Socket接收流資料并每10秒運作一次用來處理接收到資料,處理完畢後列印程式啟動後單詞出現的頻度。相比前面的執行個體,Spark Streaming視窗統計是通過reduceByKeyAndWindow()方法實作的,在該方法中需要指定視窗時間長度和滑動時間間隔。
1.6.2 示範代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowWordCount {
def main(args: Array[String]) {
if (args.length != ) {
System.err.println("Usage: WindowWorldCount <filename> <port> <windowDuration> <slideDuration>")
System.exit()
}
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
// 建立StreamingContext
val ssc = new StreamingContext(sc, Seconds())
// 定義checkpoint目錄為目前目錄
ssc.checkpoint(".")
// 通過Socket擷取資料,該處需要提供Socket的主機名和端口号,資料儲存在記憶體和硬碟中
val lines = ssc.socketTextStream(args(), args().toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))
// windows操作,第一種方式為疊加處理,第二種方式為增量處理
val wordCounts = words.map(x => (x , )).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args().toInt), Seconds(args().toInt))
//val wordCounts = words.map(x => (x , )).reduceByKeyAndWindow(_+_, _-_,Seconds(args().toInt), Seconds(args().toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.6.3 運作代碼
第一步 啟動流資料模拟器
啟動4.1打包好的流資料模拟器,在該執行個體中将定時發送/home/hadoop/upload/class7目錄下的people.txt資料檔案(該檔案可以在本系列配套資源目錄/data/class7中找到),其中people.txt資料内容如下:
模拟器Socket端口号為9999,頻度為1秒
$cd /app/hadoop/spark-
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt
在沒有程式連接配接時,該程式處于阻塞狀态,在IDEA中運作Streaming程式
在IDEA中運作該執行個體,該執行個體需要配置連接配接Socket主機名和端口号,在這裡配置參數機器名為hadoop1、端口号為9999、時間視窗為30秒和滑動時間間隔10秒
1.6.4 檢視結果
第一步 IDEA運作情況
在IDEA的運作視窗中,可以觀察到第一次運作統計單詞總數為4,第二次為14,第N次為10(N-1)+4,即統計單詞的總數為程式運作單詞數總和。
第二步 在監控頁面觀察執行情況
在webUI上監控作業運作情況,可以觀察到每10秒運作一次作業
參考資料:
(1) 《Spark Streaming》 http://blog.debugo.com/spark-streaming/