天天看點

Flink基礎(二十三):FLINK基本題(二)

1 Flink是如何支援批流一體的?

Flink基礎(二十三):FLINK基本題(二)

本道面試題考察的其實就是一句話:Flink的開發者認為批處理是流處理的一種特殊情況。批處理是有限的流處理。Flink 使用一個引擎支援了DataSet API 和 DataStream API。

2 Flink是如何做到高效的資料交換的?

  在一個Flink Job中,資料需要在不同的task中進行交換,整個資料交換是有 TaskManager 負責的。TaskManager 的網絡元件首先從緩沖buffer中收集records,然後再發送。Records 并不是一個一個被發送的,二是積累一個批次再發送,batch 技術可以更加高效的利用網絡資源。

3 Flink是如何做容錯的?

  Flink 實作容錯主要靠強大的CheckPoint機制和State機制。Checkpoint 負責定時制作分布式快照、對程式中的狀态進行備份;State 用來存儲計算過程中的中間狀态。

4 Flink 分布式快照的原理是什麼?

Flink的分布式快照是根據Chandy-Lamport算法量身定做的。簡單來說就是持續建立分布式資料流及其狀态的一緻快照。

Flink基礎(二十三):FLINK基本題(二)

核心思想是在 input source 端插入 barrier,控制 barrier 的同步來實作 snapshot 的備份和 exactly-once 語義。

5 Flink是如何保證Exactly-once語義的?

端到端的exactly-once對sink要求比較高,具體實作主要有幂等寫入和事務性寫入兩種方式。幂等寫入的場景依賴于業務邏輯,更常見的是用事務性寫入。

而事務性寫入又有預寫日志(WAL)和兩階段送出(2PC)兩種方式。

兩階段送出

Flink通過實作兩階段送出和狀态儲存來實作端到端的一緻性語義。 分為以下幾個步驟:

  開始事務(beginTransaction)建立一個臨時檔案夾,來寫把資料寫入到這個檔案夾裡面

  預送出(preCommit)将記憶體中緩存的資料寫入檔案并關閉

  正式送出(commit)将之前寫完的臨時檔案放入目标目錄下。這代表着最終的資料會有一些延遲

  丢棄(abort)丢棄臨時檔案

若失敗發生在預送出成功後,正式送出前。可以根據狀态來送出預送出的資料,也可删除預送出的資料。

6 Flink 的 kafka 連接配接器有什麼特别的地方?

  Flink源碼中有一個獨立的connector子產品,所有的其他connector都依賴于此子產品,Flink 在1.9版本釋出的全新kafka連接配接器,摒棄了之前連接配接不同版本的kafka叢集需要依賴不同版本的connector這種做法,隻需要依賴一個connector即可。

7 說說 Flink的記憶體管理是如何做的?

      Flink 并不是将大量對象存在堆上,而是将對象都序列化到一個預配置設定的記憶體塊上,這個記憶體塊叫做 

MemorySegment

,它代表了一段固定長度的記憶體(預設大小為 32KB),也是 Flink 中最小的記憶體配置設定單元,并且提供了非常高效的讀寫方法。每條記錄都會以序列化的形式存儲在一個或多個

MemorySegment

中。

  如果需要處理的資料超出了記憶體限制,則會将部分資料存儲到硬碟上。Flink 為了直接操作二進制資料實作了自己的序列化架構。理論上Flink的記憶體管理分為三部分:

Flink基礎(二十三):FLINK基本題(二)

  Network Buffers:這個是在TaskManager啟動的時候配置設定的,這是一組用于緩存網絡資料的記憶體,每個塊是32K,預設配置設定2048個,可以通過“taskmanager.network.numberOfBuffers”修改

  Memory Manage pool:這是一個由 

MemoryManager

 管理的,由衆多

MemorySegment

組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個記憶體池申請 MemorySegment,将序列化後的資料存于其中,使用完後釋放回記憶體池。預設情況下,池子占了堆記憶體的 70% 的大小。大量的Memory Segment塊,用于運作時的算法(Sort/Join/Shuffle等),這部分啟動的時候就會配置設定。記憶體的配置設定支援預配置設定和lazy load,預設懶加載的方式。

  User Code,這部分是除了Memory Manager之外的記憶體用于User code和TaskManager本身的資料結構。

Flink使用堆外記憶體:

  • 啟動超大記憶體(上百GB)的JVM需要很長時間,GC停留時間也會很長(分鐘級)。使用堆外記憶體可以極大地減小堆記憶體(隻需要配置設定Remaining Heap),使得 TaskManager 擴充到上百GB記憶體不是問題。
  • 進行IO操作時,使用堆外記憶體可以zero-copy,使用堆内記憶體至少要複制一次。
  • 堆外記憶體在程序間是共享的。

