天天看點

SparkSQL的入門實踐教程

摘要:Spark SQL是用于處理結構化資料的子產品。與Spark RDD不同的是,Spark SQL提供資料的結構資訊(源資料)和性能更好,可以通過SQL和DataSet API與Spark SQL進行互動。

本文分享自華為雲社群《【SparkSQL筆記】SparkSQL的入門實踐教程(一)》,作者:Copy工程師。

1.Spark SQL概述

Spark SQL是用于處理結構化資料的子產品。與Spark RDD不同的是,Spark SQL提供資料的結構資訊(源資料)和性能更好,可以通過SQL和DataSet API與Spark SQL進行互動。

2.Spark SQL程式設計入門

Spark SQL子產品的程式設計主入口點是SparkSession,SparkSession對象不僅為使用者提供了建立DataFrame對象、讀取外部資料源并轉化為DataFrame對象以及執行sql查詢的API,還負責記錄着使用者希望Spark應用如何在Spark叢集運作的控制、調優參數,是Spark SQL的上下文環境,是運作的基礎。

2.1 建立SparkSession

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();      

master("local")和new SparkConf().setMaster("local")一個樣子,SparkSession包含了SparkContext,SqlContext等,是更強大的入口對象,也是更統一的入口。

appName("SparkSQLDemo1")設定任務名稱

config():設定配置屬性,并且有多個重載方法:

public synchronized SparkSession.Builder config(String key, String value)
public synchronized SparkSession.Builder config(String key, long value)
public synchronized SparkSession.Builder config(String key, double value)
public synchronized SparkSession.Builder config(String key, boolean value)
public synchronized SparkSession.Builder config(SparkConf conf)      

Spark 2.0中的SparkSession為Hive提供了強大的内置支援,包括使用HiveQL編寫查詢語句,通路Hive UDF以及從Hive表讀取資料的功能。若是僅以學習為目的去測試這些功能時,并不需要在叢集中特意安裝Hive即可在Spark本地模式下測試Hive支援。

2.2 建立DataFrame

SparkSession對象提供的API,可以從現有的RDD,Hive表或其他結構化資料源中建立DataFrame對象。

在這裡說明一下,DataSet是DataFrame的替代品,比DataFrame更強大。DataFrame等價于DataSet[Row]

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
    System.out.println(sparkSession.version());
    Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");
    //show方法是展示所有的資料,也可以show(int rownums) 展示前N條資料
    json.show();
    sparkSession.close();
}      

樣例資料:

{"id":1,"name":"小紅","age":"19","phone":"111"}
{"id":2,"name":"王明","age":"20","phone":"222"}
{"id":3,"name":"諸葛亮","age":"21","phone":"333"}
{"id":4,"name":"王茂","age":"23","phone":"444"}
{"id":5,"name":"三毛","age":"17","phone":"555"}
{"id":6,"name":"老張","age":"16","phone":"666"}      

日志列印:

INFO  2019-11-25 20:26 - org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator[main] - Code generated in 18.6628 ms
+---+---+----+-----+
|age| id|name|phone|
+---+---+----+-----+
| 19|  1|  小紅|  111|
| 20|  2|  王明|  222|
| 21|  3| 諸葛亮|  333|
| 23|  4|  王茂|  444|
| 17|  5|  三毛|  555|
| 16|  6|  老張|  666|
+---+---+----+-----+

 INFO  2019-11-25 20:26 - org.spark_project.jetty.server.ServerConnector[main] - Stopped Spark@23db87{HTTP/1.1}{0.0.0.0:4040}      

可以看到,已經解析了json資料檔案,并且還解析json中的字段名稱,解析成表的字段名稱,而且如果你的json的key值中有不一緻的,都會解析成字段名稱,隻不過沒有值的預設為null

例如:

SparkSQL的入門實踐教程
SparkSQL的入門實踐教程

看到沒有,所有的不同key值都有。

2.3 DataFrame基本操作

