Maven依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
先測試一下環境,是否與Spark Streaming連接配接
WordCount.scala
package blog
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description 測試SparkStreaming連接配接
**/
object WordCount {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
println(
"""
|Usage: <host> <port>
""".stripMargin)
System.exit(-1)
}
val Array(host, port) = args
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WordCount")
//batchduration表示每一次送出的含義是每隔多長時間産生一個批次batch,即送出一次sparkstreaming作業
val batchInterval = Seconds(2)
//程式設計入口
val ssc = new StreamingContext(conf, batchInterval)
//具體業務
//為了容錯,流式資料的特點,一旦丢失就找不回來了,是以要進行持久化
val input: ReceiverInputDStream[String] = ssc.socketTextStream(host, port.toInt, StorageLevel.MEMORY_AND_DISK_SER_2)
//wordcount
val retDStream: DStream[(String, Int)] = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
//列印結果
retDStream.print()
//啟動
ssc.start()
//保證streaming作業持續不斷的運作
ssc.awaitTermination()
}
}
設定參數為hadoop01 9999

通過nc來測試
首先安裝
sudo yum -y install nc
打開端口9999
nc -lk hadoop01 9999
啟動程式,并在傳入資訊
可以看到結果成功被輸出到控制台
測試環境沒問題了之後,進行與HDFS的整合
StreamingHDFS.scala
package blog
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description Spark Streaming 整合HDFS
**/
//sparkstreaming和hdfs整合 讀取hdfs中新增的檔案
object StreamingHDFS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingHDFS")
.setMaster("local")
val batchInterval = Seconds(2)
val ssc = new StreamingContext(conf, batchInterval)
// val input:DStream[String] = ssc.textFileStream("file:///F:/data/")//讀取本地檔案
//讀取hdfs中的檔案,監控HDFS上檔案的變化
val input: DStream[String] = ssc.textFileStream("hdfs://bde/data/words")
val ret = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
//周遊每個rdd
ret.foreachRDD((rdd, time) => {
//如果RDD不為空則輸出
if (!rdd.isEmpty()) {
println(s"Time: $time")
rdd.foreach(println)
}
})
ssc.start()
ssc.awaitTermination()
}
}
拷貝hdfs-site.xml與core-site.xml到目前目錄!!
準備一些資料檔案,上傳到hdfs
1.txt
hello
word
hello
ww
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
hello
word
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
啟動程式,上傳檔案至hdfs
hdfs dfs -put 1.txt /data/words/
隻要是流式的檔案操作,Streaming都能監控到,是以可以自行寫一個寫檔案操作
WriteFile.java
package blog
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
/**
* @Author Daniel
* @Description 流式寫入檔案到HDFS
**/
object WriteFile {
def main(args: Array[String]): Unit = {
//設定使用者名避免無權限
System.setProperty("HADOOP_USER_NAME", "hadoop")
val uri = new URI("hdfs://bde/")
val fs = FileSystem.newInstance(uri, new Configuration())
val fos = fs.create(new Path("/data/words/write.txt"))
fos.write("hello spark\nhello streaming\nhello successfully".getBytes())
fs.close()
}
}
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description Spark Streaming 整合Spark SQL
* 統計不同品牌下的不同産品的銷售情況
* 資料格式:
* 001|mi|moblie
* 005|huawei|moblie
*
**/
object StreamingSQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingSQL")
.setMaster("local[*]")
//送出頻率
val batchInterval = Seconds(2)
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, batchInterval)
val lines = ssc.socketTextStream("hadoop01", 9999)
//使用checkpoint進行管理
ssc.checkpoint("file:///E:/data/checkpoint/ck2")
//計算的是截止到目前為止的狀态資訊
val usb = lines.map(line => {
val key = line.substring(line.indexOf("|") + 1)
(key, 1)
//spark streaming算子操作
}).updateStateByKey[Int]((seq: Seq[Int], option: Option[Int]) => Option(seq.sum + option.getOrElse(0)))
//top3
usb.foreachRDD((rdd) => {
if (!rdd.isEmpty()) {
import spark.implicits._
//執行個體化RDD
val rowRDD = rdd.map { case (brandCategory, count) => {
val brand = brandCategory.substring(0, brandCategory.indexOf("|"))
val category = brandCategory.substring(brandCategory.indexOf("|") + 1)
MyRow(category, brand, count)
}
}
val df = rowRDD.toDF("category", "brand", "count")
df.createOrReplaceTempView("sale")
val sql =
"""
|select
| category,
| brand,
| count,
| row_number() over(partition by category order by count desc) as rank
|from sale
|having rank < 4
""".stripMargin
spark.sql(sql).show()
}
})
ssc.start()
ssc.awaitTermination()
}
}
case class MyRow(category: String, brand: String, count: Int)
001|mi|moblie
002|mi|moblie
003|mi|moblie
004|mi|moblie
005|huawei|moblie
006|huawei|moblie
007|huawei|moblie
008|Oppo|moblie
009|Oppo|moblie
010|uniqlo|colthing
011|uniqlo|colthing
012|uniqlo|colthing
013|uniqlo|colthing
014|uniqlo|colthing
015|selected|colthing
016|selected|colthing
017|selected|colthing
018|Armani|colthing
019|lining|sports
020|nike|sports
021|adidas|sports
022|nike|sports
023|anta|sports
024|lining|sports
025|lining|sports
nc -lk hadoop01 9999