天天看點

「大資料」大資料第四代 Flink 從概念到實戰

作者:架構思考

一、Flink 的引入

這幾年大資料的飛速發展,出現了很多熱門的開源社群,其中著名的有 Hadoop、Storm,以及後來的 Spark,他們都有着各自專注的應用場景。Spark 掀開了記憶體計算的先河,也以記憶體為賭注,赢得了記憶體計算的飛速發展。Spark 的火熱或多或少的掩蓋了其他分布式計算的系統身影。就像 Flink,也就在這個時候默默的發展着。

在國外一些社群,有很多人将大資料的計算引擎分成了 4 代,當然,也有很多人不會認同。我們先姑且這麼認為和讨論。

第1代——Hadoop MapReduce

首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。它将計算分為兩個階段,分别為 Map 和 Reduce。對于上層應用來說,就不得不想方設法去拆分算法,甚至于不得不在上層應用實作多個 Job 的串聯,以完成一個完整的算法,例如疊代計算。

  • 批處理
  • Mapper、Reducer

第2代——DAG架構(Tez) + MapReduce

由于這樣的弊端,催生了支援 DAG 架構的産生。是以,支援 DAG 的架構被劃分為第二代計算引擎。如 Tez 以及更上層的 Oozie。這裡我們不去細究各種 DAG 實作之間的差別,不過對于當時的 Tez 和 Oozie 來說,大多還是批處理的任務。

  • 批處理
  • 1個Tez = MR(1) + MR(2) + ... + MR(n)
  • 相比MR效率有所提升
「大資料」大資料第四代 Flink 從概念到實戰

第3代——Spark

接下來就是以 Spark 為代表的第三代的計算引擎。第三代計算引擎的特點主要是 Job 内部的 DAG 支援(不跨越 Job),以及強調的實時計算。在這裡,很多人也會認為第三代計算引擎也能夠很好的運作批處理的 Job。

  • 批處理、流處理、SQL高層API支援
  • 自帶DAG
  • 記憶體疊代計算、性能較之前大幅提升

第4代——Flink

随着第三代計算引擎的出現,促進了上層應用快速發展,例如各種疊代計算的性能以及對流計算和 SQL 等的支援。Flink 的誕生就被歸在了第四代。這應該主要表現在 Flink 對流計算的支援,以及更一步的實時性上面。當然 Flink 也可以支援 Batch 的任務,以及 DAG 的運算。

  • 批處理、流處理、SQL高層API支援
  • 自帶DAG
  • 流式計算性能更高、可靠性更高

二、Flink 是什麼

2.1 Flink 誕生背景

Flink 起源于 Stratosphere 項目,Stratosphere 是在 2010~2014 年由地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目

2014 年 4 月捐贈給了 Apache 軟體基金會

2014 年 12 月成為 Apache 軟體基金會的頂級項目。

2.2 LOGO 介紹

在德語中,Flink 一詞表示快速和靈巧,項目采用松鼠的彩色圖案作為 logo,Flink 的松鼠 logo 尾巴的顔色與 Apache 軟體基金會的 logo 顔色相呼應,也就是說,這是一隻 Apache 風格的松鼠。

「大資料」大資料第四代 Flink 從概念到實戰

2.3 官網位址:

https://flink.apache.org/

2.4 Flink 概述

「大資料」大資料第四代 Flink 從概念到實戰

Flink 首頁在其頂部展示了該項目的理念:“Apache Flink 是為分布式、高性能、随時可用以及準确的流處理應用程式打造的開源流處理架構”。

Flink 是一款分布式的計算引擎,它可以用來做流處理;也可以用來做批處理。

「大資料」大資料第四代 Flink 從概念到實戰

2.5 哪些公司在使用 Flink

「大資料」大資料第四代 Flink 從概念到實戰

三、Flink 成為富二代

「大資料」大資料第四代 Flink 從概念到實戰

随着人工智能時代的降臨,資料量的爆發,在典型的大資料的業務場景下資料業務最通用的做法是:選用批處理的技術處理全量資料,采用流式計算處理實時增量資料。

在絕大多數的業務場景之下,使用者的業務邏輯在批處理和流處理之中往往是相同的。但是,使用者用于批處理和流處理的兩套計算引擎是不同的。是以,使用者通常需要寫兩套代碼。

毫無疑問,這帶來了一些額外的負擔和成本。

阿裡巴巴的商品資料處理就經常需要面對增量和全量兩套不同的業務流程問題,是以阿裡就在想,我們能不能有一套統一的大資料引擎技術,使用者隻需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量資料還是增量資料,亦或者實時處理,一套方案即可全部支援,這就是阿裡選擇 Flink 的背景和初衷。

2015 年阿裡巴巴開始使用 Flink 并持續貢獻社群(阿裡内部還基于 Flink 做了一套 Blink),2019 年 1 月 8 日,阿裡巴巴以 9000 萬歐元(7億元人民币)收購了創業公司 Data Artisans。從此Flink開始了新一輪的乘風破浪!

四、入門實戰

接下來我們将從零開始,教您如何建構第一個 Flink 應用程式。

4.1 開發環境準備

