說明:本文為《Flink大資料項目實戰》學習筆記,想通過視訊系統學習Flink這個最火爆的大資料計算架構的同學,推薦學習課程:
Flink大資料項目實戰:http://t.cn/EJtKhaz

從上圖可以看出Flink 中的Time大緻分為以下三類:
1.Event Time:Event 真正産生的時間,我們稱之為Event Time。
2.Ingestion Time:Event 事件被Source拿到,進入Flink處理引擎的時間,我們稱之為Ingestion Time。
3.Window Processing Time:Event事件被Flink 處理(比如做windows操作)時的時間,我們稱之為Window Processing Time。
4. Stateful Operations
什麼是狀态?
state一般指一個具體的task/operator的狀态,比如目前處理那些資料,資料處理的進度等等。
Flink state操作狀态分為兩類:
1.Operator State
Operator State跟一個特定operator的一個并發執行個體綁定,整個operator隻對應一個state。
2.Keyed State
基于KeyedStream上的狀态。這個狀态是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。
Flink 每個操作狀态又分為兩類:
Keyed State和Operator State可以以兩種形式存在:原始狀态和托管狀态( Flink架構管理的狀态)。
1.原始狀态:比如一個字元串或者數組,它需要序列化,儲存到記憶體或磁盤,或者外部存儲中,這就是它的原始狀态。
2.托管狀态:比如資料放在Hash表中,或者放在HDFS中,或者放在rocksdb中,這種就是托管狀态。當需要處理資料的時候,從托管狀态中讀取出來,還原成原始狀态,甚至變量和集合,然後再進行處理。
5.Checkpoints(備份)
什麼是checkpoint?
所謂checkpoint,就是在某一時刻,将所有task的狀态做一個快照(snapshot),然後存儲到State Backend(比如hdfs)。checkpoint擁有輕量級容錯機制,可以保證exactly-once 語義,用于内部失敗的恢複(比如當應用挂了,它可以自動恢複從上次的進度接着執行)。
checkpoint基本原理:通過往source 注入barrier(可以了解為特殊的Event),barrier作為checkpoint的标志,它會自動做checkpoint無需人工幹預。
6.Savepoint
savepoint是流處理過程中的狀态曆史版本,它具有可以replay的功能。用于外部恢複,當Flink應用重新開機和更新,它會做一個先做一個savepoint,下次應用啟動可以接着上次進度執行。
savepoint兩種觸發方式:
1.Cancel with savepoint
2.手動主動觸發
savepoint可以了解為是一種特殊的checkpoint,savepoint就是指向checkpoint的一個指針,需要手動觸發,而且不會過期,不會被覆寫,除非手動删除。正常情況下的線上環境是不需要設定savepoint的。除非對job或叢集做出重大改動的時候,需要進行測試運作。
(4)Flink Runtime
1. Flink運作時架構
1.1Flink架構
Flink 運作時架構主要包含幾個部分:Client、JobManager(master節點)和TaskManger(slave節點)。
Client:Flink 作業在哪台機器上面送出,那麼目前機器稱之為Client。使用者開發的Program 代碼,它會建構出DataFlow graph,然後通過Client送出給JobManager。
JobManager:是主(master)節點,相當于YARN裡面的REsourceManager,生成環境中一般可以做HA 高可用。JobManager會将任務進行拆分,排程到TaskManager上面執行。
TaskManager:是從節點(slave),TaskManager才是真正實作task的部分。
Client送出作業到JobManager,就需要跟JobManager進行通信,它使用Akka架構或者庫進行通信,另外Client與JobManager進行資料互動,使用的是Netty架構。Akka通信基于Actor System,Client可以向JobManager發送指令,比如Submit job或者Cancel /update job。JobManager也可以回報資訊給Client,比如status updates,Statistics和results。
Client送出給JobManager的是一個Job,然後JobManager将Job拆分成task,送出給TaskManager(worker)。JobManager與TaskManager也是基于Akka進行通信,JobManager發送指令,比如Deploy/Stop/Cancel Tasks或者觸發Checkpoint,反過來TaskManager也會跟JobManager通信傳回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之間的資料通過網絡進行傳輸,比如Data Stream做一些算子的操作,資料往往需要在TaskManager之間做資料傳輸。
1.2. TaskManger Slot
TaskManager是程序,他下面運作的task(整個Flink應用是Job,Job可以拆分成很多個task)是線程,每個task/subtask(線程)下可運作一個或者多個operator,即OperatorChain。Task是class,抽象的,subtask是Object(類比學習),具體的。
一個TaskManager通過Slot(任務槽)來控制它上面可以接受多少個task,比如一個TaskManager劃分了3個Task Slot(僅限記憶體托管,目前CPU未做隔離),它隻能接受3個task。Slot均分TaskManager所托管的記憶體,比如一個TaskManager有6G記憶體,那麼每個Slot配置設定2G。
同一個TaskManager中的task共享TCP連接配接(通過多路複用)和心跳消息。它們還可以共享資料集和資料結構,進而減少每個任務的開銷。一個TaskManager有N個槽位隻能接受N個Task嗎?不是,後面會講共享槽位。
1.3. OperatorChain && Task
為了更高效地分布式執行,Flink會盡可能地将operator的subtask連結(chain)在一起形成task。以wordcount為例,解析不同視圖下的資料流,如下圖所示。
資料流(邏輯視圖)
建立Source(并行度設定為1)讀取資料源,資料經過FlatMap(并行度設定為2)做轉換操作,然後資料經過Key Agg(并行度設定為2)做聚合操作,最後資料經過Sink(并行度設定為2)将資料輸出。
資料流(并行化視圖)
并行度為1的Source讀取資料源,然後FlatMap并行度為2讀取資料源進行轉化操作,然後資料經過Shuffle交給并行度為2的Key Agg進行聚合操作,然後并行度為2的Sink将資料輸出,未優化前的task總和為7。
資料流(優化後視圖)
并行度為1的Source讀取資料源,然後FlatMap并行度為2讀取資料源進行轉化操作,然後資料經過Shuffle交給Key Agg進行聚合操作,此時Key Agg和Sink操作合并為一個task(注意:将KeyAgg和Sink兩個operator進行了合并,因為這兩個合并後并不會改變整體的拓撲結構),它們一起的并行度為2,資料經過Key Agg和Sink之後将資料輸出,優化後的task總和為5.
1.4. OperatorChain的優點群組成條件
OperatorChain的優點
1.減少線程切換
2.減少序列化與反序列化
3.減少資料在緩沖區的交換
4.減少延遲并且提高吞吐能力
OperatorChain 組成條件
1.沒有禁用Chain
2.上下遊算子并行度一緻 。
3.下遊算子的入度為1(也就是說下遊節點沒有來自其他節點的輸入)。
4.上下遊算子在同一個slot group(後面緊跟着就會講如何通過slot group先配置設定到同一個solt,然後才能chain) 。
5.下遊節點的 chain 政策為 ALWAYS(可以與上下遊連結,map、flatmap、filter等預設是ALWAYS)。
6.上遊節點的 chain 政策為 ALWAYS 或 HEAD(隻能與下遊連結,不能與上遊連結,Source預設是HEAD)。
7.上下遊算子之間沒有資料shuffle (資料分區方式是 forward)。
1.5. 程式設計改變OperatorChain行為
Operator chain的行為可以通過程式設計API中進行指定,可以通過在DataStream的operator後面(如someStream.map(..))調用startNewChain()來訓示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。可以調用disableChaining()來訓示該operator不參與chaining(不會與前後的operator chain一起)。可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。可以設定Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通過調整并行度,來調整Operator chain。
2. Slot配置設定與共享
2.1共享Slot
預設情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。
允許slot共享有以下兩點好處:
1.Flink叢集需要的任務槽與作業中使用的最高并行度正好相同(前提,保持預設SlotSharingGroup)。也就是說我們不需要再去計算一個程式總共會起多少個task了。
2.更容易獲得更充分的資源利用。如果沒有slot共享,那麼非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,将task的2個并行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均配置設定到重的subtasks。
2.2共享Slot執行個體
将 WordCount 的并行度從之前的2個增加到6個(Source并行度仍為1),并開啟slot共享(所有operator都在default共享組),将得到如上圖所示的slot分布圖。
首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6個slots(最高并行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地配置設定到各個 TaskManager。
2.3 SlotSharingGroup(soft)
SlotSharingGroup是Flink中用來實作slot共享的類,它盡可能地讓subtasks共享一個slot。
保證同一個group的并行度相同的sub-tasks 共享同一個slots。算子的預設group為default(即預設一個job下的subtask都可以共享一個slot)
為了防止不合理的共享,使用者也能通過API來強制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。怎麼确定一個未做SlotSharingGroup設定算子的SlotSharingGroup什麼呢(根據上遊算子的group 和自身是否設定group共同确定)。适當設定可以減少每個slot運作的線程數,進而整體上減少機器的負載。
2.4 CoLocationGroup(強制)
CoLocationGroup可以保證所有的并行度相同的sub-tasks運作在同一個slot,主要用于疊代流(訓練機器學習模型)。
3. Slot & parallelism的關系
3.1 Slots && parallelism
如上圖所示,有兩個TaskManager,每個TaskManager有3個槽位。假設source操作并行度為3,map操作的并行度為4,sink的并行度為4,所需的task slots數與job中task的最高并行度一緻,最高并行度為4,那麼使用的Slot也為4。
3.2如何計算Slot
如何計算一個應用需要多少slot?
如果不設定SlotSharingGroup,那麼需要的Slot數為應用的最大并行度數。如果設定了SlotSharingGroup,那麼需要的Slot數為所有SlotSharingGroup中的最大并行度之和。比如已經強制指定了map的slot共享組為test,那麼map和map下遊的組為test,map的上遊source的組為預設的default,此時default組中最大并行度為10,test組中最大并行度為20,那麼需要的Slot=10+20=30。
4.Flink部署模式
4.1 Local 本地部署
Flink 可以運作在 Linux、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的隻是 Java 1.7.x或更高版本,本地運作會啟動Single JVM,主要用于測試調試代碼。
4.2 Standalone Cluster叢集部署
軟體需求
1.安裝Java1.8或者更高版本
2.叢集各個節點需要ssh免密登入
Flink Standalone 運作流程前面已經講過,這裡就不在贅叙。
4.3Flink ON YARN
Flink ON YARN工作流程如下所示:
首先送出job給YARN,就需要有一個Flink YARN Client。
第一步:Client将Flink 應用jar包和配置檔案上傳到HDFS。
第二步:Client向REsourceManager注冊resources和請求APPMaster Container
第三步:REsourceManager就會給某一個Worker節點配置設定一個Container來啟動APPMaster,JobManager會在APPMaster中啟動。
第四步:APPMaster為Flink的TaskManagers配置設定容器并啟動TaskManager,TaskManager内部會劃分很多個Slot,它會自動從HDFS下載下傳jar檔案和修改後的配置,然後運作相應的task。TaskManager也會與APPMaster中的JobManager進行互動,維持心跳等。
5.Flink Standalone叢集部署
安裝Flink之前需要提前安裝好JDK,這裡我們安裝的是JDK1.8版本。
5.1下載下傳
可以到官網:https://archive.apache.org/dist/flink/ 将Flink1.6.2版本下載下傳到本地。
5.2解壓
将下載下傳的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節點
使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz指令解壓flink安裝包
友善後期flink多版本的使用,可以建立flink軟連接配接
ln -s flink-1.6.2 flink
5.3配置環境變量
vi ~/.bashrc
export FLINK_HOME=/home/hadoop/app/flink
export PATH=$FLINK_HOME/bin:$PATH
使配置檔案生效
source ~/.bashrc
檢視flink版本
flink -v
5.4修改配置檔案
1.修改flink-conf.yaml配置檔案
vi flink-conf.yaml
#JobManager位址
jobmanager.rpc.address: cdh01
#槽位配置為3
taskmanager.numberOfTaskSlots: 3
#設定并行度為3
parallelism.default: 3
2.修改masters配置
vi masters
cdh01:8081
3.修改slaves配置
vi slaves
cdh01
cdh02
cdh03
5.5主節點安裝目錄同步到從節點
通過deploy.sh腳本将flink安裝目錄同步到其他節點。
deploy.sh flink-1.6.2 /home/hadoop/app/ slave
在從節點分别建立flink軟連接配接
ln -s flink-1.6.2 flink
5.6啟動服務
進入flink bin目錄執行啟動叢集腳本start-cluster.sh
bin/start-cluster.sh
通過web檢視flink叢集,檢視相關叢集資訊。
http://cdh01:8081
5.7測試運作
檢視官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.啟動nc服務
nc -l 9000
2.送出flink作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
3.輸入測試資料
5.7測試運作
檢視官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.啟動nc服務
nc -l 9000
2.送出flink作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
3.輸入測試資料
4.檢視運作結果
在TaskManager界面檢視Flink運作結果
(5)Flink開發環境搭建
1. 建立Flink項目及依賴管理
1.1建立Flink項目
官網建立Flink項目有兩種方式:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html
方式一:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.2
方式二
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.2
這裡我們仍然使用第一種方式建立Flink項目。
打開終端,切換到對應的目錄,通過maven建立flink項目
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.6.2
項目建構過程中需要輸入groupId,artifactId,version和package
Flink項目建立成功
打開IDEA工具,點選open。
選擇剛剛建立的flink項目
Flink項目已經成功導入IDEA開發工具
通過maven打包測試運作
mvn clean package
重新整理target目錄可以看到剛剛打包的flink項目
1.2. Flink依賴
Core Dependencies(核心依賴):
1.核心依賴打包在flink-dist*.jar裡
2.包含coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management等必須的依賴
注意:核心依賴不會随着應用打包(<scope>provided</scope>)
3.核心依賴項盡可能小,并避免依賴項沖突
Pom檔案中添加核心依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
注意:不會随着應用打包。
User Application Dependencies(應用依賴):
connectors, formats, or libraries(CEP, SQL, ML)、
注意:應用依賴會随着應用打包(scope保持預設值就好)
Pom檔案中添加應用依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.6.2</version>
</dependency>
注意:應用依賴按需選擇,會随着應用打包,可以通過Maven Shade插件進行打包。
1.3. 關于Scala版本
Scala各版本之間是不相容的(你基于Scala2.12開發Flink應用就不能依賴Scala2.11的依賴包)。
隻使用Java的開發人員可以選擇任何Scala版本,Scala開發人員需要選擇與他們的應用程式的Scala版本比對的Scala版本。
1.4. Hadoop依賴
不要把Hadoop依賴直接添加到Flink application,而是:
export HADOOP_CLASSPATH=`hadoop classpath`
Flink元件啟動時會使用該環境變量的
特殊情況:如果在Flink application中需要用到Hadoop的input-/output format,隻需引入Hadoop相容包即可(Hadoop compatibility wrappers)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.2</version>
</dependency>
1.5 Flink項目打包
Flink 可以使用maven-shade-plugin對Flink maven項目進行打包,具體打包指令為mvn clean
package。
2. 自己編譯Flink
2.1安裝maven
1.下載下傳
到maven官網下載下傳安裝包,這裡我們可以選擇使用apache-maven-3.3.9-bin.tar.gz。
2.解壓
将apache-maven-3.3.9-bin.tar.gz安裝包上傳至主節點的,然後使用tar指令進行解壓
tar -zxvf apache-maven-3.3.9-bin.tar.gz
3.建立軟連接配接
ln -s apache-maven-3.3.9 maven
4.配置環境變量
vi ~/.bashrc
export MAVEN_HOME=/home/hadoop/app/maven
export PATH=$MAVEN_HOME/bin:$PATH
5.生效環境變量
source ~/.bashrc
6.檢視maven版本
mvn –version
7. settings.xml配置阿裡鏡像
添加阿裡鏡像
<mirror>
<id>nexus-osc</id>
<mirrorOf>*</mirrorOf>
<name>Nexus osc</name>
<url>http://maven.aliyun.com/nexus/content/repositories/central</url>
</mirror>
2.2安裝jdk
編譯flink要求jdk8或者以上版本,這裡已經提前安裝好jdk1.8,具體安裝配置不再贅叙,檢視版本如下:
[hadoop@cdh01 conf]$ java -version
java version "1.8.0_51"
Java(TM) SE Runtime Environment (build 1.8.0_51-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
2.3下載下傳源碼
登入github:https://github.com/apache/flink,擷取flink下載下傳位址:https://github.com/apache/flink.git
打開Flink主節點終端,進入/home/hadoop/opensource目錄,通過git clone下載下傳flink源碼:
git clone https://github.com/apache/flink.git
錯誤1:如果Linux沒有安裝git,會報如下錯誤:
bash: git: command not found
解決:git安裝步驟如下所示:
1.安裝編譯git時需要的包(注意需要在root使用者下安裝)
yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel
yum install gcc perl-ExtUtils-MakeMaker
2.删除已有的git
yum remove git
3.下載下傳git源碼
先安裝wget
yum -y install wget
使用wget下載下傳git源碼
wget https://www.kernel.org/pub/software/scm/git/git-2.0.5.tar.gz
解壓git
tar xzf git-2.0.5.tar.gz
編譯安裝git
cd git-2.0.5
make prefix=/usr/local/git all
sudo make prefix=/usr/local/git install
echo "export PATH=$PATH:/usr/local/git/bin" >> ~/.bashrc
source ~/.bashrc
檢視git版本
git –version
錯誤2:git clone https://github.com/apache/flink.git
Cloning into \'flink\'...
fatal: unable to access \'https://github.com/apache/flink.git/\': SSL connect error
解決:
更新 nss 版本:yum update nss
2.4切換對應flink版本
使用如下指令檢視flink版本分支
git tag
切換到flink對應版本(這裡我們使用flink1.6.2)
git checkout release-1.6.2
2.5編譯flink
進入flink 源碼根目錄:/home/hadoop/opensource/flink,通過maven編譯flink
mvn clean install -DskipTests -Dhadoop.version=2.6.0
報錯:
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 06:58 min
[INFO] Finished at: 2019-01-18T22:11:54-05:00
[INFO] Final Memory: 106M/454M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve dependencies for project org.apache.flink:flink-mapr-fs:jar:1.6.2: Could not find artifact com.mapr.hadoop:maprfs:jar:5.2.1-mapr in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <goals> -rf :flink-mapr-fs
報錯缺失flink-mapr-fs,需要手動下載下傳安裝。
解決:
1.下載下傳maprfs jar包
通過手動下載下傳maprfs-5.2.1-mapr.jar包,下載下傳位址位址:https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/hadoop/maprfs/5.2.1-mapr/
2.上傳至主節點
将下載下傳的maprfs-5.2.1-mapr.jar包上傳至主節點的/home/hadoop/downloads目錄下。
3.手動安裝
手動安裝缺少的包到本地倉庫
mvn install:install-file -DgroupId=com.mapr.hadoop -DartifactId=maprfs -Dversion=5.2.1-mapr -Dpackaging=jar -Dfile=/home/hadoop/downloads/maprfs-5.2.1-mapr.jar
4.繼續編譯
使用maven繼續編譯flink(可以排除剛剛已經安裝的包)
mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3 -rf :flink-mapr-fs
報錯:
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 05:51 min
[INFO] Finished at: 2019-01-18T22:39:20-05:00
[INFO] Final Memory: 108M/480M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-mapr-fs: Compilation failure: Compilation failure:
[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44] package org.apache.hadoop.fs does not exist
[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45] cannot find symbol
[ERROR] symbol: class Configuration
[ERROR] location: package org.apache.hadoop.conf
[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/
runtime/fs/maprfs/MapRFileSystem.java:[73,93] cannot find symbol
[ERROR] symbol: class Configuration
缺失org.apache.hadoop.fs包,報錯找不到。
解決:
flink-mapr-fs子產品的pom檔案中添加如下依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
繼續往後編譯:
mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3 -rf :flink-mapr-fs
又報錯:
[ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.6.2: Could not find artifact io.confluent:kafka-schema-registry-client:jar:3.3.1 in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]
[ERROR]
報錯缺少kafka-schema-registry-client-3.3.1.jar 包
解決:
手動下載下傳kafka-schema-registry-client-3.3.1.jar包,下載下傳位址如下:
http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar
将下載下傳的kafka-schema-registry-client-3.3.1.jar上傳至主節點的目錄下/home/hadoop/downloads
手動安裝缺少的kafka-schema-registry-client-3.3.1.jar包
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=/home/hadoop/downloads/kafka-schema-registry-client-3.3.1.jar
繼續往後編譯
mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3 -rf :flink-mapr-fs
(6)Flink API 通用基本概念
1. 繼續侃Flink程式設計基本套路
1.1 DataSet and DataStream
DataSet and DataStream表示Flink app中的分布式資料集。它們包含重複的、不可變資料集。DataSet有界資料集,用在Flink批處理。DataStream可以是無界,用在Flink流處理。它們可以從資料源建立,也可以通過各種轉換操作建立。
1.2共同的程式設計套路
DataSet and DataStream 這裡以WordCount為例,共同的程式設計套路如下所示:
1.擷取執行環境(execution environment)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.加載/建立初始資料集
// 讀取輸入資料
DataStream<String> text;
if (params.has("input")) {
// 讀取text檔案
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// 讀取預設測試資料集
text = env.fromElements(WordCountData.WORDS);
}
3.對資料集進行各種轉換操作(生成新的資料集)
// 切分每行單詞
text.flatMap(new Tokenizer())
//對每個單詞分組統計詞頻數
.keyBy(0).sum(1);
4.指定将計算的結果放到何處去
// 輸出統計結果
if (params.has("output")) {
//寫入檔案位址
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
//資料列印控制台
counts.print();
}
5.觸發APP執行
// 執行flink 程式
env.execute("Streaming WordCount");
1.3惰性計算
Flink APP都是延遲執行的,隻有當execute()被顯示調用時才會真正執行,本地執行還是在叢集上執行取決于執行環境的類型。好處:使用者可以根據業務建構複雜的應用,Flink可以整體進優化并生成執行計劃。
2. 指定鍵(Specifying Keys)
2.1誰需要指定鍵
哪些操作需要指定key呢?常見的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce, Aggregate, Windows等。
Flink程式設計模型的key是虛拟的,不需要你建立鍵值對,可以在具體算子通過參數指定,如下代碼所示:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
2.2為Tuple定義鍵
Tuple定義鍵的方式有很多種,接下來我們一起看幾個示例:
按照指定屬性分組
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:此時表示使用Tuple3三元組的第一個成員作為keyBy
按照組合鍵進行分組
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
注意:此時表示使用Tuple3三元組的前兩個元素一起作為keyBy
特殊情況:嵌套Tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:這裡使用KeyBy(0)指定鍵,系統将會使用整個Tuple2作為鍵(整型和浮點型的)。如果想使用Tuple2内部字段作為鍵,你可以使用字段來表示鍵,這種方法會在後面闡述。
2.3使用字段表達式定義鍵
基于字元串的字段表達式可以用來引用嵌套字段(例如Tuple,POJO)
public class WC {
public String word;
public User user;
public int count;
}
public class User{
public int age;
public String zip;
}
示例:通過word字段進行分組
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
文法:
1.直接使用字段名選擇POJO字段
例如 user 表示 一個POJO的user字段
2.Tuple通過offset來選擇
"_1"和"5"分别代表第一和第六個Scala Tuple字段
“f0” and “f5”分别代表第一和第六個Java Tuple字段
3.選擇POJO和Tuples的嵌套屬性
user.zip
在scala裡你可以"_2.user.zip"或"user._4.1.zip”
在java裡你可以“2.user.zip”或者" user.f0.1.zip ”
4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也适用。
2.4字段表達式執行個體-Java
以下定義兩個Java類:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我們一起看看如下key字段如何了解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.5字段表達式執行個體-Scala
以下定義兩個Scala類:
"_1"和"5"分别代表第一和第六個Scala Tuple字段
“f0” and “f5”分别代表第一和第六個Java Tuple字段
3.選擇POJO和Tuples的嵌套屬性
user.zip
在scala裡你可以"_2.user.zip"或"user._4.1.zip”
在java裡你可以“2.user.zip”或者" user.f0.1.zip ”
4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也适用。
2.4字段表達式執行個體-Java
以下定義兩個Java類:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我們一起看看如下key字段如何了解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.5字段表達式執行個體-Scala
以下定義兩個Scala類: