天天看点

SparkSQL(1)-SQL/DataFrame/Datasets(course58)

    • SparkSQL Datasets and DataFrames
    • Starting Point SparkSession
    • 创建DataFrame
    • 1 通过sparkSession创建DataSet Row
      • 2 其他API
        • printSchema
        • select
        • selectcolplus
        • filtergt
        • groupBycount
    • sql查询及Global Temporary View
      • 1 将DataFrame注册为临时视图
      • 2 将DataFrame注册为全局临时视图
    • 创建Dataset bean
      • 1 创建javabean并创建Dataset bean
      • 2 RDD与Dataset转换的重大意义

1. SparkSQL , Datasets and DataFrames

  1. sql执行sql查询,返回的结果为dataFrame/dataset .
  2. a

    DataFrame

    is represented by a

    Dataset

    of

    Rows

    .

    DataFrame

    就是

    Dataset

    。 —spark官网
  3. Dataset可理解为序列化的DataFrame.

2. Starting Point: SparkSession

Spark

的早期版本,

sparkContext

是进入

Spark

的切入点。我们都知道

RDD

Spark

中重要的

API

,然而它的创建和操作得使用

sparkContext

提供的

API

;对于RDD之外的其他东西,我们需要使用其他的

Context

比如对于

流处理来

说,我们得使用

StreamingContext

;对于

SQL

得使用

sqlContext

;而对于

hive

得使用

HiveContext

。然而DataSet和Dataframe提供的API逐渐称为新的标准API,我们需要一个切入点来构建它们,所以在 Spark 2.0中我们引入了一个新的切入点(entry point):

SparkSession

SparkSession

实质上是

SQLContext

HiveContext

的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession

内部封装了

sparkContext

,所以计算实际上是由

sparkContext

完成的。

3. 创建DataFrame

3.1 通过sparkSession创建DataSet< Row>

package cn.whbing.spark.SparkApps.sql;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/*
 * 实战dataset<row>
 */
public class DtaaFrameOps {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("spark SQL")
                .getOrCreate();

        //Dataset<Row> df = spark.read().json("hdfs://master-1a:9000/whbing/data/nvzhuang.json");
        Dataset<Row> df = spark.read().json("D:\\javaTools\\EclipseWork1\\taobaospider\\nvzhuang.json");

        df.show();
    }

}
           

数据源:(nvzhuang.json)

{"title": "韩版宽松彩色毛领面包服", "price": "698.00", "sales": "7120笔", "class2": "羽绒服女", "store": "原创主题旗舰店", "class1": "女装"}
{"title": "可自提", "price": "599.00", "sales": "7094笔", "class2": "羽绒服女", "store": "优衣库官方旗舰店", "class1": "女装"}
{"title": "森马韩版拉链连帽学生百搭羽绒服", "price": "299.90", "sales": "6996笔", "class2": "羽绒服女", "store": "森马官方旗舰店", "class1": "女装"}
...
           

结果:

17/12/29 13:58:59 INFO CodeGenerator: Code generated in 118.888707 ms
+------+------+-------+-----+--------+-----------------+
|class1|class2|  price|sales|   store|            title|
+------+------+-------+-----+--------+-----------------+
|    女装|  羽绒服女| 698.00|7120笔| 原创主题旗舰店|      韩版宽松彩色毛领面包服|
|    女装|  羽绒服女| 599.00|7094笔|优衣库官方旗舰店|              可自提|
|    女装|  羽绒服女| 299.90|6996笔| 森马官方旗舰店|  森马韩版拉链连帽学生百搭羽绒服|
|    女装|  羽绒服女| 568.00|6912笔|  精梳坊旗舰店|  中老年冬季中年加厚50岁羽绒服|
|    女装|  羽绒服女| 338.00|6895笔|      姗西|     反季清仓2017冬装新款|
|    女装|  羽绒服女| 228.00|6869笔|   初碧旗舰店|  韩版百搭羊羔毛拼接冬季羽绒棉服|
|    女装|  羽绒服女| 199.00|6856笔|   高梵旗舰店|        高梵 修身舒适轻薄|
|    女装|  羽绒服女| 213.00|6829笔|  玫兰妮旗舰店| 冬季韩版修身连帽百搭薄款羽绒棉服|
|    女装|  羽绒服女| 219.00|6819笔| 世纪香缤旗舰店|  连帽修身薄款大码清仓韩版羽绒服|
|    女装|  羽绒服女| 218.00|6816笔|  橡迪莎旗舰店|         chic冬装外套|
|    女装|  羽绒服女| 548.00|6752笔| 向内向外旗舰店|黑色韩版亮面宽松大码毛领加厚羽绒服|
|    女装|  羽绒服女| 158.00|6700笔|   筱芙旗舰店|   中老年人60-70岁羽绒棉服|
|    女装|  羽绒服女| 829.00|6698笔|太平鸟官方旗舰店|   太平鸟黑色茧型加厚连帽羽绒服|
|    女装|  羽绒服女| 698.00|6637笔| 香影官方旗舰店|  韩版时尚毛领刺绣丝绒修身羽绒服|
|    女装|  羽绒服女| 196.00|6630笔|  玫兰妮旗舰店|   冬季韩版修身连帽百搭羽绒棉袄|
|    女装|  羽绒服女|1699.00|6569笔|波司登官方旗舰店|   波司登运动毛领韩版加厚羽绒服|
|    女装|  羽绒服女| 288.00|6307笔| 欧娜名媛旗舰店|         冬季加厚棉袄棉衣|
|    女装|  羽绒服女| 168.00|6221笔|  薇诗琪旗舰店|    妈妈羽绒中老年人40岁棉袄|
|    女装|  羽绒服女| 148.00|6197笔| 欧娜名媛旗舰店|     宽松女士羽绒韩版冬季棉服|
|    女装|  羽绒服女| 329.00|6191笔| 呼啸衫庄旗舰店|     冬季韩版修身白鸭绒羽绒服|
+------+------+-------+-----+--------+-----------------+
only showing top 20 rows
           

3.2 其他API

printSchema()

//describe table;
        df.printSchema();
           

结果:

root
 |-- class1: string (nullable = true)
 |-- class2: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales: string (nullable = true)
 |-- store: string (nullable = true)
 |-- title: string (nullable = true)
           

select()

// select price from table;
        df.select("price").show();
           

结果:

+-------+
|  price|
+-------+
| 698.00|
| 599.00|
| 299.90|
| 568.00|
| 338.00|
| 228.00|
| 199.00|
| 213.00|
| 219.00|
| 218.00|
| 548.00|
| 158.00|
| 829.00|
| 698.00|
| 196.00|
|1699.00|
| 288.00|
| 168.00|
| 148.00|
| 329.00|
+-------+
only showing top  rows
           

select()/col()/plus()

// select store price- from table;
        df.select(col("store"), col("price").plus(-)).show();
           

结果:

+--------+---------------+
|   store|(price + -1000)|
+--------+---------------+
| 原创主题旗舰店|         -302.0|
|优衣库官方旗舰店|         -401.0|
| 森马官方旗舰店|         -700.1|
|  精梳坊旗舰店|         -432.0|
|      姗西|         -662.0|
|   初碧旗舰店|         -772.0|
|   高梵旗舰店|         -801.0|
|  玫兰妮旗舰店|         -787.0|
| 世纪香缤旗舰店|         -781.0|
|  橡迪莎旗舰店|         -782.0|
| 向内向外旗舰店|         -452.0|
|   筱芙旗舰店|         -842.0|
|太平鸟官方旗舰店|         -171.0|
| 香影官方旗舰店|         -302.0|
|  玫兰妮旗舰店|         -804.0|
|波司登官方旗舰店|          699.0|
| 欧娜名媛旗舰店|         -712.0|
|  薇诗琪旗舰店|         -832.0|
| 欧娜名媛旗舰店|         -852.0|
| 呼啸衫庄旗舰店|         -671.0|
+--------+---------------+
only showing top  rows
           

