天天看點

Spark流計算-day2Spark流計算

Spark流計算

Transformations

DStream轉換與RDD的轉換類似,将DStream轉換成新的DStream.DStream常⻅的許多算⼦使⽤和SparkRDD保持⼀緻。

map算⼦

//1,zhangsan,true
lines.map(line=> line.split(","))
 .map(words=>(words(0).toInt,words(1),words(2).toBoolean))
 .print()
           

flatMap

//hello spark
lines.flatMap(line=> line.split("\\s+"))
 .map((_,1)) //(hello,1)(spark,1)
 .print()
           

filter

//隻會對含有hello的資料過濾,方法傳回true的記錄拿過來
lines.filter(line => line.contains("hello"))
 .flatMap(line=> line.split("\\s+"))
 .map((_,1))
 .print()
           

repartition(修改分區)

lines.repartition(10) //修改程式并⾏度 分區數
 .filter(line => line.contains("hello"))
 .flatMap(line=> line.split("\\s+"))
 .map((_,1))
 .print()
           

union(将兩個流合并)

val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10)
 .filter(line => line.contains("hello"))
 .flatMap(line=> line.split("\\s+"))
 .map((_,1))
 .print()
           
注意:相當于兩個Receivers,是以配置設定核的時候最少給三個

count

val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10)
 .flatMap(line=> line.split("\\s+"))
 .count() //計算微批處中RDD元素的個數
 .print()
           

reduce(func )

将結果進行合并

val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10) // aa bb
 .flatMap(line=> line.split("\\s+"))
 .reduce(_+"|"+_)//aa|bb
 .print()
           

countByValue(key計數)

val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
stream1.union(stream2).repartition(10) // a a b c
 .flatMap(line=> line.split("\\s+"))
 .countByValue() //(a,2) (b,1) (c,1)
 .print()
           

reduceByKey(func , [numTasks ])

var lines:DStream[String]=ssc.socketTextStream("CentOS",9999) //this is spark this
 lines.repartition(10)
 .flatMap(line=> line.split("\\s+").map((_,1)))
 .reduceByKey(_+_)// (this,2)(is,1)(spark ,1)
 .print()
           

join(otherStream , [numTasks ])

//1 zhangsan
val stream1: DStream[String] = ssc.socketTextStream("CentOS",9999)
//1 apple 1 4.5
val stream2: DStream[String] = ssc.socketTextStream("CentOS",8888)
val userPair:DStream[(String,String)]=stream1.map(line=>{
 var tokens= line.split(" ")
 (tokens(0),tokens(1))
})
val orderItemPair:DStream[(String,(String,Double))]=stream2.map(line=>{
//line為傳入的參數
 val tokens = line.split(" ")
 //傳回
 (tokens(0),(tokens(1),tokens(2).toInt * tokens(3).toDouble))
})
userPair.join(orderItemPair).map(t=>(t._1,t._2._1,t._2._2._1,t._2._2._2))//1 zhangsan apple 4.5
.print()
           

(1,(zhangsan,(apple,4.5)))

必須保證兩個流需要join的資料落⼊同⼀個RDD時間批次下,否則⽆法完成 join ,是以意義不⼤。

transform

可以使⽤stream和RDD做計算,因為transform可以拿到底層macro batch RDD,繼⽽實作stream-batch join

//1 apple 2 4.5
val orderLog: DStream[String] = ssc.socketTextStream("CentOS",8888)
var userRDD=ssc.sparkContext.makeRDD(List(("1","zhangs"),("2","wangw")))

//常量類型為(String,(String,Double))
val orderItemPair:DStream[(String,(String,Double))]=orderLog.map(line=>{
 val tokens = line.split(" ")
 (tokens(0),(tokens(1),tokens(2).toInt * tokens(3).toDouble))
})
//動态rdd與靜态rdd相join
orderItemPair.transform(rdd=> rdd.join(userRDD))
.print()
           

updateStateByKey(有狀态計算,全量輸出)

//必須設定檢查點,存儲計算狀态,進行備份
ssc.checkpoint("hdfs://zly:9000/spark-checkpoint")//狀态快照
val lines: DStream[String] = ssc.socketTextStream("zly",9999)

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
 //相同key值所對應集合。狀态如果有值就加沒有就為0,此狀态會存在記憶體中。
 val newCount = newValues.sum + runningCount.getOrElse(0)
 Some(newCount) }
