Spark流計算
Transformations
DStream轉換與RDD的轉換類似,将DStream轉換成新的DStream.DStream常⻅的許多算⼦使⽤和SparkRDD保持⼀緻。
map算⼦
//1,zhangsan,true
lines.map(line=> line.split(","))
.map(words=>(words(0).toInt,words(1),words(2).toBoolean))
.print()
flatMap
//hello spark
lines.flatMap(line=> line.split("\\s+"))
.map((_,1)) //(hello,1)(spark,1)
.print()
filter
//隻會對含有hello的資料過濾,方法傳回true的記錄拿過來
lines.filter(line => line.contains("hello"))
.flatMap(line=> line.split("\\s+"))
.map((_,1))
.print()
repartition(修改分區)
lines.repartition(10) //修改程式并⾏度 分區數
.filter(line => line.contains("hello"))
.flatMap(line=> line.split("\\s+"))
.map((_,1))
.print()
union(将兩個流合并)
val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10)
.filter(line => line.contains("hello"))
.flatMap(line=> line.split("\\s+"))
.map((_,1))
.print()
注意:相當于兩個Receivers,是以配置設定核的時候最少給三個
count
val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10)
.flatMap(line=> line.split("\\s+"))
.count() //計算微批處中RDD元素的個數
.print()
reduce(func )
将結果進行合并
val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10) // aa bb
.flatMap(line=> line.split("\\s+"))
.reduce(_+"|"+_)//aa|bb
.print()
countByValue(key計數)
val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10) // a a b c
.flatMap(line=> line.split("\\s+"))
.countByValue() //(a,2) (b,1) (c,1)
.print()
reduceByKey(func , [numTasks ])
var lines:DStream[String]=ssc.socketTextStream("CentOS",9999) //this is spark this
lines.repartition(10)
.flatMap(line=> line.split("\\s+").map((_,1)))
.reduceByKey(_+_)// (this,2)(is,1)(spark ,1)
.print()
join(otherStream , [numTasks ])
//1 zhangsan
val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
//1 apple 1 4.5
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
val userPair:DStream[(String,String)]=stream1.map(line=>{
var tokens= line.split(" ")
(tokens(0),tokens(1))
})
val orderItemPair:DStream[(String,(String,Double))]=stream2.map(line=>{
//line為傳入的參數
val tokens = line.split(" ")
//傳回
(tokens(0),(tokens(1),tokens(2).toInt * tokens(3).toDouble))
})
userPair.join(orderItemPair).map(t=>(t._1,t._2._1,t._2._2._1,t._2._2._2))//1 zhangsan apple 4.5
.print()
(1,(zhangsan,(apple,4.5)))
必須保證兩個流需要join的資料落⼊同⼀個RDD時間批次下,否則⽆法完成 join ,是以意義不⼤。
transform
可以使⽤stream和RDD做計算,因為transform可以拿到底層macro batch RDD,繼⽽實作stream-batch join
//1 apple 2 4.5
val orderLog: DStream[String] = ssc.socketTextStream("CentOS",8888)
var userRDD=ssc.sparkContext.makeRDD(List(("1","zhangs"),("2","wangw")))
//常量類型為(String,(String,Double))
val orderItemPair:DStream[(String,(String,Double))]=orderLog.map(line=>{
val tokens = line.split(" ")
(tokens(0),(tokens(1),tokens(2).toInt * tokens(3).toDouble))
})
//動态rdd與靜态rdd相join
orderItemPair.transform(rdd=> rdd.join(userRDD))
.print()
updateStateByKey(有狀态計算,全量輸出)
//必須設定檢查點,存儲計算狀态,進行備份
ssc.checkpoint("hdfs://zly:9000/spark-checkpoint")//狀态快照
val lines: DStream[String] = ssc.socketTextStream("zly",9999)
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
//相同key值所對應集合。狀态如果有值就加沒有就為0,此狀态會存在記憶體中。
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount) }
lines.flatMap(_.split("\\s+"))
.map((_,1))
.updateStateByKey(updateFunction) .print()
必須設定checkpointdir⽤于存儲程式的狀态資訊。對記憶體消耗⽐較嚴重。
mapWithState(有狀态計算,增量輸出)
隻會輸出有更新的key,更新的key加載到記憶體,沒更新的key會放在磁盤中
ssc.checkpoint("hdfs://zly:9000/spark-checkpoint")//狀态快照
val lines: DStream[String] = ssc.socketTextStream("zly",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
//設定增量輸出
key,value
.mapWithState(StateSpec.function((k:String,v:Option[Int],state:State[Int])=>{
var historyCount=0
//如果狀态存在,就先擷取狀态的值進行historyCount累加,再與新值相加
//如果狀态不存在,0+新值
if(state.exists()){
historyCount=state.get()
}
historyCount += v.getOrElse(0)
//更新狀态
state.update(historyCount)
(k,historyCount)
}))
.print()
必須設定checkpointdir⽤于存儲程式的狀态資訊
DStream故障恢複
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
object SparkWordCountFailRecorver {
def main(args: Array[String]): Unit = {
var checkpointDir="hdfs://zly:9000/spark-checkpoint1"
//先檢查檢查點能否恢複,如果不能恢複,執⾏recoveryFunction
var ssc= StreamingContext.getOrCreate(checkpointDir,recoveryFunction)
ssc.sparkContext.setLogLevel("FATAL")
ssc.checkpoint("hdfs://zly:9000/spark-checkpoint1")//狀态快照
//5.啟動流計算
ssc.start()
ssc.awaitTermination()
}
var recoveryFunction=()=>{
println("======recoveryFunction========")
Thread.sleep(3000)
val conf = new SparkConf()
.setAppName("SparkWordCountTopology")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val lines: DStream[String] = ssc.socketTextStream("zly",9999)
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}
lines.flatMap(_.split("\\s+"))
.map((_,1))
.mapWithState(StateSpec.function((k:String,v:Option[Int],state:State[Int])=>{
var historyCount=0
if(state.exists()){
historyCount=state.get()
}
historyCount += v.getOrElse(0)
//更新狀态
state.update(historyCount)
(k,historyCount)
}))
.print()
//傳回值
ssc
}
}
诟病:⼀旦狀态持久化後,⽤戶修改代碼就不可⻅了,因為系統并不會調
⽤ recoveryFunction ,如果希望修改的代碼⽣效,必須⼿動删除檢查點⽬錄。
Window Operations
Spark Streaming還提供了窗⼝計算,可讓您在資料的滑動窗⼝上應⽤轉換。下圖說明了此滑動窗⼝。