filter()/gt()

//select * from table where price > ;
        df.filter(col("price").gt(500)).show();
           

结果:

+------+------+-------+-----+--------+--------------------+
|class1|class2|  price|sales|   store|               title|
+------+------+-------+-----+--------+--------------------+
|    女装|  羽绒服女| 698.00|7120笔| 原创主题旗舰店|         韩版宽松彩色毛领面包服|
|    女装|  羽绒服女| 599.00|7094笔|优衣库官方旗舰店|                 可自提|
|    女装|  羽绒服女| 568.00|6912笔|  精梳坊旗舰店|     中老年冬季中年加厚50岁羽绒服|
|    女装|  羽绒服女| 548.00|6752笔| 向内向外旗舰店|   黑色韩版亮面宽松大码毛领加厚羽绒服|
|    女装|  羽绒服女| 829.00|6698笔|太平鸟官方旗舰店|      太平鸟黑色茧型加厚连帽羽绒服|
|    女装|  羽绒服女| 698.00|6637笔| 香影官方旗舰店|     韩版时尚毛领刺绣丝绒修身羽绒服|
|    女装|  羽绒服女|1699.00|6569笔|波司登官方旗舰店|      波司登运动毛领韩版加厚羽绒服|
|    女装|  羽绒服女| 779.00|6053笔|太平鸟官方旗舰店|    太平鸟加厚白鸭绒插肩袖浅绿羽绒服|
|    女装|  羽绒服女| 599.00|5982笔| 雅鹿官方旗舰店|     雅鹿韩版时尚大码毛领鸭绒羽绒服|
|    女装|  羽绒服女| 989.00|5980笔|波司登官方旗舰店|波司登2017新款迪士尼系列米奇时...|
|    女装|  羽绒服女| 599.00|5646笔|     杭依阁|       韩版加厚修身狐狸毛领羽绒服|
|    女装|  羽绒服女| 628.00|5550笔|     墨概念|    韩版貉子毛领加厚金丝绒连帽羽绒服|
|    女装|  羽绒服女| 899.50|5449笔|苏醒的乐园旗舰店|       冬季韩版薄款修身毛领羽绒服|
|    女装|  羽绒服女| 598.00|5443笔|      搜藏|       时尚韩版毛领个性口袋羽绒服|
|    女装|  羽绒服女| 669.00|5239笔|   高梵旗舰店|     高梵毛领修身显瘦韩版保暖羽绒服|
|    女装|  羽绒服女| 569.00|5104笔| 乐町官方旗舰店|         乐町2017秋冬季新款|
|    女装|  羽绒服女| 566.00|5033笔|     恒源祥|    恒源祥女士加厚保暖修身显瘦羽绒服|
+------+------+-------+-----+--------+--------------------+
           

groupBy()/count()

//count items by class2
        //select count() from table group by class2;
        df.groupBy("class2").count().show();
           

结果:

+------+-----+
|class2|count|
+------+-----+
|  羽绒服女|   60|
+------+-----+
           

4. sql查询及Global Temporary View

4.1 将DataFrame注册为临时视图

package cn.whbing.spark.SparkApps.sql;


import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.col;

/*
 * 实战dataset<row>
 */
public class DtaaFrameOps {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("spark SQL")
                .getOrCreate();

        //Dataset<Row> df = spark.read().json("hdfs://master-1a:9000/whbing/data/nvzhuang.json");
        Dataset<Row> df = spark.read().json("D:\\javaTools\\EclipseWork1\\taobaospider\\nvzhuang.json");

