天天看點

《從0到1學習Flink》—— Flink Data transformation(轉換)

《從0到1學習Flink》—— Flink Data transformation(轉換)

前言

在第一篇介紹 Flink 的文章 《《從0到1學習Flink》—— Apache Flink 介紹》 中就說過 Flink 程式的結構

《從0到1學習Flink》—— Flink Data transformation(轉換)

Flink 應用程式結構就是如上圖所示:

1、Source: 資料源,Flink 在流處理和批處理上的 source 大概有 4 類:基于本地集合的 source、基于檔案的 source、基于網絡套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。

2、Transformation:資料轉換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将資料轉換計算成你想要的資料。

3、Sink:接收器,Flink 将轉換計算後的資料發送的地點 ,你可能需要存儲下來,Flink 常見的 Sink 大概有如下幾類:寫入檔案、列印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 sink。

在上四篇文章介紹了 Source 和 Sink:

1、《從0到1學習Flink》—— Data Source 介紹

2、《從0到1學習Flink》—— 如何自定義 Data Source ?

3、《從0到1學習Flink》—— Data Sink 介紹

4、《從0到1學習Flink》—— 如何自定義 Data Sink ?

那麼這篇文章我們就來看下 Flink Data Transformation 吧,資料轉換操作還是蠻多的,需要好好講講!

Transformation

Map

這是最簡單的轉換之一,其中輸入是一個資料流,輸出的也是一個資料流:

還是拿上一篇文章的案例來将資料進行 map 轉換操作:

SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
    @Override
    public Student map(Student value) throws Exception {
        Student s1 = new Student();
        s1.id = value.id;
        s1.name = value.name;
        s1.password = value.password;
        s1.age = value.age + 5;
        return s1;
    }
});
map.print();
           

将每個人的年齡都增加 5 歲,其他不變。

《從0到1學習Flink》—— Flink Data transformation(轉換)

FlatMap

FlatMap 采用一條記錄并輸出零個,一個或多個記錄。

SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
    @Override
    public void flatMap(Student value, Collector<Student> out) throws Exception {
        if (value.id % 2 == 0) {
            out.collect(value);
        }
    }
});
flatMap.print();
           

這裡将 id 為偶數的聚集出來。

《從0到1學習Flink》—— Flink Data transformation(轉換)

Filter

Filter 函數根據條件判斷出結果。

SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
    @Override
    public boolean filter(Student value) throws Exception {
        if (value.id > 95) {
            return true;
        }
        return false;
    }
});
filter.print();
           

這裡将 id 大于 95 的過濾出來,然後列印出來。

《從0到1學習Flink》—— Flink Data transformation(轉換)

KeyBy

KeyBy 在邏輯上是基于 key 對流進行分區。在内部,它使用 hash 函數對流進行分區。它傳回 KeyedDataStream 資料流。

KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
});
keyBy.print();
           

上面對 student 的 age 做 KeyBy 操作分區

Reduce

Reduce 傳回單個的結果值,并且 reduce 操作每處理一個元素總是建立一個新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可實作。

SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
}).reduce(new ReduceFunction<Student>() {
    @Override
    public Student reduce(Student value1, Student value2) throws Exception {
        Student student1 = new Student();
        student1.name = value1.name + value2.name;
        student1.id = (value1.id + value2.id) / 2;
        student1.password = value1.password + value2.password;
        student1.age = (value1.age + value2.age) / 2;
        return student1;
    }
});
reduce.print();
           

上面先将資料流進行 keyby 操作,因為執行 reduce 操作隻能是 KeyedStream,然後将 student 對象的 age 做了一個求平均值的操作。

Fold

Fold 通過将最後一個檔案夾流與目前記錄組合來推出 KeyedStream。 它會發回資料流。

KeyedStream.fold("1", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String accumulator, Integer value) throws Exception {
        return accumulator + "=" + value;
    }
})
           

Aggregations

DataStream API 支援各種聚合,例如 min,max,sum 等。 這些函數可以應用于 KeyedStream 以獲得 Aggregations 聚合。

KeyedStream.sum(0) 
KeyedStream.sum("key") 
KeyedStream.min(0) 
KeyedStream.min("key") 
KeyedStream.max(0) 
KeyedStream.max("key") 
KeyedStream.minBy(0) 
KeyedStream.minBy("key") 
KeyedStream.maxBy(0) 
KeyedStream.maxBy("key")
           

max 和 maxBy 之間的差別在于 max 傳回流中的最大值,但 maxBy 傳回具有最大值的鍵, min 和 minBy 同理。

Window

Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間視窗聚合:

Flink 定義資料片段以便(可能)處理無限資料流。 這些切片稱為視窗。 此切片有助于通過應用轉換處理資料塊。 要對流進行視窗化,我們需要配置設定一個可以進行分發的鍵和一個描述要對視窗化流執行哪些轉換的函數

要将流切片到視窗,我們可以使用 Flink 自帶的視窗配置設定器。 我們有選項,如 tumbling windows, sliding windows, global 和 session windows。 Flink 還允許您通過擴充 WindowAssginer 類來編寫自定義視窗配置設定器。 這裡先預留下篇文章來講解這些不同的 windows 是如何工作的。

WindowAll

windowAll 函數允許對正常資料流進行分組。 通常,這是非并行資料轉換,因為它在非分區資料流上運作。

與正常資料流功能類似,我們也有視窗資料流功能。 唯一的差別是它們處理視窗資料流。 是以視窗縮小就像 Reduce 函數一樣,Window fold 就像 Fold 函數一樣,并且還有聚合。

Union

Union 函數将兩個或多個資料流結合在一起。 這樣就可以并行地組合資料流。 如果我們将一個流與自身組合,那麼它會輸出每個記錄兩次。

Window join

我們可以通過一些 key 将同一個 window 的兩個資料流 join 起來。

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))     
           .apply (new JoinFunction () {...});
           

以上示例是在 5 秒的視窗中連接配接兩個流,其中第一個流的第一個屬性的連接配接條件等于另一個流的第二個屬性。

Split

此功能根據條件将流拆分為兩個或多個流。 當您獲得混合流并且您可能希望單獨處理每個資料流時,可以使用此方法。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>(); 
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});
           

Select

此功能允許您從拆分流中選擇特定流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");
           

Project

Project 函數允許您從事件流中選擇屬性子集,并僅将所選元素發送到下一個處理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);
           

上述函數從給定記錄中選擇屬性号 2 和 3。 以下是示例輸入和輸出記錄:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)
           

最後

本文主要介紹了 Flink Data 的常用轉換方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。并用了點簡單的 demo 介紹了如何使用,具體在項目中該如何将資料流轉換成我們想要的格式,還需要根據實際情況對待。

關注我

轉載請務必注明原創位址為:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公衆号了。你可以加我的微信:zhisheng_tian,然後回複關鍵字:Flink 即可無條件擷取到。

《從0到1學習Flink》—— Flink Data transformation(轉換)

相關文章

1、《從0到1學習Flink》—— Apache Flink 介紹

2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并建構運作簡單程式入門

3、《從0到1學習Flink》—— Flink 配置檔案詳解

4、《從0到1學習Flink》—— Data Source 介紹

5、《從0到1學習Flink》—— 如何自定義 Data Source ?

6、《從0到1學習Flink》—— Data Sink 介紹

7、《從0到1學習Flink》—— 如何自定義 Data Sink ?

8、《從0到1學習Flink》—— Flink Data transformation(轉換)