天天看點

JavaSpark | SparkSQL | 建立DataSet | UDF與UDAF | 開窗函數

文章目錄

    • 一、SparkSQL
      • 1.SparkSQL介紹
      • 2.Dataset 與 DataFrame概念解析
      • 3.SparkSQL 的資料源
      • 4.SparkSQL 底層架構
      • 5.謂詞下推(predicate Pushdown)
    • 二、建立DataSet的幾種方式
      • 1.讀取 json 格式的檔案建立 Dataset
      • 2.通過 json 格式的 RDD 建立 Dataset
      • 3.非 json 格式的 RDD 建立 Dataset
        • 3.1 反射
        • 3.2 動态建立 Schema
      • 4.讀取 parquet 檔案建立 Dataset
      • 5.通過Bean class來建立 Dataset
      • 6.讀取 JDBC 中的資料建立 Dataset(MySql 為例)
      • 7.讀取 Hive 中的資料加載成 Dataset
    • 三、序列化問題
    • 四、自定義函數 UDF 和 UDAF
      • 1.自定義函數 UDF
      • 2.自定義函數 UDAF
    • 5.開窗函數

一、SparkSQL

1.SparkSQL介紹

  Hive 是 Shark 的前身,Shark 是 SparkSQL 的前身,SparkSQL 産生的根本原因是其完全脫離了 Hive 的限制。

  • SparkSQL 支援查詢原生的 RDD。RDD 是 Spark 平台的核心概念, 是 Spark 能夠高效的處理大資料的各種場景的基礎。
  • 能夠在 scala 、Java中寫 SQL 語句。支援簡單的 SQL 文法檢查,能夠在Scala 中寫 Hive 語句通路 Hive 資料,并将結果取回作為 RDD 使用。
  • Spark on Hive: Hive 隻作為儲存角色,Spark 負責 sql 解析優化,執行。
  • Hive on Spark:Hive 即作為存儲又負責 sql 的解析優化,Spark 負責執行。
兩者資料源均為Hive表,底層人物均為Spark人物,關鍵差別在于一個是Hive去解析,一個是Spark去解析。

2.Dataset 與 DataFrame概念解析

JavaSpark | SparkSQL | 建立DataSet | UDF與UDAF | 開窗函數

  Dataset 也是一個分布式資料容器。與 RDD 類似,然而 Dataset 更像傳統資料庫的二維表格,除了資料以外,還掌握資料的結構資訊(中繼資料),即schema。同時,與 Hive 類似,Dataset 也支援嵌套資料類型(struct、array 和 map)。從 API 易用性的角度上 看, Dataset API 提供的是一套高層的關系操作,比函數式的 RDD API 要更加友好,門檻更低。Dataset 的底層封裝的是 RDD,當 RDD 的泛型是 Row 類型的時候,我們也可以稱它為 DataFrame。即

Dataset<Row> = DataFrame

3.SparkSQL 的資料源

  SparkSQL 的資料源可以是 JSON 類型的字元串,JDBC,Parquent,Hive,HDFS 等。

JavaSpark | SparkSQL | 建立DataSet | UDF與UDAF | 開窗函數

4.SparkSQL 底層架構

  首先拿到 sql 後解析一批未被解決的邏輯計劃,再經過分析得到分析後的邏輯計劃,再經過一批優化規則轉換成一批最佳優化的邏輯計劃,再經過 SparkPlanner 的政策轉化成一批實體計劃,随後經過消費模型轉換成一個個的 Spark 任務執行。

JavaSpark | SparkSQL | 建立DataSet | UDF與UDAF | 開窗函數

5.謂詞下推(predicate Pushdown)

JavaSpark | SparkSQL | 建立DataSet | UDF與UDAF | 開窗函數

二、建立DataSet的幾種方式

1.讀取 json 格式的檔案建立 Dataset

  • json 檔案中的 json 資料不能嵌套 json 格式資料。
  • Dataset 是一個一個 Row 類型的 RDD,

    ds.rdd()/ds.javaRdd()

  • 可以兩種方式讀取 json 格式的檔案。
  • df.show()預設顯示前 20 行資料。
  • Dataset 原生 API 可以操作 Dataset(不友善)。
  • 注冊成臨時表時,表中的列預設按 ascii 順序顯示列。
