根据自己制定的规则,判断出黑名单ip,保存起来。
在后续产生的数据中,与黑名单数据进行比对,进行筛选。
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListDemo {
def main(args: Array[String]): Unit = {
//SparkSession
val spark: SparkSession = SparkSession.builder()
.appName(BlackListDemo.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
//核心步骤:
//①指定黑名单,并封装到RDD中
//这里只是一个简单的模拟黑名单数据,真实环境下是从别的地方读取的
val blackListRDD: RDD[(String, String)] = sc.parallelize(Seq(("27.19.74.143", ""), ("110.52.250.126", "")))
//Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.
//val bcBlackLstRDD = sc.broadcast(blackListRDD.collect())
//②使用transform算子分析DStream中的每个RDD,与外部的RDD进行leftOuterJoin操作,筛选出白名单
//110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542
//待分析的DStream中每个RDD中的元素,如:("110.52.250.126","110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542")
// 黑名单RDD中的元素,如:("110.52.250.126","")
//("110.52.250.126",("110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542",Some("")))
//("110.52.250.127",("110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542",NONE))
//③显示白名单信息
ssc.socketTextStream("node01", 6666, StorageLevel.MEMORY_ONLY)
.map(perLine => {
val ip = perLine.split("##")(0)
(ip, perLine)
}).transform(rdd => {
//val blackLst = bcBlackLstRDD.value
//RDD中的全部数据
val leftAfterRDD: RDD[(String, (String, Option[String]))] = rdd.leftOuterJoin(blackListRDD)
//filter出白名单,筛选掉黑名单
val filterAfterWhiteLst: RDD[(String, (String, Option[String]))] = leftAfterRDD.filter(_._2._2 == None)
//数据变形
filterAfterWhiteLst.map(_._2._1)
}).print(1000)
ssc.start
ssc.awaitTermination
}
}