天天看點

Flink原理簡介和使用

任務排程原理

Flink原理簡介和使用
用戶端不是運作時和程式執行的一部分 

但它用于準備并發送dataflow(JobGraph)給Master(JobManager)

然後用戶端斷開連接配接或維持連接配接以等待接受計算結果

當Flink叢集啟動後 首先會啟動一個JobManager和一個或多個TaskManager去執行

然後TaskManager将心跳和統計資訊彙報給JobManager

TaskManager之間以流的形式進行資料傳輸 

以上三者均為獨立的JVM程序      
  • Client
送出Job的用戶端

可以是運作在任何機器上 (與JobManager環境連通即可)

送出Job後 Clinet可以結束程序(Streaming的任務)

也可以不結束并等待結果傳回      
  • JobManager
主要負責排程Job并協調Task做checkpoint 

職責很像Storm的Nimbus

從Clinet處接受Job和Jar包等資源後 會生成優化後的執行計劃

并以Task的單元排程到各個TaskManager去執行
      
  • TaskManager
TaskManager啟動的時候就設定好了槽位數(Slot) 

每個Slot能啟動一個Task 。Task為線程

從JobManager處接受需要部署的Task 

部署啟動後 與自己上遊建立Netty連接配接 接受資料并處理      

TaskManager與Slots

Flink原理簡介和使用
  • Flink中的每一個worker(TaskManager)都是一個JVM程序
它可能會在獨立的線程上執行一個或多個subtask      
  • 控制一個worker能接受多少個task ,worker通過task slot來進行控制
一個worker至少有一個task slot      
  • 每個task slot表示TaskManager擁有資源的一個固定大小的子集
假如一個TaksManager有三個slot 那麼它會将其管理的記憶體分成三份分給三個slot      
  • 資源slot化意味着一個subtask将不需要跟來自其他job的subtask競争被管理的記憶體 取而代之的是它将擁有一定數量的記憶體儲備
需要注意的是 這裡不會涉及到CPU的隔離 slot目前僅僅用來隔離task受管理的記憶體      
  • 通過調整task slot的數量 允許使用者定義subtask之間如何互相隔離
如果一個taskManager一個slot 那将意味着每個task group運作在獨立的JVM中 (該JVM可能通過一個特定的容器啟動的)

而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM

而同一個JVM程序中的task将共享TCP連接配接(基于多路複用)和心跳消息

它們也可能共享資料集和資料結構 是以這減少了每個task的負載      

子任務共享slot

Flink原理簡介和使用
  • 預設情況下 Flink允許子任務共享slot
即使它們是不同任務的子任務(前提是它們來自同一個job)

這樣的結果是 一個slot可以儲存業務的整個管道       
  • Task slot是靜态概念 是指TaskManager具有并發執行能力
可以通過taskmanager.numberOfTaskSlots進行配置      
  • 并行度 parallelism是動态概念 即每個TaskManager運作程式時實際使用的并發能力
可以通過參數 parallelism.default進行配置      
假設一共有3個TaskManager 每個TaskManager中配置設定3個TaskSlot

也就是說每個TaskManager可以接收3個task

一共9個TaskSlot 如果設定parralelism.default=1即運作程式預設的并行度為1

9個TaskSlot隻用了一個 有8個空閑

是以需要設定合适的并行度才能提高效率      
Flink原理簡介和使用

程式和資料流

Flink原理簡介和使用
  • 所有的Flink由三部分組成 Source、Transformation、Sink
Source負責讀取資料源
Transformation利用各種算子進行處理加工
Sink負責輸出

在運作時 Flink上運作的程式會被映射成 "邏輯資料流" dataflows 它包含了這三部分

每一個dataflow以一個或多個sources開始
以一個或多個sinks結束      
  • dataflow類似于任意的有向無環圖(DAG)
在大部分情況下 程式的轉換算法(transformations)跟dataflow中的算子(operator)是一一對應關系
但有時候一個transformations可能對應多個operator      
Flink原理簡介和使用

執行圖(ExecutionGraph)

由Flink程式直接映射成資料流圖 StreamGraph 也被成為邏輯流圖 因為它們表示的是計算邏輯的進階視圖 

為了執行一個流處理程式 Flink需要将邏輯流圖轉換為實體資料流圖(也叫執行圖)詳細說明程式的執行方式      
  • Flink執行圖可以分為四層
StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖      
  • StreamGraph
是根據使用者通過Stream API編寫的代碼生成的最初的圖 用來表示程式的拓撲結構      
  • JobGraph
StreamGraph經過優化後生成了JobGraph 送出給JobManager的資料結構

主要優化為:

将多個符合條件的節點 chain在一起作為一個節點  這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗      
  • ExecutionGraph
JobManager根據JobGraph生成了ExecutionGraph

ExecutionGraph是JobGraph的并行化版本 是排程層最核心的資料結構

      
  • 實體執行圖
JobManager根據ExecutionGraph對Job進行排程後

在各個TaskManager上部署Task後形成的“圖”

并不是一個具體的資料結構      
Flink原理簡介和使用

并行度(Parallelism)

Flink程式的執行具有并行、分布式特性 

一個流包含了一個或多個分區(stream partition)