DataFrame為我們提供了靈活、強大且底層自帶優化的API,例如select、where、orderBy、groupBy、limit、union這樣的算子操作,DataFrame提供這一系列算子對開發者來說非常熟悉,而DataFrame正是将SQL select語句的各個組成部分封裝為同名API,用以幫助程式員通過select、where、orderBy等DataFrame API靈活地組合實作sql一樣的邏輯表達。是以,DataFrame程式設計僅需像SQL那樣簡單地對計算條件、計算需求、最終所需結果進行聲明式的描述即可,而不需要像RDD程式設計那樣一步步地對資料集進行原始操作。

DataFrame API的使用執行個體(以上面的json資料為例):

  1. 以樹形格式輸出DataSet對象的結構資訊
Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");
// 展示DataSet結構資訊
json.printSchema();      
root
 |-- age: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)      

表的結構資訊已經按照樹形結構出來了,是根據json檔案的value值判定的。

2. 通過DataSet的Select()方法查詢資料集中一列或者多列

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");
// 定義字段
Column id = new Column("id");
Column name = new Column("name");
// 查詢單個字段
json.select("id").show();
// 查詢多個字段
json.select(id,name).show();
// 關閉saprkSesison 這裡的close和stop是一個樣 2.1.X開始用close 2.0.X使用的stop
sparkSession.close();      

Select 還有一個select(String col, String... cols)第一個參數不是很明白,後面再補充吧。還有這裡有個對象Column是非常重要的,以後所有的java開發sparkSQL都會用到這個對象,這個對象就是我們在資料庫中用到的字段,并且該對象有豐富的方法對字段做操作。

3. 組合使用DataSet對象的select(),where(),orderBy()方法查找id大于3的同學的id,姓名,年齡以及電話,按id的降序排列

這裡為了更能直覺的展示結果,我修改了json檔案,添加了幾行資料:

{"id":1,"name":"小紅","age":"19","phone":"111"}
{"id":2,"name":"王明","age":"20","phone":"222"}
{"id":3,"name":"諸葛亮","age":"21","phone":"333"}
{"id":4,"name":"王茂","age":"23","phone":"444"}
{"id":5,"name":"三毛","age":"17","phone":"555"}
{"id":6,"name":"老張","age":"16","phone":"666"}
{"id":7,"name":"張好","age":"16","phone":"777"}
{"id":8,"name":"王流","age":"16","phone":"888"}      

where條件查詢有兩種形式:

public Dataset<T> where(Column condition)
public Dataset<T> where(String conditionExpr)      

第一種是通過Column對象操作條件查詢,第二種是通過直接寫條件查詢

// 條件查詢
// id > 3
Column id = new Column("id").gt(3);
// age = 16
Column name = new Column("age").equalTo("16");
// id > 3 and age =16
Column select = id.and(name);
// 直接書寫條件
json.select("id","name","age","phone").where("id > 3 and age = 16").orderBy(new Column("id").desc()).show();
// 通過多個where生成 id > 3 and age =16
json.select("id","name","age","phone").where(id).where(name).orderBy(new Column("id").desc()).show();
// 通過Column操作轉換得到 id > 3 and age =16
json.select("id","name","age","phone").where(select).orderBy(new Column("id").desc()).show();      

這三個的寫法是一樣的,結果也是一樣的。

SparkSQL的入門實踐教程

4.使用DataSet對象提供的groupBy()方法進而學生年齡分布

// 分組查詢
// 單個字段分組查詢
json.groupBy(new Column("age")).count().show();
// 多個字段分組查詢
ArrayStack<Column> stack = new ArrayStack<>();
stack.push(new Column("id"));
stack.push(new Column("age"));
json.groupBy(stack).count().show();
// 多個字段分組查詢
json.groupBy(new Column("id"),new Column("age")).count().show();      

group()分組有很多形式:

SparkSQL的入門實踐教程

至于你想怎麼寫,隻要正确就可以。

SparkSQL的入門實踐教程
SparkSQL的入門實踐教程

如果你想對字段操作,比如我們經常會這樣寫sqlselect age+1 from student,呢麼sparkSQL完全可以實作,隻需要這樣既可:new Column("age").plus(1) 這就代表着age + 1

