天天看点

Spark基础全解析SPARK全解析

SPARK全解析

标签(空格分隔): Spark

  • SPARK全解析
    • Spark是什么?
    • Spark 源码编译
    • Spark本地模式安装配置及Spark Shell基本使用
    • Spark集群
    • Spark Application开发、运行及监控(IDEA)
    • Spark 日志监控(HistoryServer)配置
    • Spark RDD
    • Spark共享变量
    • Spark 内核
      • Spark 依赖
      • Spark Shuffle
      • Spark Application添加jar包的三种方法
    • Spark SQL
      • Spark SQL的发展历程
      • DataFrame
      • SparkSQL案例
      • Spark 集成
      • SPARK中聚合函数
      • SPARK中定义UDF、UDAF
    • Spark Streaming
      • Spark Streaming工作原理
      • DStream
      • Spark Streaming编程模型
      • Spark Streaming 读取HDFS数据
      • Spark Streaming的集成
      • Spark Streaming常用API解析
      • Spark Streaming的窗口函数
    • 喜欢我的文章请关注微信公众号DTSpider

Spark是什么?

Spark是Apache的一个顶级项目,是一个快速、通用的大规模数据处理引擎。以下是它的几个特点 :

  • Speed

    存储在内存中的数据,Spark比Hadoop的MapReduce快100多倍,存储在磁盘中的数据要快10多倍。

  • Easy of Use

    开发Spark应用程序可以使用Java、Scala、Python、R等编程语言

  • Generality

    Spark提供了SparkSQL、Streaming、MLlib、GraphX,功能强大。一站式解决需求。

  • Runs Everywhere

    Spark可以运行在Hadoop的Yarn上、Mesos上、以及它自身的standalone上,处理的文件系统包括HDFS、Cassandra、HBase、S3.

    以上部分摘自官网: http://spark.apache.org/

Spark 源码编译

本文以 spark1.6.1版本为例

  • (1)下载源码包
    Spark基础全解析SPARK全解析
    • (2)准备环境

      Spark1.6.1版本编译需要Maven 3.3.3 or newer and Java 7+ 环境

      Spark基础全解析SPARK全解析
    • (3)编译

      –1 解压spark源码

      –2 在执行编译前修改$SPARK_HOME下的make-distribution.sh文件如下

      Spark基础全解析SPARK全解析

      –3 编译apache hadoop,需要配置镜像文件

      路径:

      /opt/modules/apache-maven-3.3.3/conf/settings.xml

      配置内容:
      Spark基础全解析SPARK全解析

      如果是cdh版本hadoop,则必须去掉该镜像

      –配置域名解析服务器

      # vi /etc/resolv.conf

      内容:

      nameserver 8.8.8.8 nameserver 8.8.4.4

      –4 执行编译(根据所使用的Hadoop版本进行编译)

—-针对APACH HADOOP

./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -Phive-thriftserver -Pyarn

—- 针对CDH HADOOP

./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Pyarn

Spark基础全解析SPARK全解析

Spark本地模式安装配置及Spark Shell基本使用

1、Spark安装环境准备:

  • JAVA
  • HDFS(HDFS是否脱离了安全模式)
  • SCALA
    Spark基础全解析SPARK全解析

2、Spark安装

  • 将编译好的cdh版本的spark赋予执行权限,解压至指定目录
    Spark基础全解析SPARK全解析
  • 通过notepad++配置$SPARK_HOME目录下conf下的配置文件

    ①日志配置

    更改log4j.properties.template文件名为log4j.properties

    ②配置spark-env.sh

    Spark基础全解析SPARK全解析
  • 配置完成

3、测试Spark Shell命令行

使用Spark RDD进行简单测试:

  • 启动spark交互式命令行:bin/spark-shell 并编程测试wordcount
    • HDFS上的数据源
    Spark基础全解析SPARK全解析
    • 定义rdd读取数据源
    Spark基础全解析SPARK全解析
    • 使用rdd.map(line => line.split(“ ”)) 可以将文件按空格进行分割,分割之后会变成数组
    Spark基础全解析SPARK全解析
    • 再在其后面加上.collect之后查看输出
    Spark基础全解析SPARK全解析
    • 这里需使用flatMap代替map对该数组进行一个压平的操作,即: rdd.flatMap(line => line.split(” “)).collect,输出的为压平后的一个个单词
      Spark基础全解析SPARK全解析
    • 再使用map操作将其变为元组对,即:rdd.flatMap(line => line.split(” “)).map(word => (word,1)).collect

      输出结果:

      Spark基础全解析SPARK全解析
    • 进行到这一步,再使用reduceByKey()就可完成wordcount了,reduceByKey中的ByKey使数组中的元组对按key进行排序,reduce进行相加。

      即:

      rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a + b))

      Spark基础全解析SPARK全解析
    • 上一步即完成了对数据的处理,再对其赋值之后保存,即可完成wordcount

      赋值:

      val wordcountRDD = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a + b))

      Spark基础全解析SPARK全解析
    • 保存:

      wordcountRDD.saveAsTextFile("/user/vin/wordcount/output")

      Spark基础全解析SPARK全解析

Spark集群

Spark Cluster 可以运行在Yarn上,由Yarn进行资源管理和任务调度,还可以运行在其自带的资源管理调度框架Standalone上,分为主节点Master(类似于yarn的resourcemanager)和从节点Work(类似于yarn的nodemanager),不同的是一台机器上可以运行多个Work。

Spark架构原理图:

Spark基础全解析SPARK全解析

说明:Job:包含了由Spark Action催生的多个Tasks,是一个并行计算

Stage:一个Job分为了多个彼此依赖的Stage,每个Stage又包含了一系列的Task,类似于MapReduce的Map阶段和reduce阶段。

- Spark集群安装部署

- 配置$SPARK_HOME目录下conf下的配置文件

1 配置spark-env.sh

参考官网:http://spark.apache.org/docs/1.6.1/spark-standalone.html#installing-spark-standalone-to-a-cluster

2 配置slaves

配置运行Work的主机名

3 启动

在sbin目录里使用

start-master.sh start-slaves.sh

//启动所有的从节点,使用此命令时,运行此命令的机器,必须要配置与其他机器的SSH无密钥登录,否则启动的时候会出现一些问题

Spark基础全解析SPARK全解析

4 Spark的的web监控端口为8080,URL为7070,Job监控4040,都是自动增长

Spark基础全解析SPARK全解析

5 测试Spark集群

spark-shell是spark的一个application,将其运行在spark standalone上,通过输入: bin/spark-shell –help 查看其运行方法

Spark基础全解析SPARK全解析

启动: bin/spark-shell –master spark://vin01:7077

Spark基础全解析SPARK全解析

