前言
Data Sources 是什麼呢?就字面意思其實就可以知道:資料來源。
Flink 做為一款流式計算架構,它可用來做批處理,即處理靜态的資料集、曆史的資料集;也可以用來做流處理,即實時的處理些實時資料流,實時的産生資料流結果,隻要資料源源不斷的過來,Flink 就能夠一直計算下去,這個 Data Sources 就是資料的來源地。
Flink 中你可以使用
StreamExecutionEnvironment.addSource(sourceFunction)
來為你的程式添加資料來源。
Flink 已經提供了若幹實作好了的 source functions,當然你也可以通過實作 SourceFunction 來自定義非并行的 source 或者實作 ParallelSourceFunction 接口或者擴充 RichParallelSourceFunction 來自定義并行的 source。
StreamExecutionEnvironment
中可以使用以下幾個已實作的 stream sources: ![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CMmJGZmVTZ5EWOzEGNiZjNwMGZmJTYjFjMmFmZyEDM08CXxIzLcZDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL4M3Lc9CX6MHc0RHaiojIsJye.png)
-
基于集合
1、fromCollection(Collection) - 從 Java 的 Java.util.Collection 建立資料流。集合中的所有元素類型必須相同。
2、fromCollection(Iterator, Class) - 從一個疊代器中建立資料流。Class 指定了該疊代器傳回元素的類型。
3、fromElements(T …) - 從給定的對象序列中建立資料流。所有對象類型必須相同。
4、fromParallelCollection(SplittableIterator, Class) - 從一個疊代器中建立并行資料流。Class 指定了該疊代器傳回元素的類型。
5、generateSequence(from, to) - 建立一個生成指定區間範圍内的數字序列的并行資料流。
eg3:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(2, "start", 2.0),
new Event(3, "foobar", 3.0),
...
);
-
基于檔案
1、readTextFile(path) - 讀取文本檔案,即符合 TextInputFormat 規範的檔案,并将其作為字元串傳回。
2、readFile(fileInputFormat, path) - 根據指定的檔案輸入格式讀取檔案(一次)。
3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法内部調用的方法。它根據給定的 fileInputFormat 和讀取路徑讀取檔案。根據提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監測給定路徑的新資料(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應檔案的資料并退出(FileProcessingMode.PROCESS_ONCE)。你可以通過 pathFilter 進一步排除掉需要處理的檔案。
實作:
在具體實作上,Flink 把檔案讀取過程分為兩個子任務,即目錄監控和資料讀取。每個子任務都由單獨的實體實作。目錄監控由單個非并行(并行度為1)的任務執行,而資料讀取由并行運作的多個任務執行。後者的并行性等于作業的并行性。單個目錄監控任務的作用是掃描目錄(根據 watchType 定期掃描或僅掃描一次),查找要處理的檔案并把檔案分割成切分片(splits),然後将這些切分片配置設定給下遊 reader。reader 負責讀取資料。每個切分片隻能由一個 reader 讀取,但一個 reader 可以逐個讀取多個切分片。
重要注意:
如果 watchType 設定為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當檔案被修改時,其内容将被重新處理。這會打破“exactly-once”語義,因為在檔案末尾附加資料将導緻其所有内容被重新處理。
如果 watchType 設定為 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次然後退出,而不等待 reader 完成檔案内容的讀取。當然 reader 會繼續閱讀,直到讀取所有的檔案内容。關閉 source 後就不會再有檢查點。這可能導緻節點故障後的恢複速度較慢,因為該作業将從最後一個檢查點恢複讀取。
eg1:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
eg3:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
-
基于 Socket:
socketTextStream(String hostname, int port) - 從 socket 讀取。元素可以用分隔符切分.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999) // 監聽 localhost 的 9999 端口過來的資料
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
-
自定義:
addSource - 添加一個新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取資料.
說下上面幾種的特點吧:
1、基于集合:有界資料集,更偏向于本地測試用
2、基于檔案:适合監聽檔案修改并讀取其内容
3、基于 Socket:監聽主機的 host port,從 Socket 中擷取資料
4、自定義 addSource:大多數的場景資料都是無界的,會源源不斷的過來。比如去消費 Kafka 某個 topic 上的資料,這時候就需要用到這個 addSource,可能因為用的比較多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 這個基礎類,它是 Flink Kafka 消費的最根本的類。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"), //從參數中擷取傳進來的 topic
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));
Flink 目前支援如下圖裡面常見的 Source:
如果你想自己自定義自己的 Source 呢?
那麼你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口,它繼承自一個标記接口(空接口)Function。