天天看點

[Spark streaming舉例]--實作過濾黑名單

package com.scala.my

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext

/**
 * 
 * @author root 
 *   過濾黑名單 
          步驟:
 *          1\在h15上開啟端口9999:#nc -lk 9999
 *          2\run本程式
 *          3\在h15的dos界面輸入日志内容:(注意格式是"日期+名稱")
 *          0509 yasaka
 *          0509 lily
 *          0509 cassie
 *          4\如果console沒有列印lily,那麼測試通過
 *         測試結果:通過
 */
object TransformBlackList {
  def main(args: Array[String]): Unit = {
    //擷取streamingContext
    val sc=new StreamingContext(new SparkConf().setAppName("transform").setMaster("local[2]"),Durations.seconds(8))
    /**
     * 建立模拟資料
     */
    val black=List(("lily",true))
    //需要sparkContext
    val blackRDD=sc.sparkContext.parallelize(black)
    //監聽h15上的9999端口
    val logs=sc.socketTextStream("h15", 9999)
    //分隔map
    val ds=logs.map { x => (x.split(" ")(1),x)}
    //建立transform操作
    val endDs =ds.transform( my=>{
      //左内連接配接:對于rdd和DStream連接配接     join是rdd和rdd連接配接
      val joinsRDD=my.leftOuterJoin(blackRDD)
      //過濾  
      val endRDD=joinsRDD.filter(tuple=>{
        /**
         * 舉例說明:
         * val cd=scores.getOrElse("Bob", 0)
         * 如果scores包含Bob,那麼傳回Bob,如果不包含,那麼傳回0
         */
        //意思是:tuple._2._2能get到值,傳回值,如果不能得到值,傳回false
        if (tuple._2._2.getOrElse(false)) {
false
}else{
true
}
      })
      //傳回值
     endRDD.map(_._2._1)
    })
    //列印
    endDs.print()
    //開啟
    sc.start()
    //等待
    sc.awaitTermination()
    //關閉資源
    sc.stop()
  }
}      

繼續閱讀