天天看點

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

要想熟練掌握一個大資料架構,僅僅是學習一些網絡上的樣例程式是遠遠不夠的,我們必須系統地了解它背後的設計和運作原理。

本文将以WordCount的案例為主線,主要介紹Flink的設計和運作原理。關于Flink WordCount程式可以參考我之前的文章:十分鐘搭建第一個Flink程式。

原創不易,轉載請注明出處。對大資料和機器學習感興趣的朋友可以加我的微信 aistevelu,互相交流學習。本文内容主要包括:

  • Flink的資料流圖,以及如何将資料流圖從邏輯視角轉化為實體執行圖;
  • Flink分布式架構;
  • Flink時間處理機制;
  • Flink狀态與檢查點機制;

閱讀完本章後,讀者可以對Flink的設計和運作原理有一個全面的認識。

1 Flink資料流圖簡介

1.1 Flink樣例程式

我們開始對資料流做處理,計算資料流中單詞出現的頻次。如果輸入資料流是“Hello Flink Hello World“,這個程式将統計出Hello的頻次為2,Flink和World的頻次為1。在大資料領域,詞頻統計(WordCount)程式就像是一個程式設計語言的HelloWorld程式,它展示了一個大資料引擎的基本規範。麻雀雖小,五髒俱全,從這個樣例中,我們可以一窺Flink設計和運作原理。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 1 Flink樣例程式示意圖

如圖 1所示,程式分為三大部分,第一部分讀取資料源(Source),第二部分對資料做轉換操作(Transformation),最後将轉換結果輸出到一個目的地(Sink)。 代碼中的函數被稱為算子(Operator),是Flink提供給程式員的接口,程式員需要通過這些算子對資料做操作。

我們可以把算子了解為1 + 2 運算中的加号,加号(+)是這個算子的一個符号表示,它表示對數字1和數字2做加法運算。同樣,在Flink或Spark這樣的大資料引擎中,算子對資料進行某種操作,程式員可以根據自己的需求調用合适的算子,完成所需計算任務。常用的算子有map、flatMap、keyBy、timeWindow等,它們分别對資料流執行不同類型的操作。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 2 WordCont程式的邏輯視圖

圖 2展示了WordCount程式中資料從不同算子間流動的情況。圖中,圓圈代表算子,圓圈間的箭頭代表資料流,資料流在Flink程式中經過不同算子的計算,最終生成為目标資料。按照算子對資料的操作内容,一般将算子分為Source算子、Transformation算子和Sink算子。Source算子讀取資料源中的資料,資料源可以是資料流、也可以存儲在檔案系統中的檔案。Transformation算子對資料進行必要的計算處理。Sink算子将處理結果輸出,資料一般被輸出到資料庫、檔案系統或下一個資料流程式。

我們先對這個樣例程式中各個算子做一個簡單的介紹,關于這些算子的具體使用方式将在後續文章中詳細說明。

  • map

map函數對資料流中每一條資料做一個操作,生成一條新的資料。本例中

map(word => (word, 1))

表示取輸入的每個單詞,用變量word表示,然後生成一個二進制對(word, 1),1是表示出現了一次。注意,map的一條輸入資料對應一條輸出資料。

  • flatMap

在解釋flatMap前,我們先對

split

函數做一個簡單介紹。

split(“\\s”)

函數以空白字元為分隔符,将文本切分成單詞清單。如果輸入為“Hello Flink“,那麼經過這個函數切分後,得到結果為[“Hello”,”Flink”]組成的單詞清單。

本例中

flatMap(line => line.split(“\\s”))

表示取出輸入的每一行文本,用變量line表示,将文本中以空格做切分,生成一個單詞清單,到這裡仍然清單,flatMap接着對清單打平,輸出單個單詞。

flatMap

先做

map

所做的操作,然後對輸出的各個清單打平,是以,

flatMap

的一條輸入資料可能有多條輸出。

  • keyBy

keyBy

根據某個Key做資料重分布,将所有資料中包含該Key的資料都發送到同一個分區上。本例中是将二進制組中第一項作為Key,即以單詞為Key,包含同樣單詞的二進制對都發送到同一分區上。

  • timeWindow

timeWindow

是時間視窗函數,以界定對多長時間之内的資料做統計。

  • sum

sum

為求和函數。

sum(1)