而每一個算子(operator)可以包含一個或多個子任務(operator subtask)

這些子任務在不同的線程、不同實體機或不同的容器中不依賴的執行      
  • 并行資料流
Flink原理簡介和使用
一個特定算子的子任務(subtask)的個數被稱為并行度

一般情況下 一個流程式的并行度 可以認為其所有算子中最大的并行度      
Flink原理簡介和使用
一個程式中 不同的算子可能具有不同的并行度

Stream在算子之間傳輸資料的形式可以是 one-to-one(forwarding)的模式也可以是redistributing的模式 具體是哪一種形式 取決于算子的種類

one-to-one:

stream(比如在source和map operator之間) 維護着分區以及元素的順序 那意味着map算子的子任務看到的元素的個數以及順序跟source算子的子任務生産的元素的個數、順序相同 map、filter、flatMap等算子都是one-to-one的對應關系


Redistributing:

stream(map()跟keyBy/window之間或者keyBy/window跟slink之間)的分區會發生改變

每一個算子的子任務依據所選擇的transformation發送資料到不同的目标任務

例如:

keyBy()基于hashCode重分區 
broadcast和rebalance會随機重新分區
這些算子都會引起redistribute過程
該過程就類似于spark中的shuffle
類似于spark的窄依賴、寬依賴      

任務鍊

Flink原理簡介和使用
a 、

Flink采用一種成為任務鍊的優化技術

可以在指定條件下減少本地通信開銷

為了滿足任務鍊的要求 

必須将兩個或多個算子設為相同的并行度

并通過本地轉發的方式連接配接(local forward)


b、

相同并行度one to one操作 Flink這樣相連的算子連結在一起形成一個task 原來的算子成為裡面的subtask

c、

并行度相同、并且是One-to-One操作 兩個條件缺一不可


d、

将算子連結成task是非常有效的優化 

它能減少線程之間的切換和基于緩存區的資料交換

在減少時延的同時提升吞吐量 

連結的行為可以在程式設計API中進行指定       

上面是純理論 下面實踐下 才能對理論了解的更加透徹

Flink 流處理API

Environment

Flink原理簡介和使用
  • getExecutionEnvironment
建立一個執行環境 表示目前執行程式的上下文

如果程式是獨立調用的 則此方法傳回本地執行環境

如果從指令行調用程式以送出到叢集 則此方法傳回叢集的執行環境      
  • 代碼
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
或
val env = StreamExecutionEnvironment.getExecutionEnvironment


a 傳回本地執行環境 需要在調用時指定預設的并行度

val env = StreamExecutionEnvironment.createLocalEnvironment(1)


b 傳回叢集環境 将jar包送出到遠端伺服器 需要在調用時指定 JobManager的IP和端口号 并指定要在叢集中運作的jar包

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",
6123,"YOURPATH//wordcount.jar")
      
  • 并行度
如果沒有設定并行度 會以flink-conf.yaml中配置為準 預設是1      
Flink原理簡介和使用

Source

  • 從集合讀取資料
//  定義樣例類,傳感器 id ,時間戳,溫度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream1 = env
          .fromCollection(List(
            SensorReading("sensor_1", 1547718199, 35.80018327300259),
            SensorReading("sensor_6", 1547718201, 15.402984393403084),
            SensorReading("sensor_7", 1547718202, 6.720945201171228),
            SensorReading("sensor_10", 1547718205, 38.101067604893444)
          ))
        stream1.print("stream1:").setParallelism(1)
        env.execute()
     }
}      
  • 從檔案讀取資料
val stream2 = env.readTextFile("YOUR_FILE_PATH")      
  • 以kafka消息隊列的資料作為來源
需要引入kafka連接配接器的依賴 pom:

<dependency>
 <groupId>org.apache.flink</ groupId>
 <artifactId>flink-connector-kafka-0.11_2.11</artifactI   d>
 <version>1.7.2</version>
</dependency>
   
   
代碼:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new
SimpleStringSchema(), properties))      
  • 先測試下kafka發送消息 從zk中消費消息是否可以
cd /opt/kafka/kafka_2.10-0.8.2.1/bin


生産消息

./kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic test

消費消息

./kafka-console-consumer.sh --zookeeper 192.168.84.128:2181 --topic test --from-beginning      
Flink原理簡介和使用
Flink原理簡介和使用

demo程式是用的kafka版本是 kafka-0.11_2.11

目前虛拟機上安裝的版本是2.10-0.8.2.1 是以為了跑demo程式 是以安裝下kafka-0.11_2.11版本

安裝過程​​大資料處理工具Kafka、Zk、Spark​​

  • 安裝包下載下傳路徑
https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz      
  • 唯一的差別
/opt/kafka/版本号/config/server.properties 這個配置檔案

配置zk叢集的配置項名稱

kafka_2.10-0.8.2.1: zookeeper.contact

kafka_2.12-0.11.0.1: zookeeper.connect      
  • 生成一條消息
./kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic test      
Flink原理簡介和使用
  • 消費這條消息
./kafka-console-consumer.sh --zookeeper 192.168.84.128:2181 --topic test --from-beginning      
Flink原理簡介和使用
  • 通過Flink Kafka來消費

​代碼​

https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/SourceTest.scala      

繼續閱讀