Spark Application开发、运行及监控(IDEA)

  • 在IDEA中创建scala Project,并添加spark依赖包

    步骤:File -> Project Structure -> Libraries -> +号 -> java -> 选择编译好的spark目录下的lib依赖包

    Spark基础全解析SPARK全解析
  • 导入依赖包之后即可进行程序开发,新建包、在包中创建Scala class之SparkApp
  • 编程
  • 配置resurces(在新建scala Project时,创建resources)

    由于程序中需要读hdfs上的数据文件,所以需要将hadoop的配置文件hdfs-site.xml 与core-site.xml 文件拷贝到scala project的resources中

    Spark基础全解析SPARK全解析
  • 本地运行

    使用Idea工具可以直接运行在本地模式,无需插件

    运行查看输出:

    Spark基础全解析SPARK全解析
  • 打包在Spark shell上提交运行(bin/spark-submit …)

    步骤:

    1 打包

    File -> project structure -> Artifacts -> + -> jar

    Spark基础全解析SPARK全解析
    2 选择类
    Spark基础全解析SPARK全解析
    3 去除依赖包(因为集群上本身有)
    Spark基础全解析SPARK全解析
    4 上述步骤设置了打哪个包,还需要build进行打包
    Spark基础全解析SPARK全解析
    5 将jar包上传至linux下并赋予执行权限:此处为方便上传到Spark主目录下
    Spark基础全解析SPARK全解析
    6 提交任务
    Spark基础全解析SPARK全解析
    7 先测试本地
    Spark基础全解析SPARK全解析
    8 测试集群:此时需要将代码中的master注释掉再重新打包,重新打包直接用rebuild即可
    Spark基础全解析SPARK全解析

    9 启动spark Standalone

    查看8080端口是否有资源

    提交任务

    bin/spark-sumit --master spark://vin01:7077 scalaProject.jar

    Spark基础全解析SPARK全解析
    10 监控
    Spark基础全解析SPARK全解析

Spark 日志监控(HistoryServer)配置

Spark HistoryServer配置分为两个部分:

第一、设置SparkApplicaiton在运行时,需要记录日志信息

配置:配置$SPARK_HOME目录下conf下spark-defaults.conf文件

Spark基础全解析SPARK全解析

第二、启动HistoryServer,通过界面查看

配置Spark主目录下conf下spark-env.sh文件

SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://vin01/user/vin/sparkEventLogs"

配置完成启动服务
Spark基础全解析SPARK全解析
它端口号是18080
Spark基础全解析SPARK全解析

Spark RDD

  1. RDD是什么

    官方解释:

    RDD是Spark的基本抽象,是一个弹性分布式数据集,代表着不可变的,分区(partition)的集合,能够进行并行计算。也即是说:

    • 它是一系列的分片、比如说128M一片,类似于Hadoop的split;
    • 在每个分片上都有一个函数去执行/迭代/计算它
    • 它也是一系列的依赖,比如RDD1转换为RDD2,RDD2转换为RDD3,那么RDD2依赖于RDD1,RDD3依赖于RDD2。
    • 对于一个Key-Value形式的RDD,可以指定一个partitioner,告诉它如何分片,常用的有hash、range
    • 可选择指定分区最佳计算位置
  2. 创建RDD的两种方式
    • 方式一:

      将集合进行并行化操作

      List\Seq\Array

      演示:

      Spark基础全解析SPARK全解析
    • 方式二:

      外部存储系统

      HDFS, HBase, or any data source offering a Hadoop InputFormat.

      Spark基础全解析SPARK全解析
  3. RDD的三大Operations
    • Transformation

      从原有的一个RDD进行操作创建一个新的RDD,通常是一个lazy过程,例如map(func) 、filter(func),直到有Action算子执行的时候

    • Action

      返回给驱动program一个值,或者将计算出来的结果集导出到存储系统中,例如count() reduce(func)

    • Persist

      将数据存储在内存中,或者存储在硬盘中

      例如: cache() persist() unpersist()

      合理使用persist()和cache()持久化操作能大大提高spark性能,但是其调用是有原则的,必须在transformation或者textFile后面直接调用persist()或cache(),如果先创建的RDD,然后再起一行调用这两个方法,则会报错

  4. RDD的常用Transformation

    – map(func) :返回一个新的分布式数据集,由每个原元素经过func函数转换后组成

    spark shell本地测试:

val numbers = Array(, , , , )
    val numberRDD = sc.parallelize(numbers, )  
    val multipleNumberRDD = numberRDD.map ( num => num *  )
    multipleNumberRDD.foreach ( num => println(num) ) 
           
Spark基础全解析SPARK全解析

– filter(func) : 返回一个新的数据集,由经过func函数后返回值为true的原元素组成

val numbers = Array(, , , , , , , , , )
    val numberRDD = sc.parallelize(numbers, )
    val evenNumberRDD = numberRDD.filter { num => num %  ==  }
    evenNumberRDD.foreach { num => println(num) }  
           
Spark基础全解析SPARK全解析

– flatMap(func) : 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

val lineArray = Array("hello you", "hello me", "hello world")  
    val lines = sc.parallelize(lineArray, )
    val words = lines.flatMap { line => line.split(" ") }   
    words.foreach { word => println(word) }
           
Spark基础全解析SPARK全解析

– union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成

– groupByKey([numTasks]) :在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task

val scoreList = Array(Tuple2("class1", ), Tuple2("class2", ),Tuple2("class1", ), Tuple2("class2", ))
    val scores = sc.parallelize(scoreList, )  
    val groupedScores = scores.groupByKey() 
     groupedScores.foreach(score => { 
      println(score._1); 
      score._2.foreach { singleScore => println(singleScore) };
      println("=============================")  })
           
Spark基础全解析SPARK全解析

– reduceByKey(func, [numTasks]) : 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。在实际开发中,能使reduceByKey实现的就不用groupByKey

val scoreList = Array(Tuple2("class1", ), Tuple2("class2", ),Tuple2("class1", ), Tuple2("class2", ))
    val scores = sc.parallelize(scoreList, )  
    val totalScores = scores.reduceByKey(_ + _)  
    totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))  
           
Spark基础全解析SPARK全解析

– join(otherDataset, [numTasks]) :在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集

val studentList = Array(
        Tuple2(, "leo"),
        Tuple2(, "jack"),
        Tuple2(, "tom"));
   val scoreList = Array(
        Tuple2(, ),
        Tuple2(, ),
        Tuple2(, ));
    val students = sc.parallelize(studentList);
    val scores = sc.parallelize(scoreList);
    val studentScores = students.join(scores)  
    studentScores.foreach(studentScore => { 
      println("student id: " + studentScore._1);
      println("student name: " + studentScore._2._1)
      println("student socre: " + studentScore._2._2)  
      println("=======================================")  
    })  
           
Spark基础全解析SPARK全解析

– groupWith(otherDataset, [numTasks]) : 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup

– cartesian(otherDataset) : 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。

– repartition():重新分区,当数据处理到最后剩下很少的数据集时,可以使用repartition()进行重新分区

  1. 常用Action

    – reduce(func) : 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行

val numberArray = Array(, , , , , , , , , )
    val numbers = sc.parallelize(numberArray, )  
    val sum = numbers.reduce(_ + _)  
    println(sum) 
           

–collect() : 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用。

val numberArray = Array(, , , , , , , , , )
    val numbers = sc.parallelize(numberArray, )  
    val doubleNumbers = numbers.map { num => num *  }  
    val doubleNumberArray = doubleNumbers.collect()
    for(num <- doubleNumberArray) {
      println(num)  
    }
           

–count() : 返回数据集的元素个数

val numberArray = Array(, , , , , , , , , )
    val numbers = sc.parallelize(numberArray, )  
    val count = numbers.count()
    println(count)  
           

–take(n) : 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)

