sparkSQL_DataFrame
(一)通过IDEA开发sparksql程序实现将rdd转换成dataframe
1、引入pom依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
2、例1:利用反射机制将rdd转换成dataframe
- 1、代码开发
package cn.itcast.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, SparkSession} //TODO:利用反射将rdd转换成DataFrame //定义一个样例类 case class Person(id:Int,name:String,age:Int) // object CaseClassSchema { def main(args: Array[String]): Unit = { //1.构建SparkSession val spark: SparkSession = SparkSession.builder() .appName("CaseClassSchema") .master("local[2]") .getOrCreate() //2.获取SparkContext对象 val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3.读取文件数据 val rdd1: RDD[Array[String]] = sc.textFile("F:\\AAAA_HM大数据\\00-课件\\18_spark\\spark\\day02\\person.txt").map(_.split(" ")) //4.将rdd与样例类进行关联 val personRDD: RDD[Person] = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5.将rdd转换成dataframe //手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF //6.打印schema personDF.printSchema() //----------------DSL风格语法---------------start personDF.show() //6.1默认展示20条数据,如果一个字符串超过了20个字符,进行截取,保留前20位 personDF.show(1) //6.2展示一条 //6.3获取第一条数据 println(personDF.first()) println(personDF.head()) //6.4查询name字段的所有信息 personDF.select("name").show() personDF.select($"name").show() personDF.select(new Column("name")).show() personDF.select("name","age","id").show() //6.5把age+1 personDF.select($"age",$"age"+1).show() //6.6过滤出年龄大于30的用户 personDF.filter($"age">30).show() println(personDF.filter($"age">30).count()) //6.7按照age分组统计个数 personDF.groupBy("age").count().show() //以上反正你也记不住,主要还是用sql语句 //----------------SQL风格语法---------------start //6.8先要把数据注册成一张表 personDF.createTempView("person") //6.9各种sql spark.sql("select * from person").show() spark.sql("select count(*) from person").show() spark.sql("select * from person where age>30").show() spark.sql("select * from person order by age desc").show() //hivesql --->底层使用一个sql解释器,把hivesql翻译成mr //sparksql 也类似 翻译成spark代码 //7.关闭 sc.stop() spark.stop() } } /* 00/00/00 00:58:06 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint root |-- id: integer (nullable = false) |-- name: string (nullable = true) |-- age: integer (nullable = false) +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 20| | 2| lisi| 33| | 3| wnagwu| 43| | 4| zhaoliu| 22| | 5| zhouqi| 32| | 6| kobe| 11| +---+--------+---+ +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 20| +---+--------+---+ only showing top 1 row [1,zhangsan,20] [1,zhangsan,20] +--------+ | name| +--------+ |zhangsan| | lisi| | wnagwu| | zhaoliu| | zhouqi| | kobe| +--------+ +--------+ | name| +--------+ |zhangsan| | lisi| | wnagwu| | zhaoliu| | zhouqi| | kobe| +--------+ +--------+ | name| +--------+ |zhangsan| | lisi| | wnagwu| | zhaoliu| | zhouqi| | kobe| +--------+ +--------+---+---+ | name|age| id| +--------+---+---+ |zhangsan| 20| 1| | lisi| 33| 2| | wnagwu| 43| 3| | zhaoliu| 22| 4| | zhouqi| 32| 5| | kobe| 11| 6| +--------+---+---+ +---+---------+ |age|(age + 1)| +---+---------+ | 20| 21| | 33| 34| | 43| 44| | 22| 23| | 32| 33| | 11| 12| +---+---------+ +---+------+---+ | id| name|age| +---+------+---+ | 2| lisi| 33| | 3|wnagwu| 43| | 5|zhouqi| 32| +---+------+---+ 3 +---+-----+ |age|count| +---+-----+ | 22| 1| | 20| 1| | 43| 1| | 32| 1| | 33| 1| | 11| 1| +---+-----+ +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 20| | 2| lisi| 33| | 3| wnagwu| 43| | 4| zhaoliu| 22| | 5| zhouqi| 32| | 6| kobe| 11| +---+--------+---+ +--------+ |count(1)| +--------+ | 6| +--------+ +---+------+---+ | id| name|age| +---+------+---+ | 2| lisi| 33| | 3|wnagwu| 43| | 5|zhouqi| 32| +---+------+---+ +---+--------+---+ | id| name|age| +---+--------+---+ | 3| wnagwu| 43| | 2| lisi| 33| | 5| zhouqi| 32| | 4| zhaoliu| 22| | 1|zhangsan| 20| | 6| kobe| 11| +---+--------+---+ */
3、例2:通过StructType指定schema实现将rdd转换成DataFrame
- 1、代码开发
package cn.itcast.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} //todo:利用StructType指定schema 将rdd转换成dataframe object SparksqlSchema { def main(args: Array[String]): Unit = { //1.构建SparkSession val sparkSession: SparkSession = SparkSession.builder() .appName("SparksqlSchema") .master("local[2]") .getOrCreate() //2.构建sparkcontext val sc: SparkContext = sparkSession.sparkContext sc.setLogLevel("warn") //3.读取文件数据 val rdd1: RDD[Array[String]] = sc.textFile("F:\\AAAA_HM大数据\\00-课件\\18_spark\\spark\\day02\\person.txt").map(_.split(" ")) //4.将rdd1与row对象进行关联 val rowRDD: RDD[Row] = rdd1.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //5.指定dataframe的schema 在指定schema的时候当前字段类型必须要跟row对象数据类型一致 val schema = StructType( StructField("id", IntegerType, true) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: Nil) val dataFrame: DataFrame = sparkSession.createDataFrame(rowRDD,schema) //6.打印schema dataFrame.printSchema() dataFrame.show() //变成一张表 使用sql dataFrame.createTempView("t_person") sparkSession.sql("select * from t_person where age>20").show() //7.关闭 sc.stop() sparkSession.stop() } } /* 19/09/25 10:56:24 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint root |-- id: integer (nullable = true) |-- name: string (nullable = false) |-- age: integer (nullable = false) +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 20| | 2| lisi| 33| | 3| wnagwu| 43| | 4| zhaoliu| 22| | 5| zhouqi| 32| | 6| kobe| 11| +---+--------+---+ +---+-------+---+ | id| name|age| +---+-------+---+ | 2| lisi| 33| | 3| wnagwu| 43| | 4|zhaoliu| 22| | 5| zhouqi| 32| +---+-------+---+ */
(二)sparksql操作hivesql
1、引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.2.0-cdh6.0.1</version>
</dependency>
2、代码开发
package cn.itcast.sparksql
import org.apache.spark.sql.SparkSession
/**
* 注:该例开始把依赖包都换成cdh版本的了
*
* 不知道为什么数据读不进来
逻辑无错
*/
//todo:利用spqrksql操作hivesql
object HiveSupport {
def main(args: Array[String]): Unit = {
//1、创建SparkSession
val sparkSession: SparkSession= SparkSession
.builder()
.appName("HiveSupport")
.master("local[2]")
.enableHiveSupport() //开启对hive的支持
.getOrCreate()
//2、通过sparksession操作hivesql
//2.1先创建一个hive表
sparkSession.sql("create table user3(id string,name string,age int) row format delimited fields terminated by ','")
//2.2加载数据到hive表里
sparkSession.sql("load data local inpath './day01_wordcount/data/user.txt' into table user3")
//2.3查询
sparkSession.sql("select * from user3").show()
//3、关闭
sparkSession.stop()
}
}
/*
项目里会生成两个文件夹
+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+
*/
(三)sparksql操作关系型数据库
1、sparksql从mysql加载数据
package cn.itcast.sparksql
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:sparksql从mysql加载数据生成dataframe
object DataFromMysql {
def main(args: Array[String]): Unit = {
//1、创建sparkSession
val spark: SparkSession = SparkSession.builder()
.appName("datafrommysql")
.master("local[2]")
.getOrCreate()
//2、读取mysql表中的数据
//2.1定义url连接
val url="jdbc:mysql://node03:3306/spark"
//2.2定义表名
val table="iplocation"
//2.3定义属性
val properites=new Properties()
properites.setProperty("user","root")
properites.setProperty("password","123456")
val mysqlDF: DataFrame = spark.read.jdbc(url,table,properites)
mysqlDF.printSchema()
mysqlDF.show()
//关闭
spark.stop()
}
}
/*
+----------+----------+-----------+
| longitude| latitude|total_count|
+----------+----------+-----------+
|108.948024|108.948024| 1824|
|116.405285|116.405285| 1535|
| 107.7601| 107.7601| 85|
| 106.27633| 106.27633| 36|
|106.504962|106.504962| 400|
| 107.39007| 107.39007| 47|
| 106.56347| 106.56347| 3|
| 106.57434| 106.57434| 177|
| 106.51107| 106.51107| 91|
|114.502461|114.502461| 383|
| 107.08166| 107.08166| 29|
|102.712251|102.712251| 126|
+----------+----------+-----------+
*/
2、sparksql把处理的结果写回到mysql表中
2.1 本地运行
package cn.itcast.sparksql
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:通过sparksql把结果数据写入到mysql表中
object Data2Mysql {
def main(args: Array[String]): Unit = {
//1、创建sparksession
val spark: SparkSession = SparkSession
.builder()
.appName("data2mysql")
.master("local[2]")
.getOrCreate()
//2、读取mysql表中数据
//2.1 定义url连接
val url="jdbc:mysql://node03:3306/spark"
//2.2 定义表名
val table="iplocation" //读取的表名
//2.3 定义属性
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)
//3、把dataFrame注册成一张表
mysqlDF.createTempView("iplocation")
//4、通过sparksession调用sql方法
//需要统计经度和纬度出现的人口总数大于1000的记录,保存到mysql表中
val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")
//5.保存结果数据到mysql表中
//mode:指定数据插入模式
//overwrite: 覆盖,如果表不存在,事先帮我们创建
//append: 追加,如果表不存在,事先帮我们创建
//ignore: 忽略,如果表事先存在,就不进行任何操作
//error: (默认)表事先存在就报错
result.write.mode("overwrite").jdbc(url,"itcast",properties)
//6.关闭
spark.stop()
}
}
/*
结果在mysql node03 itcast表
*/
2.2 集群运行
代码
package cn.itcast.sparksql
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:通过sparksql把结果数据写入到mysql表中 集群运行
object Data2Mysql {
def main(args: Array[String]): Unit = {
//1、创建sparksession
val spark: SparkSession = SparkSession
.builder()
.appName("data2mysql")
// .master("local[2]") 注释掉
.getOrCreate()
//2、读取mysql表中数据
//2.1 定义url连接
val url="jdbc:mysql://node03:3306/spark"
//2.2 定义表名
val table="iplocation" //读取的表名
//2.3 定义属性
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)
//3、把dataFrame注册成一张表
mysqlDF.createTempView("iplocation")
//4、通过sparksession调用sql方法
//需要统计经度和纬度出现的人口总数大于1000的记录,保存到mysql表中
val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")
//5.保存结果数据到mysql表中
//mode:指定数据插入模式
//overwrite: 覆盖,如果表不存在,事先帮我们创建
//append: 追加,如果表不存在,事先帮我们创建
//ignore: 忽略,如果表事先存在,就不进行任何操作
//error: (默认)表事先存在就报错
result.write.mode(args(0)).jdbc(url,args(1),properties) //换成可变参数
//6.关闭
spark.stop()
}
}
/*
结果在mysql node03 itcast100表
*/
提交任务
[[email protected] sparkdata]# spark-submit \
--master spark://node01:7077 \
--class cn.itcast.sparksql.Data2Mysql \
--executor-memory 1g --total-executor-cores 2 \
--jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
--driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
original-day01_wordcount-1.0-SNAPSHOT.jar \
append \
itcast100
//运行可以把反斜杠去掉
//node01:8080
附:(改过版本号的)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark_class15</artifactId>
<groupId>cn.itcast</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>day01_wordcount</artifactId>
<packaging>jar</packaging>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<!-- 定义版本号-->
<properties>
<hadoop.version>2.6.0-cdh5.14.0</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.2.0-cdh6.0.1</spark.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<!--Scala依赖库-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--spark核心依赖库-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--mysql数据库依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<!--hadoop版本库-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>2.2.0-cdh6.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>2.2.0-cdh6.0.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!--配置maven编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
看见了什么?
每个你不了解的新世界,都值得说声“Hello Word”。