        //将dataframe注册为临时视图
        df.createOrReplaceTempView("nvzhuang");
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM nvzhuang");
        sqlDF.show();
        //结果
        /*
+------+------+-------+-----+--------+-----------------+
|class1|class2|  price|sales|   store|            title|
+------+------+-------+-----+--------+-----------------+
|    女装|  羽绒服女| 698.00|7120笔| 原创主题旗舰店|      韩版宽松彩色毛领面包服|
|    女装|  羽绒服女| 599.00|7094笔|优衣库官方旗舰店|              可自提|
|    女装|  羽绒服女| 299.90|6996笔| 森马官方旗舰店|  森马韩版拉链连帽学生百搭羽绒服|
|    女装|  羽绒服女| 568.00|6912笔|  精梳坊旗舰店|  中老年冬季中年加厚50岁羽绒服|
|    女装|  羽绒服女| 338.00|6895笔|      姗西|     反季清仓2017冬装新款|
|    女装|  羽绒服女| 228.00|6869笔|   初碧旗舰店|  韩版百搭羊羔毛拼接冬季羽绒棉服|
|    女装|  羽绒服女| 199.00|6856笔|   高梵旗舰店|        高梵 修身舒适轻薄|
|    女装|  羽绒服女| 213.00|6829笔|  玫兰妮旗舰店| 冬季韩版修身连帽百搭薄款羽绒棉服|
|    女装|  羽绒服女| 219.00|6819笔| 世纪香缤旗舰店|  连帽修身薄款大码清仓韩版羽绒服|
|    女装|  羽绒服女| 218.00|6816笔|  橡迪莎旗舰店|         chic冬装外套|
|    女装|  羽绒服女| 548.00|6752笔| 向内向外旗舰店|黑色韩版亮面宽松大码毛领加厚羽绒服|
|    女装|  羽绒服女| 158.00|6700笔|   筱芙旗舰店|   中老年人60-70岁羽绒棉服|
|    女装|  羽绒服女| 829.00|6698笔|太平鸟官方旗舰店|   太平鸟黑色茧型加厚连帽羽绒服|
|    女装|  羽绒服女| 698.00|6637笔| 香影官方旗舰店|  韩版时尚毛领刺绣丝绒修身羽绒服|
|    女装|  羽绒服女| 196.00|6630笔|  玫兰妮旗舰店|   冬季韩版修身连帽百搭羽绒棉袄|
|    女装|  羽绒服女|1699.00|6569笔|波司登官方旗舰店|   波司登运动毛领韩版加厚羽绒服|
|    女装|  羽绒服女| 288.00|6307笔| 欧娜名媛旗舰店|         冬季加厚棉袄棉衣|
|    女装|  羽绒服女| 168.00|6221笔|  薇诗琪旗舰店|    妈妈羽绒中老年人40岁棉袄|
|    女装|  羽绒服女| 148.00|6197笔| 欧娜名媛旗舰店|     宽松女士羽绒韩版冬季棉服|
|    女装|  羽绒服女| 329.00|6191笔| 呼啸衫庄旗舰店|     冬季韩版修身白鸭绒羽绒服|
+------+------+-------+-----+--------+-----------------+
only showing top 20 rows
    */
    }

}       
           

4.2 将DataFrame注册为全局临时视图

Temporary views

in

Spark SQL

are

session-scoped

and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g.

SELECT * FROM global_temp.view1

.

//将dataframe注册为全局临时视图
        df.createGlobalTempView("nvzhuang1");
        spark.sql("SELECT * FROM global_temp.nvzhuang1").show();
        //spark.newSession().sql("SELECT * FROM global_temp.nvzhuang1").show();
        //结果