上面的執行個體中很好地展示了通過靈活組合使用DataSet提供的API可以實作SQL一樣清晰簡明的邏輯表達,如果采用RDD程式設計,首先RDD對JSON這種檔案格式并不敏感,會像讀取文本檔案一樣按行讀取JSON檔案,轉化為RDD[String],而不會像DataSet那樣自動解析JSON格式資料并且自動推斷出結構資訊(Schema),是以我們必須在程式中首先執行個體化一個JSON解析器用于解析JSON字元串得到真實資料組成的數組,實際是将RDD[String]轉化為由一行行記錄着多個共有字段數值的數組組成的RDD[Array[String]],進而使用map、filter、takeOrdered、distinct、union等RDD算子操作進行具體一步步地資料操作來實作業務邏輯。

相比之下,我們看出有時候同樣的資料量,同樣的分析需求,用RDD程式設計實作不僅代碼量更大,而且會極有可能因為程式員不良操作加重叢集的開銷,而采用DataFrame API組合程式設計有時僅需一行代碼即可實作複雜的分析需求。

2.4 執行SQL查詢

SparkSession為使用者提供了直接執行sql語句的SparkSession.sql(String sqlText)方法,sql語句可直接作為字元串傳入sql()方法中,sql查詢所得到結果依然為DataFrame對象。在Spark SQL子產品上直接執行sql語句的查詢需要首先将标志着結構化資料源的DataSet對象注冊成臨時表,進而在sql語句中對該臨時表進行查詢操作,具體步驟如下例所示:

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");
// 注冊臨時表
json.createOrReplaceTempView("student");
// sql查詢 用select * from student 也可以
Dataset<Row> sql = sparkSession.sql("select id,name,age,phone from student");
sql.show();
// 關閉saprkSesison 這裡的close和stop是一個樣 2.1.X開始用close 2.0.X使用的stop
sparkSession.close();      

結果顯示:

SparkSQL的入門實踐教程

由上述操作,可看出DataSet是Spark SQL核心的資料抽象,讀取的資料源需要轉化成DataSet對象,才能利用DataSet各種API進行豐富操作,也可将DataSet注冊成臨時表,進而直接執行SQL查詢,而DataFrame上的操作之後傳回的也是DataFrame對象。

另外,因為本小結所講述的是如何通過SparkSession提供的SQL接口直接進行SQL查詢,而關于具體完成業務需求所需的SQL語句如何來編寫,大家可以直接百度查詢相關SQL教程進行學習。Spark SQL的SQL接口全面支援SQL的select标準文法,包括SELECT DISTINCT、from子句、where子句、order by字句、group by子句、having子句、join子句,還有典型的SQL函數,例如avg()、count()、max()、min()等,除此之外,Spark SQL在此基礎上還提供了大量功能強大的可用函數,可嵌入sql語句中使用,有聚合類函數、時間控制類函數、數學統計類函數、字元串列控制類函數等,感興趣或有這方面分析需求的讀者具體可檢視官方文檔http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql

2.5 全局臨時表

全局臨時表(globe temporary view )于臨時表(temporary view)是相對的,全局臨時表的作用範圍是某個Spark應用程式内所有會話(SparkSession),它會持續存在,在所有會話中共享,直到該Spark應用程式終止

是以,在同一個應用中,在不同的session中都需要用到一張臨時表,呢麼該臨時表可以注冊為全局臨時表,避免多餘I/O,提高系統執行效率,當然如果某個臨時表隻在整個應用中的某個session中使用,僅需要注冊為局部臨時表,避免不必要的記憶體存儲全局臨時表

注意,全局臨時表與系統保留的資料庫global_temp相關聯,引用時需要使用global_temp辨別。

執行個體:

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");
// 注冊臨時表
json.createOrReplaceTempView("student");
// sql查詢
Dataset<Row> sql = sparkSession.sql("select * from student");
sql.show();
// 注冊為全局臨時表
try {
    json.createGlobalTempView("student_glob");
} catch (AnalysisException e) {
    e.printStackTrace();
}
// 目前session查詢全局臨時表
Dataset<Row> sqlGlob = sparkSession.sql("select * from global_temp.student_glob");
sqlGlob.show();