表示對二進制組中第二個元素求和,因為經過前面的

keyBy

,所有單詞都被發送到了同一個分區,是以,在這一個分區上,将單詞出現次數做加和,就得到出現的總次數。

對于詞頻統計這個案例,邏輯上來講無非是對資料流中的單詞做提取,然後使用一個Key-Value結構對單詞做詞頻計數,最後輸出結果即可,這樣的邏輯本可以用幾行代碼完成,改成這樣的算子形式,反而讓新人看着一頭霧水,為什麼一定要用算子的形式來寫程式呢?實際上,算子進化成目前這個形态,就像人類從石塊計數,到手指計數,到算盤計數,再到電腦計數這樣的進化過程一樣,盡管更低級的方式可以完成一定的計算任務,但是随着計算規模的增長,古老的計數方式存在着低效的弊端,無法完成更進階别和更大規模的計算需求。試想,如果我們不使用大資料引擎提供的算子,而是自己實作一套上述的計算邏輯,盡管我們可以快速完成目前的詞頻統計的任務,但是當面臨一個新計算任務時,我們需要再次重新編寫程式,完成一整套計算任務。我們自己編寫代碼的橫向擴充性可能很低,當輸入資料暴增時,我們需要做很大改動,以部署在更多機器上。

大資料引擎的算子對計算做了一些抽象,對于新人來說有一定學習成本,而一旦掌握這門技術,人們所能處理的資料規模将成倍增加。大資料引擎的算子出現,正是針對資料分布在多個分區的大資料場景需要一種統一的計算描述語言來對資料做計算而進化出的新計算形态。基于大資料引擎的算子,我們可以定義一個資料流的邏輯視圖,以此完成對大資料的計算。剩下那些資料交換、橫向擴充、故障恢複等問題全交由大資料引擎來解決。

1.2 從邏輯視圖到實體執行

在絕大多數的大資料處理場景下,一台機器節點無法處理所有資料,資料被切分到多台節點上。在大資料領域,當資料量大到超過單台機器處理能力時,就将一份資料切分到多個分區(Partition)上,每個分區分布在一台虛拟機或實體機上。

前一小節已經提到,大資料引擎的算子提供了程式設計接口,使用算子我們可以建構資料流的邏輯視圖。考慮到資料分布在多個節點的情況,邏輯視圖隻是一種抽象,需要将邏輯視圖轉化為實體執行圖,才能在分布式環境下執行。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 3 樣例程式實體執行示意圖

圖 3為1.1中的樣例程式的實體執行圖,這裡資料流分布在2個分區上。箭頭部分表示資料流分區,圓圈部分表示算子在分區上的算子子任務(Operator Subtask)。從邏輯視圖變為實體執行圖後,map算子在每個分區都有一個算子子任務,以處理該分區上的資料:map[1/2]算子子任務處理第一個資料流分區上的資料,map[2/2]算子子任務處理第二個資料流分區上的資料。keyBy算子會将資料按照某個key做資料重分布,在詞頻統計的例子中是以單詞為key,例如,輸入資料為“Hello Flink Hello World”,keyBy算子會将所有的”Hello”歸結到一個分區上。

算子子任務是實體執行的基本單元,算子子任務之間是互相獨立的,某個算子子任務有自己的線程,不同算子子任務可能分布在不同的節點上。後文在Flink的資源配置設定部分我們還會重點介紹算子子任務。

從圖 3中可以看到,除去Sink外的算子都被分成了2個算子子任務,這樣配置的并行度(Parallelism)為2,Sink算子的并行度為1。并行度是可以被設定的,實際應用中一般根據資料量的大小,計算資源的多少等多方面的因素來設定并行度。

1.3 資料交換政策

