任務排程原理
用戶端不是運作時和程式執行的一部分
但它用于準備并發送dataflow(JobGraph)給Master(JobManager)
然後用戶端斷開連接配接或維持連接配接以等待接受計算結果
當Flink叢集啟動後 首先會啟動一個JobManager和一個或多個TaskManager去執行
然後TaskManager将心跳和統計資訊彙報給JobManager
TaskManager之間以流的形式進行資料傳輸
以上三者均為獨立的JVM程序
- Client
送出Job的用戶端
可以是運作在任何機器上 (與JobManager環境連通即可)
送出Job後 Clinet可以結束程序(Streaming的任務)
也可以不結束并等待結果傳回
- JobManager
主要負責排程Job并協調Task做checkpoint
職責很像Storm的Nimbus
從Clinet處接受Job和Jar包等資源後 會生成優化後的執行計劃
并以Task的單元排程到各個TaskManager去執行
- TaskManager
TaskManager啟動的時候就設定好了槽位數(Slot)
每個Slot能啟動一個Task 。Task為線程
從JobManager處接受需要部署的Task
部署啟動後 與自己上遊建立Netty連接配接 接受資料并處理
TaskManager與Slots
- Flink中的每一個worker(TaskManager)都是一個JVM程序
它可能會在獨立的線程上執行一個或多個subtask
- 控制一個worker能接受多少個task ,worker通過task slot來進行控制
一個worker至少有一個task slot
- 每個task slot表示TaskManager擁有資源的一個固定大小的子集
假如一個TaksManager有三個slot 那麼它會将其管理的記憶體分成三份分給三個slot
- 資源slot化意味着一個subtask将不需要跟來自其他job的subtask競争被管理的記憶體 取而代之的是它将擁有一定數量的記憶體儲備
需要注意的是 這裡不會涉及到CPU的隔離 slot目前僅僅用來隔離task受管理的記憶體
- 通過調整task slot的數量 允許使用者定義subtask之間如何互相隔離
如果一個taskManager一個slot 那将意味着每個task group運作在獨立的JVM中 (該JVM可能通過一個特定的容器啟動的)
而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM
而同一個JVM程序中的task将共享TCP連接配接(基于多路複用)和心跳消息
它們也可能共享資料集和資料結構 是以這減少了每個task的負載
子任務共享slot
- 預設情況下 Flink允許子任務共享slot
即使它們是不同任務的子任務(前提是它們來自同一個job)
這樣的結果是 一個slot可以儲存業務的整個管道
- Task slot是靜态概念 是指TaskManager具有并發執行能力
可以通過taskmanager.numberOfTaskSlots進行配置
- 并行度 parallelism是動态概念 即每個TaskManager運作程式時實際使用的并發能力
可以通過參數 parallelism.default進行配置
假設一共有3個TaskManager 每個TaskManager中配置設定3個TaskSlot
也就是說每個TaskManager可以接收3個task
一共9個TaskSlot 如果設定parralelism.default=1即運作程式預設的并行度為1
9個TaskSlot隻用了一個 有8個空閑
是以需要設定合适的并行度才能提高效率
程式和資料流
- 所有的Flink由三部分組成 Source、Transformation、Sink
Source負責讀取資料源
Transformation利用各種算子進行處理加工
Sink負責輸出
在運作時 Flink上運作的程式會被映射成 "邏輯資料流" dataflows 它包含了這三部分
每一個dataflow以一個或多個sources開始
以一個或多個sinks結束
- dataflow類似于任意的有向無環圖(DAG)
在大部分情況下 程式的轉換算法(transformations)跟dataflow中的算子(operator)是一一對應關系
但有時候一個transformations可能對應多個operator
執行圖(ExecutionGraph)
由Flink程式直接映射成資料流圖 StreamGraph 也被成為邏輯流圖 因為它們表示的是計算邏輯的進階視圖
為了執行一個流處理程式 Flink需要将邏輯流圖轉換為實體資料流圖(也叫執行圖)詳細說明程式的執行方式
- Flink執行圖可以分為四層
StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖
- StreamGraph
是根據使用者通過Stream API編寫的代碼生成的最初的圖 用來表示程式的拓撲結構
- JobGraph
StreamGraph經過優化後生成了JobGraph 送出給JobManager的資料結構
主要優化為:
将多個符合條件的節點 chain在一起作為一個節點 這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗
- ExecutionGraph
JobManager根據JobGraph生成了ExecutionGraph
ExecutionGraph是JobGraph的并行化版本 是排程層最核心的資料結構
- 實體執行圖
JobManager根據ExecutionGraph對Job進行排程後
在各個TaskManager上部署Task後形成的“圖”
并不是一個具體的資料結構
并行度(Parallelism)
Flink程式的執行具有并行、分布式特性
一個流包含了一個或多個分區(stream partition)
而每一個算子(operator)可以包含一個或多個子任務(operator subtask)
這些子任務在不同的線程、不同實體機或不同的容器中不依賴的執行
- 并行資料流
一個特定算子的子任務(subtask)的個數被稱為并行度
一般情況下 一個流程式的并行度 可以認為其所有算子中最大的并行度
一個程式中 不同的算子可能具有不同的并行度
Stream在算子之間傳輸資料的形式可以是 one-to-one(forwarding)的模式也可以是redistributing的模式 具體是哪一種形式 取決于算子的種類
one-to-one:
stream(比如在source和map operator之間) 維護着分區以及元素的順序 那意味着map算子的子任務看到的元素的個數以及順序跟source算子的子任務生産的元素的個數、順序相同 map、filter、flatMap等算子都是one-to-one的對應關系
Redistributing:
stream(map()跟keyBy/window之間或者keyBy/window跟slink之間)的分區會發生改變
每一個算子的子任務依據所選擇的transformation發送資料到不同的目标任務
例如:
keyBy()基于hashCode重分區
broadcast和rebalance會随機重新分區
這些算子都會引起redistribute過程
該過程就類似于spark中的shuffle
類似于spark的窄依賴、寬依賴
任務鍊
a 、
Flink采用一種成為任務鍊的優化技術
可以在指定條件下減少本地通信開銷
為了滿足任務鍊的要求
必須将兩個或多個算子設為相同的并行度
并通過本地轉發的方式連接配接(local forward)
b、
相同并行度one to one操作 Flink這樣相連的算子連結在一起形成一個task 原來的算子成為裡面的subtask
c、
并行度相同、并且是One-to-One操作 兩個條件缺一不可
d、
将算子連結成task是非常有效的優化
它能減少線程之間的切換和基于緩存區的資料交換
在減少時延的同時提升吞吐量
連結的行為可以在程式設計API中進行指定
上面是純理論 下面實踐下 才能對理論了解的更加透徹
Flink 流處理API
Environment
- getExecutionEnvironment
建立一個執行環境 表示目前執行程式的上下文
如果程式是獨立調用的 則此方法傳回本地執行環境
如果從指令行調用程式以送出到叢集 則此方法傳回叢集的執行環境
- 代碼
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
或
val env = StreamExecutionEnvironment.getExecutionEnvironment
a 傳回本地執行環境 需要在調用時指定預設的并行度
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
b 傳回叢集環境 将jar包送出到遠端伺服器 需要在調用時指定 JobManager的IP和端口号 并指定要在叢集中運作的jar包
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",
6123,"YOURPATH//wordcount.jar")
- 并行度
如果沒有設定并行度 會以flink-conf.yaml中配置為準 預設是1
Source
- 從集合讀取資料
// 定義樣例類,傳感器 id ,時間戳,溫度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}
- 從檔案讀取資料
val stream2 = env.readTextFile("YOUR_FILE_PATH")
- 以kafka消息隊列的資料作為來源
需要引入kafka連接配接器的依賴 pom:
<dependency>
<groupId>org.apache.flink</ groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactI d>
<version>1.7.2</version>
</dependency>
代碼:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new
SimpleStringSchema(), properties))
- 先測試下kafka發送消息 從zk中消費消息是否可以
cd /opt/kafka/kafka_2.10-0.8.2.1/bin
生産消息
./kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic test
消費消息
./kafka-console-consumer.sh --zookeeper 192.168.84.128:2181 --topic test --from-beginning
demo程式是用的kafka版本是 kafka-0.11_2.11
目前虛拟機上安裝的版本是2.10-0.8.2.1 是以為了跑demo程式 是以安裝下kafka-0.11_2.11版本
安裝過程大資料處理工具Kafka、Zk、Spark
- 安裝包下載下傳路徑
https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.12-0.11.0.1.tgz
- 唯一的差別
/opt/kafka/版本号/config/server.properties 這個配置檔案
配置zk叢集的配置項名稱
kafka_2.10-0.8.2.1: zookeeper.contact
kafka_2.12-0.11.0.1: zookeeper.connect
- 生成一條消息
./kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic test
- 消費這條消息
./kafka-console-consumer.sh --zookeeper 192.168.84.128:2181 --topic test --from-beginning
- 通過Flink Kafka來消費
代碼
https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/SourceTest.scala