天天看点

spark_note07_sparkSQL_2sparkSQL_DataFrame(一)通过IDEA开发sparksql程序实现将rdd转换成dataframe(二)sparksql操作hivesql(三)sparksql操作关系型数据库

sparkSQL_DataFrame

(一)通过IDEA开发sparksql程序实现将rdd转换成dataframe

1、引入pom依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
           

2、例1:利用反射机制将rdd转换成dataframe

  • 1、代码开发
    package cn.itcast.sparksql
    
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    //TODO:利用反射将rdd转换成DataFrame
    
    //定义一个样例类
    case class Person(id:Int,name:String,age:Int)
    //
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
        //1.构建SparkSession
        val spark: SparkSession = SparkSession.builder()
                                              .appName("CaseClassSchema")
                                              .master("local[2]")
                                              .getOrCreate()
    
        //2.获取SparkContext对象
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("warn")
    
        //3.读取文件数据
        val rdd1: RDD[Array[String]] = sc.textFile("F:\\AAAA_HM大数据\\00-课件\\18_spark\\spark\\day02\\person.txt").map(_.split(" "))
    
        //4.将rdd与样例类进行关联
        val personRDD: RDD[Person] = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
        //5.将rdd转换成dataframe
          //手动导入隐式转换
        import spark.implicits._
        val personDF: DataFrame = personRDD.toDF
    
        //6.打印schema
        personDF.printSchema()
          //----------------DSL风格语法---------------start
        personDF.show() //6.1默认展示20条数据,如果一个字符串超过了20个字符,进行截取,保留前20位
        personDF.show(1) //6.2展示一条
    
            //6.3获取第一条数据
        println(personDF.first())
        println(personDF.head())
    
            //6.4查询name字段的所有信息
        personDF.select("name").show()
        personDF.select($"name").show()
        personDF.select(new Column("name")).show()
        personDF.select("name","age","id").show()
    
            //6.5把age+1
        personDF.select($"age",$"age"+1).show()
    
            //6.6过滤出年龄大于30的用户
        personDF.filter($"age">30).show()
        println(personDF.filter($"age">30).count())
    
            //6.7按照age分组统计个数
        personDF.groupBy("age").count().show()
    
    
        //以上反正你也记不住,主要还是用sql语句
        //----------------SQL风格语法---------------start
            //6.8先要把数据注册成一张表
        personDF.createTempView("person")
            //6.9各种sql
        spark.sql("select * from person").show()
        spark.sql("select count(*) from person").show()
        spark.sql("select * from person where age>30").show()
        spark.sql("select * from person order by age desc").show()
    
            //hivesql --->底层使用一个sql解释器,把hivesql翻译成mr
            //sparksql 也类似 翻译成spark代码
    
    
        //7.关闭
        sc.stop()
        spark.stop()
    
      }
    
    }
    
    /*
    00/00/00 00:58:06 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    |  2|    lisi| 33|
    |  3|  wnagwu| 43|
    |  4| zhaoliu| 22|
    |  5|  zhouqi| 32|
    |  6|    kobe| 11|
    +---+--------+---+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    +---+--------+---+
    only showing top 1 row
    
    [1,zhangsan,20]
    [1,zhangsan,20]
    +--------+
    |    name|
    +--------+
    |zhangsan|
    |    lisi|
    |  wnagwu|
    | zhaoliu|
    |  zhouqi|
    |    kobe|
    +--------+
    
    +--------+
    |    name|
    +--------+
    |zhangsan|
    |    lisi|
    |  wnagwu|
    | zhaoliu|
    |  zhouqi|
    |    kobe|
    +--------+
    
    +--------+
    |    name|
    +--------+
    |zhangsan|
    |    lisi|
    |  wnagwu|
    | zhaoliu|
    |  zhouqi|
    |    kobe|
    +--------+
    
    +--------+---+---+
    |    name|age| id|
    +--------+---+---+
    |zhangsan| 20|  1|
    |    lisi| 33|  2|
    |  wnagwu| 43|  3|
    | zhaoliu| 22|  4|
    |  zhouqi| 32|  5|
    |    kobe| 11|  6|
    +--------+---+---+
    
    +---+---------+
    |age|(age + 1)|
    +---+---------+
    | 20|       21|
    | 33|       34|
    | 43|       44|
    | 22|       23|
    | 32|       33|
    | 11|       12|
    +---+---------+
    
    +---+------+---+
    | id|  name|age|
    +---+------+---+
    |  2|  lisi| 33|
    |  3|wnagwu| 43|
    |  5|zhouqi| 32|
    +---+------+---+
    
    3
    +---+-----+
    |age|count|
    +---+-----+
    | 22|    1|
    | 20|    1|
    | 43|    1|
    | 32|    1|
    | 33|    1|
    | 11|    1|
    +---+-----+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    |  2|    lisi| 33|
    |  3|  wnagwu| 43|
    |  4| zhaoliu| 22|
    |  5|  zhouqi| 32|
    |  6|    kobe| 11|
    +---+--------+---+
    
    +--------+
    |count(1)|
    +--------+
    |       6|
    +--------+
    
    +---+------+---+
    | id|  name|age|
    +---+------+---+
    |  2|  lisi| 33|
    |  3|wnagwu| 43|
    |  5|zhouqi| 32|
    +---+------+---+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  3|  wnagwu| 43|
    |  2|    lisi| 33|
    |  5|  zhouqi| 32|
    |  4| zhaoliu| 22|
    |  1|zhangsan| 20|
    |  6|    kobe| 11|
    +---+--------+---+
    
     */
    
               

