天天看點

flink-DataStream APIstream執行環境DataStream轉換Flink程式本地運作參考連結

flink-DataStream API

  • stream執行環境
  • DataStream轉換
    • RichFunction
    • Operators
  • Flink程式
  • 本地運作
  • 參考連結

stream執行環境

每個 Flink 應用都需要有執行環境,在該示例中為 env。流式應用需要用到 StreamExecutionEnvironment。

DataStream API 将你的應用建構為一個 job graph,并附加到 StreamExecutionEnvironment 。當調用 env.execute() 時此 graph 就被打包并發送到 JobManager 上,後者對作業并行處理并将其子任務分發給 Task Manager 來執行。每個作業的并行子任務将在 task slot 中執行。

如果沒有調用 execute(),應用就不會運作。

此分布式運作時取決于你的應用是否是可序列化的。它還要求所有依賴對叢集中的每個節點均可用。

flink-DataStream APIstream執行環境DataStream轉換Flink程式本地運作參考連結

DataStream轉換

DataStream 是 Flink 流處理 API 中最核心的資料結構。它代表了一個運行在多個分區上的并行流。一 個 DataStream 可以從 StreamExecutionEnvironment 通過env.addSource(SourceFunction) 獲得。 DataStream 上的轉換操作都是逐條的,比如 map(),flatMap(),filter() 。

下圖展示了Flink 中目前支援的主要幾種流的類型,以及它們之間的轉換關系。

flink-DataStream APIstream執行環境DataStream轉換Flink程式本地運作參考連結

RichFunction

RichFunction中有非常有用的四個方法:open,close,getRuntimeContext 和 setRuntimecontext 這些功能在參數化函數、建立和确定本地狀态、擷取廣播變量、擷取運行時資訊(例如累加器和計數器)和疊代資訊時非常有幫助。

import java.util.Properties

import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFlatMapFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

class KafkaRichFlatMapFunction(topic: String,properites: Properties) extends RichFlatMapFunction[String, Collector[Int]]{

  var producer: KafkaProducer[String, String] = null

  override def open(parameters: Configuration): Unit = {
    // 建立kafka生産者
    producer = new KafkaProducer[String, String](properites)
  }

  override def close(): Unit = {
    // 關閉kafka生産者
    producer.close()
  }

  override def getRuntimeContext: RuntimeContext = super.getRuntimeContext

  override def setRuntimeContext(t: RuntimeContext): Unit = super.setRuntimeContext(t)

  override def getIterationRuntimeContext: IterationRuntimeContext = super.getIterationRuntimeContext


  override def flatMap(value: String, out: Collector[Collector[Int]]): Unit = {
    //使用RuntimeContext得到子線程ID,比如可以用于多線程寫檔案
    println(getRuntimeContext.getIndexOfThisSubtask)
    //發送資料到kafka
    producer.send(new ProducerRecord[String, String](topic, value))
  }
}
           

Operators

1.map / flatmap

  • 含義:資料映射(1進1出和1進n出)
  • 轉換關系:DataStream → DataStream

2.filter

  • 含義:資料篩選(滿足條件event的被篩選出來進行後續處理),根據FliterFunction傳回的布爾值來判斷是否 保留元素,true為保留,false則丢棄
  • 轉換關系:DataStream → DataStream

3.keyBy

  • 含義: 根據指定的key進行分組(邏輯上把DataStream分成若幹不相交的分區,key一樣的event會 被劃分到相同的partition,内部采用hash分區來實作)
  • 轉換關系: DataStream → KeyedStream
  • 限制: 可能會出現資料傾斜,可根據實際情況結合物理分區來解決
KeyedStream
  • KeyedStream用來表示根據指定的key進行分組的資料流。
  • 一個KeyedStream可以通過調用DataStream.keyBy()來獲得。
  • 在KeyedStream上進行任何transformation都将轉變回DataStream。
  • 在實作中,KeyedStream會把key的資訊傳入到算子的函數中。
  • 每個event隻能通路所屬key的狀态,其上的聚合函數可以方便地操作和儲存對應key的狀态

4.reduce / fold

  • 分組之後當然要對分組之後的資料也就是KeyedStream進行各種聚合操作啦
  • KeyedStream → DataStream
  • 對于KeyedStream的聚合操作都是滾動的(rolling,在前面的狀态基礎上繼續聚合),千萬不要理解為批處理 時的聚合操作(DataSet,其實也是滾動聚合,隻不過他隻把最後的結果給了我們)

5.connect / union

  • connect之後生成ConnectedStreams,會對兩個流的資料應用不同的處理方法,并且雙流之間可以共享狀态 (比如計數)。
  • union 合并多個流,新的流包含所有流的資料。
  • union是DataStream → DataStream
  • connect隻能連接配接兩個流,而union可以連接配接多于兩個流
  • connect連接配接的兩個流類型可以不一緻,而union連接配接的流的類型必須一緻

6.coMap / CoFlatMap

  • 跟map and flatMap類似,隻不過作用在ConnectedStreams上
  • ConnectedStreams → DataStream