val numberArray = Array(, , , , , , , , , )
    val numbers = sc.parallelize(numberArray, )  
    val top3Numbers = numbers.take()
    for(num <- top3Numbers) {
      println(num)  
    }
           

–first() : 返回数据集的第一个元素(类似于take(1))

–saveAsTextFile(path) : 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本

–saveAsSequenceFile(path) : 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)

–foreach(func) : 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

Spark共享变量

  • 共享变量概念

共享变量是Spark一个非常重要的特性,在默认情况下,如果一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中,此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,这种方式是做不到的。

Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),一种是Accumulator(累加变量)。广播变量会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存的消耗。累加变量则可以然多个task共同操作一份变量,主要可以进行累加。

  • 共享变量示例

    Spark提供的Broadcast Variable是只读的,可以通过调用SparkContext的broadcast()方法来针对某个变量创建广播变量。每个节点可以使用广播变量的value()方法来获取值。

val factor = 
   val factorBroadcast = sc.broadcast(factor)

   val arr =Array(,,,,)
   val rdd = sc.parallelize(arr)
   val mutiRdd = rdd.map(num => num*factorBroadcast.value())
   mutiRdd.foreach(num => println(num))
           

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享的操作,task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取。

val sumAccumulator = sc.accumulator()

    val arr = Array(,,,,)
    val rdd = sc.parallelize(arr)
    rdd.foreach(num => sumAccumulator +=num)
    println(sumAccumulator.value)
           

Spark 内核

Spark 依赖

Spark依赖分为宽依赖和窄依赖:

  • 窄依赖
  • 子RDD的每个分区依赖于常数个(即与数据规模无关)
  • 输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatMap
  • 输入输出一对一,但结果RDD的分区结构发生了变化,如 union、coalesce
  • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample
  • 宽依赖
  • 子RDD的每个分区依赖于所有父RDD分区
  • 对单个RDD基于key进行重组和reduce,如 groupByKey、reduceByKey‘
  • 对两个RDD基于key进行join和重组,如join

    区分宽依赖和窄依赖是根据父RDD的每个分区数据给多少个子RDD的每个分区数据:

    1 -> 1 :窄依赖

    1 -> N :宽依赖,有宽依赖的地方必有shuffle

    Spark基础全解析SPARK全解析

Spark Shuffle

  • Shuffle过程的解析

    在Spark RDD的计算处理的过程中,每个宽依赖都伴随着Shuffle。

    首先看Shuffle过程

    Spark基础全解析SPARK全解析

    依图所示:

    假设有一个节点,上面运行了3个shuffleMapTask,每个shuffleMapTask,都会为每个ReduceTask创建一份bucket缓存以及对应的ShuffleBlockFile磁盘文件,shuffleMapTask的输出,会作为MapStatus,发送到DAGScheduler的MapOutputTrackerMaster中,每个MapStatus包含了每个ReduceTask要拉取的数据的大小。

    假设有另外一个节点,上面也运行了4个ReduceTask,现在等着去获取ShuffleMapTask的输出数据,来完成程序定义的算子,而ReduceTask会用BlockStoreShuffleFetcher去MapOutputTrackerMaster获取自己要拉取的文件的信息,然后底层通过BlockManager将数据拉取过来。每个ReduceTask拉取过来的数据,其实就会组成一个内部的RDD,叫shuffleRDD,优先放入内存,其次如果内存不够,那么写入磁盘,最后每个ReduceTask针对数据进行聚合,最后生成MapPartitionRDD,就是执行reduceByKey等操作希望获得的RDD。

Spark Application添加jar包的三种方法

  • –jars

    在bin/spark-submit 后面直接以–jars方式将jar包添加,须写绝对路径

    示例:

    ${SPARK_HOME}/bin/spark-submit --master /opt/jars/sparkexternale/xx1.jar, /opt/jars/sparkexternale/xx2.jar

  • –driver-class-path

    示例:

bin/spark-shell \
--master local[] \
--driver-class-path /opt/modules/spark--bin--cdh5/exlibs/mysql-connector-java--bin.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-client--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-common--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-protocol--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-server--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/htrace-core-.jar:/opt/modules/spark--bin--cdh5/exlibs/protobuf-java-.jar:/opt/modules/spark--bin--cdh5/exlibs/guava-.jar:/opt/modules/spark--bin--cdh5/exlibs/hive-hbase-handler--cdh5.jar
           
  • SPARK_CLASSPATH

    配置此环境变量:通常在企业中,提交Application使用脚本的方式,比如spark-app-submit.sh脚本,通常一个App设置一个脚本,即设置一个SPARK CLASSPATH

    //spark-app-submit.sh:

#!/bin/sh
## SPARK_HOME
SPARK_HOME=/opt/cdh5./spark-.-bin-.-cdh5.
## SPARK CLASSPATH
SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar

${SPARK_HOME}/bin/spark-submit --master spark://vin01:7077 --deploy-mode cluster /opt/tools/scalaProject.jar
           

Spark SQL

Spark SQL的发展历程

Spark 作为一个优秀的大数据计算框架,必然少不了支持SQL的框架,基于Hive的性能以及它与Spark的兼容,Shark项目由此诞生,Shark即Hive on Spark,它通过HQL解析,将HQL翻译成Spark上的RDD操作,然后通过Hive的metadata获取数据库里的表信息,实际的HDFS上的数据和文件会有Spark获取并放到Spark上计算。

在Hive中,处理SQl的过程如下:

SQL –> 语法解析–>逻辑计划(优化)–>物理计划–>MapReduce

而Shark的诞生,是将某个Hive版本源码拿过来进行修改“物理计划”的部分,将其转化为Spark而不是MapReduce,这有很大的弊端,比如依赖于Hive。

Shark的SQL处理过程如下:

SQL –> 语法解析–>逻辑计划(优化)–>物理计划–>Spark

在1.0之后SparkSQL诞生,它涵盖了Shark的所有特性,SparkSQL不再使用Hive的解析引擎,即不再与Hive共用语法解析和逻辑计划,它有了自己的解析引擎Catalyst。

Spark基础全解析SPARK全解析
spark SQL的三大愿景就是:① Write less code ② Read less data ③ Let the optimizer do the hard work

DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二位表格,它与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二位数据集的每一列都带有名称和类型。使得SparkSQL得以洞察更多的结构信息,从而对其背后的额数据源以及作为用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标,反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单通用的流水线优化

