DataStream API主要可為分為三個部分,DataSource子產品、Transformation子產品以及DataSink子產品。
DataSource子產品
内置DataSource:檔案資料源
讀取類型(WatchType): 其中WatchType共分為兩種模式——PROCESS_CONTINUOUSLY和PROCESS_ONCE模式。在PROCESS_CONTINUOUSLY模式下,一旦檢測到檔案内容發生變化,Flink會将該檔案全部内容加載到Flink系統中進行處理。而在PROCESS_ONCE模式下,當檔案内容發生變化時,隻會将變化的資料讀取至Flink中,在這種情況下資料隻會被讀取和處理一次
可繼承RichSourceFunction實作自定義資料源
Transformation子產品
所有DataStream的轉換操作可分為三類類型:
- 單Single-DataStream
- Multi-DaataStream
- 實體分區
其中Single-DataStream操作定義了對單個DataStream資料集元素的處理邏輯,Multi-DataStream操作定義了對多個DataStream資料集元素的處理邏
Single-DataStream操作:
Map [DataStream->DataStream]
FlatMap [DataStream->DataStream]:
該算子主要應用處理輸入一個元素産生一個或者多個元素的計算場景,比較常見的是在經典例子WordCount中,将每一行的文本資料切割,生成單詞序列對于輸入DataStream[String]通過FlatMap函數進行處理,字元串數字按逗号切割,然後形成新的整數資料集。
Filter [DataStream->DataStream]
KeyBy [DataStream->KeyedStream]:
以下兩種資料類型将不能使用KeyBy方法對資料集進行重分區:
使用者使用POJOs類型資料,但是POJOs類中沒有複寫hashCode()方法,而是依賴于Object.hasCode();
任何資料類型的數組結構。
Reduce [KeyedStream->DataStream]:主要目的是将輸入的KeyedStream通過傳入的使用者自定義的ReduceFunction滾動地進行資料聚合處理,其中定義的ReduceFunciton必須滿足運算結合律和交換律
Aggregations[KeyedStream->DataStream]: Aggregations是DataStream接口提供的聚合算子,根據指定的字段進行聚合操作,滾動地産生一系列資料聚合結果。其實是将Reduce算子中的函數進行了封裝,封裝的聚合操作有sum、min、minBy、max、maxBy等,這樣就不需要使用者自己定義Reduce函數
Multi-DataStream操作
Union[DataStream ->DataStream]
Connect, CoMap,CoFlatMap[DataStream ->DataStream]:Connect算子主要是為了合并兩種或者多種不同資料類型的資料集,合并後會保留原來資料集的資料類型
Split [DataStream->SplitStream]:Split算子是将一個DataStream資料集按照條件進行拆分,形成兩個資料集的過程,也是union算子的逆向實作。每個接入的資料都會被路由到一個或者多個輸出資料集中。如圖4-6所示,将輸入資料集根據顔色切分成兩個資料集
Select [SplitStream ->DataStream]: split函數本身隻是對輸入資料集進行标記,并沒有将資料集真正的實作切分,是以需要借助Select函數根據标記将資料切分成不同的資料集
Iterate[DataStream->IterativeStream->DataStream]: Iterate算子适合于疊代計算場景
實體分區(Physical Partitioning):
- 随機分區(Random Partitioning): [DataStream ->DataStream]
- Roundrobin Partitioning: [DataStream ->DataStream]
- Rescaling Partitioning: [DataStream ->DataStream]
- 廣播操作(Broadcasting): [DataStream ->DataStream]
- 自定義分區(Custom Partitioning): [DataStream ->DataStream]
DataSink子產品
在流式計算架構 Flink 中,可以通過 Sink 進行存儲操作。官方給出更推薦的說法是連接配接器 Connector, 第三方中間件作為連接配接器,既可以當成資料源,也能當成目的地,取決于實作的接口(SourceFunction/SinkFunction)
官方支援的連接配接器:
- pache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
PrintSinkFunction:在日常開發中常使用,通過控制台輸出結果進行驗證資料是否跟自己預期的一緻
自定義 SinkFunction:除了官方支援的 Connector 外,還提供了途徑,讓我們擴充存儲方式,通過 addSink() 方法,添加自定義的 SinkFunction
自定義Sink實作:
- open 擷取資料庫連結和初始化 SQL
- close 時釋放連結
- 每次落庫具體操作在 invoke 方法中。
Code demo:
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
private PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = MyDruidUtils.getConnection();
String sql = "insert into student(name, age, address) values (?, ?, ?);";
ps = connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(List<Student> value, Context context) throws Exception {
for (Student student : value) {
ps.setString(1, student.getName());
ps.setInt(2, student.getAge());
ps.setString(3, student.getAddress());
ps.addBatch();
}
int[] count = ps.executeBatch();
}
}
總結
DataStream API主要分為三個部分組成:DataSource子產品、Transformation子產品以及DataSink子產品,分别代表資料流處理的不同階段,可以根據實際需要自定義Source和Sink子產品。Transformation子產品進行實際邏輯處理,Flink提供了相關的算子來進行資料的處理。
參考
- https://cloud.tencent.com/developer/article/1559885
- http://www.justdojava.com/2019/11/21/flink_learn_datasink/