/*
+------+------+-------+-----+--------+-----------------+
|class1|class2|  price|sales|   store|            title|
+------+------+-------+-----+--------+-----------------+
|    女装|  羽绒服女| 698.00|7120笔| 原创主题旗舰店|      韩版宽松彩色毛领面包服|
|    女装|  羽绒服女| 599.00|7094笔|优衣库官方旗舰店|              可自提|
|    女装|  羽绒服女| 299.90|6996笔| 森马官方旗舰店|  森马韩版拉链连帽学生百搭羽绒服|
|    女装|  羽绒服女| 568.00|6912笔|  精梳坊旗舰店|  中老年冬季中年加厚50岁羽绒服|
|    女装|  羽绒服女| 338.00|6895笔|      姗西|     反季清仓2017冬装新款|
|    女装|  羽绒服女| 228.00|6869笔|   初碧旗舰店|  韩版百搭羊羔毛拼接冬季羽绒棉服|
|    女装|  羽绒服女| 199.00|6856笔|   高梵旗舰店|        高梵 修身舒适轻薄|
|    女装|  羽绒服女| 213.00|6829笔|  玫兰妮旗舰店| 冬季韩版修身连帽百搭薄款羽绒棉服|
|    女装|  羽绒服女| 219.00|6819笔| 世纪香缤旗舰店|  连帽修身薄款大码清仓韩版羽绒服|
|    女装|  羽绒服女| 218.00|6816笔|  橡迪莎旗舰店|         chic冬装外套|
|    女装|  羽绒服女| 548.00|6752笔| 向内向外旗舰店|黑色韩版亮面宽松大码毛领加厚羽绒服|
|    女装|  羽绒服女| 158.00|6700笔|   筱芙旗舰店|   中老年人60-70岁羽绒棉服|
|    女装|  羽绒服女| 829.00|6698笔|太平鸟官方旗舰店|   太平鸟黑色茧型加厚连帽羽绒服|
|    女装|  羽绒服女| 698.00|6637笔| 香影官方旗舰店|  韩版时尚毛领刺绣丝绒修身羽绒服|
|    女装|  羽绒服女| 196.00|6630笔|  玫兰妮旗舰店|   冬季韩版修身连帽百搭羽绒棉袄|
|    女装|  羽绒服女|1699.00|6569笔|波司登官方旗舰店|   波司登运动毛领韩版加厚羽绒服|
|    女装|  羽绒服女| 288.00|6307笔| 欧娜名媛旗舰店|         冬季加厚棉袄棉衣|
|    女装|  羽绒服女| 168.00|6221笔|  薇诗琪旗舰店|    妈妈羽绒中老年人40岁棉袄|
|    女装|  羽绒服女| 148.00|6197笔| 欧娜名媛旗舰店|     宽松女士羽绒韩版冬季棉服|
|    女装|  羽绒服女| 329.00|6191笔| 呼啸衫庄旗舰店|     冬季韩版修身白鸭绒羽绒服|
+------+------+-------+-----+--------+-----------------+
only showing top 20 rows
*/
           

5. 创建Dataset< bean>

5.1 创建javabean并创建Dataset< bean>

package cn.whbing.spark.SparkApps.sql;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;


public class DatasetOps {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("spark SQL")
                .getOrCreate();

