天天看点

再探spark之二

在Spark2.X.X后,想要在Spark-shell中运行这个命令,你需要使用spark.sqlContext.sql()的形式。

spark的cache缓存其中的方法 (保存在内存中)

 .cache()  //进行缓存

.unpresist(true) //对资源进行释放

spark的checkpoint机制(保存在hdfs中)(checkpoint和cache都属于transformation 需要action才能执行)

sc.setCheckpointDir("hdfs://hadoop01:9000/ck2018523")

val rdd = sc.textFile("hdfs://hadoop01:9000/itcast")

rdd.checkpoint

rdd.count    //这里会执行两次,一个是本身的计算,一个是额外的checkpoint写到hdfs

 val rdd2=rdd.map(_.split("\t")).map(x =>(x(1), 1)).reduceByKey(_+_)

rdd2.cache  //如果在checkpoint前面新加一个cache,会提高很快的效率,而不需要重新启动一个额外的任务

rdd2.checkpoint

rdd2.collect

spark的提交流程如下图

1、driver端向master端通信,然后master端通知并分配任务给各个worker端

     worker端启动excutor剩下的就没master什么事了,主要是worker和driver之间的通信

2、sc的产生标志着driver和master端之间开始通信

3、下面途中RDD objects 和DAGschedule都是在driver端完成的

再探spark之二

1.1.1. 窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

总结:窄依赖我们形象的比喻为独生子女

1.1.2. 宽依赖

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

总结:窄依赖我们形象的比喻为超生

stage 的划分是根据宽依赖,宽依赖大多伴随着shuffle所以不能在一条流水线(pipeline)上

SparkSQL

第一种dataframe创建的方式

package com.wxa.spark.four

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SQLDemowxa {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SQLDemo").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    System.setProperty("user.name","root")  //这步设置登陆名
    val personRdd=sc.textFile("hdfs://hadoop01:9000/person.txt").map(line=>{
      val field =line.split(",")
      person(field(0).toLong,field(1),field(2).toInt)
    })
    import sqlContext.implicits._
    val personDF = personRdd.toDF() //转为dataframe
    personDF.show()         //这是DSL风格的方式模仿R语言 如 res1.select("id","name").show 有点像sql但不是SQL方式

//    personDF.registerTempTable("person")  //转为用sql的方式,注册成sql表,将其转化成表方便sqlcontext用sql来进行查询
//    sqlContext.sql("select * from person where age>=25 order by age asc limit 2").show()
    sc.stop
  }

}

case class  person(id:Long,name:String,age:Int)
           

将程序提交上去运行 spark-submit --class com.wxa.spark.four.SQLDemowxa  --master spark://hadoop01:7077 scalaMaven-1.0.jar 

以json形式写到hdfs上面  res1.select("id","name").write.json("hdfs://hadoop01:9000/json")

在Spark-Sell下启用SQL报错:error: not found: value sqlContext解决方案

在Spark2.X.X后,想要在Spark-shell中运行这个命令,你需要使用spark.sqlContext.sql()的形式。

将json数据直接读取进来,直接变成dataframe

val df=spark.sqlContext.load("hdfs://hadoop01:9000/json","json")

parquet文件类型

 res1.select("id","name").save("hdfs://hadoop01:9000/out000")保存在hdfs上面会产生parquet这类的文件

df上面的一些方法

再探spark之二

第二种构建dataframe的方法(通过StructType)

object SpecifyingSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //注册表
    personDataFrame.registerTempTable("t_person")
    //执行SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
           

继续阅读