背景
SQL,Structured Query Language:結構化查詢語言,作為一個通用、流行的查詢語言,不僅僅是在傳統的資料庫,在大資料領域也變得越來越流行,hive、spark、kafka、flink等大資料元件都支援sql的查詢,使用sql可以讓一些不懂這些元件原理的人,輕松的來操作,大大的降低了使用的門檻,今天我們先來簡單的講講在flink的流進行中如何使用sql.
執行個體講解
構造StreamTableEnvironment對象
在flink的流進行中,要使用sql,需要首先構造一個StreamTableEnvironment對象,方法比較簡單。
sql中用到的catalog、table、function等都需要注冊到StreamTableEnvironment才能使用。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
複制
注冊table
接下來要将相應的表的資訊注冊到StreamTableEnvironment對象中,有以下幾種方式可以選擇.
以下的代碼是基于flink 1.10.0版本進行講解的,各個版本略有不同。
使用Tuple
//使用flink的二進制組,這個時候需要自定義字段名稱
Tuple2<String,Integer> tuple2 = Tuple2.of("jack", 10);
//構造一個Tuple的DataStream
DataStream<Tuple2<String,Integer>> tupleStream = env.fromElements(tuple2);
// 注冊到StreamTableEnvironment,并且指定對應的字段名
tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age");
//執行一個sql 查詢. 然後傳回一個table對象
Table table = tableEnv.sqlQuery("select name,age from usersTuple");
// 将table對象轉成flink的DataStream,以便後續操作,我們這裡将其輸出
tableEnv.toAppendStream(table, Row.class).print();
複制
具體的詳盡的内容請參考代碼中的注釋.
使用Row
flink中提供的元組Tuple是有限制的,最多到Tuple25,是以如果我們有更多的字段,可以選擇使用flink中的Row對象.
//使用Row
Row row = new Row(2);
row.setField(0, "zhangsan");
row.setField(1, 20);
DataStream<Row> rowDataStream = env.fromElements(row);
tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age");
Table tableRow = tableEnv.sqlQuery("select name,age from usersRow");
tableEnv.toAppendStream(tableRow, Row.class).print();
複制
使用java的Pojo類
首先定一個pojo類
public static class User{
private String name;
private int age;
public String getName(){
return name;
}
public void setName(String name){
this.name = name;
}
public int getAge(){
return age;
}
public void setAge(int age){
this.age = age;
}
}
複制
定義這個pojo類是要符合flink的序列化規則,是有一定要求的,具體的可以參考【1】:
- 該類是public類型并且沒有非靜态内部類
- 該類擁有公有的無參構造器
- 類(以及所有超類)中的所有非靜态、非 transient 字段都是公有的(非 final 的);或者遵循 Java bean 規則,字段是private的,但是具有public類型的 getter 和 setter 方法
User user = new User();
user.setName("Tom");
user.setAge(20);
DataStream<User> userDataStream = env.fromElements(user);
tableEnv.createTemporaryView("usersPojo", userDataStream);
Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo");
tableEnv.toAppendStream(tablePojo, Row.class).print();
複制
如果使用的是java pojo類型的DataStream,就不用聲明字段名稱了,flink會自動解析pojo類中的字段名稱和類型來作為table的字段和類型。
使用外部存儲
//連接配接外部系統,比如檔案,kafka等
Schema schema = new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT());
tableEnv.connect(new FileSystem().path("...."))
.withFormat(new Csv())
.withSchema(schema)
.createTemporaryTable("usersFile");
Table tableFile = tableEnv.sqlQuery("select name,age from usersFile");
tableEnv.toAppendStream(tableFile, Row.class).print();
複制
使用外部存儲的時候需要指定以下對象:
- tableEnv.connect(ConnectorDescriptor ...) 指定連接配接符,目前flink支援Elasticsearch、hbase、kafka、filesystem這幾類
- withFormat(FormatDescriptor format) 這個就是指定我們從上述資料源讀取的資料的格式,比如json、csv、parquet等等
- .withSchema(Schema schema) 給我們的table定義一個schema,也就是字段的名稱和類型,用于sql查詢
- .createTemporaryTable("usersFile") 給表起一個名字,并且注冊到StreamTableEnvironment中
其實還有一些其他的注冊方法,但是已經标記為過期了,我們這裡就不講解了。
參考資料:
[1].https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
完整代碼請參考
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/sql/SqlFirst.java