8 說說 Flink的序列化如何做的?

  • 序列化與反序列化可以了解為編碼與解碼的過程。序列化以後的資料希望占用比較小的空間,而且資料能夠被正确地反序列化出來。為了能正确反序列化,序列化時僅存儲二進制資料本身肯定不夠,需要增加一些輔助的描述資訊。此處可以采用不同的政策,因而産生了很多不同的序列化方法。Java本身自帶的序列化和反序列化的功能,但是輔助資訊占用空間比較大,在序列化對象時記錄了過多的類資訊。
  • Flink實作了自己的序列化架構,Flink處理的資料流通常是一種類型,是以可以隻儲存一份對象Schema資訊,節省存儲空間。又因為對象類型固定,是以可以通過偏移量存取。
  • Java支援任意Java或Scala類型,類型資訊由 

    TypeInformation

     類表示,TypeInformation 支援以下幾種類型:
    • BasicTypeInfo

      : 任意Java 基本類型或 String 類型。
    • BasicArrayTypeInfo

      : 任意Java基本類型數組或 String 數組。
    • WritableTypeInfo

      : 任意 Hadoop Writable 接口的實作類。
    • TupleTypeInfo

      : 任意的 Flink Tuple 類型(支援Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實作。
    • CaseClassTypeInfo

      : 任意的 Scala CaseClass(包括 Scala tuples)。
    • PojoTypeInfo

      : 任意的 POJO (Java or Scala),例如,Java對象的所有成員變量,要麼是 public 修飾符定義,要麼有 getter/setter 方法。
    • GenericTypeInfo

      : 任意無法比對之前幾種類型的類。
  • 針對前六種類型資料集,Flink皆可以自動生成對應的TypeSerializer,能非常高效地對資料集進行序列化和反序列化。對于最後一種資料類型,Flink會使用Kryo進行序列化和反序列化。每個TypeInformation中,都包含了serializer,類型會自動通過serializer進行序列化,然後用Java Unsafe接口寫入MemorySegments。如下圖展示 一個内嵌型的Tuple3<integer,double,person> 對象的序列化過程:
Flink基礎(二十三):FLINK基本題(二)

操縱二進制資料:

  • Flink 提供了如 group、sort、join 等操作,這些操作都需要通路海量資料。以sort為例。
  • 首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,用來存放排序的資料。
Flink基礎(二十三):FLINK基本題(二)
  • 這些記憶體會分為兩部分,一個區域是用來存放所有對象完整的二進制資料。另一個區域用來存放指向完整二進制資料的指針以及定長的序列化後的key(key+pointer)。将實際的資料和point+key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的資料也不用移動其他key和pointer。第二,這樣做是緩存友好的,因為key都是連續存儲在記憶體中的,可以增加cache命中。 排序會先比較 key 大小,這樣就可以直接用二進制的 key 比較而不需要反序列化出整個對象。通路排序後的資料,可以沿着排好序的key+pointer順序通路,通過 pointer 找到對應的真實資料。

9 Flink是如何處理反壓的?

  Flink 内部是基于 producer-consumer 模型來進行消息傳遞的,Flink的反壓設計也是基于這個模型。Flink 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。下遊消費者消費變慢,上遊就會受到阻塞。

10 Flink的反壓和Strom有哪些不同?

  Storm 是通過監控 Bolt 中的接收隊列負載情況,如果超過高水位值就會将反壓資訊寫到 Zookeeper ,Zookeeper 上的 watch 會通知該拓撲的所有 Worker 都進入反壓狀态,最後 Spout 停止發送 tuple。Flink中的反壓使用了高效有界的分布式阻塞隊列,下遊消費變慢會導緻發送端阻塞。二者最大的差別是Flink是逐級反壓,而Storm是直接從源頭降速。

11 Operator Chains(算子鍊)這個概念你了解嗎?

  為了更高效地分布式執行,Flink會盡可能地将operator的subtask連結(chain)在一起形成task。每個task在一個線程中執行。

  将operators連結成task是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少資料在緩沖區的交換,減少了延遲的同時提高整體的吞吐量。這就是我們所說的算子鍊。

12 Flink什麼情況下才會把Operator chain在一起形成算子鍊?

兩個operator chain在一起的的條件:

1 上下遊的并行度一緻

2 下遊節點的入度為1 (也就是說下遊節點沒有來自其他節點的輸入)

3 上下遊節點都在同一個 slot group 中(下面會解釋 slot group)

4 下遊節點的 chain 政策為 ALWAYS(可以與上下遊連結,map、flatmap、filter等預設是ALWAYS)

5 上遊節點的 chain 政策為 ALWAYS 或 HEAD(隻能與下遊連結,不能與上遊連結,Source預設是HEAD)

6 兩個節點間資料分區方式是 forward(參考了解資料流的分區)

7 使用者沒有禁用 chain

13 Flink中的Window出現了資料傾斜,你有什麼解決辦法?

  window産生資料傾斜指的是資料在不同的視窗内堆積的資料量相差過多。本質上産生這種情況的原因是資料源頭發送的資料量速度不同導緻的。

  出現這種情況一般通過兩種方式來解決:

    1. 在資料進入視窗前做預聚合

    2. 重新設計視窗聚合的key

14 Flink中在使用聚合函數 GroupBy、Distinct、KeyBy 等函數時出現資料熱點該如何解決?

資料傾斜和資料熱點是所有大資料架構繞不過去的問題。處理這類問題主要從3個方面入手:

  在業務上規避這類問題

    例如一個假設訂單場景,北京和上海兩個城市訂單量增長幾十倍,其餘城市的資料量不變。這時候我們在進行聚合的時候,北京和上海就會出現資料堆積,我們可以單獨資料北京和上海的資料。

  Key的設計上

    把熱key進行拆分,比如上個例子中的北京和上海,可以把北京和上海按照地區進行拆分聚合。

  參數設定

    Flink 1.9.0 SQL(Blink Planner) 性能優化中一項重要的改進就是更新了微批模型,即 MiniBatch。原理是緩存一定的資料後再觸發處理,以減少對State的通路,進而提升吞吐和減少資料的輸出量。

15 Flink任務延遲高,想解決這個問題,你會如何入手?

  在Flink的背景任務管理中,我們可以看到Flink的哪個算子和task出現了反壓。最主要的手段是資源調優和算子調優。

  資源調優即是對作業中的Operator的并發數(parallelism)、CPU(core)、堆記憶體(heap_memory)等參數進行調優。

16 消費kafka資料的時候,如何處理髒資料?