// 建立新的SparkSeesion 查詢全局臨時表
Dataset<Row> newSqlGlob = sparkSession.newSession().sql("select * from global_temp.student_glob");
newSqlGlob.show();

// 關閉saprkSesison 這裡的close和stop是一個樣 2.1.X開始用close 2.0.X使用的stop
sparkSession.close();      

顯示的結果都是一樣的。

2.6 DataSet實作WordCount

Dataset[T]中對象的序列化并不使用Java标準序列化或Kryo,而是使用專門的編碼器對對象進行序列化以便通過網絡進行處理或傳輸。雖然編碼器和标準序列化都負責将對象轉換為位元組,但編碼器是根據Dataset[T]的元素類型(T)動态生成,并且允許Spark無須将位元組反序列化回對象的情況下即可執行許多操作(如過濾、排序和散列),是以避免了不必要的反序列化導緻的資源浪費,更加高效。

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
Dataset<String> stringDataset = sparkSession.read().textFile("D:\\sparksqlfile\\jsondata\\word.txt");
// 分割每行的字元串
Dataset<String> dataset = stringDataset.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) throws Exception {
        String[] split = s.split("\t", -1);
        return Arrays.asList(split).iterator();
    }
},Encoders.STRING());
// 根據key值分組
KeyValueGroupedDataset<String, String> groupByKey = dataset.groupByKey(new MapFunction<String, String>() {
    @Override
    public String call(String value) throws Exception {
        return value.toLowerCase();
    }
},Encoders.STRING());
// 求和展示資料
groupByKey.count().show();

// 關閉saprkSesison 這裡的close和stop是一個樣 2.1.X開始用close 2.0.X使用的stop
sparkSession.close();      
SparkSQL的入門實踐教程

中繼資料:

im    runnig    man
you    are    yes    haha
yes    you    wein    im    niu
you    are    yes    haha
you    are    yes    haha      

2.7 将RDDs轉化為DataFrame

除了調用SparkSesion.read().json/csv/orc/parquet/jdbc方法從各種外部結構化資料源建立DataFrame對象外,Spark SQL還支援将已有的RDD轉化為DataSet對象,但是需要注意的是,并不是由任意類型對象組成的RDD均可轉化為DataSet對象,隻有當組成RDD[T]的每一個T對象内部具有公有且鮮明的字段結構時,才能隐式或顯式地總結出建立DataSet對象所必要的結構資訊(Schema)進行轉化,進而在DataSet上調用RDD所不具備的強大豐富的API,或執行簡潔的SQL查詢。

Spark SQL支援将現有RDDs轉換為DataSet的兩種不同方法,其實也就是隐式推斷或者顯式指定DataSet對象的Schema。

執行個體資料:

王明    13    17865321121    南京市天龍寺小區一棟    五年級一班
劉紅    14    15643213452    南京市天龍寺小區二棟    五年級一班
張三    15    15678941247    南京市天龍寺小區三棟    五年級二班
諸葛劉芳    14    14578654123    南京市天龍寺小區一棟    五年級一班      

1.使用反射機制(Reflection)推理出schema結構資訊

第一種将RDDs轉化為DataFrame的方法是使用SparkSQL内部反射機制自動推斷包含特定類型對象的RDD的schema(RDD的結構資訊)進行隐士轉化。采用這種方式轉化為DataSet對象,往往是因為被轉化的RDD[T]所包含的T對象本身就是具有典型一維表嚴格的字段結構的對象,是以SparkSQL很容易就可以自動推斷出合理的Schema。這種基于反射機制隐式地建立DataSet的方法往往僅需要簡潔的代碼即可完成轉化,并且運作效果良好。

SparkSQL的Scala接口支援自動包含樣例類(case class)對象的RDD轉換為DataSet對象。在樣例類的聲明中已預先定義了表的結構資訊,内部通過反射機制即可讀取樣例類的參數的名稱,類型,轉化為DataSet對象的Schema。樣例類不僅可以包含Int,Double,String,這樣的簡單資料類型,也可以嵌套或包含複雜類型,例如Seq或Arrays。