圖 3中keyBy算子子任務将資料做了重新配置設定,即資料在不同分區上進行着資料交換,産生了資料流動的現象。無論是Hadoop、Spark還是Flink,當涉及資料分布在多個分區時,對資料的處理都會涉及到資料交換政策。在Flink中,資料交換政策包括圖 4中涉及到的四種政策:

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 4 Flink資料交換政策

  1. 前向傳播(Forward):前一個算子子任務将資料直接傳遞給後一個算子子任務,資料不存在跨分區的交換,也避免了因資料交換産生的各類開銷,圖 3中Source和和flatMap之間就是這樣的情形。
  2. 全局廣播(Broadcast):将某份資料發送到所有分區上,這種政策涉及到了資料拷貝和網絡通信,是以非常消耗資源。
  3. 基于Key的資料重分布:資料以(Key, Value)形式存在,該政策将所有資料做一次重新分布,并保證相同Key的資料被發送到同一個分區上。圖 3中keyBy算子将單詞作為Key,把某個單詞都發送到同一分區,以友善後續算子來統計這個單詞出現的頻次。
  4. 随機政策(Random):該政策将所有資料随機均勻地發送到多個分區上,以保證資料平均配置設定到不同分區上。該政策通常為了防止資料傾斜到某些分區,導緻部分分區資料稀疏,部分分區資料擁堵,甚至超過該分區上算子的處理能力。

2 Flink架構與核心元件

為了實作支援分布式運作,Flink跟其他大資料引擎一樣,采用了主從(Master-Worker)架構,運作時主要包括兩個元件:

• JobManager,又被稱為Master,是一個Flink應用的主節點。

• TaskManager,又被稱為Worker,執行計算任務的節點。

一個Flink應用一般含有至少一個JobManager,一個或多個TaskManager。

2.1 Flink作業執行過程

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 5 Flink作業送出流程

使用者編寫Flink程式并送出任務的具體流程為:

  1. 使用者編寫應用程式代碼,并通過Flink用戶端(Client)送出作業。程式一般為Java或Scala語言,調用Flink API,建構基于邏輯視角的資料流圖,代碼和相關配置檔案被編譯打包,并被送出到JobManager上,形成一個應用作業(Application)。
  2. JobManager接受到作業後,将邏輯視圖轉化成可并行的實體執行圖。
  3. JobManager将實體執行圖發送給各TaskManager。
  4. TaskManager接收到實體執行圖後,會初始化并開始執行被配置設定的任務。
  5. TaskManager在執行任務過程中可能會與其他TaskManager交換資料,會使用圖 4提到的一些資料交換政策。
  6. TaskManager将任務啟動、運作、性能名額、結束或終止等狀态資訊會回報給JobManager。
  7. 使用者可以使用Flink Web儀表盤來監控送出的作業。
深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 6 Flink主從架構架構圖

圖 6對Flink的各個元件描述得更為詳細,我們再對涉及到的各個元件進行更為詳細的介紹。

Client

當使用者送出一個Flink程式時,會首先建立一個用戶端(Client)。該Client會對使用者送出的Flink程式進行預處理,并把作業送出到Flink叢集中處理。Client需要從使用者送出的Flink程式配置中擷取JobManager的位址,并建立到JobManager的連接配接,将Flink作業送出給JobManager。Client會将使用者送出的Flink程式組裝一個JobGraph。

JobManager

JobManager是Flink的協調者,它負責接收Flink作業,排程任務。同時,JobManager還負責管理TaskManager,收集作業的狀态資訊,生成檢查點和故障恢複等問題。JobManager會将Client送出的JobGraph轉化為ExceutionGraph,ExecutionGraph是JobGraph的并行版本,但還不是最終的實體執行圖。

TaskManager

TaskManager是實際負責執行計算的節點,在其上執行實體執行圖。同時,TaskManager還要處理必要的資料緩存和交換。每個TaskManager負責管理其所在節點上的資源資訊,包括記憶體、磁盤、網絡,TaskManager啟動的時候會将資源的狀态向JobManager彙報。

2.2 再談邏輯視圖到實體執行圖

了解了Flink的分布式架構和核心元件,這裡我們從更細粒度上來分析1.2介紹的從邏輯視圖轉化為實體執行圖過程,該過程可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖。

  • StreamGraph:是根據使用者通過 DataStream API 編寫的代碼生成的最初的圖,用來表示程式的拓撲結構。在這張圖中,節點就是使用者調用的算子,邊表示資料流。
  • JobGraph:JobGraph是送出給 JobManager 的資料結構。StreamGraph經過優化後生成了 JobGraph,主要的優化為,将多個符合條件的節點連結在一起作為一個節點,這樣可以減少資料交換所需要的序列化、反序列化以及傳輸消耗。這個連結的過程叫做算子鍊,會在下一節簡單介紹。
  • ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是排程層最核心的資料結構。
  • 實體執行圖:JobManager 根據 ExecutionGraph 對作業進行排程後,在各個TaskManager 上部署任務形成的圖,實體執行并不是一個具體的資料結構。

