天天看点

SparkStreaming过滤黑名单数据简单案例

根据自己制定的规则,判断出黑名单ip,保存起来。

在后续产生的数据中,与黑名单数据进行比对,进行筛选。

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object BlackListDemo {
  def main(args: Array[String]): Unit = {
    //SparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName(BlackListDemo.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
    //核心步骤:
    //①指定黑名单,并封装到RDD中
    //这里只是一个简单的模拟黑名单数据,真实环境下是从别的地方读取的
    val blackListRDD: RDD[(String, String)] = sc.parallelize(Seq(("27.19.74.143", ""), ("110.52.250.126", "")))
    //Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.

    //val bcBlackLstRDD = sc.broadcast(blackListRDD.collect())

    //②使用transform算子分析DStream中的每个RDD,与外部的RDD进行leftOuterJoin操作,筛选出白名单
    //110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542
    //待分析的DStream中每个RDD中的元素,如:("110.52.250.126","110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542")
    // 黑名单RDD中的元素,如:("110.52.250.126","")
    //("110.52.250.126",("110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542",Some("")))
    //("110.52.250.127",("110.52.250.126##2017-05-30 17:38:20##GET /static/image/common/logo.png HTTP/1.1##200##4542",NONE))

    //③显示白名单信息
    ssc.socketTextStream("node01", 6666, StorageLevel.MEMORY_ONLY)
      .map(perLine => {
        val ip = perLine.split("##")(0)
        (ip, perLine)
      }).transform(rdd => {
      //val blackLst = bcBlackLstRDD.value

      //RDD中的全部数据
      val leftAfterRDD: RDD[(String, (String, Option[String]))] = rdd.leftOuterJoin(blackListRDD)

      //filter出白名单,筛选掉黑名单
      val filterAfterWhiteLst: RDD[(String, (String, Option[String]))] = leftAfterRDD.filter(_._2._2 == None)
      //数据变形
      filterAfterWhiteLst.map(_._2._1)
    }).print(1000)
    ssc.start
    ssc.awaitTermination
  }
}
           

继续阅读