Spark基础全解析SPARK全解析
  • DataFrame VS RDD 及DataFrame的创建

    在Spark安装目录下有示例数据,将其上传到HDFS上。

    Spark基础全解析SPARK全解析
    创建RDD,首选创建一个类来封装rdd
    Spark基础全解析SPARK全解析
    再创建rdd读取该数据文件,并处理之后传递给People类,这样就得到了一个存储类的rdd
    Spark基础全解析SPARK全解析
    得到的结果是:
    Spark基础全解析SPARK全解析

    从结果可以看出,创建出来的RDD只知道它存储的是一个类,具体类的参数名称的信息都不清楚。

    而如果创建DataFrame呢

    使用json格式数据创建DataFrame,(DataFrame中的read方法可以直接读取json格式的数据

    Spark基础全解析SPARK全解析
    创建DataFrame的入口是sqlContext或者HiveContext
    Spark基础全解析SPARK全解析
    得到的结果是:
    Spark基础全解析SPARK全解析

    从创建的DataFrame可以得到数据的字段名,数据类型等。

    DataFrame的创建

    1. 通过内置方法读取外部数据源,数据源可以是以下格式:

      json hive jdbc parquet/orc text

    2. 通过scala的CASE CLASS转换

      首先创建一个case class,名为Employee,并且定义id和name两个参数

      case class Employee(id: Int, name: String)

      我们可以通过很多方式来初始化Employee类,比如从关系型数据库中获取数据以此来定义Employee类。但是在本文为了简单起见,我将直接定义一个Employee类的List,如下:

      val listOfEmployees = List(Employee(1, "iteblog"), Employee(2, "Jason"), Employee(3, "Abhi"))

      我们将listOfEmployees列表传递给SQLContext类的createDataFrame 函数,这样我们就可以创建出DataFrame了!然后我们可以调用DataFrame的printuSchema函数,打印出该DataFrame的模式,我们可以看出这个DataFrame主要有两列:name和id,这正是我们定义Employee的两个参数,并且类型都一致。
val empFrame = sqlContext.createDataFrame(listOfEmployees)  
empFrame.printSchema
|-- id: integer (nullable = false)  
|-- name: string (nullable = true)  
           

然后可以使用Spark支持的SQL功能来查询相关的数据。在使用这个功能之前,我们必须先对DataFrame注册成一张临时表,我们可以使用registerTempTable函数实现,如下:

empFrame.registerTempTable("employeeTable")

注册为临时表就可以使用SQL语句来进行查询等操作了

val sortedByNameEmployees = sqlContext.sql("select * from employeeTable order by name desc")  
sortedByNameEmployees.show()  
+-----+-------+  
|empId|   name|  
+-----+-------+  
|    |iteblog|  
|    |  Jason|  
|    |   Abhi|  
+-----+-------+  
           

转换原理:createDataFrame函数可以接收一切继承scala.Product类的集合对象

查看Spark的DataFrame源码中createDataFrame函数

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame

而case class类就是继承了Product。我们所熟悉的TupleN类型也是继承了scala.Product类的,所以我们也可以通过TupleN来创建DataFrame

- 通过RDD进行转换

方式一:

The first method uses reflection to infer the schema of an RDD that contains specific types of objects.

这一种方式是通过自动推断,将RDD反射为DataFrame,但是这个Rdd必须是case class的类型。而且必须使用import sqlContext.implicits._来引包。

演示:

定义一个case class 类People

case class People(name: String, age: Int)

创建一个People类的RDD

val rdd = sc.textFile("/user/vin/people.txt").map(line => line.split(",")).map(x => People(x(0), x(1).trim.toInt))

Spark基础全解析SPARK全解析
转换:

val people_df = rdd.toDF()

Spark基础全解析SPARK全解析

在IDEA编程中需要使用import sqlContext.implicits._,否则jar包出错。

方式二:

The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

a DataFrame can be created programmatically with three steps:

第一步:将RDD(可以是任何类型)转换为RDD[Row]

需要引包:

import org.apache.spark.sql._

演示:

val rdd = sc.textFile("/user/vin/people.txt") import org.apache.spark.sql._ val rowRdd = rdd.map(line => line.split(", ")).map(x => Row(x(0), x(1).toInt))

Spark基础全解析SPARK全解析

第二步:

创建schema

引包:

import org.apache.spark.sql.types._

val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)

Spark基础全解析SPARK全解析
第三步:Apply the schema to the RDD of Rows

val people_df = sqlContext.createDataFrame(rowRdd, schema)

Spark基础全解析SPARK全解析
这种方式类似scala 的case class类创建DataFrame

SparkSQL案例

需求:将Hive中的emp表与mysql中的dept表进行连接查询

一、启动spark-shell

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/modules/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1 bin/spark-shell local[2]

二、引包并建立JDBC连接

val url = "jdbc:mysql://vin01:3306/test?user=root&password=123456" import java.util.Properties val props = new Properties()

Spark基础全解析SPARK全解析

三、创建DataFrame

Spark基础全解析SPARK全解析

四、jion

val join_df = hive_emp_df.join(mysql_dept_df, "deptno")

Spark基础全解析SPARK全解析

五、 将jion出来的值注册为临时表,方便查询

join_df.registerTempTable("join_emp_dept")

查询:

sqlContext.sql("select empno, ename, deptno, deptname, sal from join_emp_dept order by empno").show

Spark基础全解析SPARK全解析

Spark 集成

一、Spark与Hive的集成

Spark SQL通过sqlContext读取Hive中的数据,由于Spark需要读取Hive表中的元数据,所以需要将Hive conf下的hive-site.xml文件传递到Spark conf下,做软链接:

Spark基础全解析SPARK全解析

还需要指定mysql连接jar包:

Spark基础全解析SPARK全解析

测试语句:

Spark基础全解析SPARK全解析

Spark还提供直接使用SQL的命令行:

Spark基础全解析SPARK全解析
Spark基础全解析SPARK全解析

二、Spark与Hbase集成

读取hbase上的数据

在pom文件中添加maven依赖

版本:

<hbase.version>0.98.6-hadoop2</hbase.version>

依赖:

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
           

因为需要读取hbase的配置文件,所以需要将hbase-site.xml文件拷贝到resource中

Spark基础全解析SPARK全解析

我们知道hbase的数据存储在hdfs上,spark读取hbase的数据与mapreduce读取hbase的数据方法是一样的,首先我们在spark core中sparkContext.scala类中找到两个读取hadoop文件的api

Spark基础全解析SPARK全解析
Spark基础全解析SPARK全解析

上面两个类就是读取hadoop文件的函数,这里使用新API进行解析

源码解析:

在newAPIHadoopRDD中,有四个参数conf 、fClass 、kClass 和 vClass,还有一个返回值RDD,

其中conf为设置配置文件,fClass是读取HDFS文件所使用的格式,在这里是TableInputFormat,vClass和KClass的具体含义与mapreduce和hbase的集成中一样。

完整代码:

package org.bigdata.spark.app.hbase

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkContext , SparkConf}
/**
 * Created by hp-pc on 2016/8/10.
 */
object SparkReadHbase {
  def main(args: Array[String]) {
    //step 0: SparkContext
    val sparkConf =new SparkConf()
      .setAppName("SparkReadHbase  Application")
      .setMaster("local[2]")
    //create SparkContext
    val sc = new SparkContext(sparkConf)
    /**
     *def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
      conf: Configuration = hadoopConfiguration,
      fClass: Class[F],* @param fClass Class of the InputFormat
      kClass: Class[K],* @param kClass Class of the keys
      vClass: Class[V]): RDD[(K, V)] = withScope {
    assertNotStopped()        * @param vClass Class of the values
     */
//创建一个rdd读取hbase ,读取hbase需要定义hbase配置,结合hbase与mapreduce集成时hbase配置定义方法,再设置读取哪张表
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE,"user")
    val hbaseRdd = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]

    )
    println(hbaseRdd.count()+"=============================================")
    sc.stop()
  }
}
           

这里只是统计hbase有多少行,如果需要输出hbase具体数据,就需要使用具体api,代码如下:

import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkContext, SparkConf}

object SparkReadHBase {

  def main(args: Array[String]) {
    // step : SparkContext
    val sparkConf = new SparkConf()
      .setAppName("SparkReadHBase Applicaiton") // name
      .setMaster("local[2]") // --master local[] | spark://xx: | yarn

    // Create SparkContext
    val sc = new SparkContext(sparkConf)

    /**
     *
    conf: Configuration  hadoopConfiguration,
      fClass: Class[F],
      kClass: Class[K],
      vClass: Class[V]

      RDD[(K, V)]
     */
    val conf = HBaseConfiguration.create()
    //   /** Job parameter that specifies the input table. */
    // val INPUT_TABLE: String = "hbase.mapreduce.inputtable"
    conf.set(TableInputFormat.INPUT_TABLE, "stu")

    // RDD[(ImmutableBytesWritable, Result)]
    val hbaseRdd = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
    // 上面必须填写,具体含义,与我们讲解的MapReduc与HBase集成是一样的

    println(hbaseRdd.count() + "============================")

    hbaseRdd.map(tuple => {
      val rowkey = Bytes.toString(tuple._1.get())
      val result = tuple._2

      var rowStr = rowkey + ", "

      for(cell <- result.rawCells()){
        rowStr += Bytes.toString(CellUtil.cloneFamily(cell)) + ":" +
          Bytes.toString(CellUtil.cloneQualifier(cell)) + "->" +
          Bytes.toString(CellUtil.cloneValue(cell)) + "----"
      }
      // return
      rowStr
    }).foreach(println)

    sc.stop()

  }

}
           

测试:

在命令行测试,需要导入包,前两种方法都不行,使用SPARK_CLASSPATH,代码如下:

export SPARK_CLASSPATH=/opt/modules/spark--bin--cdh5/exlibs/mysql-connector-java--bin.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-client--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-common--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-protocol--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/hbase-server--cdh5.jar:/opt/modules/spark--bin--cdh5/exlibs/htrace-core-.jar:/opt/modules/spark--bin--cdh5/exlibs/protobuf-java-.jar:/opt/modules/spark--bin--cdh5/exlibs/guava-.jar:/opt/modules/spark--bin--cdh5/exlibs/hive-hbase-handler--cdh5.jar
           

在命令行中粘贴代码:

使用paste

Spark基础全解析SPARK全解析

将读取hbase的代码以粘贴模式输入:

Spark基础全解析SPARK全解析

输出:

Spark基础全解析SPARK全解析

打印hbase信息的代码:

Spark基础全解析SPARK全解析

输出:

Spark基础全解析SPARK全解析

SPARK中聚合函数

在DataFrame类中,有如下五组函数

* @groupname basic

Basic DataFrame functions

* @groupname dfops

Language Integrated Queries

* @groupname rdd

RDD Operations

* @groupname output

Output Operations

* @groupname action

Actions

agg()支持SQL 语言and DSL(Domain)语言,有如下需求:在hbase中存储有emp表,使用agg函数进行统计:

首先创建dataframe:

val emp_df = sqlContext.sql(“select * from tmp”)

或者:

val emp_df = sqlContext.read.table(“emp”)

统计工资的平均值和comm的最大值的几种写法

(1)

emp_df.agg("sal" -> "avg", "comm" -> "max").show

(2)

emp_df.agg(Map("sal" -> "avg", "comm" -> "max")).show

(3)

emp_df.agg(max($"comm"), avg($"sal")).show //DSL写法

按照分组进行统计,即统计各个部门的comm最大值和sal平均值

emp_df.groupBy($"deptno").agg(max($"comm"), avg($"sal")).show

SPARK中定义UDF、UDAF

UDF:在实际开发中,通常在创建sqlContext后,注册UDF、 UDAF,语法:

sqlContext.udf.register( “”, //函数名称 .... //函数体 )

需求:针对hbase中的emp表,如果comm(奖金)是null的话,返回0.0,编写udf:

sqlContext.udf.register(
      "trans_comm", 
      (comm: Double) => {
        if(comm == null){
          
        }else{
          comm
        }
      }
) 
           

注册完成后可以在sql中使用该udf

UDAF:定义UDAF:UDAF是用户自定义聚合函数,它的特点是多对一,即输入多个值,输出一个值。定义UDAF需要继承UserDefinedAggregateFunction 这个基类,然后重写它的抽象方法。

解析:

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{StructType, DataType}

/**
 * Created by hp-pc on 2016/8/17.
 */
object AvgUDAF  extends UserDefinedAggregateFunction{
  override def inputSchema: StructType = ???

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???

  override def bufferSchema: StructType = ???

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???

  override def initialize(buffer: MutableAggregationBuffer): Unit = ???

  override def deterministic: Boolean = ???

  override def evaluate(buffer: Row): Any = ???

  override def dataType: DataType = ???
           

** 其中inputSchema是传递给聚合函数的参数的类型,在sparkSql中都是DataFrame,而DataFrame都是StructType,在这里还要封装,使用StructField,具体实现(以工资sal字段为例、Double类型):

override def inputSchema: StructType = StructType(
  StructField("sal", DoubleType, true) :: Nil
)
           

true代表是否为空,Nil表示空List, :: 符号表示将前面的数组和Nil合并为一个List 。因为StructType里存放的是数组,所以这里把它转换为List,这一段代码就是指定输入字段的类型。

**其中的dataType是指定输出数据的类型,以工资为例的话就是DoubleType类型,具体代码如下:

override def dataType: DataType = DoubleType

**其中的bufferSchema就是依据需求定义的缓冲字段的类型和名称,以求平均工资为例,缓冲的是工资总和和个数两个字段,这里工资总和是Double类型,个数是Int类型,具体代码如下

/**
 * 依据需求定义缓冲数据字段的名称和类型
 */
override def bufferSchema: StructType = StructType(
  StructField("sal_total", DoubleType, true) ::
    StructField("sal_count", IntegerType, true) :: Nil
    )
           

**其中的evaluate函数就是将结果计算并返回,它的Any最后就是DataType ,代码实现如下:

override def evaluate(buffer: Row): Any = {
  val salTotal = buffer.getDouble()
  val salCount = buffer.getInt()

  // return
  salTotal / salCount
}
           

**其中的deterministic是确定唯一性,将其值设为true即可

override def deterministic: Boolean = true

**其中的initialize是初始化定义的字段,这里将Double初始化值为0.0,Int值初始化为0,分别为第一个数据和第二个数据,代码实现如下:

/**
 * 对缓冲数据的字段值进行初始化
 * @param buffer
 */
override def initialize(buffer: MutableAggregationBuffer): Unit = {
  buffer.update(, )
  buffer.update(, )
}
           

**其中的update函数是更新缓存数据的值,从这个函数的参数可以看出它是从bufferSchema函数中获取值,使用buffer.getDouble(0)来获取bufferSchema中的List中第一个值,及sal_total的值,使用buffer.getInt(1)来获取List中的第二个值,及sal_count的值,代码实现如下:

// 获取缓冲数据 val salTotal1 = buffer1.getDouble(0) val salCount1 = buffer1.getInt(1)

上面只是获取缓冲数据,还需要获取新传递的数据,传递的数据葱花inputSchema函数中获取。这些数据需要更新到缓冲中,代码实现如下:

// 获取传递进来的数据 val inputSal = input.getDouble(0)

接收到数据之后,将会更新到缓冲数据,最后计算的其实是更新完的缓冲的数据,在这个例子中的更新方法是将sal_total进行累加,将sal_count进行加一计数。代码实现如下:

// 更新缓冲数据
  buffer.update(, salTotal + inputSal)
  buffer.update(, salCount + )
}
           

**其中merge函数是在合并分区的时候用到的,我们读取的数据实在HDFS文件系统上,必然会被分为很多块,每个块都有自己的缓冲,有自己的task,当将这些缓冲合并在一起返回最终结果时就会用到merge,合并之后的缓冲会存储在buffer1中。代码实现如下:

/**
 * 从字面看,是合并
 * Merges two aggregation buffers
 *  and stores the updated buffer values back to `buffer1`.
 *  *  This is called when we merge two partially aggregated data together.
 *  * @param buffer1
 * @param buffer2
 */
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  // 获取缓冲数据
  val salTotal1 = buffer1.getDouble()
  val salCount1 = buffer1.getInt()

  // 获取缓冲数据
  val salTotal2 = buffer2.getDouble()
  val salCount2 = buffer2.getInt()

  // 更新缓冲数据
  buffer1.update(, salTotal1 + salTotal2)
  buffer1.update(, salCount1 + salCount2)
}
           