7.split / select / SideOutput

  • split
  • DataStream → SplitStream
    • 按照指定标準将指定的DataStream拆分成多個流用SplitStream來表示
  • select
    • SplitStream → DataStream
    • 跟split搭配使用,從SplitStream中選擇一個或多個流

8.實體分區

  • rebalance
  • 含義:再平衡,用來減輕資料傾斜
  • 轉換關系: DataStream → DataStream
  • 使用場景:處理資料傾斜,比如某個kafka的partition的資料比較多
  • rescale
  • 原理:通過輪詢排程将元素從上遊的task一個子集發送到下遊task的一個子集
  • 轉換關系:DataStream → DataStream
  • 使用場景:資料傳輸都在一個TaskManager内,不需要通過網絡。
  • partitioner
  • 轉換關系:DataStream → DataStream
  • 使用場景:自定義資料處理負載
  • 實作方法:
    • 實作org.apache.flink.api.common.functions.Partitioner接口
    • 覆寫partition方法
    • 設計算法傳回partitionId

Flink程式

Flink程式由幾個基本子產品組成:

  • 擷取執行環境
  • 加載/建立初始資料
  • 指定資料轉換
  • 資料接收
  • 觸發程式執行

1.執行環境

StreamExecutionEnvironment是所有Flink程式的基礎。可以使用StreamExecutionEnvironment上的這些靜态方法獲得:

  • getExecutionEnvironment()
  • createLocalEnvironment()
  • createRemoteEnvironment(String host, int port, String… jarFiles)

通常,隻需要使用 getExecutionEnvironment() , 因為這将根據上下文做正确的事,如果你執行程式在IDE或普通Java程式将建立一個本地環境,将執行程式在本地機器上。如果您從您的程式建立了一個JAR檔案,并通過指令行調用它,那麼Flink叢集管理器将執行您的主方法,getExecutionEnvironment()将傳回一個在叢集上執行您的程式的執行環境。

2.加載/建立初始資料

flink的資料源的來源很豐富,檔案,hadoop,kafka等都可以作為資料的來源。flink提供的操作如下:

  • readTextFile(path)- TextInputFormat逐行讀取文本檔案,即符合規範的檔案,并将它們作為字元串傳回。
  • readFile(fileInputFormat, path) - 按指定的檔案輸入格式指定讀取(一次)檔案。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) -

    這是前兩個内部調用的方法。它path根據給定的内容讀取檔案fileInputFormat。根據提供的内容watchType,此源可以定期監視(每intervalms)新資料(FileProcessingMode.PROCESS_CONTINUOUSLY)的路徑,或者處理目前在路徑中的資料并退出(FileProcessingMode.PROCESS_ONCE)。使用該pathFilter,使用者可以進一步排除正在處理的檔案。

  • socketTextStream - 從套接字讀取。元素可以用分隔符分隔
  • fromCollection(Collection) - 從Java Java.util.Collection建立資料流。集合中的所有元素必須屬于同一類型
  • fromCollection(Iterator, Class) - 從疊代器建立資料流。該類指定疊代器傳回的元素的資料類型。
  • fromElements(T …) - 從給定的對象序列建立資料流。所有對象必須屬于同一類型
  • fromParallelCollection(SplittableIterator, Class) - 并行地從疊代器建立資料流。該類指定疊代器傳回的元素的資料類型。
  • generateSequence(from, to) - 并行生成給定間隔中的數字序列
  • addSource 自定義資料來源,例如kafka的資料來源就需要調用次方法,addSource(new FlinkKafkaConsumer08<>(…));

3.指定資料轉換

參考 Datastream轉換

4.資料接收

資料接收器可以從資料源中,也可以到資料源中,即源的操作也可以當做資料的接收器,用于存儲,從流入flink的資料也可以流入到kafka中,

  • print(); 使用者資料的列印
  • writeAsText(String path) 資料輸入到執行檔案中
  • addSource 自定義資料接收器

5.觸發程式執行

觸發執行程式調用 execute()上StreamExecutionEnvironment。根據執行的類型,ExecutionEnvironment将在本地計算機上觸發執行或送出程式以在群集上執行。

該execute()方法傳回一個JobExecutionResult,包含執行時間和累加器結果。

程式的執行并不是從main方法開始,而是任務從調用execute開發,而前面的資料源,資料轉化,都在不同的線程中執行,而後調用execute執行。

本地運作

LocalStreamEnvironment在建立Flink系統的同一個JVM程序中啟動Flink系統。如果從IDE啟動LocalEnvironment,則可以在代碼中設定斷點,并輕松地調試程式。

建立和使用LocalEnvironment如下:

val env = StreamExecutionEnvironment.createLocalEnvironment
val LocalSources = env.addSource(/* some source */);
//開始執行
env.execute();
           

參考連結

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/

https://www.cnblogs.com/xiexiandong/category/1748467.html

https://blog.csdn.net/springk/article/details/109383292

https://blog.csdn.net/weixin_30613433/article/details/99507272