
Event Time 最好的學習方式,是輸出
本打算使用CSDN記錄學習筆記的,但CSDN的編輯與釋出着實麻煩,一點都不簡約,實在不能好好玩耍。今天刷知乎,受知乎邀請,獲得【創作者中心】公測權,想起來前面那句名言,是以打開知乎寫文章的頁面,極簡風格,很适合創作。首次在知乎寫文章,就寫技術類文章,似乎會被人說成碼農,無趣的程式員。其實不然,個人愛好廣泛,會研究大資料、機器學習、資料分析、經濟學、金融學、管理學,偶爾會思考下哲學、心理學、文學、曆史、天文實體。思維不能受局限,創新創造。
貌似扯遠了,接下來就正題吧。
Flink是一種由Java和Scala編寫的分布式資料流處理架構。與其他分布式架構不同的是,其将流處理和批處理統一起來,流處理的資料是無界的,批處理的資料是有界的,一種特殊的流處理。
1. 架構
https://blog.csdn.net/yangyin007/article/details/82382779
Flink程式是由Stream和Transformation這兩個基本塊建構組成的,其中Stream是中間結果資料,Transformation是一個操作,它對一個或者多個Stream輸入進行處理,輸出一個或多個Stream。
1.1 運作層運作層以JobGraph形式接收程式。JobGraph為一個一般化的并行資料流圖,它擁有任意數量的Task來接收和産生data stream。一個Stream可以被分成多個Stream分區,一個Operator可以被分成多個Operator Subtask,每一個Operator Subtask是在不同的線程中獨立執行的,可能在不同機器或容器上執行。
https://flink.sojb.cn/concepts/programming-model.html
Stream可以One-to-One和Redistribution模式在兩個Operator之間傳輸資料
- One-to-One,(如Source和map算子之間)儲存資料元的分區和排序。這意味着map()中看到的資料流中記錄的順序,與Source中看到的記錄順序是一緻的。
- Redistribution,Streams在map()和keyBy/window之間,以及keyBy/window和Sink之間改變Stream的分區。每個Operator subtask根據所選的轉換向不同的目标subtasks發送資料。例如keyBy()
Flink分布式執行環境中,會将多個Operator Subtask串起來組成一個Operator Chain,每個Operator Chain會在TaskManager上獨立的線程中執行。
1.2 視窗聚合操作(例如計數、和)在Stream上的工作方式上與批處理不同。例如,不可能計算Stream中的所有元素,因為Stream通常是無邊界的。是以,Stream上的聚合由視窗限定範圍,例如“過去5分鐘内的計數或最後100個元素的總和”。
Flink中的時間有幾種:
- Event Time 建立事件的時間,通常由事件中的時間戳描述,例如由生産傳感器或生産服務附加。Flink通過時間戳配置設定器通路Event Time
- Ingestion time 事件在Source Operator處進入Flink dataflow的時間
- Processing Time 每個執行基于時間的操作的Operator的本地時間。
雖然dataflow中的許多操作一次隻檢視一個單獨事件(例如一個事件解析器),但是有些操作可以跨多個事件記錄資訊(例如視窗Operators)。這些操作稱為有狀态的。
有狀态操作的狀态維護可以認為是一個embedded key/value 存儲。嚴格分區和分布的狀态與Streams一起被Stateful Operators讀取。是以,隻有在keyBy()函數之後的鍵控制流上才能通路key/value狀态,并且隻能通路與目前事件的鍵關聯的值。将流的鍵與狀态對齊可以確定所有狀态更新都是本地操作,進而確定一緻性Tasks and Operator ChainTasks and Operator Chain,而不需要事務開銷。這種對齊還允許Flink重新配置設定狀态并透明地調整Stream分區。
占位,先不寫,這個有點耗時。。。
1.6 任務和 Operator 連結對于分布式執行,Flink将operator subtasks連結到任務中。每個任務由一個線程執行。将operators連結到任務中是一種有用的優化:它減少了線程到線程切換和緩沖的開銷,增加了總體的吞吐量,同時降低了延遲。
2. 叢集排程
2.1 Flink運作時由兩種類型的程序組成:JobManagers(也稱為masters)協調分布式執行。負責安排任務、協調檢查點、協調故障恢複等。
至少有一個Job Manager。一個高可用性的設定将有多個Job Managers,其中一個是上司,其他的總是備用的。
TaskManagers(也稱為workers)執行資料流的任務(或者更具體說,subtasks),并緩沖和交換資料流。至少有一個TaskManager
JobManagers和TaskManagers可以通過多種方式啟動:直接在機器上作為獨立叢集啟動,或者在容器中啟動,或者由像YARN或Mesos之類的資源架構管理。TaskManagers連接配接到JobManagers,宣布自己可用,并配置設定工作。
client不是運作時和runtime和程式執行的一部分,而是用于準備和向JobManager發送資料流。然後,client可以斷開連接配接,或者保持連接配接以接收進度報告。client可以作為觸發執行Java/Scala程式的一部分運作,也可以在指令行進行中運作。
在JobManager端,會接收到Client送出的JobGraph形式的Flink Job,JobManager會将一個JobGraph轉換映射為一個ExecutionGraph,ExecutionGraph是JobGraph的并行表示,也就是實際JobManager排程一個Job在TaskManager上運作的邏輯視圖。
資源的配置設定與使用例子:
- 左上圖:有2個TaskManager,每個TaskManager有3個Task Slot
- 左下圖:一個Flink JobGraph,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction。
- 左下圖:使用者送出的Flink Job對各個Operator進行的配置——data douorce的并行度設定為4,MapFunction的并行度也為4,ReduceFunction的并行度為3,在JobManager端對應于ExecutionGraph
- 右上圖:TaskManager1上,有2個并行的ExecutionVertex組成的DAG圖,它們各占用一個Task Slot
- 右下圖:TaskManager2上,有2個并行的ExecutionVertex組成的DAG圖,它們各占用一個Task Slot
- 在2個TaskManager上運作的4個Execution是并行執行的
普通疊代、增益疊代。占位,先不寫。。。。
2.4 監控流處理系統中,當下遊Operator處理速度跟不上的情況,如果下遊Operator能夠将自己處理狀态傳播給上遊Operator,使得上遊Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現有流處理系統存在的問題。
Flink web界面上提供了對運作Job的Backpressure行為的監控,它通過使用Sampling線程對正在運作的Task進行堆棧跟蹤采樣來實作。
預設情況下,JobManager會每間隔50ms觸發對一個Job的每個Task依次進行100次堆棧跟蹤調用,過計算得到一個比值,例如,radio=0.01,表示100次中僅有1次方法調用阻塞。Flink目前定義了如下Backpressure狀态:
- OK: 0 <= Ratio <= 0.10
- LOW: 0.10 < Ratio <= 0.5
- HIGH: 0.5 < Ratio <= 1
3. 上層庫
- Table :Flink的Table API實作了使用類SQL進行流和批處理。https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html
- CEP:支援在流中發現複雜的事件模式,快速篩選使用者感興趣的資料。https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html#next-steps
- Gelly:圖計算API,提供了簡化開發和建構圖計算分析應用的接口。https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/index.html
- FlinkML:FlinkML是Flink提供的機器學習庫,提供了可擴充的機器學習算法、簡潔的API和工具簡化機器學習系統的開發。https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html
4. 安裝配置
下載下傳,https://flink.apache.org/downloads.html#apache-flink-164
移動到hadoop使用者,
mv flink-1.6.4-bin-hadoop28-scala_2.11.tgz /home/hadoop
解壓:
tar -zxvf flink-1.6.4-bin-hadoop28-scala_2.11.tgz
Flink有多種部署模式,Local本地部署、Standalone Cluster叢集部署、Flink ON YARN。這裡使用Local本地部署,用于開發調試。其實很簡單,直接在bin目錄下執行
bin$ ./start-cluster.sh
jps指令後出現StandaloneSessionClusterEntrypoint程序,打開位址http://localhost:8081,即可進入Flink web管理頁面
至于Standalone Cluster叢集模式,則是需要修改配置檔案。Flink ON YARN是在生産環境使用的,比較重型,做開發與調試不是很适合,需要另外啟動Hadoop叢集YARN任務,浪費資源。
5. IDEA開發配置
這裡使用maven配置IDEA,為了maven更新下載下傳快些,先配置下國内的源,比如阿裡雲。
添加~/.m2/settings.xml 檔案,加入如下聲明:
<?xml version="1.0" encoding="UTF-8"?>
配置阿裡雲後更新就飛起來了。然後是pom.xml添加項目jar包依賴和一些聲明。
。。。balabala,配置是個阻擋入門的幫手。。。
然後開始寫代碼了。
6. 建立第一個WordCount程式
public
整個程式很簡單,統計10秒内stream時間視窗上的單詞數。
StreamExecutionEnvironment是一個任務的入口類,可以用于設定參數和建立資料源以及送出任務。
StreamExecutionEnvironment
下一步,建立一個從本地端口号9003的socket中讀取資料的資料源,"n"換行符是指資料每行每行的讀取,Flink的時間視窗是讀取指定時間視窗内的資料,并進行處理。
DataStream
這裡建立了一個字元串類型的DataStream。在這段代碼中,我們計算每個單詞在特定時間視窗中出現的次數,比如10秒視窗。是以,首先應該分割字元串解析成Tuple2<單詞,次數>的形式表示。實作了一個flatmap:
DataStream
接着将生成的Stream按照單詞字段做分組,keyBy(0)即是以Tuple的第一個元素為key,然後取10秒視窗的資料流進行聚合。.timeWindow()指定需要的滾動視窗時間。.sum聚合函數以視窗的每個key計算聚合,參數1指按照次數字段相加,得到的結果資料流,将每5秒輸出一次這5秒内每個單詞的出現次數。
DataStream
最後将資料流列印到控制台,并執行execute調用送出作業到叢集上或本地計算機運作。
windowCounts
這時,使用netcat指令在終端,擷取輸入流:
9003
在終端輸入單詞,按回車,資料才進入SocketWindowWordCount程式運作計算,因為定義flink讀取資料以換行符為每次讀取資料的間隔。