Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
先测试一下环境,是否与Spark Streaming连接
WordCount.scala
package blog
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description 测试SparkStreaming连接
**/
object WordCount {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
println(
"""
|Usage: <host> <port>
""".stripMargin)
System.exit(-1)
}
val Array(host, port) = args
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WordCount")
//batchduration表示每一次提交的含义是每隔多长时间产生一个批次batch,即提交一次sparkstreaming作业
val batchInterval = Seconds(2)
//编程入口
val ssc = new StreamingContext(conf, batchInterval)
//具体业务
//为了容错,流式数据的特点,一旦丢失就找不回来了,所以要进行持久化
val input: ReceiverInputDStream[String] = ssc.socketTextStream(host, port.toInt, StorageLevel.MEMORY_AND_DISK_SER_2)
//wordcount
val retDStream: DStream[(String, Int)] = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
//打印结果
retDStream.print()
//启动
ssc.start()
//保证streaming作业持续不断的运行
ssc.awaitTermination()
}
}
设置参数为hadoop01 9999

通过nc来测试
首先安装
sudo yum -y install nc
打开端口9999
nc -lk hadoop01 9999
启动程序,并在传入信息
可以看到结果成功被输出到控制台
测试环境没问题了之后,进行与HDFS的整合
StreamingHDFS.scala
package blog
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description Spark Streaming 整合HDFS
**/
//sparkstreaming和hdfs整合 读取hdfs中新增的文件
object StreamingHDFS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingHDFS")
.setMaster("local")
val batchInterval = Seconds(2)
val ssc = new StreamingContext(conf, batchInterval)
// val input:DStream[String] = ssc.textFileStream("file:///F:/data/")//读取本地文件
//读取hdfs中的文件,监控HDFS上文件的变化
val input: DStream[String] = ssc.textFileStream("hdfs://bde/data/words")
val ret = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
//遍历每个rdd
ret.foreachRDD((rdd, time) => {
//如果RDD不为空则输出
if (!rdd.isEmpty()) {
println(s"Time: $time")
rdd.foreach(println)
}
})
ssc.start()
ssc.awaitTermination()
}
}
拷贝hdfs-site.xml与core-site.xml到当前目录!!
准备一些数据文件,上传到hdfs
1.txt
hello
word
hello
ww
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
hello
word
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
启动程序,上传文件至hdfs
hdfs dfs -put 1.txt /data/words/
只要是流式的文件操作,Streaming都能监控到,所以可以自行写一个写文件操作
WriteFile.java
package blog
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
/**
* @Author Daniel
* @Description 流式写入文件到HDFS
**/
object WriteFile {
def main(args: Array[String]): Unit = {
//设置用户名避免无权限
System.setProperty("HADOOP_USER_NAME", "hadoop")
val uri = new URI("hdfs://bde/")
val fs = FileSystem.newInstance(uri, new Configuration())
val fos = fs.create(new Path("/data/words/write.txt"))
fos.write("hello spark\nhello streaming\nhello successfully".getBytes())
fs.close()
}
}
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description Spark Streaming 整合Spark SQL
* 统计不同品牌下的不同产品的销售情况
* 数据格式:
* 001|mi|moblie
* 005|huawei|moblie
*
**/
object StreamingSQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingSQL")
.setMaster("local[*]")
//提交频率
val batchInterval = Seconds(2)
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, batchInterval)
val lines = ssc.socketTextStream("hadoop01", 9999)
//使用checkpoint进行管理
ssc.checkpoint("file:///E:/data/checkpoint/ck2")
//计算的是截止到目前为止的状态信息
val usb = lines.map(line => {
val key = line.substring(line.indexOf("|") + 1)
(key, 1)
//spark streaming算子操作
}).updateStateByKey[Int]((seq: Seq[Int], option: Option[Int]) => Option(seq.sum + option.getOrElse(0)))
//top3
usb.foreachRDD((rdd) => {
if (!rdd.isEmpty()) {
import spark.implicits._
//实例化RDD
val rowRDD = rdd.map { case (brandCategory, count) => {
val brand = brandCategory.substring(0, brandCategory.indexOf("|"))
val category = brandCategory.substring(brandCategory.indexOf("|") + 1)
MyRow(category, brand, count)
}
}
val df = rowRDD.toDF("category", "brand", "count")
df.createOrReplaceTempView("sale")
val sql =
"""
|select
| category,
| brand,
| count,
| row_number() over(partition by category order by count desc) as rank
|from sale
|having rank < 4
""".stripMargin
spark.sql(sql).show()
}
})
ssc.start()
ssc.awaitTermination()
}
}
case class MyRow(category: String, brand: String, count: Int)
001|mi|moblie
002|mi|moblie
003|mi|moblie
004|mi|moblie
005|huawei|moblie
006|huawei|moblie
007|huawei|moblie
008|Oppo|moblie
009|Oppo|moblie
010|uniqlo|colthing
011|uniqlo|colthing
012|uniqlo|colthing
013|uniqlo|colthing
014|uniqlo|colthing
015|selected|colthing
016|selected|colthing
017|selected|colthing
018|Armani|colthing
019|lining|sports
020|nike|sports
021|adidas|sports
022|nike|sports
023|anta|sports
024|lining|sports
025|lining|sports
nc -lk hadoop01 9999