package com.shsxt.java_Test.sql.dataset;


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class CreateDSFromJosonFile {

    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession
                .builder()
                .appName("jsonfile")
                .master("local")
                .getOrCreate();


        /**
         * Dataset的底層是一個一個的RDD
         * 注意:
         *      當 Dataset裡的泛型是Row時,我們又可以稱之為dataframe
         *
         * 以下兩種方式都可以讀取json格式的檔案
         *
         */

        Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
//		Dataset ds = sparkSession.read().json("data/json");
        ds.show();


        /**
         * 顯示DataSet中的内容,預設顯示前20行。如果現實多行要指定多少行show(行數)
         * 注意:當有多個列時,顯示的列先後順序是按列的ascii碼先後顯示。
         */
        ds.show(100);


        /**
         *DataSet轉換成RDD
         */
//        JavaRDD<Row> javaRDD = ds.javaRDD();

        /**
         * 樹形的形式顯示schema資訊
         */
        ds.printSchema();


        /**
         * dataset自帶的API 操作dataset
         */
		//select name from table
		ds.select("name").show();
		//select name ,age+10 as addage from table
		ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
		//select name ,age from table where age>19
		ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
		//select age,count(*) from table group by age
		ds.groupBy(ds.col("age")).count().show();

        /**
         * 将dataset注冊成臨時的一張表,這張表相當于臨時注冊到記憶體中,邏是輯上的表,不會霧化到磁盤
         */

        ds.createOrReplaceTempView("jtable");
        //ds.registerTempTable("jtable");

        Dataset<Row> sql = sparkSession.sql("select age,count(*) as gg from jtable group by age");
        sql.show();
//
//        Dataset<Row> sql2 = sparkSession.sql("select name,age from jtable");
//        sql2.show();
        sparkSession.stop();
    }
}
           
+----+--------+
| age|    name|
+----+--------+
|  18|zhangsan|
|null|    lisi|
|  18|  wangwu|
|  28|  laoliu|
|  20|zhangsan|
|null|    lisi|
|  18|  wangwu|
|  28|  laoliu|
|  28|zhangsan|
|null|    lisi|
|  18|  wangwu|
+----+--------+

+----+--------+
| age|    name|
+----+--------+
|  18|zhangsan|
|null|    lisi|
|  18|  wangwu|
|  28|  laoliu|
|  20|zhangsan|
|null|    lisi|
|  18|  wangwu|
|  28|  laoliu|
|  28|zhangsan|
|null|    lisi|
|  18|  wangwu|
+----+--------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+--------+
|    name|
+--------+
|zhangsan|
|    lisi|
|  wangwu|
|  laoliu|
|zhangsan|
|    lisi|
|  wangwu|
|  laoliu|
|zhangsan|
|    lisi|
|  wangwu|
+--------+

+--------+------+
|    name|addage|
+--------+------+
|zhangsan|    28|
|    lisi|  null|
|  wangwu|    28|
|  laoliu|    38|
|zhangsan|    30|
|    lisi|  null|
|  wangwu|    28|
|  laoliu|    38|
|zhangsan|    38|
|    lisi|  null|
|  wangwu|    28|
+--------+------+

+--------+---+
|    name|age|
+--------+---+
|  laoliu| 28|
|zhangsan| 20|
|  laoliu| 28|
|zhangsan| 28|
+--------+---+

+----+---+
| age| gg|
+----+---+
|null|  3|
|  28|  3|
|  18|  4|
|  20|  1|
+----+---+
           

2.通過 json 格式的 RDD 建立 Dataset

package com.shsxt.java_Test.sql.dataset;


import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;


import java.util.Arrays;

/**
 * 讀取json格式的RDD建立DF
 * @author root
 *
 */
public class CreateDSFromJsonRDD {
	public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("jsonrdd")
                .master("local")
                .getOrCreate();