可以看到,Flink在資料流圖上可謂煞費苦心,僅各類圖就有四種之多。對于新人來說,可以不用太關心這些非常細節的底層實作,隻需要了解以下幾個核心概念:

  • Flink采用主從架構,JobManager起着管理協調作用,TaskManager負責實體執行,在執行過程中會發生一些資料交換、生命周期管理等事情。
  • 使用者調用Flink API,構造邏輯視圖,Flink會對邏輯視圖優化,并轉化為實體執行圖,最後被執行的是實體執行圖。

2.3 任務、算子子任務與算子鍊

在分布式運作的過程中,Flink将一些算子子任務(Subtask)連結在一起,組成算子鍊(Operator Chain),連結後将以任務(Task)的形式被TaskManager排程執行。使用算子鍊是一個非常有效的優化,它可以有效降低算子子任務之間的傳輸開銷。連結之後形成的Task是TaskManager中的一個線程。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 7 任務、子任務與算子鍊

例如,資料從Source算子前向傳播到 flatMap算子,再由flatMap算子前向傳播到map算子,中間沒有發生跨分區的資料交換,是以,我們完全可以将Source、flatMap和map幾個Operator Subtask組合在一起,形成一個Task。keyBy算子發生了資料重分布,資料會跨越分區,是以map和keyBy無法被連結到一起。同樣,我們也不能把sum和Sink連結到一起。

預設情況下,Flink會盡量将更多的Subtask連結在一起,但一個Subtask有超過一個輸入或發生資料交換時,連結就無法建立。盡管将算子連結到一起會降低一些傳輸開銷,但是也有一些情況并不需要太多連結。比如,有時候我們需要将一個非常長的算子鍊拆開,這樣我們就可以将原來集中在一個線程中的計算拆分到多個線程中來并行計算。Flink手動配置是否對某些算子啟用算子鍊。

2.4 任務槽位與計算資源

任務槽位的概念

根據前文的介紹,我們已經了解到TaskManager負責具體的任務執行。TaskManager是一個Java虛拟機程序,在TaskManager中可以并行運作多個Task。在程式執行之前,經過優化,部分Subtask被連結在一起,組成一個Task。每個Task是一個線程,需要TaskManager為其配置設定相應的資源,TaskManager使用任務槽位(Task Slot)給任務配置設定資源,簡稱槽位(Slot)。

在解釋任務槽位的概念前,我們先回顧一下程序與線程的概念。在作業系統層面,程序(Process)是進行資源配置設定和排程的一個獨立機關,線程(Thread)是是CPU排程的基本機關。比如,我們常用的Office Word軟體,在啟動後就占用作業系統的一個程序。Windows上可以使用任務管理器來檢視目前活躍程序,Linux上可以使用top指令來檢視。線程是程序的一個子集,一個線程一般專注于處理一些特定任務,不獨立擁有系統資源,隻擁有一些運作中必要的資源,如程式計數器。一個程序至少有一個線程,也可以有多個線程。多線程場景下,每個線程都處理一小個任務,多個線程以高并發的方式同時處理多個小任務,可以提高處理能力。

回到Flink的槽位配置設定機制上,一個TaskManager是一個程序,TaskManager可以管理一至多個Task,每個Task是一個線程,占用一個槽位。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 8 Task Slot與Task Manager

假設我們給WordCount程式配置設定兩個TaskManager,每個TaskManager又配置設定3個槽位,是以總共是6個槽位。結合圖 7中對這個作業的并行度設定,整個作業被劃分為5個Task,使用5個線程,這5個線程可以按照圖 8所示的方式配置設定到6個槽位中。

每個槽位的資源是整個TaskManager資源的子集,比如這裡的TaskManager下有3個槽位,每個槽位占用TaskManager所管理的1/3的記憶體,在第一個槽位内運作的任務不會與在第二個槽位内運作的任務互相争搶記憶體資源。注意,在配置設定資源時,Flink并沒有将CPU資源明确配置設定給各個槽位。