如圖所示,每當窗⼝在源DStream上滑動時,落⼊窗⼝内的源RDD就會合并并對其進⾏操作,以⽣成窗⼝DStream的RDD。在上圖中,該操作将應⽤于資料的最後3個時間機關,并以2個時間機關滑動。這表明任何窗⼝操作都需要指定兩個參數。
① 窗⼝⻓度-窗⼝的持續時間(3倍時間機關)。
② 滑動間隔-進⾏窗⼝操作的間隔(2倍時間機關)。
注意 這兩個參數必須是源DStrem的 批處理間隔 的倍數,因為對于DStream⽽⾔,微批值原⼦性的最⼩處理機關。通常在流計算中,如果 窗⼝⻓度 = 滑動間隔 稱該窗⼝為滾動窗⼝沒有元素交疊;如果 窗⼝⻓度 > 滑動間隔 稱個窗⼝為滑動窗⼝存在元素交疊;⼀般情況下所有的流的窗⼝⻓度 >= 滑動間隔,因為如果⼩于滑動間隔,會有資料的遺漏。
⼀些常⻅的窗⼝操作如下。所有這些操作均采⽤上述兩個參數-windowLength和slideInterval
window(windowLength , slideInterval )
會把落在同一個視窗的微批合并成一個大的RDD
ssc.socketTextStream("zly", 9999)
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.window(Seconds(2),Seconds(2))
.reduceByKey((v1, v2) => v1 + v2)
.print()
countByWindow(windowLength , slideInterval )
傳回一個視窗中元素的個數
ssc.checkpoint("hdfs://zly:9000/spark-checkpoints")
ssc.socketTextStream("zly", 9999)
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.countByWindow(Seconds(2),Seconds(2))
.print()
相當于先window然後使⽤count算⼦,必須設定檢查點目錄
reduceByWindow(func , windowLength , slideInterval )
ssc.socketTextStream("zly", 9999)
.flatMap(line => line.split("\\s+"))
.reduceByWindow(_+" | "+_,Seconds(2),Seconds(2))
.print()
相當于先window然後使⽤reduce算⼦
reduceByKeyAndWindow(func , windowLength , slideInterval , [numTasks ])
ssc.socketTextStream("zly", 9999)
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.reduceByKeyAndWindow(_+_,Seconds(2),Seconds(2))
.print()
相當于先window然後使⽤reduceByKey算⼦
reduceByKeyAndWindow(func , invFunc , windowLength , slideInterval , [numTasks ])
ssc.checkpoint("hdfs://zly:9000/spark-checkpoints")
ssc.sparkContext.setLogLevel("FATAL")
ssc.socketTextStream("zly", 9999)
.flatMap(line => line.split("\\s+"))
.map((_,1))
.reduceByKeyAndWindow(
(v1,v2)=>v1+v2,//加上新移⼊元素
(v1,v2)=>v1-v2,//減去有移除元素
Seconds(4),Seconds(1),
filterFunc = t=> t._2 > 0) //過濾掉值=0元素
.print()
必須重疊元素過半,使⽤以上⽅法效率⾼。
Output Operations
輸出操作允許将DStream的資料推出到外部系統,例如資料庫或⽂件系統。由于輸出操作實際上允許外部系統使⽤轉換後的資料,是以它們會觸發所有DStream轉換的實際執⾏(類似于RDD的操作)。目前,定義了以下輸出操作:
Kafka Sink
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
def createKafkaConnection(): KafkaProducer[String, String] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"zly:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getNam
e)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getN
ame)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")//開啟幂等性
props.put(ProducerConfig.RETRIES_CONFIG,"2")//設定重試
props.put(ProducerConfig.BATCH_SIZE_CONFIG,"100")//設定緩沖區⼤⼩
props.put(ProducerConfig.LINGER_MS_CONFIG,"1000")//最多延遲1000毫秒
new KafkaProducer[String,String](props)
}
lazy val kafkaProducer:KafkaProducer[String,String]= createKafkaConnection()
Runtime.getRuntime.addShutdownHook(new Thread(){
override def run(): Unit = {
kafkaProducer.close()
}
})
def save(vs: Iterator[(String, Int)],topic:String,): Unit = {
try{
vs.foreach(tuple=>{
val record = new ProducerRecord[String,String] (topic,tuple._1,tuple._2.toString)
kafkaProducer.send(record)
})
}catch {
case e:Exception=> println("發郵件,出錯啦~")
}
}
}
DStream整合DataframeSQL
ssc.socketTextStream("zly", 9999)
.flatMap(line => line.split("\\s+"))
.map((_,1))
.reduceByKeyAndWindow(
(v1,v2)=>v1+v2,//加上新移⼊元素
(v1,v2)=>v1-v2,//減去有移除元素
Seconds(4),Seconds(2),
filterFunc = t=> t._2 > 0) //過濾掉值=0元素
.foreachRDD(rdd=>{
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val wordsDataFrame = rdd.toDF("word","count")
val props = new Properties()
props.put("user", "root")
props.put("password", "123456")
wordsDataFrame .write
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://zly:3306/mysql","t_wordcount",props)
})