        /**
         * 注意:
         *   1.由于是java版,故通過javaSparkcontext.parallelize來建立json格式的JavaRDD
         *   是以我們通過sparkcontext來建立javaSparkcontext
         *   2.如果是scala版本,直接通過sparkcontext.parallelize來建立,就無需建立javaSparkcontext
         */

        SparkContext sc = sparkSession.sparkContext();

        JavaSparkContext jsc = new JavaSparkContext(sc);

        JavaRDD<String> nameRDD = jsc.parallelize(Arrays.asList(
					"{'name':'zhangsan','age':\"18\"}",
					"{\"name\":\"lisi\",\"age\":\"19\"}",
					"{\"name\":\"wangwu\",\"age\":\"20\"}"
				));

		JavaRDD<String> scoreRDD = jsc.parallelize(Arrays.asList(
				"{\"name\":\"zhangsan\",\"score\":\"100\"}",
				"{\"name\":\"lisi\",\"score\":\"200\"}",
				"{\"name\":\"wangwu\",\"score\":\"300\"}"
				));

		Dataset<Row> nameds = sparkSession.read().json(nameRDD);
        Dataset<Row> scoreds = sparkSession.read().json(scoreRDD);


		//注冊成臨時表使用
        nameds.createOrReplaceTempView("nameTable");
        scoreds.createOrReplaceTempView("scoreTable");

        Dataset<Row> result =
                sparkSession.sql("select nameTable.name,nameTable.age,scoreTable.score "
							+ "from nameTable join scoreTable "
							+ "on nameTable.name = scoreTable.name");
		result.show();
		sc.stop();
	}
}
           
+--------+---+-----+
|    name|age|score|
+--------+---+-----+
|  wangwu| 20|  300|
|zhangsan| 18|  100|
|    lisi| 19|  200|
+--------+---+-----+
           

3.非 json 格式的 RDD 建立 Dataset

3.1 反射

  通過反射的方式将非

json

格式的

RDD

轉換成

Dataset

。實際上就是先将資料轉換成自定義類對象,變成

JavaRDD

,在底層通過反射的方式解析

Person.class

獲得

Person

的所有

schema

資訊(field),結合

RDD

本身,就生成了

Dataset

  • 自定義類要可序列化
  • 自定義類的通路級别是

    Public

  • RDD

    轉成

    Dataset

    後會根據映射将字段按

    Assci

    碼排序
  • Dataset

    轉換成

    RDD

    時擷取字段兩種方式,一種是

    ds.getInt(0)

    下标擷取(不推薦使用),另一種是

    ds.getAs(“列名”)

    擷取(推薦使用)

Person.java

package com.shsxt.java_Test.sql.dataset;

import java.io.Serializable;

public class Person implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private String id ;
	private String name;
	private Integer age;
	
	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public Integer getAge() {
		return age;
	}

	public void setAge(Integer age) {
		this.age = age;
	}

	@Override
	public String toString() {
		return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";
	}
	
}
           
package com.shsxt.java_Test.sql.dataset;



import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

/**
 * 通過反射的方式将非json格式的RDD轉換成Dataset
 * 注意:這種方式不推薦使用
 *
 * @author root
 */