Flink允許使用者設定TaskManager中槽位的數目,這樣使用者就可以确定以怎樣的粒度将任務做互相隔離。如果每個TaskManager隻包含一個槽位,那麼運作在該槽位内的任務将獨享JVM。如果TaskManager包含多個槽位,那麼多個槽位内的任務可以共享JVM資源,比如共享TCP連接配接、心跳資訊、部分資料結構等。如無特殊需要,可以将槽位數目設定為TaskManager下可用的CPU核心數,那麼平均下來,每個槽位都能獲得至少一個CPU核心。

槽位共享

圖 8中展示了任務的一種資源配置設定方式,預設情況下, Flink還提供了一種槽位共享(Slot Sharing)的優化機制,進一步優化資料傳輸開銷,充分利用計算資源。将圖 8中的任務做槽位共享優化後,結果如圖 9所示。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 9 槽位共享示意圖

開啟槽位共享後,Flink允許将獨占一個槽位的任務與同一個作業中的其他任務共享槽位。于是可以将一個作業從開頭到結尾的所有Subtask都放置在一個槽位中,如圖 9中最左側的資料流,這樣槽位内的資料交換成本更低。而且,對于一個資料流圖來說,Source、map等算子的計算量相對不大,window算子的計算量比較大,計算量較大的Subtask與計算量較小的Subtask互相互補,可以騰出更多的槽位,配置設定給更多Task,這樣可以更好地利用資源。而不開啟槽位共享,計算量小的Source、map算子子任務獨占了槽位,造成一定的資源浪費。

并行度與槽位數目

圖 3中提到了并行度,在WordCount的例子中,除去Sink算子的并行度為1外,其他算子的并行度均為2,也就是說在并行度為2的情況下,每個算子隻能拆分為2個Subtask。圖 8中的方式共占用5個槽位,支援槽位共享後,圖 9隻占用2個槽位,這裡故意将剩下的幾個槽位置空,隻是為了示範需要,如果這個作業的資料量非常大,占用的資料分區很多,其實完全可以通過增加并行度,将這些槽位填充,為更多的并行任務提供資源。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 10 并行度與槽位數目

為了充分利用空槽位,占滿圖 9中多餘的4個槽位,我們可以把除Sink外的其他算子的并行度都設定為6。圖 2‑10展示了将并行度增加後,資源配置設定情況。

并行度和槽位數目的概念可能容易讓人混淆,這裡再次闡明一下。使用者使用Flink提供的API算子可以建構一個邏輯視圖,需要将任務并行才能被實體執行。整個作業将被切分為多個執行個體,每個執行個體處理整個作業輸入資料的一部分。如果輸入資料過大,增大并行度可以增加更多的執行個體,加快資料處理速度。可見,并行度是Flink對任務并行切分的一種描述。槽位數目是在資源設定時,對單個TaskManager的資源切分粒度。并行度、槽位數目和TaskManager數可大緻按照公式 2‑1來計算。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

公式 1 并行度、TaskManager數與Task Slot數關系

其中,ceil為上限函數,表示對除法結果向上取整。關于并行度、槽位數目等配置,将在後續文章中詳細說明。

2.5 Flink API結構

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 11 Flink API抽象

我們之前讨論的WordCount例子中,一直使用的是Flink提供的DataStream API,即在資料流上的操作。除了DataStream API,Flink給程式設計人員不同層次API,主要有三層:

  1. Flink最底層提供的是有狀态的流式計算引擎,流(Stream)、狀态(State)和時間(Time)等流式計算概念都在這一層得到了實作。
  2. 一般情況下,應用程式不會使用上述底層接口,而是使用Flink提供的核心API:針對有界和無界資料流的DataStream API和針對有界資料集的DataSet API。使用者可以使用這兩個API進行常用的資料處理:轉換(Transformation)、連接配接(Join)、聚合(Aggregation)、視窗(Window)以及對狀态(State)的操作。這一層有點像Spark提供的RDD級别的接口。
  3. Table API和SQL是更進階别的抽象。在這一層,資料被轉換成了關系型資料庫式的表格,每個表格擁有一個表模式(Schema),使用者可以像操作表格那樣操作流式資料,例如可以使用針對結構化資料的select、join、group-by等操作。如果使用者熟悉SQL語句,那麼可以很快上手Flink的Table API和SQL。很多公司的資料流非常依賴SQL,Flink SQL降低了從其他架構遷移至Flink的成本。

我們将在後續文章中介紹DataStream API、Table API和SQL。

2.6 Flink元件棧

