天天看點

Spark Streaming整合HDFS與SQL

Maven依賴

<dependencies>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>2.2.2</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.11</artifactId>
		<version>2.2.2</version>
	</dependency>
</dependencies>
           

先測試一下環境,是否與Spark Streaming連接配接

WordCount.scala

package blog

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Author Daniel
  * @Description 測試SparkStreaming連接配接
  **/

object WordCount {
  def main(args: Array[String]): Unit = {
    if (args == null || args.length < 2) {
      println(
        """
          |Usage: <host> <port>
        """.stripMargin)
      System.exit(-1)
    }
    val Array(host, port) = args
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("WordCount")
    //batchduration表示每一次送出的含義是每隔多長時間産生一個批次batch,即送出一次sparkstreaming作業
    val batchInterval = Seconds(2)
    //程式設計入口
    val ssc = new StreamingContext(conf, batchInterval)

    //具體業務

    //為了容錯,流式資料的特點,一旦丢失就找不回來了,是以要進行持久化
    val input: ReceiverInputDStream[String] = ssc.socketTextStream(host, port.toInt, StorageLevel.MEMORY_AND_DISK_SER_2)
    //wordcount
    val retDStream: DStream[(String, Int)] = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    //列印結果
    retDStream.print()
    //啟動
    ssc.start()
    //保證streaming作業持續不斷的運作
    ssc.awaitTermination()
  }
}
           

設定參數為hadoop01 9999

Spark Streaming整合HDFS與SQL

通過nc來測試

首先安裝

sudo yum -y install nc
           

打開端口9999

nc -lk hadoop01 9999
           

啟動程式,并在傳入資訊

Spark Streaming整合HDFS與SQL

可以看到結果成功被輸出到控制台

Spark Streaming整合HDFS與SQL

測試環境沒問題了之後,進行與HDFS的整合

StreamingHDFS.scala

package blog

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Author Daniel
  * @Description Spark Streaming 整合HDFS
  **/

//sparkstreaming和hdfs整合 讀取hdfs中新增的檔案
object StreamingHDFS {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingHDFS")
      .setMaster("local")
    val batchInterval = Seconds(2)
    val ssc = new StreamingContext(conf, batchInterval)
    //        val input:DStream[String] = ssc.textFileStream("file:///F:/data/")//讀取本地檔案
    //讀取hdfs中的檔案,監控HDFS上檔案的變化
    val input: DStream[String] = ssc.textFileStream("hdfs://bde/data/words")
    val ret = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    //周遊每個rdd
    ret.foreachRDD((rdd, time) => {
      //如果RDD不為空則輸出
      if (!rdd.isEmpty()) {
        println(s"Time: $time")
        rdd.foreach(println)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
           

拷貝hdfs-site.xml與core-site.xml到目前目錄!!

Spark Streaming整合HDFS與SQL

準備一些資料檔案,上傳到hdfs

1.txt

hello
word
hello
ww
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
hello
word
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
           

啟動程式,上傳檔案至hdfs

hdfs dfs -put 1.txt /data/words/
           
Spark Streaming整合HDFS與SQL

隻要是流式的檔案操作,Streaming都能監控到,是以可以自行寫一個寫檔案操作

WriteFile.java

package blog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

/**
  * @Author Daniel
  * @Description 流式寫入檔案到HDFS
  **/

object WriteFile {
  def main(args: Array[String]): Unit = {
    //設定使用者名避免無權限
    System.setProperty("HADOOP_USER_NAME", "hadoop")
    val uri = new URI("hdfs://bde/")
    val fs = FileSystem.newInstance(uri, new Configuration())
    val fos = fs.create(new Path("/data/words/write.txt"))
    fos.write("hello spark\nhello streaming\nhello successfully".getBytes())
    fs.close()

  }
}
           
package sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Author Daniel
  * @Description Spark Streaming 整合Spark SQL
  *              統計不同品牌下的不同産品的銷售情況
  *              資料格式:
  *              001|mi|moblie
  *              005|huawei|moblie
  *
  **/
object StreamingSQL {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingSQL")
      .setMaster("local[*]")
    //送出頻率
    val batchInterval = Seconds(2)
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, batchInterval)
    val lines = ssc.socketTextStream("hadoop01", 9999)
    //使用checkpoint進行管理
    ssc.checkpoint("file:///E:/data/checkpoint/ck2")
    //計算的是截止到目前為止的狀态資訊
    val usb = lines.map(line => {
      val key = line.substring(line.indexOf("|") + 1)
      (key, 1)
      //spark streaming算子操作
    }).updateStateByKey[Int]((seq: Seq[Int], option: Option[Int]) => Option(seq.sum + option.getOrElse(0)))
    //top3
    usb.foreachRDD((rdd) => {
      if (!rdd.isEmpty()) {
        import spark.implicits._
        //執行個體化RDD
        val rowRDD = rdd.map { case (brandCategory, count) => {
          val brand = brandCategory.substring(0, brandCategory.indexOf("|"))
          val category = brandCategory.substring(brandCategory.indexOf("|") + 1)
          MyRow(category, brand, count)
        }
        }
        val df = rowRDD.toDF("category", "brand", "count")
        df.createOrReplaceTempView("sale")
        val sql =
          """
            |select
            |  category,
            |  brand,
            |  count,
            |  row_number() over(partition by category order by count desc) as rank
            |from sale
            |having rank < 4
          """.stripMargin
        spark.sql(sql).show()
      }
    })


    ssc.start()
    ssc.awaitTermination()
  }
}

case class MyRow(category: String, brand: String, count: Int)
           
001|mi|moblie
002|mi|moblie
003|mi|moblie
004|mi|moblie
005|huawei|moblie
006|huawei|moblie
007|huawei|moblie
008|Oppo|moblie
009|Oppo|moblie
010|uniqlo|colthing
011|uniqlo|colthing
012|uniqlo|colthing
013|uniqlo|colthing
014|uniqlo|colthing
015|selected|colthing
016|selected|colthing
017|selected|colthing
018|Armani|colthing
019|lining|sports
020|nike|sports
021|adidas|sports
022|nike|sports
023|anta|sports
024|lining|sports
025|lining|sports
           
nc -lk hadoop01 9999
           

繼續閱讀