        // Encoders are created for Java beans
        Encoder<Product> propductEncoder = Encoders.bean(Product.class);
        String path = "D:\\javaTools\\EclipseWork1\\taobaospider\\nvzhuang.json";
        Dataset<Product> productDS = spark.read().json(path).as(propductEncoder);
        productDS.show();
/*
+------+------+-------+-----+--------+-----------------+
|class1|class2|  price|sales|   store|            title|
+------+------+-------+-----+--------+-----------------+
|    女装|  羽绒服女| 698.00|7120笔| 原创主题旗舰店|      韩版宽松彩色毛领面包服|
|    女装|  羽绒服女| 599.00|7094笔|优衣库官方旗舰店|              可自提|
|    女装|  羽绒服女| 299.90|6996笔| 森马官方旗舰店|  森马韩版拉链连帽学生百搭羽绒服|
|    女装|  羽绒服女| 568.00|6912笔|  精梳坊旗舰店|  中老年冬季中年加厚50岁羽绒服|
|    女装|  羽绒服女| 338.00|6895笔|      姗西|     反季清仓2017冬装新款|
|    女装|  羽绒服女| 228.00|6869笔|   初碧旗舰店|  韩版百搭羊羔毛拼接冬季羽绒棉服|
|    女装|  羽绒服女| 199.00|6856笔|   高梵旗舰店|        高梵 修身舒适轻薄|
|    女装|  羽绒服女| 213.00|6829笔|  玫兰妮旗舰店| 冬季韩版修身连帽百搭薄款羽绒棉服|
|    女装|  羽绒服女| 219.00|6819笔| 世纪香缤旗舰店|  连帽修身薄款大码清仓韩版羽绒服|
|    女装|  羽绒服女| 218.00|6816笔|  橡迪莎旗舰店|         chic冬装外套|
|    女装|  羽绒服女| 548.00|6752笔| 向内向外旗舰店|黑色韩版亮面宽松大码毛领加厚羽绒服|
|    女装|  羽绒服女| 158.00|6700笔|   筱芙旗舰店|   中老年人60-70岁羽绒棉服|
|    女装|  羽绒服女| 829.00|6698笔|太平鸟官方旗舰店|   太平鸟黑色茧型加厚连帽羽绒服|
|    女装|  羽绒服女| 698.00|6637笔| 香影官方旗舰店|  韩版时尚毛领刺绣丝绒修身羽绒服|
|    女装|  羽绒服女| 196.00|6630笔|  玫兰妮旗舰店|   冬季韩版修身连帽百搭羽绒棉袄|
|    女装|  羽绒服女|1699.00|6569笔|波司登官方旗舰店|   波司登运动毛领韩版加厚羽绒服|
|    女装|  羽绒服女| 288.00|6307笔| 欧娜名媛旗舰店|         冬季加厚棉袄棉衣|
|    女装|  羽绒服女| 168.00|6221笔|  薇诗琪旗舰店|    妈妈羽绒中老年人40岁棉袄|
|    女装|  羽绒服女| 148.00|6197笔| 欧娜名媛旗舰店|     宽松女士羽绒韩版冬季棉服|
|    女装|  羽绒服女| 329.00|6191笔| 呼啸衫庄旗舰店|     冬季韩版修身白鸭绒羽绒服|
+------+------+-------+-----+--------+-----------------+
only showing top 20 rows
*/
    }

    public static class Product{
        private String class1;
        private String class2;
        private String price;
        private String sales;
        private String store;
        private String title;
        public String getClass1() {
            return class1;
        }
        public void setClass1(String class1) {
            this.class1 = class1;
        }
        public String getClass2() {
            return class2;
        }
        public void setClass2(String class2) {
            this.class2 = class2;
        }
        public String getPrice() {
            return price;
        }
        public void setPrice(String price) {
            this.price = price;
        }
        public String getSales() {
            return sales;
        }
        public void setSales(String sales) {
            this.sales = sales;
        }
        public String getStore() {
            return store;
        }
        public void setStore(String store) {
            this.store = store;
        }
        public String getTitle() {
            return title;
        }
        public void setTitle(String title) {
            this.title = title;
        }       
    }
}
           

5.2 RDD与Dataset转换的重大意义

在Spark中RDD可以直接转换成DataFrame。SparkCore的核心是RDD,所有的调度都是基于RDD完成的,对RDD的操作都可以转换成基于DataFrame使用SparkSQL来操作。RDD可能接上数据库,接上NoSQL,其他文件系统等各种数据来源,然后将数据转换为DataFrame,极大简化了大数据的开发,原来写Scala\Java,现在只需要写SparkSQL。

同时对DataFrame的操作又可以转换成RDD,基于DataFrame对数据进行SQL或机器学习等操作后又可以转换为RDD,这对于保存数据、格式化非常方便。

继续阅读