将上述代码合并之后即是一个求平均工资的UDAF,同样,在使用的时候也要对其进行注册。

注册方法:将上述代码在spark中使用paste模式进行粘贴执行,再执行下面方法进行注册,即可使用。

sqlContext.udf.register(
      "avg_sal",
      AvgSalUDAF
    )
           

Spark Streaming

Streaming,是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的流,源源不断的送出,使用户听到的声音或者看到的图像十分平稳,而且用户在整个文件传送完之前就可以开始在屏幕上浏览文件。

三种流处理技术:

  • Apache Storm
  • Spark Streaming
  • Apache Samza

    上述三种实时计算系统都是开源的分布式系统,具有低延时、可扩展和容错性诸多优点,他们的共同特色在于:允许在运行数据流代码时,将任务分配到一系列具有容错能力的计算机上并行运行。

Spark Streaming工作原理

Spark Streaming是一个可扩展,高吞吐、具有容错率的流式计算框架,它从数据源(soket、flume 、kafka)得到数据,并将流式数据分成很多RDD,根据时间间隔以批次(batch)为单位进行处理,能实现实时统计,累加,和一段时间内的指标的统计。

当运行Spark Streaming 框架时,Application会执行StreamingContext,并且在底层运行的是SparkContext,然后Driver在每个Executor上一直运行一个Receiver来接受数据
Spark基础全解析SPARK全解析
Receiver通过input stream接收数据并将数据分成块(blocks),之后存储在Executor的内存中,blocks会在其他的Executor上进行备份
Spark基础全解析SPARK全解析
Executor将存储的blocks回馈给StreamingContext,当经过一定时间后,StreamingContext将在这一段时间内的blocks,也称为批次(batch)当作RDD来进行处理,并通过SparkContext运行Spark jobs,Spark jobs通过运行tasks在每个Executor上处理存储在内存中的blocks
Spark基础全解析SPARK全解析
这个循环每隔一个批次执行一次
Spark基础全解析SPARK全解析

DStream

DStream(Discretized Stream)是Spark Streaming的一个基本抽象,它表示一个连续的数据流,可以是从数据源接受的输入数据流,也可以是通过转换输入数据流而生成的新的待处理的数据流,实际上,DStream代表的是一系列连续的RDDs,每个DStream的RDD都包含了一个批次的数据,对DStream的操作就是对它的一系列RDD进行操作,它有两种方式创建,一是接收数据源的流数据创建,二是通过转换,每一个时间间隔会生成一个RDD。

Spark基础全解析SPARK全解析

Spark Streaming编程模型

首先导入包:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 
           

创建StreamingContext:(这里local2表示启动了两个线程,必须两个以上

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds())  //设置批次时间,测试可以用5秒
//在Spark-shell中可以通过传递sc方式创建StreamingContext:
val ssc = new StreamingContext(sc, Seconds())
           
  • 第一步:接收数据

    ssc创建好之后就可以读取数据源了,根据StreamingContext源码可以看到读取数据源的方法:

    Spark基础全解析SPARK全解析

    这里举例使用套接字作为数据源,即使用socketTextStream方法

    查看其注释:

/**
 - Create a input stream from TCP source hostname:port. Data is received using
 - a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited lines.
 - @param hostname      Hostname to connect to for receiving data
 - @param port          Port to connect to for receiving data
 - @param storageLevel  Storage level to use for storing the received objects
 -                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
           

从上面的源码中可以看出该方法有三个参数,最后一个有默认值,那么以最简单的方式创建一个Dstream:

val socketDStream= scc.socketTextStream(“vin01”,9999)

这里的socketDStream是一行一行的数据

- 第二步:基于DStream进行处理数据

// 将行数据分隔成单词

val words = socketDStream.flatMap(_.split(" "))

// 计算这一批次的词频,先将单词转换成元组,再reduce

val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _)

第三步:输出

wordCounts.print()

最后启动该应用即可:

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

运行测试:

Spark基础全解析SPARK全解析

输出:

Spark基础全解析SPARK全解析

Spark Streaming 读取HDFS数据

读取HDFS上的数据,需要用到Spark Streaming类中的一个方法:

即:

/**
 * Create a input stream that monitors a Hadoop-compatible filesystem
 * for new files and reads them as text files (using key as LongWritable, value
 * as Text and input format as TextInputFormat). Files must be written to the
 * monitored directory by "moving" them from another location within the same
 * file system. File names starting with . are ignored.
 * @param directory HDFS directory to monitor for new file
 */
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
  fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
           

同样,使用wordcount模板,更改的是创建DStream的方式

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds())

val socketDStream = ssc.textFileStream("/user/vin/sparkstreaming/hdfsfiles")

val words = socketDStream.flatMap(_.split(" "))
val pairs = words.map(word => (word, ))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()             
ssc.awaitTermination()  
           

将数据文件上传到HDFS上:

Spark基础全解析SPARK全解析

执行结果:

Spark基础全解析SPARK全解析

*通常,在开发测试中,通常把代码写入脚本中,然后再spark-shell中执行该脚本:

步骤:创建.scala文件,将代码拷贝至该文件中,在spark-shell中使用 :load +绝对路径执行*

Spark基础全解析SPARK全解析

Spark Streaming的集成

  • 与Flume进行集成

    spark与提供了与Flume集成时的通用类:flumeUtils,但是必须依赖flume的某些jar包,所以在开发时,maven工程中要添加依赖:

    (参考官网),如果在命令行测试,则需要添加classpath,这里在spark主目录下创建exlibs来存放jar包。

    flumeUtils.scala源码

    Spark基础全解析SPARK全解析
    启动spark-shell时,多个包之间用逗号隔开。
bin/spark-shell \
--master local[3] \
--jars exlibs/mysql-connector-java-5.1.27-bin.jar,\
exlibs/spark-streaming-flume_2.10-1.6.1.jar,\
exlibs/flume-avro-source-1.5.0-cdh5.3.6.jar,\
exlibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar
           

