天天看点

spark streaming WordCount实例演示

一、首先准备一台虚拟机

安装NC

yum -y install nc      

然后

nc -lk 9999      

二、打开IDEA

(1)创建一个maven项目,并修改pom文件如下

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.kgc.demo.scala</groupId>
    <artifactId>scala-demo-project</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>scala-demo-project</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.kgc.kafak.demo.ThreadProducer</mainClass>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>      

(2)等待依赖下载完成后,配置sdk,以及赋予文件权限

spark streaming WordCount实例演示
spark streaming WordCount实例演示
spark streaming WordCount实例演示

三、四个演示操作

操作之前,先关闭虚拟防火墙

systemctl stop firewalld      

1.WordCount

虚拟机端执行如下命令,并在IDEA中执行代码后输入字符即可进行wordcount

spark streaming WordCount实例演示

此处注意要修改虚拟机主机映射地址为自己虚拟机的映射或者 是IP地址,否则连接失败

object WordCountForSocket {
  /**
    *
    *
    * 1. 你写的代码在哪里执行
    *
    *
    *
    * @param args
    */

  def main(args: Array[String]): Unit = {



    val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

    //每隔5秒钟 做一次workcount
    val ssc = new StreamingContext(conf, Durations.seconds(5))


    ssc.sparkContext.setLogLevel("Error")
     // 在 Driver上执行(执行一次)
    println("------------------------------------------------------------------------------------start。。。。。")


    //先启动nc -lk  9999
    val lines = ssc.socketTextStream("nodefive", 9999)

    // 在 exector上执行
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)

    result.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()


  }
  }      

2.有状态统计

需要设置checkpoint,其他操作同上,一样是要修改主机映射

object UpdateStateByKey {

  def main(args: Array[String]): Unit = {


    // 计算每个单词出现的次数的总和 hello   world   每个商品总的销售额
    // 首选要保存上一个批次的计算结果
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")


    val ssc = new StreamingContext(conf, Durations.seconds(5))

    ssc.sparkContext.setLogLevel("Error")

    ssc.checkpoint("./updatestatebykey")

    val lines = ssc.socketTextStream("nodefive", 9999)

    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    //scala 编程的时,尽量通过参数名传递更多的信息
    val result: DStream[(String, Int)] = wordAndOne.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {
      Some(preValue.getOrElse(0) + currentValues.sum)
    })
    result.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()


  }
  }      

3.DriverHA

文件结构如下(KAFAK.DEMO目录不需要)

(1)首先创建好文件夹,并把word.txt文件放到copyFileWord目录下

word.txt

hello world
hello java
hello scala      

其实就是一个输入目录,一个输出目录,

spark streaming WordCount实例演示

(2)先执行util包中CopyFileToDirectory,复制文件

这里注意两个路径要替换成自己的,在IEDA中选中对应文件/文件夹 右键COPY PATH即可

package util

import java.io.{File, FileInputStream, FileOutputStream}
import java.util.UUID

object CopyFileToDirectory {


  def main(args: Array[String]): Unit = {
    while (true) {
      Thread.sleep(5000);
      val uuid = UUID.randomUUID().toString();
      println(uuid);
      copyFile(new File("D:\\IDEA_project\\aa2_17\\src\\test\\java\\data\\copyFileWord\\word.txt"), new File("D:\\IDEA_project\\aa2_17\\src\\test\\java\\data\\streamingCopyFile\\" + uuid + "----words.txt"));
    }
  }

  /**
    * 复制文件到文件夹目录下
    */
  def copyFile(fromFile: File, toFile: File): Unit = {
    val ins = new FileInputStream(fromFile);
    val out = new FileOutputStream(toFile);
    val buffer = new Array[Byte](1024 * 1024)
    var size = 0
    while (size != -1) {
      out.write(buffer, 0, buffer.length);
      size = ins.read(buffer)
    }
    ins.close();
    out.close();
  }

}      

(3)复制文件成功后,再执行SparkStreamingDriverHA

package sparkstreaming.demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Durations, StreamingContext}

object SparkStreamingDriverHA {

  val checkpointDir = "data/streamingCheckpoint"

  def main(args: Array[String]): Unit = {

    //checkpoint中有就恢复,没有就创建
    val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointDir, createStreamingContext)
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }

  def createStreamingContext() = {
    println("-----------------Create new StreamingContext ----------------")
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("DriverHA")
    val ssc: StreamingContext = new StreamingContext(conf, Durations.seconds(5))
    ssc.sparkContext.setLogLevel("Error")

    /**
      * 默认checkpoint 存储:
      *     1.配置信息
      *     2.DStream操作逻辑
      *     3.job的执行进度
      * * 4.offset
      */

    //监测目录,有文件就处理
    ssc.checkpoint(checkpointDir)
    val lines: DStream[String] = ssc.textFileStream("D:\\IDEA_project\\aa2_17\\src\\test\\java\\data\\streamingCopyFile")
    val words: DStream[String] = lines.flatMap(line => {
      line.trim.split(" ")
    })
    val pairWords: DStream[(String, Int)] = words.map(word => {
      (word, 1)
    })
    val result: DStream[(String, Int)] = pairWords.reduceByKey((v1: Int, v2: Int) => {
      v1 + v2
    })

    result.print()

    /**
      * 更改逻辑
      */
//    result.foreachRDD(pairRDD => {
//      pairRDD.filter(one => {
//        println("*********** filter *********")
//        true
//      })
//      pairRDD.foreach(println)
//    })

    ssc
  }

}      

4.WindowOperatorDemo

package sparkstreaming.demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object WindowOperatorDemo {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Durations.seconds(5))
    ssc.sparkContext.setLogLevel("Error")

    val lines = ssc.socketTextStream("192.168.202.143", 9999)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    // 窗口长度  计算数据的范围
    // 窗口的步长   触发计算的时间间隔
    // 都必须是 时间间隔的整数倍
    // 前一天   24小时内, 一小时触发一次
//    val result = wordAndOne.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,
//      Durations.seconds(15),
//      Durations.seconds(5))

    ssc.checkpoint("./windowoprator")
    val result = wordAndOne.reduceByKeyAndWindow((total: Int, enterValue: Int) => total + enterValue,
      (total: Int, leaveValue: Int) => total - leaveValue,
      Durations.seconds(15),
      Durations.seconds(5))


    result.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }

}      

继续阅读