Flink 可以運作在 Linux, Max OS X, 或者是 Windows 上。為了開發 Flink 應用程式,在本地機器上需要有 Java 8.x 和 maven 環境。

如果有 Java 8 環境,運作下面的指令會輸出如下版本資訊:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)           

如果有 maven 環境,運作下面的指令會輸出如下版本資訊:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"           

另外我們推薦使用 IntelliJ IDEA (社群免費版已夠用)作為 Flink 應用程式的開發 IDE。Eclipse 雖然也可以,但是 Eclipse 在 Scala 和 Java 混合型項目下會有些已知問題,是以不太推薦 Eclipse。

4.2 建立 Maven 項目

我們将使用 Flink Maven Archetype 來建立我們的項目結構和一些初始的預設依賴。在你的工作目錄下,運作如下指令來建立項目:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false           

你可以編輯上面的 groupId, artifactId, package 成你喜歡的路徑。使用上面的參數,Maven 将自動為你建立如下所示的項目結構:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties           

我們的 pom.xml 檔案已經包含了所需的 Flink 依賴,并且在 src/main/java 下有幾個示例程式架構。接下來我們将開始編寫第一個 Flink 程式。

4.3 編寫 Flink 程式

啟動 IntelliJ IDEA,選擇 "Import Project"(導入項目),選擇 my-flink-project 根目錄下的 pom.xml。根據引導,完成項目導入。

在 src/main/java/myflink 下建立 SocketWindowWordCount.java 檔案:

package myflink;

public class SocketWindowWordCount {
  public static void main(String[] args) throws Exception {
  }
}           

現在這程式還很基礎,我們會一步步往裡面填代碼。注意下文中我們不會将 import 語句也寫出來,因為 IDE 會自動将他們添加上去。在本節末尾,我會将完整的代碼展示出來,如果你想跳過下面的步驟,可以直接将最後的完整代碼粘到編輯器中。

Flink 程式的第一步是建立一個 StreamExecutionEnvironment 。這是一個入口類,可以用來設定參數和建立資料源以及送出任務。是以讓我們把它添加到 main 函數中:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();           

下一步我們将建立一個從本地端口号 9000 的 socket 中讀取資料的資料源:

DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");           

這建立了一個字元串類型的 DataStream。DataStream 是 Flink 中做流處理的核心 API,上面定義了非常多常見的操作(如,過濾、轉換、聚合、視窗、關聯等)。在本示例中,我們感興趣的是每個單詞在特定時間視窗中出現的次數,比如說5秒視窗。為此,我們首先要将字元串資料解析成單詞和次數(使用Tuple2<String, Integer>表示),第一個字段是單詞,第二個字段是次數,次數初始值都設定成了1。我們實作了一個 flatmap 來做解析的工作,因為一行資料中可能有多個單詞。

DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });           

接着我們将資料流按照單詞字段(即0号索引字段)做分組,這裡可以簡單地使用 keyBy(int index) 方法,得到一個以單詞為 key 的Tuple2<String, Integer>資料流。然後我們可以在流上指定想要的視窗,并根據視窗中的資料計算結果。在我們的例子中,我們想要每5秒聚合一次單詞數,每個視窗都是從零開始統計的。

DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);           

第二個調用的 .timeWindow() 指定我們想要5秒的翻滾視窗(Tumble)。第三個調用為每個key每個視窗指定了sum聚合函數,在我們的例子中是按照次數字段(即1号索引字段)相加。得到的結果資料流,将每5秒輸出一次這5秒内每個單詞出現的次數。

最後一件事就是将資料流列印到控制台,并開始執行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");           

最後的 env.execute 調用是啟動實際Flink作業所必需的。所有算子操作(例如建立源、聚合、列印)隻是建構了内部算子操作的圖形。隻有在execute()被調用時才會在送出到叢集上或本地計算機上執行。

下面是完整的代碼,部分代碼經過簡化(代碼在 GitHub 上也能通路到):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // 建立 execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 通過連接配接 socket 擷取輸入資料,這裡連接配接到本地9000端口,如果9000端口已被占用,請換一個端口
    DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

    // 解析資料,按 word 分組,開窗,聚合
    DataStream<Tuple2<String, Integer>> windowCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    // 将結果列印到控制台,注意這裡使用的是單線程列印,而非多線程
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }
}           

4.4 運作程式

要運作示例程式,首先我們在終端啟動 netcat 獲得輸入流:

nc -lk 9000           

如果是 Windows 平台,可以通過 https://nmap.org/ncat/ 安裝 ncat 然後運作:

ncat -lk 9000           

然後直接運作SocketWindowWordCount的 main 方法。

隻需要在 netcat 控制台輸入單詞,就能在 SocketWindowWordCount 的輸出控制台看到每個單詞的詞頻統計。如果想看到大于1的計數,請在5秒内反複鍵入相同的單詞。

「大資料」大資料第四代 Flink 從概念到實戰

Cheers !

文章來源:https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247483718&idx=1&sn=1850940a76195809988a2d4db8bb7f50&chksm=fd3b8f04ca4c0612d5f98c2b20b9eb5baeb174c026efb27afb1e9af7f49078cf1163b2fd9d7d&scene=21#wechat_redirect