集成测试:在启动spark-shell前,首先配置flume,设置接收源,这里测试基于push的测试,

配置flume:创建flume-push-spark.properties文件,$FLUME_HOME/conf下,

通过notepad++进行配置

主要是配置sinks

Spark基础全解析SPARK全解析

注:spark-push.txt文件已经存在

flume配置完成后,编写与flume集成的第一个测试程序wordcount:flume.scala(存储在spark主目录下)

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

val ssc = new StreamingContext(sc, Seconds())

// Step :Recevier Data From Where
// Flume: FlumeUtils, Kafka: KafkaUtils
val flumeDStream = FlumeUtils.createStream(ssc, "vin01", ).map(event => new String(event.event.getBody.array()))

// Step : Process Data Base DStream
// DStream[Long] 
val words = flumeDStream.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, ))
val wordCounts = pairs.reduceByKey(_ + _)

// Step : Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

sc.stop
           

启动Spark shell:

Spark基础全解析SPARK全解析

spark-shell application应用提交:

:load /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/flume.scala

启动flume来监控spark-shell.txt文件:

启动命令:(添加上将结果显示在控制台上的参数console)

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-push-spark.properties -Dflume.root.logger=DEBUG,console
           

在flume启动前需要先在spark-shell中启动Flume的wordcount程序,否则运行报错:(先有接收的才能推给它)

Spark基础全解析SPARK全解析

测试:使用echo命令往spark-push.txt 里面追加数据

Spark基础全解析SPARK全解析

测试结果:

Spark基础全解析SPARK全解析
  • Spark Streaming与kafka的集成(基于直接取direct)

    SparkStreaming与kafka集成,基于Direct方式,没有Recevier,数据存储在kafka中,kafka中有Topic,Topic中有分区,所以当spark job运行时,会调用kafka消费者api,找到某个topic,从zookeeper上获取偏移量offset,从而创建rdd,然后再进行rdd数据处理,处理完后更新zookeeper上的偏移量,下一个job运行时同样的流程,只不过传递的参数不一样

    参考源码,创建最简单的DStream:

    Spark基础全解析SPARK全解析

    集成使用:

    若使用idea编程,需在pom文件中添加依赖包

    (参考官网)

    编程:(此程序使用了不仅进行wordcount,还对其进行了累加)

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}

    val ssc = new StreamingContext(sc, Seconds(5))

    // set checkpoint
    ssc.checkpoint("sparkstreaming/kafka/")

    // Step : Recevier Data From Where
    val kafkaParams = Map("metadata.broker.list" -> "vin01:9092")
    val topics = Set("sparkPullTopic")

    // InputDStream[(K, V)]
    val socketDStream = KafkaUtils.createDirectStream[
      String, String,StringDecoder, StringDecoder](
      ssc,
      kafkaParams, // Map[String, String]
      topics  // Set[String]
    ).map(_._2)

    // Step : Process Data Base DStream
    // Split each line into words
    val words = socketDStream.flatMap(_.split(" "))

    // Count each word in each batch
    val pairs = words.map(word => (word, ))
    /**
     * updateFunc: (Seq[V], Option[S]) => Option[S]
     *
     * DStream[(Key, Value)]
     * Seq[V]
     *    V: 代表的是 DStream中Value的类型,针对WordCount程序来说,V是Int
     * Option[S]
     *    Option\Some\None
     *    S: 代表的是状态State,存储的是以前分析的结果,针对WordCount程序来说,S是Count,Int
     *      S可以是任意类型,依据实际需求而定
     *
     */
    val wordCounts = pairs.updateStateByKey(
      (values: Seq[Int], state: Option[Int]) => {
        //获取当前的要计算的值
        val currentCount = values.sum
        // 获取以前状态中的值
        val previousCount = state.getOrElse()

        // update state and return
        Some(currentCount + previousCount)
      }
    )

    // Step : Output Result
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate

    sc.stop
           

kafka配置:

创建topic:

查看当前有哪些topic:

bin/kafka-topics.sh --list --zookeeper vin01:2181

创建sparkPull topic:(一个副本、一个分区)

bin/kafka-topics.sh --create --zookeeper vin01:2181 --replication-factor 1 --partitions 1 --topic sparkPull

Spark基础全解析SPARK全解析

打开producer:

bin/kafka-console-producer.sh --broker-list vin01:9092 --topic sparkPull

Spark基础全解析SPARK全解析

打开consumer以便于监控:

bin/kafka-console-consumer.sh --zookeeper vin01:2181 --topic sparkPull --from-beginning

Spark基础全解析SPARK全解析

启动spark-shell:

kafka依赖于几个包,在启动时必须指定(参考spark添加jar包的三种方式):

bin/spark-shell  \
--master local[3]  \
--jars exlibs/mysql-connector-java-5.1.27-bin.jar, \
exlibs/spark-streaming-kafka_2.10-1.6.1.jar, \
exlibs/kafka_2.10-0.8.2.1.jar, \
exlibs/kafka-clients-0.8.2.1.jar, \ 
exlibs/zkclient-0.3.jar, \
exlibs/metrics-core-2.2.0.jar
           
Spark基础全解析SPARK全解析

此时jar包就导入了

Spark基础全解析SPARK全解析

运行之前的kafka.scala应用程序:

:load /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/kafka.scala

运行成功

Spark基础全解析SPARK全解析

测试:在produce界面输入单词

第一次输入第一行回车

第二次输入第二行回车

Spark基础全解析SPARK全解析

在consumer界面看到输出:

Spark基础全解析SPARK全解析

查看统计结果:

第一次输出:

Spark基础全解析SPARK全解析

第二次输出:(实现了累加)

Spark基础全解析SPARK全解析

Spark Streaming常用API解析

  • UpdateStateByKey

    UpdateStateByKey通常用作更新记录使用,能将Spark Streaming之前处理的数据记录起来,进而实现累加功能

    updateStateBykey方法存在于PairDStreamFunctions.scala中,可以从DStream.scala中的 object DStream中的toPairDStreamFunctions方法中链接到PairDStreamFunctions.scala中。

    Spark基础全解析SPARK全解析
    Spark基础全解析SPARK全解析
    解析updateStateByKey:
/**
 * Return a new "state" DStream where the state for each key is updated by applying
 * the given function on the previous state of the key and the new values of each key.
 * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 * @param updateFunc State update function. If `this` function returns None, then
 *                   corresponding state key-value pair will be eliminated.
 * @tparam S State type
 */
def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)] = ssc.withScope {
  updateStateByKey(updateFunc, defaultPartitioner())
}
           

其中:

updateFunc: (Seq[V], Option[S]) => Option[S]

代表一个匿名函数,Options[S]是该函数粉返回类型,

Seq[V]表示泛型,因为DStream是[(key , value)]格式的,Seq[V]中V代表的是DStream中value的类型,针对wordcount程序来说,V为整型Int,Seq[V]表示一个集合,存储了value的类型。

Option[S]代表一个状态,Option有两个子类:Some、None,

这里的S代表的代表的是一个状态,存储的是以前分析的结果,由于不同的应用分析结果不同,针对wordcount程序来说,分析结果是Count,其类型也是Int。但在不同的应用中,S可以是任意的类型,依据实际需求而定。