lines.flatMap(_.split("\\s+"))
.map((_,1))
.updateStateByKey(updateFunction) .print()
           
必須設定checkpointdir⽤于存儲程式的狀态資訊。對記憶體消耗⽐較嚴重。

mapWithState(有狀态計算,增量輸出)

隻會輸出有更新的key,更新的key加載到記憶體,沒更新的key會放在磁盤中

ssc.checkpoint("hdfs://zly:9000/spark-checkpoint")//狀态快照
val lines: DStream[String] = ssc.socketTextStream("zly",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
//設定增量輸出
key,value
.mapWithState(StateSpec.function((k:String,v:Option[Int],state:State[Int])=>{
 var historyCount=0
 //如果狀态存在,就先擷取狀态的值進行historyCount累加,再與新值相加
 //如果狀态不存在,0+新值
 if(state.exists()){
 historyCount=state.get()
  }
  
 historyCount += v.getOrElse(0)
 //更新狀态
 state.update(historyCount)
 (k,historyCount)
}))
.print()
           
必須設定checkpointdir⽤于存儲程式的狀态資訊

DStream故障恢複

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
object SparkWordCountFailRecorver {
 def main(args: Array[String]): Unit = {
	 var checkpointDir="hdfs://zly:9000/spark-checkpoint1"
	 //先檢查檢查點能否恢複,如果不能恢複,執⾏recoveryFunction
	 var ssc= StreamingContext.getOrCreate(checkpointDir,recoveryFunction)
	 ssc.sparkContext.setLogLevel("FATAL")
	 ssc.checkpoint("hdfs://zly:9000/spark-checkpoint1")//狀态快照
	 //5.啟動流計算
	 ssc.start()
	 ssc.awaitTermination()
 }
 var recoveryFunction=()=>{
	 println("======recoveryFunction========")
	 Thread.sleep(3000)
	 val conf = new SparkConf()
	 .setAppName("SparkWordCountTopology")
	 .setMaster("local[*]")
	 val ssc = new StreamingContext(conf, Seconds(1))
	 val lines: DStream[String] = ssc.socketTextStream("zly",9999)
	 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
	 val newCount = newValues.sum + runningCount.getOrElse(0)
	 Some(newCount)
	 }
	 lines.flatMap(_.split("\\s+"))
	 .map((_,1))
	 .mapWithState(StateSpec.function((k:String,v:Option[Int],state:State[Int])=>{
	 var historyCount=0
	 if(state.exists()){
	 historyCount=state.get()
	 }
	 historyCount += v.getOrElse(0)
	 //更新狀态
	 state.update(historyCount)
	 (k,historyCount)
	 }))
	 .print()
 //傳回值
 ssc
 }
}
           

诟病:⼀旦狀态持久化後,⽤戶修改代碼就不可⻅了,因為系統并不會調

⽤ recoveryFunction ,如果希望修改的代碼⽣效,必須⼿動删除檢查點⽬錄。

Window Operations

Spark Streaming還提供了窗⼝計算,可讓您在資料的滑動窗⼝上應⽤轉換。下圖說明了此滑動窗⼝。

Spark流計算-day2Spark流計算

如圖所示,每當窗⼝在源DStream上滑動時,落⼊窗⼝内的源RDD就會合并并對其進⾏操作,以⽣成窗⼝DStream的RDD。在上圖中,該操作将應⽤于資料的最後3個時間機關,并以2個時間機關滑動。這表明任何窗⼝操作都需要指定兩個參數。

① 窗⼝⻓度-窗⼝的持續時間(3倍時間機關)。

② 滑動間隔-進⾏窗⼝操作的間隔(2倍時間機關)。

注意 這兩個參數必須是源DStrem的 批處理間隔 的倍數,因為對于DStream⽽⾔,微批值原⼦性的最⼩處理機關。通常在流計算中,如果 窗⼝⻓度 = 滑動間隔 稱該窗⼝為滾動窗⼝沒有元素交疊;如果 窗⼝⻓度 > 滑動間隔 稱個窗⼝為滑動窗⼝存在元素交疊;⼀般情況下所有的流的窗⼝⻓度 >= 滑動間隔,因為如果⼩于滑動間隔,會有資料的遺漏。

⼀些常⻅的窗⼝操作如下。所有這些操作均采⽤上述兩個參數-windowLength和slideInterval

Spark流計算-day2Spark流計算
Spark流計算-day2Spark流計算

window(windowLength , slideInterval )