執行個體:将學生樣例對象的RDD隐式轉換為DataSet對象

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
    // 讀取檔案轉成JavaRDD
    JavaRDD<String> stringRDD = sparkSession.sparkContext().textFile("D:\\sparksqlfile\\jsondata\\student.txt",1).toJavaRDD();
    // JavaRDD<String> 轉為 JavaRDD<Person>
    JavaRDD<Person> personRDD = stringRDD.map(new Function<String, Person>() {
        @Override
        public Person call(String v1) {
            String[] split = v1.split("\t", -1);
            return new Person(split[0],Integer.valueOf(split[1]),split[2],split[3],split[4]);
        }
    });
    // RDD 轉換為 DataSet
    Dataset<Row> personDataSet = sparkSession.createDataFrame(personRDD, Person.class);
    personDataSet.show();
    // 注冊臨時表
    personDataSet.createOrReplaceTempView("person");
    // 查詢臨時表
    Dataset<Row> selectDataSet = sparkSession.sql("select * from person where age between 14 and 15");
    selectDataSet.show();
    // 周遊DataSet 通過下标擷取 name的Ds
    Dataset<String> nameDs = selectDataSet.map(new MapFunction<Row, String>() {
        @Override
        public String call(Row value) {
            // Row 的 字段排序是按照字典排序的 是以 第四個才是name字段
            return "name:"+value.getString(3);
        }
    }, Encoders.STRING());
    nameDs.show();

    // Row通過指定字段名擷取字段值 傳回Object對象
    Dataset<String> nameDs2 = selectDataSet.map(new MapFunction<Row, String>() {
        @Override
        public String call(Row value) {
            return "name:"+value.getAs("name");
        }
    }, Encoders.STRING());
    nameDs2.show();

    // 關閉saprkSesison 這裡的close和stop是一個樣 2.1.X開始用close 2.0.X使用的stop
    sparkSession.close();
}      

日志截圖:

SparkSQL的入門實踐教程

2.開發者指定Schema

RDD轉化為Dataset的第二種方法是通過程式設計接口,允許先建立一個schema,然後将其應用到現有的RDD[Row],較前一種方法由樣例類或基本資料類型(Int,String)對象組成的RDD通過sparkSession.createDataFrame直接隐式轉換為iDataset不同,不僅需要根據需求以及資料結構建構schema,而且需要将RDD[T]轉化為Row對象組成的RDD[Row],這樣方法雖然代碼多了一些,但也提供了更高的自由度和靈活性。

當case類不能提前定義時(例如:資料集結構資訊已經包含在每一行,一個文本資料集的字段對不同使用者來說需要被解析成不同的字段名),這時就可以通過以下三部完成Dataset的轉換:

(1):根據需求從源RDD轉化為RDD of Rows

(2):建立由符合在步驟1中建立的RDD中的Rows結構的StructType表示的模式。

(3):通過SparkSession提供的createDataFrame方法将模式應用于行的RDD。

由此可見,将RDD轉化為Dataset的實質就是,賦予RDD内部包含特定類型對象的結構資訊,使Dataset掌握更豐富的結構與資訊(可以了解為傳統資料庫的表頭,表頭包含個字段名稱,類型等資訊),如此一來,便更好地說明Dateset支援sql查詢了。

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
// 讀取檔案轉成JavaRDD
JavaRDD<String> peopleRDD = sparkSession.sparkContext().textFile("D:\\sparksqlfile\\jsondata\\student.txt",1).toJavaRDD();

String[] schemaString = {"name", "age"};
// 建立自定義schema
List<StructField> fields = new ArrayList<>();
for (String s : schemaString) {
    fields.add(DataTypes.createStructField(s,DataTypes.StringType,true));
}
StructType schema = DataTypes.createStructType(fields);
// JavaRDD<String> 轉為JavaRDD<Row>行記錄
JavaRDD<Row> rowRdd = peopleRDD.map(new Function<String, Row>() {
    @Override
    public Row call(String v1) {
        String[] split = v1.split("\t", -1);
        return RowFactory.create(split[0],split[1]);
    }
});
// JavaRDD轉DataSet
Dataset<Row> personDataset = sparkSession.createDataFrame(rowRdd, schema);
personDataset.show();      

2.8 使用者自定義函數