3、例2:通过StructType指定schema实现将rdd转换成DataFrame

  • 1、代码开发
    package cn.itcast.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    //todo:利用StructType指定schema 将rdd转换成dataframe
    object SparksqlSchema {
      def main(args: Array[String]): Unit = {
        //1.构建SparkSession
        val sparkSession: SparkSession = SparkSession.builder()
          .appName("SparksqlSchema")
          .master("local[2]")
          .getOrCreate()
    
        //2.构建sparkcontext
        val sc: SparkContext = sparkSession.sparkContext
        sc.setLogLevel("warn")
    
        //3.读取文件数据
        val rdd1: RDD[Array[String]] = sc.textFile("F:\\AAAA_HM大数据\\00-课件\\18_spark\\spark\\day02\\person.txt").map(_.split(" "))
    
        //4.将rdd1与row对象进行关联
        val rowRDD: RDD[Row] = rdd1.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
    
        //5.指定dataframe的schema 在指定schema的时候当前字段类型必须要跟row对象数据类型一致
        val schema =
            StructType(
                StructField("id", IntegerType, true) ::
                StructField("name", StringType, false) ::
                StructField("age", IntegerType, false) :: Nil)
    
        val dataFrame: DataFrame = sparkSession.createDataFrame(rowRDD,schema)
    
        //6.打印schema
        dataFrame.printSchema()
        dataFrame.show()
    
        //变成一张表 使用sql
        dataFrame.createTempView("t_person")
        sparkSession.sql("select * from t_person where age>20").show()
    
        //7.关闭
        sc.stop()
        sparkSession.stop()
    
      }
    
    }
    
    
    /*
    19/09/25 10:56:24 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    root
     |-- id: integer (nullable = true)
     |-- name: string (nullable = false)
     |-- age: integer (nullable = false)
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    |  2|    lisi| 33|
    |  3|  wnagwu| 43|
    |  4| zhaoliu| 22|
    |  5|  zhouqi| 32|
    |  6|    kobe| 11|
    +---+--------+---+
    
    +---+-------+---+
    | id|   name|age|
    +---+-------+---+
    |  2|   lisi| 33|
    |  3| wnagwu| 43|
    |  4|zhaoliu| 22|
    |  5| zhouqi| 32|
    +---+-------+---+
    
     */
               

(二)sparksql操作hivesql

1、引入依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.0-cdh6.0.1</version>
        </dependency>

           

2、代码开发

package cn.itcast.sparksql

import org.apache.spark.sql.SparkSession

/**
 * 注:该例开始把依赖包都换成cdh版本的了
 *
 * 不知道为什么数据读不进来
 	逻辑无错
 */

//todo:利用spqrksql操作hivesql
object HiveSupport {
  def main(args: Array[String]): Unit = {
    //1、创建SparkSession
    val sparkSession: SparkSession= SparkSession
      .builder()
      .appName("HiveSupport")
      .master("local[2]")
      .enableHiveSupport() //开启对hive的支持
      .getOrCreate()

    //2、通过sparksession操作hivesql
        //2.1先创建一个hive表
    sparkSession.sql("create table user3(id string,name string,age int) row format delimited fields terminated by ','")
        //2.2加载数据到hive表里
    sparkSession.sql("load data local inpath './day01_wordcount/data/user.txt' into table user3")
        //2.3查询
    sparkSession.sql("select * from user3").show()

    //3、关闭
    sparkSession.stop()

  }

}

/*
项目里会生成两个文件夹
+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+
 */
           

(三)sparksql操作关系型数据库

1、sparksql从mysql加载数据

package cn.itcast.sparksql

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:sparksql从mysql加载数据生成dataframe
object DataFromMysql {
  def main(args: Array[String]): Unit = {
    //1、创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("datafrommysql")
      .master("local[2]")
      .getOrCreate()

    //2、读取mysql表中的数据
        //2.1定义url连接
    val url="jdbc:mysql://node03:3306/spark"
        //2.2定义表名
    val table="iplocation"
        //2.3定义属性
    val properites=new Properties()
    properites.setProperty("user","root")
    properites.setProperty("password","123456")

    val mysqlDF: DataFrame = spark.read.jdbc(url,table,properites)

    mysqlDF.printSchema()
    mysqlDF.show()


    //关闭
    spark.stop()

  }

}