public class CreateDSFromRDDWithReflect {
    public static void main(String[] args) {
        /**
         * 注意:
         * 1.自定義類要實作序列化接口
         * 2.自定義類通路級别必須是Public
         * 3.RDD轉成Dataset會把自定義類中字段的名稱按assci碼排序
         */

        SparkSession sparkSession = SparkSession
                .builder()
                .appName("reflect")
                .master("local")
                .getOrCreate();

        SparkContext sc = sparkSession.sparkContext();
        JavaSparkContext jsc = new JavaSparkContext(sc);
        JavaRDD<String> lineRDD = jsc.textFile("data/person.txt");
        
        JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
            @Override
            public Person call(String line) throws Exception {
                Person p = new Person();

                p.setId(line.split(",")[0]);
                p.setName(line.split(",")[1]);
                p.setAge(Integer.valueOf(line.split(",")[2]));
                return p;
            }
        });
        /**
         * 傳入進去Person.class的時候,sparkSession是通過反射的方式建立Dataset
         * 在底層通過反射的方式獲得Person的所有schema資訊(field),結合RDD本身,就生成了Dataset
         */
        Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class);

        dataFrame.show();
        dataFrame.printSchema();
        dataFrame.registerTempTable("person");
        Dataset sql = sparkSession.sql("select  name,id,age from person where id = 2");
        sql.show();
        

        /**
         * 将Dataset轉成JavaRDD
         * 注意:
         * 1.可以使用row.getInt(0),row.getString(1)...通過下标擷取傳回Row類型的資料,但是要注意列順序問題---不常用
         * 2.可以使用row.getAs("列名")來擷取對應的列值。
         *
         //		 */
		JavaRDD<Row> javaRDD = dataFrame.javaRDD();
		JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
			@Override
			public Person call(Row row) throws Exception {
				Person p = new Person();

//				p.setId(row.getString(1));
//				p.setName(row.getString(2));
//				p.setAge(row.getInt(0));

                p.setId(row.getAs("id"));
				p.setName(row.getAs("name"));
				p.setAge(row.getAs("age"));

				return p;
			}
		});

		map.foreach(x-> System.out.println(x));

        sparkSession.stop();
    }
}
           
+---+---+--------+
|age| id|    name|
+---+---+--------+
| 18|  1|zhangsan|
| 19|  2|    lisi|
| 20|  3|  wangwu|
+---+---+--------+

root
 |-- age: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+----+---+---+
|name| id|age|
+----+---+---+
|lisi|  2| 19|
+----+---+---+

Person [id=1, name=zhangsan, age=18]
Person [id=2, name=lisi, age=19]
Person [id=3, name=wangwu, age=20]
           

3.2 動态建立 Schema

  動态建立 Schema 将非 json 格式的 RDD 轉換成 Dataset。

/**
 * 
 */
package com.shsxt.java_Test.sql.dataset;


import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

/**
 * 動态建立Schema将非json格式RDD轉換成Dataset
 * @author root
 *
 */
public class CreateDSFromRDDWithStruct {
	public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("struct")
                .master("local")
                .getOrCreate();

        SparkContext sc = sparkSession.sparkContext();

        JavaSparkContext jsc = new JavaSparkContext(sc);
		JavaRDD<String> lineRDD = jsc.textFile("data/person.txt");
		/**
		 * 轉換成Row類型的RDD
		 */
		final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
			@Override
			public Row call(String s) throws Exception {

				return RowFactory.create(
						s.split(",")[0],
						s.split(",")[1],
						Integer.valueOf(s.split(",")[2])
					);
			}
		});
		/**
		 * 動态建構DataFrame中的中繼資料,一般來說這裡的字段可以來源自字元串,也可以來源于外部資料庫
		 */
		
		List<StructField> asList = Arrays.asList(
                DataTypes.createStructField("id", DataTypes.StringType, true),
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.IntegerType, true)
        );


		StructType schema = DataTypes.createStructType(asList);

		Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
		df.printSchema();
		df.show();


		sc.stop();
		
	}
}
           
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 18|
|  2|    lisi| 19|
|  3|  wangwu| 20|
+---+--------+---+
           

4.讀取 parquet 檔案建立 Dataset

  讀取與儲存二進制格式–parquet 檔案。

  • 可以将

    Dataset

    存儲成

    parquet

    檔案。儲存成

    parquet

    檔案的方式有兩種:
    df.write().mode(SaveMode.Overwrite)format("parquet").save("./sparksql/parquet");
    df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
               
  • SaveMode

    指定檔案儲存時的模式。
    • Overwrite

      :覆寫
    • Append

      :追加
    • ErrorIfExists

      :如果存在就報錯
    • Ignore

      :如果存在就忽略
package com.shsxt.java_Test.sql.dataset;


import org.apache.spark.sql.*;


public class CreateDFFromParquet {
	public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("parquet")
                .master("local")
                .getOrCreate();