了解Flink的主從架構以及API結構後,我們可以将Flink的核心元件分層來剖析。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 12 Flink元件棧

部署層

大資料引擎首先需要部署在實體機或虛拟機上。Flink支援多種部署方式,可以部署在單機、叢集,以及雲上。

運作時層

運作時(Runtime)層為Flink各類計算提供了實作。這一層做了前面章節中提到的将資料流圖轉化為實體執行圖、資源配置設定以及分布式排程與執行等大部分工作。

API層

API層主要實作了面向資料流的流處理DataStream API和面向資料集的批處理DataSet API。在這兩個API之上,Flink還提供了更豐富的工具:

  • 面向資料流處理的:CEP(Complex Event Process,複雜事件處理)、基于類SQL的Table API和SQL
  • 面向資料集批處理的:FlinkML(機器學習計算庫)、Gelly(圖計算庫)

3 Flink時間處理機制

3.1 時間視窗

在批處理場景下,資料已經是按照某個時間次元分批次地存儲了。一些公司經常将使用者行為日志按天存儲在一個檔案目錄下,另外一些開放資料集都會說明資料采集的時間始末。是以,對于批處理任務,處理一個資料集,其實就是對該資料集對應的時間視窗内的資料進行處理。在流計算場景下,資料以源源不斷的流的形式存在,資料一直在産生,沒有始末。我們要對資料進行處理時,往往需要明确一個時間視窗,比如,資料在“每秒”、“每小時”、“每天”的次元下的一些特性。一般有如下幾種定義時間視窗的方式。

滾動視窗

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 13 固定資料數目的滾動視窗

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 14 固定時間間隔的滾動視窗

滾動視窗(Tumbling Window)模式下視窗之間互不重疊,且視窗長度是固定的,長度可以是資料的條數,也可以是時間間隔。圖 13是固定長度為4的滾動視窗,圖 14是固定長度為10分鐘的滾動視窗。定長滾動視窗是經常用到的一種視窗模式。在本文最開始的WordCount例子中,我們使用的是定長為5秒的滾動視窗。

滑動視窗

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 15 滑動視窗

滑動視窗(Sliding Window)也是一種視窗長度定長的模式。與滾動視窗不同,滑動視窗模式下視窗和視窗之間有滑動間隔(Slide)。再以WordCount為例,我們要統計10分鐘内的詞頻,并且每隔1分鐘統計一次,就需要使用滑動視窗。

會話視窗

會話(Session)是一個使用者互動概念,常常出現在網際網路應用上,一般指使用者在某APP或某網站上短期内産生的一系列行為。比如,使用者在手機淘寶上短時間有大量的搜尋和點選的行為,這系列行為事件組成了一個Session,接着可能因為一些其他因素,使用者暫停了與APP的互動,過一會使用者又傳回APP,經過一系列搜尋、點選、與客服溝通,最終下單。Session視窗的長度并不固定,是以不能用上面兩種形式的視窗來模組化。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 16 會話視窗

Session沒有固定長度,那如何将資料劃分到不同的視窗呢?Flink提供了Session Gap的概念。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 17 session gap示意圖

我們繼續以使用者在手機淘寶上的行為為例,現在有3個使用者,每個使用者産生了不同的行為,果兩個行為資料的時間戳小于session gap,則被劃歸到同一個視窗中,圖 17中user2的window4,如兩個行為資料的時間戳大于了session gap,則被劃歸到兩個不同的視窗中,user2的window1和window2之間的時間間隔大于最小的session gap,資料被劃歸為了兩個視窗。

我們将在後續文章詳細介紹以上幾種視窗的使用方法。

3.2 Flink三種時間語義

如果我們要定義基于時間的視窗,那麼首先要定義時間。在程式中,時間一般基于Unix時間戳,即以1970-01-01-00:00:00.000為起始點。時間戳毫秒精度是時間距離該起點的毫秒總數,時間戳微秒精度是事件距離該起點的微秒總數。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 18 三種時間語義

之前文章中我們提到了流處理的時間語義問題,在Flink中一般有三種時間概念,如圖 18所示。

  • 事件時間(Event Time)是事件實際發生的時間,通常是事件發生時嵌入到事件上的時間,比如某個傳感器在生成資料時,會将時間戳打入這個資料上。
  • 接收時間(Ingestion Time)是事件進入Flink的時間,确切的說,是該事件進入Source算子時,Source算子的目前時間。
  • 處理時間(Processing Time)是各個時間算子處理該事件的目前時間。一般情況下,處理時間會比攝入時間更晚一些。

