文章目錄
-
- 一、SparkSQL
-
- 1.SparkSQL介紹
- 2.Dataset 與 DataFrame概念解析
- 3.SparkSQL 的資料源
- 4.SparkSQL 底層架構
- 5.謂詞下推(predicate Pushdown)
- 二、建立DataSet的幾種方式
-
- 1.讀取 json 格式的檔案建立 Dataset
- 2.通過 json 格式的 RDD 建立 Dataset
- 3.非 json 格式的 RDD 建立 Dataset
-
- 3.1 反射
- 3.2 動态建立 Schema
- 4.讀取 parquet 檔案建立 Dataset
- 5.通過Bean class來建立 Dataset
- 6.讀取 JDBC 中的資料建立 Dataset(MySql 為例)
- 7.讀取 Hive 中的資料加載成 Dataset
- 三、序列化問題
- 四、自定義函數 UDF 和 UDAF
-
- 1.自定義函數 UDF
- 2.自定義函數 UDAF
- 5.開窗函數
一、SparkSQL
1.SparkSQL介紹
Hive 是 Shark 的前身,Shark 是 SparkSQL 的前身,SparkSQL 産生的根本原因是其完全脫離了 Hive 的限制。
- SparkSQL 支援查詢原生的 RDD。RDD 是 Spark 平台的核心概念, 是 Spark 能夠高效的處理大資料的各種場景的基礎。
- 能夠在 scala 、Java中寫 SQL 語句。支援簡單的 SQL 文法檢查,能夠在Scala 中寫 Hive 語句通路 Hive 資料,并将結果取回作為 RDD 使用。
兩者資料源均為Hive表,底層人物均為Spark人物,關鍵差別在于一個是Hive去解析,一個是Spark去解析。
- Spark on Hive: Hive 隻作為儲存角色,Spark 負責 sql 解析優化,執行。
- Hive on Spark:Hive 即作為存儲又負責 sql 的解析優化,Spark 負責執行。
2.Dataset 與 DataFrame概念解析
Dataset 也是一個分布式資料容器。與 RDD 類似,然而 Dataset 更像傳統資料庫的二維表格,除了資料以外,還掌握資料的結構資訊(中繼資料),即schema。同時,與 Hive 類似,Dataset 也支援嵌套資料類型(struct、array 和 map)。從 API 易用性的角度上 看, Dataset API 提供的是一套高層的關系操作,比函數式的 RDD API 要更加友好,門檻更低。Dataset 的底層封裝的是 RDD,當 RDD 的泛型是 Row 類型的時候,我們也可以稱它為 DataFrame。即
Dataset<Row> = DataFrame
3.SparkSQL 的資料源
SparkSQL 的資料源可以是 JSON 類型的字元串,JDBC,Parquent,Hive,HDFS 等。
4.SparkSQL 底層架構
首先拿到 sql 後解析一批未被解決的邏輯計劃,再經過分析得到分析後的邏輯計劃,再經過一批優化規則轉換成一批最佳優化的邏輯計劃,再經過 SparkPlanner 的政策轉化成一批實體計劃,随後經過消費模型轉換成一個個的 Spark 任務執行。
5.謂詞下推(predicate Pushdown)
二、建立DataSet的幾種方式
1.讀取 json 格式的檔案建立 Dataset
- json 檔案中的 json 資料不能嵌套 json 格式資料。
- Dataset 是一個一個 Row 類型的 RDD,
。ds.rdd()/ds.javaRdd()
- 可以兩種方式讀取 json 格式的檔案。
- df.show()預設顯示前 20 行資料。
- Dataset 原生 API 可以操作 Dataset(不友善)。
- 注冊成臨時表時,表中的列預設按 ascii 順序顯示列。
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class CreateDSFromJosonFile {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonfile")
.master("local")
.getOrCreate();
/**
* Dataset的底層是一個一個的RDD
* 注意:
* 當 Dataset裡的泛型是Row時,我們又可以稱之為dataframe
*
* 以下兩種方式都可以讀取json格式的檔案
*
*/
Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
// Dataset ds = sparkSession.read().json("data/json");
ds.show();
/**
* 顯示DataSet中的内容,預設顯示前20行。如果現實多行要指定多少行show(行數)
* 注意:當有多個列時,顯示的列先後順序是按列的ascii碼先後顯示。
*/
ds.show(100);
/**
*DataSet轉換成RDD
*/
// JavaRDD<Row> javaRDD = ds.javaRDD();
/**
* 樹形的形式顯示schema資訊
*/
ds.printSchema();
/**
* dataset自帶的API 操作dataset
*/
//select name from table
ds.select("name").show();
//select name ,age+10 as addage from table
ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
//select name ,age from table where age>19
ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
//select age,count(*) from table group by age
ds.groupBy(ds.col("age")).count().show();
/**
* 将dataset注冊成臨時的一張表,這張表相當于臨時注冊到記憶體中,邏是輯上的表,不會霧化到磁盤
*/
ds.createOrReplaceTempView("jtable");
//ds.registerTempTable("jtable");
Dataset<Row> sql = sparkSession.sql("select age,count(*) as gg from jtable group by age");
sql.show();
//
// Dataset<Row> sql2 = sparkSession.sql("select name,age from jtable");
// sql2.show();
sparkSession.stop();
}
}
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+--------+
| name|
+--------+
|zhangsan|
| lisi|
| wangwu|
| laoliu|
|zhangsan|
| lisi|
| wangwu|
| laoliu|
|zhangsan|
| lisi|
| wangwu|
+--------+
+--------+------+
| name|addage|
+--------+------+
|zhangsan| 28|
| lisi| null|
| wangwu| 28|
| laoliu| 38|
|zhangsan| 30|
| lisi| null|
| wangwu| 28|
| laoliu| 38|
|zhangsan| 38|
| lisi| null|
| wangwu| 28|
+--------+------+
+--------+---+
| name|age|
+--------+---+
| laoliu| 28|
|zhangsan| 20|
| laoliu| 28|
|zhangsan| 28|
+--------+---+
+----+---+
| age| gg|
+----+---+
|null| 3|
| 28| 3|
| 18| 4|
| 20| 1|
+----+---+
2.通過 json 格式的 RDD 建立 Dataset
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import java.util.Arrays;
/**
* 讀取json格式的RDD建立DF
* @author root
*
*/
public class CreateDSFromJsonRDD {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonrdd")
.master("local")
.getOrCreate();
/**
* 注意:
* 1.由于是java版,故通過javaSparkcontext.parallelize來建立json格式的JavaRDD
* 是以我們通過sparkcontext來建立javaSparkcontext
* 2.如果是scala版本,直接通過sparkcontext.parallelize來建立,就無需建立javaSparkcontext
*/
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> nameRDD = jsc.parallelize(Arrays.asList(
"{'name':'zhangsan','age':\"18\"}",
"{\"name\":\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
));
JavaRDD<String> scoreRDD = jsc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));
Dataset<Row> nameds = sparkSession.read().json(nameRDD);
Dataset<Row> scoreds = sparkSession.read().json(scoreRDD);
//注冊成臨時表使用
nameds.createOrReplaceTempView("nameTable");
scoreds.createOrReplaceTempView("scoreTable");
Dataset<Row> result =
sparkSession.sql("select nameTable.name,nameTable.age,scoreTable.score "
+ "from nameTable join scoreTable "
+ "on nameTable.name = scoreTable.name");
result.show();
sc.stop();
}
}
+--------+---+-----+
| name|age|score|
+--------+---+-----+
| wangwu| 20| 300|
|zhangsan| 18| 100|
| lisi| 19| 200|
+--------+---+-----+
3.非 json 格式的 RDD 建立 Dataset
3.1 反射
通過反射的方式将非
json
格式的
RDD
轉換成
Dataset
。實際上就是先将資料轉換成自定義類對象,變成
JavaRDD
,在底層通過反射的方式解析
Person.class
獲得
Person
的所有
schema
資訊(field),結合
RDD
本身,就生成了
Dataset
。
- 自定義類要可序列化
- 自定義類的通路級别是
Public
-
轉成RDD
後會根據映射将字段按Dataset
碼排序Assci
- 将
轉換成Dataset
時擷取字段兩種方式,一種是RDD
下标擷取(不推薦使用),另一種是ds.getInt(0)
擷取(推薦使用)ds.getAs(“列名”)
Person.java
:
package com.shsxt.java_Test.sql.dataset;
import java.io.Serializable;
public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id ;
private String name;
private Integer age;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";
}
}
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 通過反射的方式将非json格式的RDD轉換成Dataset
* 注意:這種方式不推薦使用
*
* @author root
*/
public class CreateDSFromRDDWithReflect {
public static void main(String[] args) {
/**
* 注意:
* 1.自定義類要實作序列化接口
* 2.自定義類通路級别必須是Public
* 3.RDD轉成Dataset會把自定義類中字段的名稱按assci碼排序
*/
SparkSession sparkSession = SparkSession
.builder()
.appName("reflect")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("data/person.txt");
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
@Override
public Person call(String line) throws Exception {
Person p = new Person();
p.setId(line.split(",")[0]);
p.setName(line.split(",")[1]);
p.setAge(Integer.valueOf(line.split(",")[2]));
return p;
}
});
/**
* 傳入進去Person.class的時候,sparkSession是通過反射的方式建立Dataset
* 在底層通過反射的方式獲得Person的所有schema資訊(field),結合RDD本身,就生成了Dataset
*/
Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class);
dataFrame.show();
dataFrame.printSchema();
dataFrame.registerTempTable("person");
Dataset sql = sparkSession.sql("select name,id,age from person where id = 2");
sql.show();
/**
* 将Dataset轉成JavaRDD
* 注意:
* 1.可以使用row.getInt(0),row.getString(1)...通過下标擷取傳回Row類型的資料,但是要注意列順序問題---不常用
* 2.可以使用row.getAs("列名")來擷取對應的列值。
*
// */
JavaRDD<Row> javaRDD = dataFrame.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
@Override
public Person call(Row row) throws Exception {
Person p = new Person();
// p.setId(row.getString(1));
// p.setName(row.getString(2));
// p.setAge(row.getInt(0));
p.setId(row.getAs("id"));
p.setName(row.getAs("name"));
p.setAge(row.getAs("age"));
return p;
}
});
map.foreach(x-> System.out.println(x));
sparkSession.stop();
}
}
+---+---+--------+
|age| id| name|
+---+---+--------+
| 18| 1|zhangsan|
| 19| 2| lisi|
| 20| 3| wangwu|
+---+---+--------+
root
|-- age: integer (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+----+---+---+
|name| id|age|
+----+---+---+
|lisi| 2| 19|
+----+---+---+
Person [id=1, name=zhangsan, age=18]
Person [id=2, name=lisi, age=19]
Person [id=3, name=wangwu, age=20]
3.2 動态建立 Schema
動态建立 Schema 将非 json 格式的 RDD 轉換成 Dataset。
/**
*
*/
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
/**
* 動态建立Schema将非json格式RDD轉換成Dataset
* @author root
*
*/
public class CreateDSFromRDDWithStruct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("struct")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("data/person.txt");
/**
* 轉換成Row類型的RDD
*/
final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
return RowFactory.create(
s.split(",")[0],
s.split(",")[1],
Integer.valueOf(s.split(",")[2])
);
}
});
/**
* 動态建構DataFrame中的中繼資料,一般來說這裡的字段可以來源自字元串,也可以來源于外部資料庫
*/
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
df.printSchema();
df.show();
sc.stop();
}
}
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan| 18|
| 2| lisi| 19|
| 3| wangwu| 20|
+---+--------+---+
4.讀取 parquet 檔案建立 Dataset
讀取與儲存二進制格式–parquet 檔案。
- 可以将
存儲成Dataset
檔案。儲存成parquet
檔案的方式有兩種:parquet
df.write().mode(SaveMode.Overwrite)format("parquet").save("./sparksql/parquet"); df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
-
指定檔案儲存時的模式。SaveMode
-
:覆寫Overwrite
-
:追加Append
-
:如果存在就報錯ErrorIfExists
-
:如果存在就忽略Ignore
-
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.*;
public class CreateDFFromParquet {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("parquet")
.master("local")
.getOrCreate();
Dataset<Row> df = sparkSession.read().json("data/json");
// sparkSession.read().format("json").load("./spark/json");
df.show();
/**
* 将Dataset儲存成parquet檔案,
* SaveMode指定存儲檔案時的儲存模式:
* Overwrite:覆寫
* Append:追加
* ErrorIfExists:如果存在就報錯
* Ignore:如果存在就忽略
* 儲存成parquet檔案有以下兩種方式:
*/
df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet");
// df.write().mode(SaveMode.Overwrite).parquet("data/parquet");
/**
* 加載parquet檔案成Dataset
* 加載parquet檔案有以下兩種方式:
*/
Dataset load = sparkSession.read().format("parquet").load("data/parquet");
load = sparkSession.read().parquet("data/parquet");
load.show();
sparkSession.stop();
}
}
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
5.通過Bean class來建立 Dataset
/**
*
*/
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.*;
import java.util.Arrays;
import java.util.List;
/**
* 通過Bean class來建立 Dataset
*
* @author root
*/
public class CreateDSFromBeanClass {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("beanclass")
.master("local")
.getOrCreate();
Person person = new Person();
person.setId("1");
person.setAge(18);
person.setName("zs");
Person person2 = new Person();
person2.setId("2");
person2.setAge(20);
person2.setName("ls");
List<Person> people = Arrays.asList(person, person2);
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> dataset = sparkSession.createDataset(people, personEncoder);
dataset.printSchema();
dataset.show();
dataset.registerTempTable("person");
Dataset<Row> result = sparkSession.sql("select name , id from person");
result.show();
sparkSession.stop();
}
}
root
|-- age: integer (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+---+---+----+
|age| id|name|
+---+---+----+
| 18| 1| zs|
| 20| 2| ls|
+---+---+----+
+----+---+
|name| id|
+----+---+
| zs| 1|
| ls| 2|
+----+---+
6.讀取 JDBC 中的資料建立 Dataset(MySql 為例)
從MYSQL中讀取建立 Dataset與寫入。
- 第一種方式讀取MySql資料庫表,加載為DataFrame
- 第二種方式讀取MySql資料表加載為Dataset
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CreateDSFromMysql {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("mysql")
.master("local")
.getOrCreate();
/**
* 第一種方式讀取MySql資料庫表,加載為Dataset
*/
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "root");
options.put("dbtable", "person");
Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load();
person.show();
person.createOrReplaceTempView("person");
// person.registerTempTable("person");
/**
* 第二種方式讀取MySql資料表加載為Dataset
*/
DataFrameReader reader = sparkSession.read().format("jdbc");
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark"); //MYSQL位址
reader.option("driver", "com.mysql.jdbc.Driver"); //MYSQL驅動包
reader.option("user", "root"); //使用者名
reader.option("password", "root"); //使用者密碼
reader.option("dbtable", "score"); //表
Dataset<Row> score = reader.load();
score.show();
score.createOrReplaceTempView("score");
// score.registerTempTable("score");
Dataset<Row> result =
sparkSession.sql("select person.id,person.name,person.age,score.score "
+ "from person,score "
+ "where person.name = score.name and score.score> 82");
result.show();
//
result.registerTempTable("result");
Dataset<Row> df = sparkSession.sql("select id,name,age,score from result where age > 18");
df.show();
/**
* 将Dataset結果儲存到Mysql中
*/
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "root");
/**
* SaveMode:
* Overwrite:覆寫
* Append:追加
* ErrorIfExists:如果存在就報錯
* Ignore:如果存在就忽略
*
*/
result.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);
System.out.println("----Finish----");
sparkSession.stop();
}
}
7.讀取 Hive 中的資料加載成 Dataset
package com.shsxt.scala_Test.sql.dataset
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object CreateDFFromHive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("hive")
//開啟Hive支援,代碼運作時等同于hive用戶端,需要連接配接到hive的中繼資料服務
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
//HiveSQL
spark.sql("use spark")
spark.sql("drop table if exists student_infos")
spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/student_infos' into table student_infos")
spark.sql("drop table if exists student_scores")
spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/student_scores' into table student_scores")
val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
spark.sql("drop table if exists good_student_infos2")
/**
* 将結果寫入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos2")
sc.stop()
}
}
三、序列化問題
序列化是生成對象的一種方式。
- 反序列化時
serializable
版本号不一緻時會導緻不能反序列化。
簡單來說,
的序列化機制是通過在運作時判斷類的Java
來驗證版本一緻性的。在進行反序列化時,serialVersionUID
會把傳來的位元組流中的JVM
與本地相應實體(類)的serialVersionUID
進行比較,如果相同就認為是一緻的, 可以進行反序列化,否則就會出現序列化版本不一緻的異常。當實作serialVersionUID
接口的實體(類)沒有顯式地定義一個名為java.io.Serializable
,類型為serialVersionUID
的變量時,long
序列化機制會根據編譯的Java
自動生成一個class
作序列化版本比較用,這種情況下,隻有同一次編譯生成的serialVersionUID
才會生成相同的class
。如果我們不希望通過編譯來強制劃分軟體版本,即實作序列化接口的實體能夠相容先前版本,未作更改的類,就需要顯式地定義一個名為serialVersionUID
,類型為serialVersionUID
的變量,不修改這個變量值的序列化實體都可以互相進行串行化和反串行化。long
- 子類中實作了
接口,父類中沒有實作,父類中的變量是不能被序列化,序列化後父類中的變量會得到null。serializable
- 注意:父類實作
接口,子類沒有實作serializable
接口時,子類可以正常序列化(應用:将一些不需要序列化的屬性值抽取出來放到父類(未實作序列化接口),子類實作序列化接口)serializable
- 注意:父類實作
- 被關鍵字
修飾的變量不能被序列化。transient
- 靜态變量不能被序列化,屬于類,不屬于方法和對象,是以不能被序列化。
- 靜态變量的值是在jvm中,能擷取到不是因為反序列化。
四、自定義函數 UDF 和 UDAF
1.自定義函數 UDF
package com.shsxt.java_Test.sql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* UDF 使用者自定義函數
* @author root
*
*/
public class UDF {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("udf")
.master("local")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
/**
* 動态建立Schema方式加載DF
*/
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD,schema);
df.registerTempTable("user");
/**
* 根據UDF函數參數的個數來決定是實作哪一個UDF UDF1,UDF2。。。。UDF1xxx
*/
sparkSession.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length() + t2;
}
} ,DataTypes.IntegerType );
sparkSession.sql("select name ,StrLen(name,100) as length from user").show();
//
sparkSession.stop();
}
}
+--------+------+
| name|length|
+--------+------+
|zhangsan| 108|
| lisi| 104|
| wangwu| 106|
+--------+------+
2.自定義函數 UDAF
實作
UDAF
函數,如果要自定義類,要實作
UserDefinedAggregateFunction
類。
package com.shsxt.java_Test.sql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* UDAF 使用者自定義聚合函數
*/
public class UDAF {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("udaf")
.master("local")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<String> parallelize = sc.parallelize(
Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi","zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi"),2);
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 注冊一個UDAF函數,實作統計相同值得個數
* 注意:這裡可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的
* 資料:
* zhangsan
* zhangsan
* lisi
* lisi
*
* select count(*) from user group by name
*/
sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {
private static final long serialVersionUID = 1L;
/**
* 初始化一個内部的自己定義的值,在Aggregate之前每組資料的初始化結果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
System.out.println("init ....." + buffer.get(0));
}
/**
* 更新 可以認為一個一個地将組内的字段值傳遞進來 實作拼接的邏輯
* buffer.getInt(0)擷取的是上一次聚合後的值
* 相當于map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合
* 大聚和發生在reduce端.
* 這裡即是:在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算
*/
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {
buffer.update(0, buffer.getInt(0) + 1);
System.out.println("update.....buffer" + buffer.toString() + " | row" + arg1.toString() );
}
/**
* 合并 update操作,可能是針對一個分組内的部分資料,在某個節點上發生的 但是可能一個分組内的資料,會分布在多個節點上處理
* 此時就要用merge操作,将各個節點上分布式拼接好的串,合并起來
* buffer1.getInt(0) : 大聚合的時候 上一次聚合後的值
* buffer2.getInt(0) : 這次計算傳入進來的update的結果
* 這裡即是:最後在分布式節點完成後需要進行全局級别的Merge操作
*/
public void merge(MutableAggregationBuffer buffer1, Row arg1) {
// 2 3 4 5 6 7
// 0 + 2 = 2
// 2 + 3 = 5
// 5 + 4 = 9
buffer1.update(0, buffer1.getInt(0) + arg1.getInt(0));
System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + arg1.toString() );
}
/**
* 在進行聚合操作的時候所要處理的資料的結果的類型
*/
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
}
/**
* 最後傳回一個和DataType的類型要一緻的類型,傳回UDAF最後的計算結果
*/
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}
/**
* 指定UDAF函數計算後傳回的結果類型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}
/**
* 指定輸入字段的字段及類型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
}
/**
* 確定一緻性 一般用true,用以标記針對給定的一組輸入,UDAF是否總是生成相同的結果。
*/
@Override
public boolean deterministic() {
return true;
}
});
sparkSession.sql("select name ,StringCount(name) as number from user group by name").show();
sc.stop();
}
}
+--------+------+
| name|number|
+--------+------+
| wangwu| 2|
|zhangsan| 6|
| lisi| 4|
+--------+------+
5.開窗函數
SQL函數
-
: 開窗函數是按照某個字段分組,然後取另一字段的前幾個的值,相當于分組取row_number()
topN
- 開窗函數格式:
package com.shsxt.java_Test.sql.windowfun;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
/**
* 動态建立Schema将非json格式RDD轉換成Dataset
* @author root
*
*/
public class CreateDSFromRDDWithStruct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("struct")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("data/sales.txt");
/**
* 轉換成Row類型的RDD
*/
final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
return RowFactory.create(
Integer.valueOf(s.split("\t")[0]),
s.split("\t")[1],
Integer.valueOf(s.split("\t")[2])
);
}
});
/**
* 動态建構DataFrame中的中繼資料,一般來說這裡的字段可以來源自字元串,也可以來源于外部資料庫
*/
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("riqi", DataTypes.IntegerType, true),
DataTypes.createStructField("leibie", DataTypes.StringType, true),
DataTypes.createStructField("jine", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
df.printSchema();
df.show();
df.createOrReplaceTempView("sales");
/**
* 開窗函數格式:
* 【 row_number() over (partition by XXX order by XXX) as rank】
* 注意:rank 從1開始
*/
/**
* 以類别分組,按每種類别金額降序排序,顯示 【日期,種類,金額】 結果,如:
*
* 1 A 100
* 2 B 200
* 3 A 300
* 4 B 400
* 5 A 500
* 6 B 600
* 排序後:
* 5 A 500 --rank 1
* 3 A 300 --rank 2
* 1 A 100 --rank 3
* 6 B 600 --rank 1
* 4 B 400 --rank 2
* 2 B 200 --rank 3
*
* 2018 A 400 1
* 2017 A 500 2
* 2016 A 550 3
*
*
* 2016 A 550 1
* 2017 A 500 2
* 2018 A 400 3
*
*/
Dataset<Row> result = sparkSession.sql("select riqi,leibie,jine,rank "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show(100);
sc.stop();
}
}
+----+------+----+----+
|riqi|leibie|jine|rank|
+----+------+----+----+
| 6| F| 96| 1|
| 9| F| 87| 2|
| 9| F| 84| 3|
| 7| E| 97| 1|
| 4| E| 94| 2|
| 9| E| 90| 3|
| 8| B| 98| 1|
| 9| B| 82| 2|
| 7| B| 67| 3|
| 3| D| 93| 1|
| 8| D| 79| 2|
| 8| D| 76| 3|
| 5| C| 95| 1|
| 9| C| 86| 2|
| 9| C| 81| 3|
| 9| A| 99| 1|
| 2| A| 92| 2|
| 9| A| 88| 3|
| 1| G| 91| 1|
| 9| G| 89| 2|
| 8| G| 75| 3|
+----+------+----+----+