        Dataset<Row>  df = sparkSession.read().json("data/json");

//		sparkSession.read().format("json").load("./spark/json");
		df.show();
		/**
		 * 将Dataset儲存成parquet檔案,
		 * SaveMode指定存儲檔案時的儲存模式:
		 * 		Overwrite:覆寫
		 * 		Append:追加
		 * 		ErrorIfExists:如果存在就報錯
		 * 		Ignore:如果存在就忽略
		 * 儲存成parquet檔案有以下兩種方式:
		 */

		df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet");
//		df.write().mode(SaveMode.Overwrite).parquet("data/parquet");

        /**
		 * 加載parquet檔案成Dataset
		 * 加載parquet檔案有以下兩種方式:	
		 */

        Dataset load = sparkSession.read().format("parquet").load("data/parquet");
		load = sparkSession.read().parquet("data/parquet");
		load.show();
        sparkSession.stop();
	}
	
}
           
+----+--------+
| age|    name|
+----+--------+
|  18|zhangsan|
|null|    lisi|
|  18|  wangwu|
|  28|  laoliu|
|  20|zhangsan|
|null|    lisi|
|  18|  wangwu|
|  28|  laoliu|
|  28|zhangsan|
|null|    lisi|
|  18|  wangwu|
+----+--------+
           

5.通過Bean class來建立 Dataset

/**
 *
 */
package com.shsxt.java_Test.sql.dataset;

import org.apache.spark.sql.*;
import java.util.Arrays;
import java.util.List;

/**
 * 通過Bean class來建立 Dataset
 *
 * @author root
 */
public class CreateDSFromBeanClass {

    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("beanclass")
                .master("local")
                .getOrCreate();


        Person person = new Person();
        person.setId("1");
        person.setAge(18);
        person.setName("zs");

        Person person2 = new Person();
        person2.setId("2");
        person2.setAge(20);
        person2.setName("ls");

        List<Person> people = Arrays.asList(person, person2);
        Encoder<Person> personEncoder = Encoders.bean(Person.class);

        Dataset<Person> dataset = sparkSession.createDataset(people, personEncoder);

        dataset.printSchema();

        dataset.show();

        dataset.registerTempTable("person");

        Dataset<Row> result = sparkSession.sql("select name , id  from person");

        result.show();

        sparkSession.stop();

    }
}
           
root
 |-- age: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+---+---+----+
|age| id|name|
+---+---+----+
| 18|  1|  zs|
| 20|  2|  ls|
+---+---+----+

+----+---+
|name| id|
+----+---+
|  zs|  1|
|  ls|  2|
+----+---+
           

6.讀取 JDBC 中的資料建立 Dataset(MySql 為例)

  從MYSQL中讀取建立 Dataset與寫入。

  • 第一種方式讀取MySql資料庫表,加載為DataFrame
  • 第二種方式讀取MySql資料表加載為Dataset
package com.shsxt.java_Test.sql.dataset;

import org.apache.spark.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


public class CreateDSFromMysql {
    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession
                .builder()
                .appName("mysql")
                .master("local")
                .getOrCreate();


        /**
         * 第一種方式讀取MySql資料庫表,加載為Dataset
         */
        Map<String, String> options = new HashMap<String, String>();
        options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user", "root");
        options.put("password", "root");
        options.put("dbtable", "person");

        Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load();

        person.show();

        person.createOrReplaceTempView("person");

//        person.registerTempTable("person");
        /**
         * 第二種方式讀取MySql資料表加載為Dataset
         */

        DataFrameReader reader = sparkSession.read().format("jdbc");
        reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");  //MYSQL位址
        reader.option("driver", "com.mysql.jdbc.Driver");   //MYSQL驅動包
        reader.option("user", "root");  //使用者名
        reader.option("password", "root");  //使用者密碼
        reader.option("dbtable", "score");  //表

        Dataset<Row> score = reader.load();
        score.show();
        score.createOrReplaceTempView("score");
//        score.registerTempTable("score");

        Dataset<Row>  result =
                sparkSession.sql("select person.id,person.name,person.age,score.score "
                        + "from person,score "
                        + "where person.name = score.name  and score.score> 82");
        result.show();
//
        result.registerTempTable("result");