Processing Time是最簡單的時間概念,隻需要算子擷取目前運作機器的系統時間,不需要考慮其他任何因素,是以使用Processing Time作為時間,可以獲得最好的性能和最低的延遲。但Processing Time并不能代表事件實際發生的時間,從事件實際發生到算子處理的過程有大量的不确定性,以Processing Time來計算,很可能導緻事件的處理是亂序的,産生不可複現的結果。

Event Time可以保證事件順序的可靠性,是以可以得到一緻的、可複現的結果。Event Time雖然準确,但也有其弊端:我們無法預知某個時間下,是否所有資料均已到達,是以需要使用水位線機制處理延遲資料。

3.3 水位線

之前文章已經提到,水位線(Watermark)機制假設在某個時間點上,不會有比這個時間點更晚的上報資料。Watermark常被作為一個時間視窗的結束時間。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 19 一個帶有Watermark的資料流

Flink中的Watermark是被系統插入到資料流的特殊資料。Watermark的時間戳單調遞增,且與事件時間戳相關。如上圖的資料流所示,方塊是事件,三角形是該事件對應的時間戳,圓圈為Watermark。當Flink接受到時間戳值為5的Watermark時,系統假設時間戳小于5的事件均已到達,後續到達的小于5的事件均為延遲資料。Flink處理到最新的Watermark,會開啟這個時間視窗的計算,把這個Watermark之前的資料納入進此次計算,延遲資料則不能被納入進來,是以使用Watermark會導緻微小的誤差。

生成Watermark

流資料中的事件時間戳與Watermark高度相關,事件時間戳的抽取和Watermark的生成也基本是同時進行的,抽取的過程會遇到下面兩種情況:

  1. 資料流中已經包含了事件時間戳和Watermark。
  2. 使用抽取算子生成事件時間戳和Watermark,這也是實際應用中更為常見的場景。因為後續的計算都依賴時間,Watermark抽取算子最好在資料接入後馬上調用。具體而言,Watermark抽取算子包含兩個函數:第一個函數從資料流的事件中抽取時間戳,并将時間戳指派到事件的中繼資料上,第二個函數生成Watermark。

Flink有兩種方式來生成Watermark:

  1. 周期性(Periodic)生成Watermark:Flink每隔一定時間間隔,定期調用Watermark生成函數。這種方式下,Watermark的生成與時間有周期性的關系。
  2. 斷點式(Punctuated)生成Watermark:資料流中某些帶有特殊标記的資料自帶了Watermark資訊,Flink監控資料流中的每個事件,當接收到帶有特殊标記資料時,會觸發Watermark的生成。這種方式下,Watermark的生成與時間無關,與何時接收到特殊标記資料有關。

無論是以上那種方式,Flink都會生成Watermark并插入到資料流中。一旦時間戳和Watermark生成後,後續的算子将以Event Time的時間語義來處理這個資料流。Flink把時間處理部分的代碼都做了封裝,會在内部處理各類時間問題,使用者不需要擔心延遲資料等任何時間相關問題。使用者隻需要在資料接入的一開始生成時間戳和Watermark,Flink會負責剩下的事情。

延遲資料

Flink有一些機制專門收集和處理延遲資料。遲到事件在Watermark之後到達,一般處理的方式有三種:

  1. 将遲到事件作為錯誤事件直接丢棄
  2. 将遲到事件收集起來另外再處理
  3. 重新觸發計算

對于第二種方式,使用者可以使用Flink提供的“Side Output”機制,将遲到事件放入一個單獨的資料流,以便再對其單獨處理。

對于第三種方式,使用者可以使用Flink提供的“Allowed Lateness”機制,設定一個允許的最大遲到時長,原定的時間視窗關閉後,Flink仍然會儲存該視窗的狀态,直至超過遲到時長,遲到的事件加上原來的事件一起重新被計算。

我們将在後續文章中詳細介紹Event Time的使用、Watermark生成、延遲資料處理等技術細節。

4 Flink的狀态和檢查點

4.1 狀态