​ 除了利用Dataset豐富的内置函數變成外,還可以自己程式設計滿足特定分析需求的使用者自定義函數(UDF)并加以使用,SparkSQL中主要支援建立使用者自定義無類型聚合函數和使用者自定義強類型聚合函數

1.使用者自定義無類型聚合函數

使用者自定義的無類型聚合函數必須繼承UserDefinedAggregateFunction抽象類,進而重寫父類中的抽象成員變量和成員方法。其實重寫父類抽象成員變量,方法的過程即是實作使用者自定義函數的輸入,輸出規範以及計算邏輯的過程。

執行個體:求取平均值的函數

UDF函數代碼:

public class MyAverage extends UserDefinedAggregateFunction {

    private StructType inputSchema;
    private StructType bufferSchema;

    public MyAverage() {
        ArrayList<StructField> inputFields = new ArrayList<>();
        inputFields.add(DataTypes.createStructField("inputColumn",DataTypes.LongType,true));
        inputSchema = DataTypes.createStructType(inputFields);

        ArrayList<StructField> bufferFields = new ArrayList<>();
        bufferFields.add(DataTypes.createStructField("sum",DataTypes.LongType,true));
        bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType,true));
        bufferSchema = DataTypes.createStructType(bufferFields);

    }
    // Data types of input arguments of this aggregate function
    // 聚合函數輸入參數的資料類型(其實是該函數所作用的Dataset指定列的資料類型)
    @Override
    public StructType inputSchema() {
        return inputSchema;

    }
    // Data types of values in the aggregation buffer
    // 聚合函數的緩沖器結構,傳回之前定義了用于記錄累加值和累加數的字段結構
    @Override
    public StructType bufferSchema() {
        return bufferSchema;
    }
    // The data type of the returned value
    // 聚合函數傳回值的資料類型
    @Override
    public DataType dataType() {
        return DataTypes.DoubleType;
    }
    // Whether this function always returns the same output on the identical input
    // 此函數是否始終在相同輸入上傳回相同輸出
    @Override
    public boolean deterministic() {
        return true;
    }
    // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
    // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
    // the opportunity to update its values. Note that arrays and maps inside the buffer are still
    // immutable.
    // 初始化給定的buffer聚合緩沖器
    // buffer 聚合緩沖器其本身是一個Row對象,是以可以調用其标準方法通路buffer内的元素,例如在索引處檢索一個值
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0,0L);
        buffer.update(1,0L);
    }
    // Updates the given aggregation buffer `buffer` with new input data from `input`
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        if (!input.isNullAt(0)){
            long updatedSum = buffer.getLong(0) + input.getLong(0);
            long updatedCount = buffer.getLong(1)+1;
            buffer.update(0,updatedSum);
            buffer.update(1,updatedCount);
        }
    }
    // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
        long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
        buffer1.update(0,mergedSum);
        buffer1.update(1,mergedCount);
    }
    @Override
    public Object evaluate(Row buffer) {
        return ((double)buffer.getLong(0))/buffer.getLong(1);
    }
}      

運作代碼:

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder().master("local").appName("XXXXXXXXXX").config("spark.testing.memory", 471859200).getOrCreate();
    // 讀取檔案
    Dataset<Row> df = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student5.json");
    // 注冊自定義函數
    sparkSession.udf().register("myAverage",new MyAverage());
    // 顯示原始資料
    df.createOrReplaceTempView("student");
    df.show();
    // 使用自定義UDF求平均值
    Dataset<Row> result = sparkSession.sql("SELECT myAverage(age) as average_salary FROM student");
    result.show();
}      
SparkSQL的入門實踐教程

2.使用者自定義強類型聚合函數

使用者自定義強類型聚合函數需繼承Aggregator抽象類,同樣需要重寫父類抽象方法(reduce,merge,finish)以實作自定義聚合函數的計算邏輯。使用者定義的強類型聚合函數相比于前一種UDF,内部與特定資料集的資料類型緊密結合,增強了緊密型,安全性,但降低了适用性。

執行個體:求使用者平均值的強類型聚合函數

資料實體類:

// 定義Employee樣例類型規範聚合函數輸入資料的資料類型
public class Employee implements Serializable {
    private String name;
    private long age;
    private String sex;
    private String institute;
    private String phone;
    public Employee() {
    }

    public Employee(String name, long age, String sex, String institute, String phone) {
        this.name = name;
        this.age = age;
        this.sex = sex;
        this.institute = institute;
        this.phone = phone;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getAge() {
        return age;
    }

    public void setAge(long age) {
        this.age = age;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getInstitute() {
        return institute;
    }

    public void setInstitute(String institute) {
        this.institute = institute;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    @Override
    public String toString() {
        return "Employee{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", sex='" + sex + '\'' +
                ", institute='" + institute + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }
}      

定義聚合函數緩沖器:

// 定義Average樣例類規範buffer聚合緩沖器的資料類型
public class Average implements Serializable {
    private long sum;
    private long count;

    public Average() {
    }

    public Average(long sum, long count) {
        this.sum = sum;
        this.count = count;
    }

    public long getSum() {
        return sum;
    }

    public void setSum(long sum) {
        this.sum = sum;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "Average{" +
                "sum=" + sum +
                ", count=" + count +
                '}';
    }
}      

UDF代碼:

// 使用者自定義的強類型聚合函數必須繼承Aggregator抽象類,注意需要傳入聚合函數輸入資料,buffer緩沖器以及傳回的結果的泛型參數
public class MyAverage2 extends Aggregator<Employee,Average,Double> {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = b
    // 定義聚合的零值,應該滿足任何b + zero = b
    @Override
    public Average zero() {
        return new Average(0L, 0L);
    }

    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    // 定義作為Average對象的buffer聚合緩沖器如何處理每一條輸入資料(Employee對象)的聚合邏輯,
    // 與上例的求取平均值的無類型聚合函數的update方法一樣,每一次調用reduce都會更新buffer聚合函數的緩沖器
    // 并将更新後的buffer作為傳回值
    @Override
    public Average reduce(Average buffer, Employee employee) {
        long newSum = buffer.getSum() + employee.getAge();
        long newCount = buffer.getCount() + 1;
        buffer.setSum(newSum);
        buffer.setCount(newCount);
        return buffer;
    }

    // Merge two intermediate values
    // 與上例的求取平均值的無類型聚合函數的merge方法實作的邏輯相同
    @Override
    public Average merge(Average b1, Average b2) {
        long mergeSum = b1.getSum() + b2.getSum();
        long mergeCount = b1.getCount() + b2.getCount();
        b1.setSum(mergeSum);
        b1.setCount(mergeCount);
        return b1;
    }

    // Transform the output of the reduction
    // 定義輸出結果的邏輯,reduction表示buffer聚合緩沖器經過多次reduce,merge之後的最終聚合結果
    // 仍為Average對象記錄着所有資料的累加,累加次數
    @Override
    public Double finish(Average reduction) {
        System.out.println("////////////////"+((double) reduction.getSum()) / reduction.getCount());
        return ((double)reduction.getSum())/reduction.getCount();
    }

    // Transform the output of the reduction
    // 指定中間值的編碼器類型
    @Override
    public Encoder<Average> bufferEncoder() {
        return Encoders.bean(Average.class);
    }

    // Specifies the Encoder for the final output value type
    // 指定最終輸出的編碼器類型
    @Override
    public Encoder<Double> outputEncoder() {
        return Encoders.DOUBLE();
    }
}      
public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder().master("local").appName("XXXXXXXXXX").config("spark.testing.memory", 471859200).getOrCreate();
    Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
    // 讀取檔案
    Dataset<Employee> employeeDataset = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student5.json").as(employeeEncoder);
    employeeDataset.show();

    // 将函數轉換為'TypedColumn' 并給他一個名字
    MyAverage2 myAverage2 = new MyAverage2();
    TypedColumn<Employee, Double> average_salary = myAverage2.toColumn().name("average_salary");
    // 使用自定義強類型UDF求平均值
    Dataset<Double> result = employeeDataset.select(average_salary);
    result.show();
}      

日志:

SparkSQL的入門實踐教程

點選關注,第一時間了解華為雲新鮮技術~