        Dataset<Row>  df = sparkSession.sql("select id,name,age,score from result where age > 18");
        df.show();

        /**
         * 将Dataset結果儲存到Mysql中
         */

        Properties properties = new Properties();
        properties.setProperty("user", "root");
        properties.setProperty("password", "root");
        /**
         * SaveMode:
         * Overwrite:覆寫
         * Append:追加
         * ErrorIfExists:如果存在就報錯
         * Ignore:如果存在就忽略
         *
         */

        result.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);
        System.out.println("----Finish----");
        sparkSession.stop();
    }
}
           

7.讀取 Hive 中的資料加載成 Dataset

package com.shsxt.scala_Test.sql.dataset

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}


object CreateDFFromHive {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local")
            .appName("hive")
            //開啟Hive支援,代碼運作時等同于hive用戶端,需要連接配接到hive的中繼資料服務
            .enableHiveSupport()
            .getOrCreate()

        val sc: SparkContext = spark.sparkContext
        //HiveSQL
        spark.sql("use spark")
        spark.sql("drop table if exists student_infos")
        spark.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by '\t'")
        spark.sql("load data local inpath './data/student_infos' into table student_infos")

        spark.sql("drop table if exists student_scores")
        spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
        spark.sql("load data local inpath './data/student_scores' into table student_scores")

        val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
        spark.sql("drop table if exists good_student_infos2")

        /**
          * 将結果寫入到hive表中
          */
        df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos2")

        sc.stop()
    }
}
           

三、序列化問題

  序列化是生成對象的一種方式。

  1. 反序列化時

    serializable

    版本号不一緻時會導緻不能反序列化。

    簡單來說,

    Java

    的序列化機制是通過在運作時判斷類的

    serialVersionUID

    來驗證版本一緻性的。在進行反序列化時,

    JVM

    會把傳來的位元組流中的

    serialVersionUID

    與本地相應實體(類)的

    serialVersionUID

    進行比較,如果相同就認為是一緻的, 可以進行反序列化,否則就會出現序列化版本不一緻的異常。當實作

    java.io.Serializable

    接口的實體(類)沒有顯式地定義一個名為

    serialVersionUID

    ,類型為

    long

    的變量時,

    Java

    序列化機制會根據編譯的

    class

    自動生成一個

    serialVersionUID

    作序列化版本比較用,這種情況下,隻有同一次編譯生成的

    class

    才會生成相同的

    serialVersionUID

    。如果我們不希望通過編譯來強制劃分軟體版本,即實作序列化接口的實體能夠相容先前版本,未作更改的類,就需要顯式地定義一個名為

    serialVersionUID

    ,類型為

    long

    的變量,不修改這個變量值的序列化實體都可以互相進行串行化和反串行化。
  2. 子類中實作了

    serializable

    接口,父類中沒有實作,父類中的變量是不能被序列化,序列化後父類中的變量會得到null。
    • 注意:父類實作

      serializable

      接口,子類沒有實作

      serializable

      接口時,子類可以正常序列化(應用:将一些不需要序列化的屬性值抽取出來放到父類(未實作序列化接口),子類實作序列化接口)
  3. 被關鍵字

    transient

    修飾的變量不能被序列化。
  4. 靜态變量不能被序列化,屬于類,不屬于方法和對象,是以不能被序列化。
    • 靜态變量的值是在jvm中,能擷取到不是因為反序列化。

四、自定義函數 UDF 和 UDAF

1.自定義函數 UDF

package com.shsxt.java_Test.sql.udf_udaf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;

import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
 * UDF 使用者自定義函數
 * @author root
 *
 */
public class UDF {
	public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("udf")
                .master("local")
                .getOrCreate();

        JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
		JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu"));

		JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Row call(String s) throws Exception {
				return RowFactory.create(s);
			}
		});
		
		/**
		 * 動态建立Schema方式加載DF
		 */
		List<StructField> fields = new ArrayList<StructField>();
		fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
		StructType schema = DataTypes.createStructType(fields);
		
		Dataset<Row> df = sparkSession.createDataFrame(rowRDD,schema);
		
		df.registerTempTable("user");

		/**
		 * 根據UDF函數參數的個數來決定是實作哪一個UDF  UDF1,UDF2。。。。UDF1xxx
		 */
		sparkSession.udf().register("StrLen",new UDF2<String, Integer, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(String t1, Integer t2) throws Exception {
				return t1.length() + t2;
			}
		} ,DataTypes.IntegerType );

		sparkSession.sql("select name ,StrLen(name,100) as length from user").show();
//
        sparkSession.stop();
		
	}
}
           
+--------+------+
|    name|length|
+--------+------+
|zhangsan|   108|
|    lisi|   104|
|  wangwu|   106|
+--------+------+
           

2.自定義函數 UDAF

  實作

UDAF

函數,如果要自定義類,要實作

UserDefinedAggregateFunction

類。

package com.shsxt.java_Test.sql.udf_udaf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;

import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * UDAF 使用者自定義聚合函數
 */
public class UDAF {
    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession
                .builder()
                .appName("udaf")
                .master("local")
                .getOrCreate();

        JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());

        JavaRDD<String> parallelize = sc.parallelize(
                Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi","zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi"),2);

        JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Row call(String s) throws Exception {

                return RowFactory.create(s);
            }
        });

        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(fields);
        Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
        df.registerTempTable("user");

        /**
         * 注冊一個UDAF函數,實作統計相同值得個數
         * 注意:這裡可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的
         * 資料:
         *     zhangsan
         *     zhangsan
         *     lisi
         *     lisi
         *
         *     select count(*)  from user group by name
         */

        sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {

            private static final long serialVersionUID = 1L;

            /**
             * 初始化一個内部的自己定義的值,在Aggregate之前每組資料的初始化結果
             */
            @Override
            public void initialize(MutableAggregationBuffer buffer) {

                buffer.update(0, 0);

                System.out.println("init ....." + buffer.get(0));

            }

            /**
             * 更新 可以認為一個一個地将組内的字段值傳遞進來 實作拼接的邏輯
             * buffer.getInt(0)擷取的是上一次聚合後的值
             * 相當于map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合
             * 大聚和發生在reduce端.
             * 這裡即是:在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算
             */
            @Override
            public void update(MutableAggregationBuffer buffer, Row arg1) {

                buffer.update(0, buffer.getInt(0) + 1);

                System.out.println("update.....buffer" + buffer.toString() + " | row" + arg1.toString() );
            }

            /**
             * 合并 update操作,可能是針對一個分組内的部分資料,在某個節點上發生的 但是可能一個分組内的資料,會分布在多個節點上處理
             * 此時就要用merge操作,将各個節點上分布式拼接好的串,合并起來
             * buffer1.getInt(0) : 大聚合的時候 上一次聚合後的值
             * buffer2.getInt(0) : 這次計算傳入進來的update的結果
             * 這裡即是:最後在分布式節點完成後需要進行全局級别的Merge操作
             */

            public void merge(MutableAggregationBuffer buffer1, Row arg1) {
                // 2 3  4  5  6  7
                // 0 + 2 = 2
                // 2 + 3 = 5
                // 5 + 4  = 9

                buffer1.update(0, buffer1.getInt(0) + arg1.getInt(0));

                System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + arg1.toString() );
            }

            /**
             * 在進行聚合操作的時候所要處理的資料的結果的類型
             */
            @Override
            public StructType bufferSchema() {
                return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
            }

            /**
             * 最後傳回一個和DataType的類型要一緻的類型,傳回UDAF最後的計算結果
             */
            @Override
            public Object evaluate(Row row) {
                return row.getInt(0);
            }

            /**
             * 指定UDAF函數計算後傳回的結果類型
             */
            @Override
            public DataType dataType() {
                return DataTypes.IntegerType;
            }

            /**
             * 指定輸入字段的字段及類型
             */
            @Override
            public StructType inputSchema() {
                return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
            }

            /**
             * 確定一緻性 一般用true,用以标記針對給定的一組輸入,UDAF是否總是生成相同的結果。
             */
            @Override
            public boolean deterministic() {
                return true;
            }

        });

        sparkSession.sql("select name ,StringCount(name) as number from user group by name").show();

        sc.stop();
    }
}
           
