Flink概述
Flink是Apache的一個頂級項目,Apache Flink 是一個開源的分布式流處理和批處理系統。Flink 的核心是在資料流上提供資料分發、通信、具備容錯的分布式計算。同時,Flink 在流處理引擎上建構了批處理引擎,原生支援了疊代計算、記憶體管理和程式優化。
現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理一般需要支援低延遲、Exactly-once保證,而批處理需要支援高吞吐、高效處理。
Flink從另一個視角看待流處理和批處理,将二者統一起來:Flink是完全支援流處理,也就是說作為流處理看待時輸入資料流是×××的;批處理被作為一種特殊的流處理,隻是它的輸入資料流被定義為有界的。
Flink流處理特性:
- 支援高吞吐、低延遲、高性能的流處理
- 支援帶有事件時間的視窗(Window)操作
- 支援有狀态計算的Exactly-once語義
- 支援高度靈活的視窗(Window)操作,支援基于time、count、session,以及data-driven的視窗操作
- 支援具有Backpressure功能的持續流模型
- 支援基于輕量級分布式快照(Snapshot)實作的容錯
- 一個運作時同時支援Batch on Streaming處理和Streaming處理
- Flink在JVM内部實作了自己的記憶體管理
- 支援疊代計算
- 支援程式自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存
Flink架構圖:
Flink以層級式系統形式元件其軟體棧,不同層的棧建立在其下層基礎上,并且各層接受程式不同層的抽象形式。
在最基本的層面上,一個Flink應用程式是由以下幾部分組成:
- Data source: 資料源,将資料輸入到Flink中
- Transformations: 處理資料
- Data sink: 将處理後的資料傳輸到某個地方
如下圖:
目前Flink支援如下架構:
- Apache Kafka (sink/source)
- Elasticsearch 1.x / 2.x / 5.x (sink)
- HDFS (sink)
- RabbitMQ (sink/source)
- Amazon Kinesis Streams (sink/source)
- Twitter (source)
- Apache NiFi (sink/source)
- Apache Cassandra (sink)
- Redis, Flume, and ActiveMQ (via Apache Bahir) (sink)
Flink官網位址如下:
http://flink.apache.org/
部分内容參考自如下文章:
https://blog.csdn.net/jdoouddm7i/article/details/62039337
使用Flink完成wordcount統計
Flink下載下傳位址:
http://flink.apache.org/downloads.html
Flink快速開始文檔位址:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/setup_quickstart.html
注:安裝Flink之前系統中需要安裝有jdk1.7以上版本的環境
我這裡下載下傳的是2.6版本的Flink:
[root@study-01 ~]# cd /usr/local/src/
[root@study-01 /usr/local/src]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop26-scala_2.11.tgz
[root@study-01 /usr/local/src]# tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz -C /usr/local
[root@study-01 /usr/local/src]# cd ../flink-1.4.2/
[root@study-01 /usr/local/flink-1.4.2]# ls
bin conf examples lib LICENSE log NOTICE opt README.txt resources tools
[root@study-01 /usr/local/flink-1.4.2]#
啟動Flink:
[root@study-01 /usr/local/flink-1.4.2]# ./bin/start-local.sh
[root@study-01 /usr/local/flink-1.4.2]# jps
6576 Jps
6131 JobManager
6499 TaskManager
[root@study-01 /usr/local/flink-1.4.2]#
啟動成功之後就可以通路主機ip的8081端口,進入到Flink的web頁面:
我們現在就可以開始實作wordcount案例了,我這裡有一個檔案,内容如下:
[root@study-01 /usr/local/flink-1.4.2]# cat /data/hello.txt
hadoop welcome
hadoop hdfs mapreduce
hadoop hdfs
hello hadoop
spark vs mapreduce
[root@study-01 /usr/local/flink-1.4.2]#
執行如下指令,實作wordcount案例,如果學習過Hadoop會發現這個指令和Hadoop上使用MapReduce實作wordcount案例是類似的:
[root@study-01 /usr/local/flink-1.4.2]# ./bin/flink run ./examples/batch/WordCount.jar --input file:///data/hello.txt --output file:///data/tmp/flink_wordcount_out
執行完成後,可以到web頁面上,檢視任務的執行資訊:
檢視輸出結果:
[root@study-01 /usr/local/flink-1.4.2]# cat /data/tmp/flink_wordcount_out
hadoop 4
hdfs 2
hello 1
mapreduce 2
spark 1
vs 1
welcome 1
[root@study-01 /usr/local/flink-1.4.2]#
Beam概述
Google的新老三駕馬車:
- 老的三駕馬車:GFS、MapReduce、BigTable
- 新的三駕馬車:Dremel、Pregel、Caffeine
我們都知道,Hadoop生态圈内的幾個架構都源于Google老的三駕馬車,而一些新的架構實作也是部分源于Google新的三駕馬車的概念。是以現在市面上的大資料相關架構很多,架構多就會導緻程式設計規範多、處理模式不一緻,而我們希望有一個工具能夠統一這些程式設計模型,是以,Beam就誕生了。
Apache Beam是 Apache 軟體基金會于2017年1 月 10 日對外宣布的開源平台。Beam 為建立複雜資料平行處理管道,提供了一個可移動(相容性好)的 API 層。這層 API 的核心概念基于 Beam 模型(以前被稱為 Dataflow 模型),并在每個 Beam 引擎上不同程度得執行。
背景:
2016 年 2 月份,谷歌及其合作夥伴向 Apache 捐贈了一大批代碼,創立了孵化中的 Beam 項目( 最初叫 Apache Dataflow)。這些代碼中的大部分來自于谷歌 Cloud Dataflow SDK——開發者用來寫流處理和批處理管道(pipelines)的庫,可在任何支援的執行引擎上運作。當時,支援的主要引擎是谷歌 Cloud Dataflow,附帶對 Apache Spark 和 開發中的 Apache Flink 支援。如今,它正式開放之時,已經有五個官方支援的引擎。除去已經提到的三個,還包括 Beam 模型和 Apache Apex。
Beam特點:
- 統一了資料批處理(batch)和流處理(stream)程式設計範式,
- 能在任何執行引擎上運作。
- 它不僅為模型設計、更為執行一系列資料導向的工作流提供了統一的模型。這些工作流包括資料處理、吸收和整合。
Beam的官方網站:
https://beam.apache.org/
将WordCount的Beam程式以多種不同Runner運作
Beam Java的快速開始文檔:
https://beam.apache.org/get-started/quickstart-java/
安裝Beam的前置也是需要系統具備jdk1.7以上版本的環境,以及Maven環境。
使用如下指令下載下傳Beam以及wordcount案例代碼:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.4.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
進入下載下傳後的目錄進行檢視:
[root@study-01 /usr/local/src]# cd word-count-beam/
[root@study-01 /usr/local/src/word-count-beam]# tree
.
├── pom.xml
└── src
├── main
│ └── java
│ └── org
│ └── apache
│ └── beam
│ └── examples
│ ├── common
│ │ ├── ExampleBigQueryTableOptions.java
│ │ ├── ExampleOptions.java
│ │ ├── ExamplePubsubTopicAndSubscriptionOptions.java
│ │ ├── ExamplePubsubTopicOptions.java
│ │ ├── ExampleUtils.java
│ │ └── WriteOneFilePerWindow.java
│ ├── complete
│ │ └── game
│ │ ├── GameStats.java
│ │ ├── HourlyTeamScore.java
│ │ ├── injector
│ │ │ ├── Injector.java
│ │ │ ├── InjectorUtils.java
│ │ │ └── RetryHttpInitializerWrapper.java
│ │ ├── LeaderBoard.java
│ │ ├── StatefulTeamScore.java
│ │ ├── UserScore.java
│ │ └── utils
│ │ ├── GameConstants.java
│ │ ├── WriteToBigQuery.java
│ │ ├── WriteToText.java
│ │ └── WriteWindowedToBigQuery.java
│ ├── DebuggingWordCount.java
│ ├── MinimalWordCount.java
│ ├── WindowedWordCount.java
│ └── WordCount.java
└── test
└── java
└── org
└── apache
└── beam
└── examples
├── complete
│ └── game
│ ├── GameStatsTest.java
│ ├── HourlyTeamScoreTest.java
│ ├── LeaderBoardTest.java
│ ├── StatefulTeamScoreTest.java
│ └── UserScoreTest.java
├── DebuggingWordCountTest.java
├── MinimalWordCountTest.java
└── WordCountTest.java
20 directories, 31 files
[root@study-01 /usr/local/src/word-count-beam]#
預設情況下,beam的runner是Direct,下面就用Direct來運作wordcount案例,指令如下:
[root@study-01 /usr/local/src/word-count-beam]# ls
pom.xml src target
[root@study-01 /usr/local/src/word-count-beam]#
[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/data/hello.txt --output=counts" -Pdirect-runner
運作的結果會存放在目前的目錄下:
[root@study-01 /usr/local/src/word-count-beam]# ls
counts-00000-of-00003 counts-00001-of-00003 counts-00002-of-00003 pom.xml src target
[root@study-01 /usr/local/src/word-count-beam]# more counts* # 檢視結果檔案
::::::::::::::
counts-00000-of-00003
::::::::::::::
welcome: 1
spark: 1
::::::::::::::
counts-00001-of-00003
::::::::::::::
hdfs: 2
hadoop: 4
mapreduce: 2
::::::::::::::
counts-00002-of-00003
::::::::::::::
hello: 1
vs: 1
[root@study-01 /usr/local/src/word-count-beam]#
如果需要指定其他的runner則可以使用--runner參數進行指定,例如我要指定runner為Flink,則修改指令如下即可:
[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=FlinkRunner --inputFile=/data/hello.txt --output=counts" -Pflink-runner
删除之前生成的檔案及目錄,我們來使用Spark的方式進行運作。使用Spark的話,也隻是修改--runner以及-Pspark參數即可:
[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=/data/hello.txt --output=counts" -Pspark-runner
[root@study-01 /usr/local/src/word-count-beam]# ls
counts-00000-of-00003 counts-00001-of-00003 counts-00002-of-00003 pom.xml src target
[root@study-01 /usr/local/src/word-count-beam]#
[root@study-01 /usr/local/src/word-count-beam]# more counts*
::::::::::::::
counts-00000-of-00003
::::::::::::::
spark: 1
::::::::::::::
counts-00001-of-00003
::::::::::::::
welcome: 1
hello: 1
mapreduce: 2
::::::::::::::
counts-00002-of-00003
::::::::::::::
vs: 1
hdfs: 2
hadoop: 4
[root@study-01 /usr/local/src/word-count-beam]#