/*
+----------+----------+-----------+
| longitude|  latitude|total_count|
+----------+----------+-----------+
|108.948024|108.948024|       1824|
|116.405285|116.405285|       1535|
|  107.7601|  107.7601|         85|
| 106.27633| 106.27633|         36|
|106.504962|106.504962|        400|
| 107.39007| 107.39007|         47|
| 106.56347| 106.56347|          3|
| 106.57434| 106.57434|        177|
| 106.51107| 106.51107|         91|
|114.502461|114.502461|        383|
| 107.08166| 107.08166|         29|
|102.712251|102.712251|        126|
+----------+----------+-----------+
 */
           

2、sparksql把处理的结果写回到mysql表中

2.1 本地运行

package cn.itcast.sparksql

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:通过sparksql把结果数据写入到mysql表中
object Data2Mysql {
  def main(args: Array[String]): Unit = {
    //1、创建sparksession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("data2mysql")
      .master("local[2]")
      .getOrCreate()

    //2、读取mysql表中数据
        //2.1 定义url连接
    val url="jdbc:mysql://node03:3306/spark"
        //2.2 定义表名
    val table="iplocation" //读取的表名
        //2.3 定义属性
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456")

    val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)

    //3、把dataFrame注册成一张表
    mysqlDF.createTempView("iplocation")

    //4、通过sparksession调用sql方法
        //需要统计经度和纬度出现的人口总数大于1000的记录,保存到mysql表中
    val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")


    //5.保存结果数据到mysql表中
        //mode:指定数据插入模式
              //overwrite:  覆盖,如果表不存在,事先帮我们创建
              //append: 追加,如果表不存在,事先帮我们创建
              //ignore: 忽略,如果表事先存在,就不进行任何操作
              //error:  (默认)表事先存在就报错
    result.write.mode("overwrite").jdbc(url,"itcast",properties)

    //6.关闭
    spark.stop()

  }

}

/*
结果在mysql node03 itcast表
 */

           

2.2 集群运行

代码

package cn.itcast.sparksql

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:通过sparksql把结果数据写入到mysql表中 集群运行
object Data2Mysql {
  def main(args: Array[String]): Unit = {
    //1、创建sparksession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("data2mysql")
//      .master("local[2]") 注释掉
      .getOrCreate()

    //2、读取mysql表中数据
        //2.1 定义url连接
    val url="jdbc:mysql://node03:3306/spark"
        //2.2 定义表名
    val table="iplocation" //读取的表名
        //2.3 定义属性
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456")

    val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)

    //3、把dataFrame注册成一张表
    mysqlDF.createTempView("iplocation")

    //4、通过sparksession调用sql方法
        //需要统计经度和纬度出现的人口总数大于1000的记录,保存到mysql表中
    val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")


    //5.保存结果数据到mysql表中
        //mode:指定数据插入模式
              //overwrite:  覆盖,如果表不存在,事先帮我们创建
              //append: 追加,如果表不存在,事先帮我们创建
              //ignore: 忽略,如果表事先存在,就不进行任何操作
              //error:  (默认)表事先存在就报错
    result.write.mode(args(0)).jdbc(url,args(1),properties) //换成可变参数

    //6.关闭
    spark.stop()

  }

}

/*
结果在mysql node03 itcast100表
 */

           

提交任务

[[email protected] sparkdata]# spark-submit \
--master spark://node01:7077 \
--class cn.itcast.sparksql.Data2Mysql \
--executor-memory 1g --total-executor-cores 2 \
--jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
--driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar  \
original-day01_wordcount-1.0-SNAPSHOT.jar \
append \
itcast100

//运行可以把反斜杠去掉
//node01:8080

           

附:(改过版本号的)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spark_class15</artifactId>
        <groupId>cn.itcast</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>day01_wordcount</artifactId>

    <packaging>jar</packaging>




    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <!--    定义版本号-->
    <properties>
        <hadoop.version>2.6.0-cdh5.14.0</hadoop.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.2.0-cdh6.0.1</spark.version>
        <scala.version>2.11.8</scala.version>
    </properties>
    <dependencies>
        <!--Scala依赖库-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!--spark核心依赖库-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--mysql数据库依赖-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.18</version>
        </dependency>
        <!--hadoop版本库-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>2.2.0-cdh6.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>2.2.0-cdh6.0.1</version>
            <scope>compile</scope>
        </dependency>

    </dependencies>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!--配置maven编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
           

看见了什么?

每个你不了解的新世界,都值得说声“Hello Word”。
           

继续阅读