+--------+------+
|    name|number|
+--------+------+
|  wangwu|     2|
|zhangsan|     6|
|    lisi|     4|
+--------+------+
           

5.開窗函數

  SQL函數

  • row_number()

    : 開窗函數是按照某個字段分組,然後取另一字段的前幾個的值,相當于分組取

    topN

  • 開窗函數格式:
package com.shsxt.java_Test.sql.windowfun;



import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

/**
 * 動态建立Schema将非json格式RDD轉換成Dataset
 * @author root
 *
 */
public class CreateDSFromRDDWithStruct {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("struct")
                .master("local")
                .getOrCreate();

        SparkContext sc = sparkSession.sparkContext();

        JavaSparkContext jsc = new JavaSparkContext(sc);
        JavaRDD<String> lineRDD = jsc.textFile("data/sales.txt");
        /**
         * 轉換成Row類型的RDD
         */
        final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
            @Override
            public Row call(String s) throws Exception {

                return RowFactory.create(
                        Integer.valueOf(s.split("\t")[0]),
                        s.split("\t")[1],
                        Integer.valueOf(s.split("\t")[2])
                );
            }
        });
        /**
         * 動态建構DataFrame中的中繼資料,一般來說這裡的字段可以來源自字元串,也可以來源于外部資料庫
         */

        List<StructField> asList = Arrays.asList(
                DataTypes.createStructField("riqi", DataTypes.IntegerType, true),
                DataTypes.createStructField("leibie", DataTypes.StringType, true),
                DataTypes.createStructField("jine", DataTypes.IntegerType, true)
        );


        StructType schema = DataTypes.createStructType(asList);

        Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
        df.printSchema();
        df.show();

        df.createOrReplaceTempView("sales");

        /**
         * 開窗函數格式:
         * 【 row_number() over (partition by XXX order by XXX) as rank】
         * 注意:rank 從1開始
         */
        /**
         * 以類别分組,按每種類别金額降序排序,顯示 【日期,種類,金額】 結果,如:
         *
         * 1 A 100
         * 2 B 200
         * 3 A 300
         * 4 B 400
         * 5 A 500
         * 6 B 600
         * 排序後:
         * 5 A 500  --rank 1
         * 3 A 300  --rank 2 
         * 1 A 100  --rank 3
         * 6 B 600  --rank 1
         * 4 B 400	--rank 2
         * 2 B 200  --rank 3
         *
         * 2018 A 400     1
         * 2017 A 500     2
         * 2016 A 550     3
         *
         *
         * 2016 A 550     1
         * 2017 A 500     2
         * 2018 A 400     3
         *
         */
        Dataset<Row> result = sparkSession.sql("select riqi,leibie,jine,rank "
                + "from ("
                + "select riqi,leibie,jine,"
                + "row_number() over (partition by leibie order by jine desc) rank "
                + "from sales) t "
                + "where t.rank<=3");
        result.show(100);

        sc.stop();

    }
}
           
+----+------+----+----+
|riqi|leibie|jine|rank|
+----+------+----+----+
|   6|     F|  96|   1|
|   9|     F|  87|   2|
|   9|     F|  84|   3|
|   7|     E|  97|   1|
|   4|     E|  94|   2|
|   9|     E|  90|   3|
|   8|     B|  98|   1|
|   9|     B|  82|   2|
|   7|     B|  67|   3|
|   3|     D|  93|   1|
|   8|     D|  79|   2|
|   8|     D|  76|   3|
|   5|     C|  95|   1|
|   9|     C|  86|   2|
|   9|     C|  81|   3|
|   9|     A|  99|   1|
|   2|     A|  92|   2|
|   9|     A|  88|   3|
|   1|     G|  91|   1|
|   9|     G|  89|   2|
|   8|     G|  75|   3|
+----+------+----+----+

           

繼續閱讀