會把落在同一個視窗的微批合并成一個大的RDD

ssc.socketTextStream("zly", 9999)
 .flatMap(line => line.split("\\s+"))
 .map(word => (word, 1))
 .window(Seconds(2),Seconds(2))
 .reduceByKey((v1, v2) => v1 + v2)
 .print()
           

countByWindow(windowLength , slideInterval )

傳回一個視窗中元素的個數

ssc.checkpoint("hdfs://zly:9000/spark-checkpoints")
ssc.socketTextStream("zly", 9999) 
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.countByWindow(Seconds(2),Seconds(2))
.print()
           
相當于先window然後使⽤count算⼦,必須設定檢查點目錄

reduceByWindow(func , windowLength , slideInterval )

ssc.socketTextStream("zly", 9999) 
.flatMap(line => line.split("\\s+"))
.reduceByWindow(_+" | "+_,Seconds(2),Seconds(2))
.print()
           
相當于先window然後使⽤reduce算⼦

reduceByKeyAndWindow(func , windowLength , slideInterval , [numTasks ])

ssc.socketTextStream("zly", 9999) 
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.reduceByKeyAndWindow(_+_,Seconds(2),Seconds(2))
.print()
           
相當于先window然後使⽤reduceByKey算⼦

reduceByKeyAndWindow(func , invFunc , windowLength , slideInterval , [numTasks ])

Spark流計算-day2Spark流計算
ssc.checkpoint("hdfs://zly:9000/spark-checkpoints")
ssc.sparkContext.setLogLevel("FATAL")
ssc.socketTextStream("zly", 9999) 
.flatMap(line => line.split("\\s+"))
.map((_,1))
.reduceByKeyAndWindow(
 (v1,v2)=>v1+v2,//加上新移⼊元素
 (v1,v2)=>v1-v2,//減去有移除元素
 Seconds(4),Seconds(1),
 filterFunc = t=> t._2 > 0) //過濾掉值=0元素
.print()
           
必須重疊元素過半,使⽤以上⽅法效率⾼。

Output Operations

輸出操作允許将DStream的資料推出到外部系統,例如資料庫或⽂件系統。由于輸出操作實際上允許外部系統使⽤轉換後的資料,是以它們會觸發所有DStream轉換的實際執⾏(類似于RDD的操作)。目前,定義了以下輸出操作:

Spark流計算-day2Spark流計算

Kafka Sink

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
 def createKafkaConnection(): KafkaProducer[String, String] = {
 val props = new Properties()
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"zly:9092")
 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getNam
e)
 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer].getN
ame)
 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")//開啟幂等性
 props.put(ProducerConfig.RETRIES_CONFIG,"2")//設定重試
 props.put(ProducerConfig.BATCH_SIZE_CONFIG,"100")//設定緩沖區⼤⼩
 props.put(ProducerConfig.LINGER_MS_CONFIG,"1000")//最多延遲1000毫秒
 new KafkaProducer[String,String](props)
 }
 lazy val kafkaProducer:KafkaProducer[String,String]= createKafkaConnection()
 Runtime.getRuntime.addShutdownHook(new Thread(){
 override def run(): Unit = {
 kafkaProducer.close()
 }
 })
 def save(vs: Iterator[(String, Int)],topic:String,): Unit = {
 try{
 vs.foreach(tuple=>{
 val record = new ProducerRecord[String,String] (topic,tuple._1,tuple._2.toString)
 kafkaProducer.send(record)
 })
 }catch {
 case e:Exception=> println("發郵件,出錯啦~")
 }
 }
}
           

DStream整合DataframeSQL

ssc.socketTextStream("zly", 9999)
 .flatMap(line => line.split("\\s+"))
 .map((_,1))
 .reduceByKeyAndWindow(
 (v1,v2)=>v1+v2,//加上新移⼊元素
 (v1,v2)=>v1-v2,//減去有移除元素
 Seconds(4),Seconds(2),
 filterFunc = t=> t._2 > 0) //過濾掉值=0元素
 .foreachRDD(rdd=>{
	 val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
	 import spark.implicits._
	 val wordsDataFrame = rdd.toDF("word","count")
	 val props = new Properties()
	 props.put("user", "root")
	 props.put("password", "123456")
	 wordsDataFrame .write
	 .mode(SaveMode.Append)
	 .jdbc("jdbc:mysql://zly:3306/mysql","t_wordcount",props)
 })
           

繼續閱讀