package com.scala.my
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext
/**
*
* @author root
* 過濾黑名單
步驟:
* 1\在h15上開啟端口9999:#nc -lk 9999
* 2\run本程式
* 3\在h15的dos界面輸入日志内容:(注意格式是"日期+名稱")
* 0509 yasaka
* 0509 lily
* 0509 cassie
* 4\如果console沒有列印lily,那麼測試通過
* 測試結果:通過
*/
object TransformBlackList {
def main(args: Array[String]): Unit = {
//擷取streamingContext
val sc=new StreamingContext(new SparkConf().setAppName("transform").setMaster("local[2]"),Durations.seconds(8))
/**
* 建立模拟資料
*/
val black=List(("lily",true))
//需要sparkContext
val blackRDD=sc.sparkContext.parallelize(black)
//監聽h15上的9999端口
val logs=sc.socketTextStream("h15", 9999)
//分隔map
val ds=logs.map { x => (x.split(" ")(1),x)}
//建立transform操作
val endDs =ds.transform( my=>{
//左内連接配接:對于rdd和DStream連接配接 join是rdd和rdd連接配接
val joinsRDD=my.leftOuterJoin(blackRDD)
//過濾
val endRDD=joinsRDD.filter(tuple=>{
/**
* 舉例說明:
* val cd=scores.getOrElse("Bob", 0)
* 如果scores包含Bob,那麼傳回Bob,如果不包含,那麼傳回0
*/
//意思是:tuple._2._2能get到值,傳回值,如果不能得到值,傳回false
if (tuple._2._2.getOrElse(false)) {
false
}else{
true
}
})
//傳回值
endRDD.map(_._2._1)
})
//列印
endDs.print()
//開啟
sc.start()
//等待
sc.awaitTermination()
//關閉資源
sc.stop()
}
}