在之前的文章中我們已經提到了狀态的概念:流式大資料處理引擎會根據流入資料持續更新狀态資料。狀态可以是目前所處理事件的位置偏移(Offset)、一個時間視窗内的某種輸入資料、或與具體作業有關的自定義變量。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 20 資料流與狀态示意圖

對于WordCount的例子來說,已經處理了一個”Hello”單詞,并且正在處理一個”Hello”,對于Source算子來說,目前資料的位置偏移為3,所有已處理的資料中,單詞”Hello”的出現次數為2。這個作業的狀态包括目前處理的位置偏移、已處理過的單詞出現次數等變量資訊。

4.2 檢查點

一緻性檢查點

在一個有狀态的流處理作業中,為保證高吞吐和低延遲,Flink的每個Task需要高效讀寫狀态資料,Task會在本地的TaskManager中存儲狀态資料。然而,由于大資料系統一般運作在多台機器上,可能會遇到程序被殺、機器當機、網絡抖動等問題,一旦出現當機等問題,該機器上的狀态以及相應的計算會丢失,是以需要一種恢複機制來應對這些潛在問題。

Flink使用一緻性檢查點(Consistent Checkpoint)技術來做故障恢複。檢查點機制一般是定期将狀态資料生成快照(Snapshot),持久化存儲起來,一旦發生意外,Flink主動重新開機應用,并從最近的快照中恢複,再繼續處理新流入資料。一緻性檢查點技術可以将分布在多台節點上的所有狀态都記錄下來,并提供了Exactly-Once的投遞保障,其背後是使用了Chandy-Lamport算法,将本地的狀态資料儲存到一個存儲空間上,故障發生後及時恢複最近的快照資料。我們将在後續文章中詳細介紹一緻性檢查點的算法原理。

狀态後端

Task在本地記憶體中儲存一份狀态資料,但在分布式系統中,某個Task在任意時間點都可能發生故障,是以Task上的本地狀态資料可以被認為是脆弱的。Flink定期将本地的狀态資料持久化儲存到一個存儲空間上。使用者可以選擇以怎樣的方式來儲存這些狀态資料,這種機制被稱為狀态後端(State Backend)。Flink提供了三種狀态後端:記憶體、檔案系統和RocksDB。

記憶體肯定是讀寫性能最優的方式,單個節點的記憶體有限,是以這種狀态後端會對狀态資料的大小有限制。相比記憶體,本地磁盤的速度更慢,其所能承擔的資料量更大,RocksDB 就是一種基于本地磁盤的狀态後端。此外,Flink還允許将資料存儲到分布式檔案系統,如Hadoop的HDFS和AWS的S3上,分布式檔案系統的資料存儲能力非常大,足以應付海量資料的存儲需求。我們将在後續文章中詳細介紹三種狀态後端的使用方法。

Savepoint

在容錯上,除了Checkpoint,Flink還提供了Savepoint機制。從名稱和實作上,這兩個機制都極其相似,甚至Savepoint會使用Checkpoint的資料,但實際上,這兩個機制的定位不同。

深入淺出:10行Flink WordCount程式背後的萬字深度解析,讀懂Flink原理和架構

圖 21 Checkpoint和Savepoint

Checkpoint是Flink定期觸發并自動執行的故障恢複機制,以應對各種意外情況,其設計初衷主要是針對容錯和故障恢複。Savepoint會使用Checkpoint生成的快照資料,但與Checkpoint不同點在于,Savepoint需要程式設計人員手動介入,用來恢複暫停作業。相比而言,Checkpoint是自動執行,Savepoint是手動管理。

當我們想要手動處理之前已經處理過的資料,就可以使用Savepoint,是以Savepoint經常被用來調試程式:

  • 我們可以給同一份作業設定不同的并行度,來找到最佳的并行度設定
  • 我們想測試一個新功能或修複一個已知的bug,并用新的程式邏輯處理原來的資料
  • 進行一些A/B實驗,使用相同的資料源測試程式的不同版本
  • 因為狀态可以被持久化存儲到分布式檔案系統上,我們甚至可以将同樣一份應用程式從一個叢集遷移到另一個叢集,隻需保證不同的叢集都可以通路這個檔案系統

Checkpoint 和 Savepoint 是Flink提供的兩個相似的功能,它們滿足了不同的需求,以確定一緻性、容錯性,滿足了作業更新、BUG 修複、遷移、A/B測試等不同需求。

繼續閱讀