Flink1.7.2 sql 批處理示例
源碼
概述
- 本文為Flink sql Dataset 示例
- 主要操作包括:Scan / Select,as (table),as (column),limit,Where / Filter,between and (where),Sum,min,max,avg,
- (group by ),group by having,distinct,INNER JOIN,left join,right join,full outer join,union,unionAll,INTERSECT
in,EXCEPT,insert into
SELECT
Scan / Select
- 功能描述: 查詢一個表中的所有資料
- scala 程式
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
tableEnv.sqlQuery(s"select name,age FROM user1")
.first(100).print()
/**
* 輸出結果
*
* 小明,15
* 小王,45
* 小李,25
* 小慧,35
*/
}
}
- 輸出結果
小明,15
小王,45
小李,25
小慧,35
as (table)
- 功能描述: 給表名取别稱
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
tableEnv.sqlQuery(s"select t1.name,t1.age FROM user1 as t1")
.first(100).print()
/**
* 輸出結果
*
* 小明,15
* 小王,45
* 小李,25
* 小慧,35
*/
}
}
小明,15
小王,45
小李,25
小慧,35
as (column)
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.scan
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
tableEnv.sqlQuery(s"select name a,age as b FROM user1 ")
.first(100).print()
/**
* 輸出結果
*
* 小明,15
* 小王,45
* 小李,25
* 小慧,35
*/
}
}
小明,15
小王,45
小李,25
小慧,35
limit
- 功能描述:查詢一個表的資料,隻傳回指定的前幾行(争對并行度而言,是以并行度不一樣,結果不一樣)
package com.opensourceteams.mo`dule.bigdata.flink.example.sql.dataset.operations.limit
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
/**
* 先排序,按age的降序排序,輸出前100位結果,注意是按同一個并行度中的資料進行排序,也就是同一個分區
*/
tableEnv.sqlQuery(s"select name,age FROM user1 ORDER BY age desc LIMIT 100 ")
.first(100).print()
/**
* 輸出結果 并行度設定為2
*
* 小明,15
* 小王,45
* 小慧,35
* 小李,25
*/
/**
* 輸出結果 并行度設定為1
*
* 小王,45
* 小慧,35
* 小李,25
* 小明,15
*/
}
}
小明,15
小王,45
小慧,35
小李,25
Where / Filter
- 功能描述:列加條件過濾表中的資料
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.where
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
tableEnv.sqlQuery(s"select name,age,sex FROM user1 where sex = '女'")
.first(100).print()
/**
* 輸出結果
*
* 小李,25,女
* 小慧,35,女
*/
}
}
小李,25,女
小慧,35,女
between and (where)
- 功能描述: 過濾列中的資料, 開始資料 <= data <= 結束資料
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.whereBetweenAnd
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
tableEnv.sqlQuery(s"select name,age,sex FROM user1 where age between 20 and 35")
.first(100).print()
/**
* 結果
*
* 小李,25,女
* 小慧,35,女
*/
}
}
小李,25,女
小慧,35,女
Sum
- 功能描述: 求和所有資料
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.sum
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)
//彙總所有資料
tableEnv.sqlQuery(s"select sum(salary) FROM user1")
.first(100).print()
/**
* 輸出結果
*
* 6800
*/
}
}
6800
max
- 功能描述: 求最大值
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.max
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)
//彙總所有資料
tableEnv.sqlQuery(s"select max(salary) FROM user1 ")
.first(100).print()
/**
* 輸出結果
*
* 4000
*/
}
}
4000
min
- 功能描述: 求最小值
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.min
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)
tableEnv.sqlQuery(s"select min(salary) FROM user1 ")
.first(100).print()
/**
* 輸出結果
*
* 500
*/
}
}
500
sum (group by )
- 功能描述: 按性别分組求和
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)
//彙總所有資料
tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex")
.first(100).print()
/**
* 輸出結果
*
* 女,1300
* 男,5500
*/
}
}
女,1300
男,5500
group by having
- 功能描述:
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.group_having
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男",1500),("小王",45,"男",4000),("小李",25,"女",800),("小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex,'salary)
//分組統計,having是分組條件查詢
tableEnv.sqlQuery(s"select sex,sum(salary) FROM user1 group by sex having sum(salary) >1500")
.first(100).print()
/**
* 輸出結果
*
*
*/
}
}
男,5500
distinct
- 功能描述: 去重一列或多列
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.aggregations.distinct
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",15,"male"),("a",45,"female"),("d",25,"male"),("c",35,"female"))
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
/**
* 對資料去重
*/
tableEnv.sqlQuery("select distinct name FROM user1 ")
.first(100).print()
/**
* 輸出結果
*
* a
* c
* d
*/
}
}
a
c
d
join
INNER JOIN
- 功能描述: 連接配接兩個表,按指定的列,兩列都存在值才輸出
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.innerJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSetGrade = env.fromElements((1,"國文",100),(2,"數學",80),(1,"外語",50) )
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)
//内連接配接,兩個表
// tableEnv.sqlQuery("select * FROM `user` INNER JOIN grade on `user`.id = grade.userId ")
tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` INNER JOIN grade on `user`.id = grade.userId ")
.first(100).print()
/**
* 輸出結果
* 2,小王,45,男,4000,數學,80
* 1,小明,15,男,1500,國文,100
* 1,小明,15,男,1500,外語,50
*/
}
}
2,小王,45,男,4000,數學,80
1,小明,15,男,1500,國文,100
1,小明,15,男,1500,外語,50
left join
- 功能描述:連接配接兩個表,按指定的列,左表中存在值就一定輸出,右表如果不存在,就顯示為空
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.leftJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSetGrade = env.fromElements((1,"國文",100),(2,"數學",80),(1,"外語",50) )
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)
//左連接配接,拿左邊的表中的每一行資料,去關聯右邊的資料,如果有相同的比對資料,就都比對出來,如果沒有,就比對一條,不過右邊的資料為空
tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` LEFT JOIN grade on `user`.id = grade.userId ")
.first(100).print()
/**
* 輸出結果
*
* 1,小明,15,男,1500,國文,100
* 1,小明,15,男,1500,外語,50
* 2,小王,45,男,4000,數學,80
* 4,小慧,35,女,500,null,null
* 3,小李,25,女,800,null,null
*
*
*/
}
}
1,小明,15,男,1500,國文,100
1,小明,15,男,1500,外語,50
2,小王,45,男,4000,數學,80
4,小慧,35,女,500,null,null
3,小李,25,女,800,null,null
right join
- 功能描述:連接配接兩個表,按指定的列,右表中存在值就一定輸出,左表如果不存在,就顯示為空
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.rightJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSetGrade = env.fromElements((1,"國文",100),(2,"數學",80),(1,"外語",50),(10,"外語",90) )
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)
//左連接配接,拿左邊的表中的每一行資料,去關聯右邊的資料,如果有相同的比對資料,就都比對出來,如果沒有,就比對一條,不過右邊的資料為空
tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` RIGHT JOIN grade on `user`.id = grade.userId ")
.first(100).print()
/**
* 輸出結果
*
* 1,小明,15,男,1500,外語,50
* 1,小明,15,男,1500,國文,100
* 2,小王,45,男,4000,數學,80
* null,null,null,null,null,外語,90
*
*
*/
}
}
1,小明,15,男,1500,外語,50
1,小明,15,男,1500,國文,100
2,小王,45,男,4000,數學,80
null,null,null,null,null,外語,90
full outer join
- 功能描述: 連接配接兩個表,按指定的列,隻要有一表中存在值就一定輸出,另一表如果不存在就顯示為空
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.join.fullOuterJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSetGrade = env.fromElements((1,"國文",100),(2,"數學",80),(1,"外語",50),(10,"外語",90) )
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("grade",dataSetGrade,'userId,'name,'fraction)
//左,右,全比對所有資料
tableEnv.sqlQuery("select `user`.*,grade.name,grade.fraction FROM `user` FULL OUTER JOIN grade on `user`.id = grade.userId ")
.first(100).print()
/**
* 輸出結果
*
*
* 3,小李,25,女,800,null,null
* 1,小明,15,男,1500,外語,50
* 1,小明,15,男,1500,國文,100
* 2,小王,45,男,4000,數學,80
* 4,小慧,35,女,500,null,null
* null,null,null,null,null,外語,90
*
*
*
*/
}
}
3,小李,25,女,800,null,null
1,小明,15,男,1500,外語,50
1,小明,15,男,1500,國文,100
2,小王,45,男,4000,數學,80
4,小慧,35,女,500,null,null
null,null,null,null,null,外語,90
Set Operations
union
- 功能描述: 連接配接兩個表中的資料,會去重
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.union
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)
/**
* union 連接配接兩個表,會去重
*/
tableEnv.sqlQuery(
"select * from ("
+"select t1.* FROM `user` as t1 ) " +
+ " UNION "
+ " ( select t2.* FROM t2 )"
)
.first(100).print()
/**
* 輸出結果
*
* 30,小李,25,女,800
* 40,小慧,35,女,500
* 2,小王,45,男,4000
* 4,小慧,35,女,500
* 3,小李,25,女,800
* 1,小明,15,男,1500
*
*/
}
}
30,小李,25,女,800
40,小慧,35,女,500
2,小王,45,男,4000
4,小慧,35,女,500
3,小李,25,女,800
1,小明,15,男,1500
unionAll
- 功能描述: 連接配接兩表中的資料,不會去重
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.unionAll
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)
/**
* union 連接配接兩個表,不會去重
*/
tableEnv.sqlQuery(
"select * from ("
+"select t1.* FROM `user` as t1 ) " +
+ " UNION ALL "
+ " ( select t2.* FROM t2 )"
)
.first(100).print()
/**
* 輸出結果
*
* 1,小明,15,男,1500
* 2,小王,45,男,4000
* 3,小李,25,女,800
* 4,小慧,35,女,500
* 1,小明,15,男,1500
* 2,小王,45,男,4000
* 30,小李,25,女,800
* 40,小慧,35,女,500
*
*/
}
}
1,小明,15,男,1500
2,小王,45,男,4000
3,小李,25,女,800
4,小慧,35,女,500
1,小明,15,男,1500
2,小王,45,男,4000
30,小李,25,女,800
40,小慧,35,女,500
INTERSECT
- 功能描述: INTERSECT 連接配接兩個表,找相同的資料(相交的資料,重疊的資料)
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.intersect
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)
/**
* INTERSECT 連接配接兩個表,找相同的資料(相交的資料,重疊的資料)
*/
tableEnv.sqlQuery(
"select * from ("
+"select t1.* FROM `user` as t1 ) " +
+ " INTERSECT "
+ " ( select t2.* FROM t2 )"
)
.first(100).print()
/**
* 輸出結果
*
* 1,小明,15,男,1500
* 2,小王,45,男,4000
*
*/
}
}
1,小明,15,男,1500
2,小王,45,男,4000
in
- 功能描述: 子查詢
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.in
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)
/**
* in ,子查詢
*/
tableEnv.sqlQuery(
"select t1.* FROM `user` t1 where t1.id in " +
" (select t2.id from t2) "
)
.first(100).print()
/**
* 輸出結果
*
* 1,小明,15,男,1500
* 2,小王,45,男,4000
*
*/
}
}
1,小明,15,男,1500
2,小王,45,男,4000
EXCEPT
- 功能描述: EXCEPT 連接配接兩個表,找不相同的資料(不相交的資料,不重疊的資料)
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.setOperations.except
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(3,"小李",25,"女",800),(4,"小慧",35,"女",500))
val dataSet2 = env.fromElements((1,"小明",15,"男",1500),(2,"小王",45,"男",4000),(30,"小李",25,"女",800),(40,"小慧",35,"女",500))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user",dataSet,'id,'name,'age,'sex,'salary)
tableEnv.registerDataSet("t2",dataSet2,'id,'name,'age,'sex,'salary)
/**
* EXCEPT 連接配接兩個表,找不相同的資料(不相交的資料,不重疊的資料)
*/
tableEnv.sqlQuery(
"select * from ("
+"select t1.* FROM `user` as t1 ) " +
+ " EXCEPT "
+ " ( select t2.* FROM t2 )"
)
.first(100).print()
/**
* 輸出結果
*
* 3,小李,25,女,800
* 4,小慧,35,女,500
*
*/
}
}
3,小李,25,女,800
4,小慧,35,女,500
DML
insert into
- 功能描述:将一個表中的資料(source),插入到 csv檔案中(sink)
- scala程式
package com.opensourceteams.module.bigdata.flink.example.sql.dataset.operations.insert
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation
object Run {
def main(args: Array[String]): Unit = {
//得到批環境
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("小明",15,"男"),("小王",45,"男"),("小李",25,"女"),("小慧",35,"女"))
//得到Table環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊table
tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)
// create a TableSink
val csvSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE);
val fieldNames = Array("name", "age", "sex")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.STRING)
tableEnv.registerTableSink("t2",fieldNames,fieldTypes,csvSink)
tableEnv.sqlUpdate(s" insert into t2 select name,age,sex FROM user1 ")
env.execute()
/**
* 輸出結果
* a.csv
*
* 小明,15,男
* 小王,45,男
* 小李,25,女
* 小慧,35,女
*/
}
}
- 輸出資料 a.csv
小明,15,男
小王,45,男
小李,25,女
小慧,35,女