根据分析可以得出如下代码:

……

val WordCounts = Pairs.updataStateByKey((values:Seq[Int], state:Option[Int]) => { //获取当前要计算的值 val currentCount = values.sum //获取以前状态中的值 val previousCount = state.getOrElse(0) //更新状态,返回 Some(currentCount + prevousCount) })

完整代码:

import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}

object UpdateStateWordCount {
  def main(args: Array[String]) {
    // step 0: SparkContext
    val sparkConf = new SparkConf()
      .setAppName("LogAnalyzer Applicaiton") // name
      .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn

    // Create SparkContext
    val sc = new SparkContext(sparkConf)

    val ssc = new StreamingContext(sc, Seconds())

    // set checkpoint
    ssc.checkpoint("sparkstreaming/socketwc/")

    // Step 1: Recevier Data From Where
    val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", )


    // transformFunc: RDD[T] => RDD[U]
    // 仅仅针对DStream中的RDD来操作,在实际开发中,RDD的操作更加方便
    /**
     * WordCount
     *    . ? @ # $ !
     *    如果单词是上述标点符号,统计毫无意义,可以进行过滤
     */
    val filterRdd = sc.parallelize(List(".", "?", "@", "#", "!")).map((_, true))
    socketDStream.transform(rdd => {
      val tupleRdd = rdd.map((_, ))
      // join,filter
      tupleRdd.leftOuterJoin(filterRdd)
    })

    // Step 2: Process Data Base DStream
    // Split each line into words
    val words = socketDStream.flatMap(_.split(" "))

    // Count each word in each batch
    val pairs = words.map(word => (word, ))
    /**
     * updateFunc: (Seq[V], Option[S]) => Option[S]
     *
     * DStream[(Key, Value)]
     * Seq[V]
     *    V: 代表的是 DStream中Value的类型,针对WordCount程序来说,V是Int
     * Option[S]
     *    Option\Some\None
     *    S: 代表的是状态State,存储的是以前分析的结果,针对WordCount程序来说,S是Count,Int
     *      S可以是任意类型,依据实际需求而定
     *
     *  回顾一下:
     *    reduceByKey()
     *    =
     *    reduce(Key, Values)
     *
     */
    val wordCounts = pairs.updateStateByKey(
      (values: Seq[Int], state: Option[Int]) => {
        //获取当前的要计算的值
        val currentCount = values.sum
        // 获取以前状态中的值
        val previousCount = state.getOrElse()

        // update state and return
        Some(currentCount + previousCount)
      }
    )

    val wcDStream = pairs.updateStateByKey(
      (values: Seq[Int], state: Option[Int]) => Some(values.sum + state.getOrElse()))

    // Step 3: Output Result
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate

    sc.stop
  }

}
           

测试:

打开nc 9999端口,运行程序:

两次在端口输入:

Spark基础全解析SPARK全解析

第一次输出:

Spark基础全解析SPARK全解析

第二次输出:

Spark基础全解析SPARK全解析

实现了累加统计,测试成功

注:在代码中,使用了transform方法,对其进行解析:

  • DStream的transform方法

transform方法在DStream中,其源码为:

/**
 * Return a new DStream in which each RDD is generated by applying a function
 * on each RDD of 'this' DStream.
 */
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
  // because the DStream is reachable from the outer object here, and because
  // DStreams can't be serialized with closures, we can't proactively check
  // it for serializability and so we pass the optional false to SparkContext.clean
  val cleanedF = context.sparkContext.clean(transformFunc, false)
  transform((r: RDD[T], t: Time) => cleanedF(r))
}
           

可以看出,它将DStream中的RDD进行单独单独操作,最终返回的还是DStream,所以如果进行wordcount过滤的话,将需要过滤的符号存储在RDD中,对RDD 进行join操作。匹配过滤掉这些字符。

具体代码:

/**
     * WordCount
     *    . ? @ # $ !
     *    如果单词是上述标点符号,统计毫无意义,可以进行过滤
     */
    val filterRdd = sc.parallelize(List(".", "?", "@", "#", "!")).map((_, true))
    socketDStream.transform(rdd => {
      val tupleRdd = rdd.map((_, ))
      // join,filter
      tupleRdd.leftOuterJoin(filterRdd).filter(tuple => {
val x1 = tuple_1
val x2 =tuple_2 //(i, option[boolean])
if (!x2._2.isEmpty){
 true
}else
{false
}

})
    })
           

解释:

filterRdd存储了这些符号,进行RDD之间的join需要将其映射成元组对,所以对其进行映射,

在transform中,对RDD进行操作,首先将其映射为tupleRdd(元组对),以此tupleRdd为准与filterRdd进行join。join完成之后还是元组对,所以对该元组对进行操作,首先判断tupleRdd中的值是否是这些符号之中的一个,join之后是两个元组对进行join,所以x2在这里也是一个元组对,它的类型是( 1 , option(boolean)),如果是这个符号,那么返回的是(1,true),否则返回的是空(1,none),所以x2._2如果是空,则为这些符号,需要过滤掉。其中filter(func) : 返回一个新的数据集,由经过func函数后返回值为true的原元素组成。

join(otherDataset, [numTasks]) :在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集

  • DStream的foreachRDD方法

    DStream中还有一个方法foreachRDD,它与transform一样是对RDD进行操作,但是它没有返回值,比如遇到需求将分析出来的结果存储在mysql中:

// foreachFunc: (RDD[T], Time) => Unit
dstream.foreachRDD(rdd => {
  // 将分析的数据存储到JDBC中,MySQL数据中
  val connection = createJDBCConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.putStateResult(record) // executed at the worker
  }
})
           

Spark Streaming的窗口函数

spark streaming提供了窗口操作,允许在某个大小的窗口中进行操作,常用于统计某个时间段内指标:

比如需求:对词频的统计,要求每次统计的数据是最近10s的数据

分析windows源码:

Spark基础全解析SPARK全解析

分析reduceByKeyAndWindow:

/**
 * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
 * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
 * generate the RDDs with Spark's default number of partitions.
 * @param reduceFunc associative reduce function
 * @param windowDuration width of the window; must be a multiple of this DStream's
 *                       batching interval
 * @param slideDuration  sliding interval of the window (i.e., the interval after which
 *                       the new DStream will generate RDDs); must be a multiple of this
 *                       DStream's batching interval
 */
def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[(K, V)] = ssc.withScope {
  reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
           

在源码中,可以看到需要传递三个参数,reduceFunc:RDD的操作,windowDuration:窗口的大小,即每次处理几个批次的数据,必须是接收数据时间间隔的整数倍(即 Seconds(5)的整数倍),slideDuration表示窗口的时间间隔,即每隔多少秒窗口执行一次。

在wordcount程序中只需修改:

val wordCounts = pairs.reduceByKey(_ + _)  这一行代码,修改为:
val wordCounts = pairs.reduceByKeyAndWindow((x:Int,y:Int)=x+y,Seconds(),Second())
           

Second(10)表示每次处理2 X5秒的数据,在这里是统计两个批次的数据,Second(4)表示每隔4s执行一次窗口。

喜欢我的文章请关注微信公众号DTSpider

Spark基础全解析SPARK全解析