天天看點

flink學習筆記-各種Time

說明:本文為《Flink大資料項目實戰》學習筆記,想通過視訊系統學習Flink這個最火爆的大資料計算架構的同學,推薦學習課程:

Flink大資料項目實戰:http://t.cn/EJtKhaz

flink學習筆記-各種Time

從上圖可以看出Flink 中的Time大緻分為以下三類:

1.Event Time:Event 真正産生的時間,我們稱之為Event Time。

2.Ingestion Time:Event 事件被Source拿到,進入Flink處理引擎的時間,我們稱之為Ingestion Time。

3.Window Processing Time:Event事件被Flink 處理(比如做windows操作)時的時間,我們稱之為Window Processing Time。

4. Stateful Operations

flink學習筆記-各種Time

什麼是狀态?

state一般指一個具體的task/operator的狀态,比如目前處理那些資料,資料處理的進度等等。

Flink state操作狀态分為兩類:

1.Operator State

Operator State跟一個特定operator的一個并發執行個體綁定,整個operator隻對應一個state。

2.Keyed State

基于KeyedStream上的狀态。這個狀态是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。

Flink 每個操作狀态又分為兩類:

Keyed State和Operator State可以以兩種形式存在:原始狀态和托管狀态( Flink架構管理的狀态)。

1.原始狀态:比如一個字元串或者數組,它需要序列化,儲存到記憶體或磁盤,或者外部存儲中,這就是它的原始狀态。

2.托管狀态:比如資料放在Hash表中,或者放在HDFS中,或者放在rocksdb中,這種就是托管狀态。當需要處理資料的時候,從托管狀态中讀取出來,還原成原始狀态,甚至變量和集合,然後再進行處理。

5.Checkpoints(備份)

flink學習筆記-各種Time

什麼是checkpoint?

所謂checkpoint,就是在某一時刻,将所有task的狀态做一個快照(snapshot),然後存儲到State Backend(比如hdfs)。checkpoint擁有輕量級容錯機制,可以保證exactly-once 語義,用于内部失敗的恢複(比如當應用挂了,它可以自動恢複從上次的進度接着執行)。

checkpoint基本原理:通過往source 注入barrier(可以了解為特殊的Event),barrier作為checkpoint的标志,它會自動做checkpoint無需人工幹預。

6.Savepoint

savepoint是流處理過程中的狀态曆史版本,它具有可以replay的功能。用于外部恢複,當Flink應用重新開機和更新,它會做一個先做一個savepoint,下次應用啟動可以接着上次進度執行。

savepoint兩種觸發方式:

1.Cancel with savepoint

2.手動主動觸發

savepoint可以了解為是一種特殊的checkpoint,savepoint就是指向checkpoint的一個指針,需要手動觸發,而且不會過期,不會被覆寫,除非手動删除。正常情況下的線上環境是不需要設定savepoint的。除非對job或叢集做出重大改動的時候,需要進行測試運作。

(4)Flink Runtime

1. Flink運作時架構

1.1Flink架構

Flink 運作時架構主要包含幾個部分:Client、JobManager(master節點)和TaskManger(slave節點)。

flink學習筆記-各種Time

Client:Flink 作業在哪台機器上面送出,那麼目前機器稱之為Client。使用者開發的Program 代碼,它會建構出DataFlow graph,然後通過Client送出給JobManager。

JobManager:是主(master)節點,相當于YARN裡面的REsourceManager,生成環境中一般可以做HA 高可用。JobManager會将任務進行拆分,排程到TaskManager上面執行。

TaskManager:是從節點(slave),TaskManager才是真正實作task的部分。

Client送出作業到JobManager,就需要跟JobManager進行通信,它使用Akka架構或者庫進行通信,另外Client與JobManager進行資料互動,使用的是Netty架構。Akka通信基于Actor System,Client可以向JobManager發送指令,比如Submit job或者Cancel /update job。JobManager也可以回報資訊給Client,比如status updates,Statistics和results。

Client送出給JobManager的是一個Job,然後JobManager将Job拆分成task,送出給TaskManager(worker)。JobManager與TaskManager也是基于Akka進行通信,JobManager發送指令,比如Deploy/Stop/Cancel Tasks或者觸發Checkpoint,反過來TaskManager也會跟JobManager通信傳回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之間的資料通過網絡進行傳輸,比如Data Stream做一些算子的操作,資料往往需要在TaskManager之間做資料傳輸。

1.2. TaskManger Slot

flink學習筆記-各種Time

TaskManager是程序,他下面運作的task(整個Flink應用是Job,Job可以拆分成很多個task)是線程,每個task/subtask(線程)下可運作一個或者多個operator,即OperatorChain。Task是class,抽象的,subtask是Object(類比學習),具體的。

一個TaskManager通過Slot(任務槽)來控制它上面可以接受多少個task,比如一個TaskManager劃分了3個Task Slot(僅限記憶體托管,目前CPU未做隔離),它隻能接受3個task。Slot均分TaskManager所托管的記憶體,比如一個TaskManager有6G記憶體,那麼每個Slot配置設定2G。

同一個TaskManager中的task共享TCP連接配接(通過多路複用)和心跳消息。它們還可以共享資料集和資料結構,進而減少每個任務的開銷。一個TaskManager有N個槽位隻能接受N個Task嗎?不是,後面會講共享槽位。

1.3. OperatorChain && Task

為了更高效地分布式執行,Flink會盡可能地将operator的subtask連結(chain)在一起形成task。以wordcount為例,解析不同視圖下的資料流,如下圖所示。

flink學習筆記-各種Time

資料流(邏輯視圖)

建立Source(并行度設定為1)讀取資料源,資料經過FlatMap(并行度設定為2)做轉換操作,然後資料經過Key Agg(并行度設定為2)做聚合操作,最後資料經過Sink(并行度設定為2)将資料輸出。

資料流(并行化視圖)

并行度為1的Source讀取資料源,然後FlatMap并行度為2讀取資料源進行轉化操作,然後資料經過Shuffle交給并行度為2的Key Agg進行聚合操作,然後并行度為2的Sink将資料輸出,未優化前的task總和為7。

資料流(優化後視圖)

并行度為1的Source讀取資料源,然後FlatMap并行度為2讀取資料源進行轉化操作,然後資料經過Shuffle交給Key Agg進行聚合操作,此時Key Agg和Sink操作合并為一個task(注意:将KeyAgg和Sink兩個operator進行了合并,因為這兩個合并後并不會改變整體的拓撲結構),它們一起的并行度為2,資料經過Key Agg和Sink之後将資料輸出,優化後的task總和為5.

1.4. OperatorChain的優點群組成條件

OperatorChain的優點

1.減少線程切換

2.減少序列化與反序列化

3.減少資料在緩沖區的交換

4.減少延遲并且提高吞吐能力

OperatorChain 組成條件

1.沒有禁用Chain

2.上下遊算子并行度一緻 。

3.下遊算子的入度為1(也就是說下遊節點沒有來自其他節點的輸入)。

4.上下遊算子在同一個slot group(後面緊跟着就會講如何通過slot group先配置設定到同一個solt,然後才能chain) 。

5.下遊節點的 chain 政策為 ALWAYS(可以與上下遊連結,map、flatmap、filter等預設是ALWAYS)。

6.上遊節點的 chain 政策為 ALWAYS 或 HEAD(隻能與下遊連結,不能與上遊連結,Source預設是HEAD)。

7.上下遊算子之間沒有資料shuffle (資料分區方式是 forward)。

1.5. 程式設計改變OperatorChain行為

Operator chain的行為可以通過程式設計API中進行指定,可以通過在DataStream的operator後面(如someStream.map(..))調用startNewChain()來訓示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。可以調用disableChaining()來訓示該operator不參與chaining(不會與前後的operator chain一起)。可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。可以設定Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通過調整并行度,來調整Operator chain。

2. Slot配置設定與共享

2.1共享Slot

flink學習筆記-各種Time

預設情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。

允許slot共享有以下兩點好處:

1.Flink叢集需要的任務槽與作業中使用的最高并行度正好相同(前提,保持預設SlotSharingGroup)。也就是說我們不需要再去計算一個程式總共會起多少個task了。

2.更容易獲得更充分的資源利用。如果沒有slot共享,那麼非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,将task的2個并行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均配置設定到重的subtasks。

2.2共享Slot執行個體

flink學習筆記-各種Time

将 WordCount 的并行度從之前的2個增加到6個(Source并行度仍為1),并開啟slot共享(所有operator都在default共享組),将得到如上圖所示的slot分布圖。

首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6個slots(最高并行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地配置設定到各個 TaskManager。

2.3 SlotSharingGroup(soft)

SlotSharingGroup是Flink中用來實作slot共享的類,它盡可能地讓subtasks共享一個slot。

保證同一個group的并行度相同的sub-tasks 共享同一個slots。算子的預設group為default(即預設一個job下的subtask都可以共享一個slot)

為了防止不合理的共享,使用者也能通過API來強制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。怎麼确定一個未做SlotSharingGroup設定算子的SlotSharingGroup什麼呢(根據上遊算子的group 和自身是否設定group共同确定)。适當設定可以減少每個slot運作的線程數,進而整體上減少機器的負載。

flink學習筆記-各種Time

2.4 CoLocationGroup(強制)

CoLocationGroup可以保證所有的并行度相同的sub-tasks運作在同一個slot,主要用于疊代流(訓練機器學習模型)。

3. Slot & parallelism的關系

3.1 Slots && parallelism

flink學習筆記-各種Time

如上圖所示,有兩個TaskManager,每個TaskManager有3個槽位。假設source操作并行度為3,map操作的并行度為4,sink的并行度為4,所需的task slots數與job中task的最高并行度一緻,最高并行度為4,那麼使用的Slot也為4。

3.2如何計算Slot

如何計算一個應用需要多少slot?

flink學習筆記-各種Time

如果不設定SlotSharingGroup,那麼需要的Slot數為應用的最大并行度數。如果設定了SlotSharingGroup,那麼需要的Slot數為所有SlotSharingGroup中的最大并行度之和。比如已經強制指定了map的slot共享組為test,那麼map和map下遊的組為test,map的上遊source的組為預設的default,此時default組中最大并行度為10,test組中最大并行度為20,那麼需要的Slot=10+20=30。

4.Flink部署模式

4.1 Local 本地部署

Flink 可以運作在 Linux、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的隻是 Java 1.7.x或更高版本,本地運作會啟動Single JVM,主要用于測試調試代碼。

4.2 Standalone Cluster叢集部署

軟體需求

1.安裝Java1.8或者更高版本

2.叢集各個節點需要ssh免密登入

Flink Standalone 運作流程前面已經講過,這裡就不在贅叙。

4.3Flink ON YARN

flink學習筆記-各種Time

Flink ON YARN工作流程如下所示:

首先送出job給YARN,就需要有一個Flink YARN Client。

第一步:Client将Flink 應用jar包和配置檔案上傳到HDFS。

第二步:Client向REsourceManager注冊resources和請求APPMaster  Container

第三步:REsourceManager就會給某一個Worker節點配置設定一個Container來啟動APPMaster,JobManager會在APPMaster中啟動。

第四步:APPMaster為Flink的TaskManagers配置設定容器并啟動TaskManager,TaskManager内部會劃分很多個Slot,它會自動從HDFS下載下傳jar檔案和修改後的配置,然後運作相應的task。TaskManager也會與APPMaster中的JobManager進行互動,維持心跳等。

5.Flink Standalone叢集部署

安裝Flink之前需要提前安裝好JDK,這裡我們安裝的是JDK1.8版本。

flink學習筆記-各種Time

5.1下載下傳

可以到官網:https://archive.apache.org/dist/flink/ 将Flink1.6.2版本下載下傳到本地。

flink學習筆記-各種Time

5.2解壓

将下載下傳的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節點

flink學習筆記-各種Time

使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz指令解壓flink安裝包

flink學習筆記-各種Time

友善後期flink多版本的使用,可以建立flink軟連接配接

ln -s flink-1.6.2 flink

flink學習筆記-各種Time

5.3配置環境變量

vi ~/.bashrc

export FLINK_HOME=/home/hadoop/app/flink

export PATH=$FLINK_HOME/bin:$PATH

使配置檔案生效

source ~/.bashrc

檢視flink版本

flink -v

5.4修改配置檔案

1.修改flink-conf.yaml配置檔案

vi flink-conf.yaml

#JobManager位址

jobmanager.rpc.address: cdh01

#槽位配置為3

taskmanager.numberOfTaskSlots: 3

#設定并行度為3

parallelism.default: 3

2.修改masters配置

vi masters

cdh01:8081

3.修改slaves配置

vi slaves

cdh01

cdh02

cdh03

5.5主節點安裝目錄同步到從節點

通過deploy.sh腳本将flink安裝目錄同步到其他節點。

deploy.sh flink-1.6.2 /home/hadoop/app/ slave

在從節點分别建立flink軟連接配接

ln -s flink-1.6.2 flink

5.6啟動服務

進入flink bin目錄執行啟動叢集腳本start-cluster.sh

bin/start-cluster.sh

flink學習筆記-各種Time

通過web檢視flink叢集,檢視相關叢集資訊。

http://cdh01:8081

5.7測試運作

檢視官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

1.啟動nc服務

nc -l 9000

2.送出flink作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

3.輸入測試資料

5.7測試運作

檢視官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

1.啟動nc服務

nc -l 9000

2.送出flink作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

3.輸入測試資料

flink學習筆記-各種Time

4.檢視運作結果

在TaskManager界面檢視Flink運作結果

flink學習筆記-各種Time

(5)Flink開發環境搭建

1. 建立Flink項目及依賴管理

1.1建立Flink項目

官網建立Flink項目有兩種方式:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html

方式一:

mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=1.6.2

方式二

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.2

這裡我們仍然使用第一種方式建立Flink項目。

打開終端,切換到對應的目錄,通過maven建立flink項目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.6.2

flink學習筆記-各種Time

項目建構過程中需要輸入groupId,artifactId,version和package

flink學習筆記-各種Time

Flink項目建立成功

flink學習筆記-各種Time

打開IDEA工具,點選open。

flink學習筆記-各種Time

選擇剛剛建立的flink項目

flink學習筆記-各種Time

Flink項目已經成功導入IDEA開發工具

flink學習筆記-各種Time

通過maven打包測試運作

mvn clean package

flink學習筆記-各種Time

重新整理target目錄可以看到剛剛打包的flink項目

flink學習筆記-各種Time

1.2. Flink依賴

Core Dependencies(核心依賴):

1.核心依賴打包在flink-dist*.jar裡

2.包含coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management等必須的依賴

注意:核心依賴不會随着應用打包(<scope>provided</scope>)

3.核心依賴項盡可能小,并避免依賴項沖突

Pom檔案中添加核心依賴

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>1.6.2</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>1.6.2</version>

<scope>provided</scope>

</dependency>

注意:不會随着應用打包。

User Application Dependencies(應用依賴):

connectors, formats, or libraries(CEP, SQL, ML)、

注意:應用依賴會随着應用打包(scope保持預設值就好)

Pom檔案中添加應用依賴

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.10_2.11</artifactId>

<version>1.6.2</version>

</dependency>

注意:應用依賴按需選擇,會随着應用打包,可以通過Maven Shade插件進行打包。

1.3. 關于Scala版本

Scala各版本之間是不相容的(你基于Scala2.12開發Flink應用就不能依賴Scala2.11的依賴包)。

隻使用Java的開發人員可以選擇任何Scala版本,Scala開發人員需要選擇與他們的應用程式的Scala版本比對的Scala版本。

1.4. Hadoop依賴

不要把Hadoop依賴直接添加到Flink application,而是:

export HADOOP_CLASSPATH=`hadoop classpath`

Flink元件啟動時會使用該環境變量的

特殊情況:如果在Flink application中需要用到Hadoop的input-/output format,隻需引入Hadoop相容包即可(Hadoop compatibility wrappers)

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-hadoop-compatibility_2.11</artifactId>

<version>1.6.2</version>

</dependency>

1.5 Flink項目打包

Flink 可以使用maven-shade-plugin對Flink maven項目進行打包,具體打包指令為mvn clean 

package。

2. 自己編譯Flink

2.1安裝maven

1.下載下傳

到maven官網下載下傳安裝包,這裡我們可以選擇使用apache-maven-3.3.9-bin.tar.gz。

2.解壓

将apache-maven-3.3.9-bin.tar.gz安裝包上傳至主節點的,然後使用tar指令進行解壓

tar -zxvf apache-maven-3.3.9-bin.tar.gz

3.建立軟連接配接

ln -s apache-maven-3.3.9 maven

4.配置環境變量

vi ~/.bashrc

export MAVEN_HOME=/home/hadoop/app/maven

export PATH=$MAVEN_HOME/bin:$PATH

5.生效環境變量

source ~/.bashrc

6.檢視maven版本

mvn –version

7. settings.xml配置阿裡鏡像

添加阿裡鏡像

<mirror>

<id>nexus-osc</id>

<mirrorOf>*</mirrorOf>

<name>Nexus osc</name>

<url>http://maven.aliyun.com/nexus/content/repositories/central</url>

</mirror>

2.2安裝jdk

編譯flink要求jdk8或者以上版本,這裡已經提前安裝好jdk1.8,具體安裝配置不再贅叙,檢視版本如下:

[hadoop@cdh01 conf]$ java -version

java version "1.8.0_51"

Java(TM) SE Runtime Environment (build 1.8.0_51-b16)

Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)

2.3下載下傳源碼

登入github:https://github.com/apache/flink,擷取flink下載下傳位址:https://github.com/apache/flink.git

打開Flink主節點終端,進入/home/hadoop/opensource目錄,通過git clone下載下傳flink源碼:

git clone https://github.com/apache/flink.git

錯誤1:如果Linux沒有安裝git,會報如下錯誤:

bash: git: command not found

解決:git安裝步驟如下所示:

1.安裝編譯git時需要的包(注意需要在root使用者下安裝)

yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel

yum install  gcc perl-ExtUtils-MakeMaker

2.删除已有的git

yum remove git

3.下載下傳git源碼

先安裝wget

yum -y install wget

使用wget下載下傳git源碼

wget https://www.kernel.org/pub/software/scm/git/git-2.0.5.tar.gz

解壓git

tar xzf git-2.0.5.tar.gz

編譯安裝git

cd git-2.0.5

make prefix=/usr/local/git all

sudo make prefix=/usr/local/git install

echo "export PATH=$PATH:/usr/local/git/bin" >> ~/.bashrc

source ~/.bashrc

檢視git版本

git –version

錯誤2:git clone https://github.com/apache/flink.git

Cloning into \'flink\'...

fatal: unable to access \'https://github.com/apache/flink.git/\': SSL connect error

解決:

更新 nss 版本:yum update nss

2.4切換對應flink版本

使用如下指令檢視flink版本分支

git tag

切換到flink對應版本(這裡我們使用flink1.6.2)

git checkout release-1.6.2

2.5編譯flink

進入flink 源碼根目錄:/home/hadoop/opensource/flink,通過maven編譯flink

mvn clean install -DskipTests -Dhadoop.version=2.6.0

報錯:

[INFO] BUILD FAILURE

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 06:58 min

[INFO] Finished at: 2019-01-18T22:11:54-05:00

[INFO] Final Memory: 106M/454M

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve dependencies for project org.apache.flink:flink-mapr-fs:jar:1.6.2: Could not find artifact com.mapr.hadoop:maprfs:jar:5.2.1-mapr in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions, please read the following articles:

[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

[ERROR]

[ERROR] After correcting the problems, you can resume the build with the command

[ERROR]   mvn <goals> -rf :flink-mapr-fs

報錯缺失flink-mapr-fs,需要手動下載下傳安裝。

解決:

1.下載下傳maprfs jar包

通過手動下載下傳maprfs-5.2.1-mapr.jar包,下載下傳位址位址:https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/hadoop/maprfs/5.2.1-mapr/

2.上傳至主節點

将下載下傳的maprfs-5.2.1-mapr.jar包上傳至主節點的/home/hadoop/downloads目錄下。

3.手動安裝

手動安裝缺少的包到本地倉庫

mvn install:install-file -DgroupId=com.mapr.hadoop -DartifactId=maprfs -Dversion=5.2.1-mapr -Dpackaging=jar  -Dfile=/home/hadoop/downloads/maprfs-5.2.1-mapr.jar

4.繼續編譯

使用maven繼續編譯flink(可以排除剛剛已經安裝的包)

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

報錯:

[INFO] BUILD FAILURE

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 05:51 min

[INFO] Finished at: 2019-01-18T22:39:20-05:00

[INFO] Final Memory: 108M/480M

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-mapr-fs: Compilation failure: Compilation failure:

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44] package org.apache.hadoop.fs does not exist

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45] cannot find symbol

[ERROR] symbol:   class Configuration

[ERROR] location: package org.apache.hadoop.conf

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/

runtime/fs/maprfs/MapRFileSystem.java:[73,93] cannot find symbol

[ERROR] symbol:   class Configuration

缺失org.apache.hadoop.fs包,報錯找不到。

解決:

flink-mapr-fs子產品的pom檔案中添加如下依賴:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop.version}</version>

</dependency>

繼續往後編譯:

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

又報錯:

[ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.6.2: Could not find artifact io.confluent:kafka-schema-registry-client:jar:3.3.1 in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]

[ERROR]

報錯缺少kafka-schema-registry-client-3.3.1.jar 包

解決:

手動下載下傳kafka-schema-registry-client-3.3.1.jar包,下載下傳位址如下:

http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar

将下載下傳的kafka-schema-registry-client-3.3.1.jar上傳至主節點的目錄下/home/hadoop/downloads

手動安裝缺少的kafka-schema-registry-client-3.3.1.jar包

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar  -Dfile=/home/hadoop/downloads/kafka-schema-registry-client-3.3.1.jar

繼續往後編譯

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

flink學習筆記-各種Time

(6)Flink API 通用基本概念

1. 繼續侃Flink程式設計基本套路

1.1 DataSet and DataStream

DataSet and DataStream表示Flink app中的分布式資料集。它們包含重複的、不可變資料集。DataSet有界資料集,用在Flink批處理。DataStream可以是無界,用在Flink流處理。它們可以從資料源建立,也可以通過各種轉換操作建立。

1.2共同的程式設計套路

DataSet and DataStream 這裡以WordCount為例,共同的程式設計套路如下所示:

1.擷取執行環境(execution environment)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2.加載/建立初始資料集

// 讀取輸入資料

DataStream<String> text;

if (params.has("input")) {

// 讀取text檔案

text = env.readTextFile(params.get("input"));

} else {

System.out.println("Executing WordCount example with default input data set.");

System.out.println("Use --input to specify file input.");

// 讀取預設測試資料集

text = env.fromElements(WordCountData.WORDS);

}

3.對資料集進行各種轉換操作(生成新的資料集)

// 切分每行單詞

text.flatMap(new Tokenizer())

//對每個單詞分組統計詞頻數

.keyBy(0).sum(1);

4.指定将計算的結果放到何處去

// 輸出統計結果

if (params.has("output")) {

//寫入檔案位址

counts.writeAsText(params.get("output"));

} else {

System.out.println("Printing result to stdout. Use --output to specify output path.");

//資料列印控制台

counts.print();

}

5.觸發APP執行

// 執行flink 程式

env.execute("Streaming WordCount");

1.3惰性計算

Flink APP都是延遲執行的,隻有當execute()被顯示調用時才會真正執行,本地執行還是在叢集上執行取決于執行環境的類型。好處:使用者可以根據業務建構複雜的應用,Flink可以整體進優化并生成執行計劃。

2. 指定鍵(Specifying Keys)

2.1誰需要指定鍵

哪些操作需要指定key呢?常見的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce, Aggregate, Windows等。

Flink程式設計模型的key是虛拟的,不需要你建立鍵值對,可以在具體算子通過參數指定,如下代碼所示:

DataSet<...> input = // [...]

DataSet<...> reduced = input

.groupBy(/*define key here*/)

.reduceGroup(/*do something*/);

2.2為Tuple定義鍵

Tuple定義鍵的方式有很多種,接下來我們一起看幾個示例:

按照指定屬性分組

DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

注意:此時表示使用Tuple3三元組的第一個成員作為keyBy

按照組合鍵進行分組

DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

注意:此時表示使用Tuple3三元組的前兩個元素一起作為keyBy

特殊情況:嵌套Tuple

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]

KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

注意:這裡使用KeyBy(0)指定鍵,系統将會使用整個Tuple2作為鍵(整型和浮點型的)。如果想使用Tuple2内部字段作為鍵,你可以使用字段來表示鍵,這種方法會在後面闡述。

2.3使用字段表達式定義鍵

基于字元串的字段表達式可以用來引用嵌套字段(例如Tuple,POJO)

public class WC {

    public String word;

public User user;

    public int count;

}

public class User{

public int age;

public String zip;

}

示例:通過word字段進行分組

DataStream<WC> words = // [...]

DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

文法:

1.直接使用字段名選擇POJO字段

 例如 user 表示 一個POJO的user字段

2.Tuple通過offset來選擇

"_1"和"5"分别代表第一和第六個Scala Tuple字段

“f0” and “f5”分别代表第一和第六個Java Tuple字段

3.選擇POJO和Tuples的嵌套屬性

user.zip

在scala裡你可以"_2.user.zip"或"user._4.1.zip”

在java裡你可以“2.user.zip”或者" user.f0.1.zip ”

4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也适用。

2.4字段表達式執行個體-Java

以下定義兩個Java類:

public static class WC {

     public ComplexNestedClass complex;

     private int count;

     public int getCount() {

           return count;

      }

      public void setCount(int c) {

           this.count = c;

      }

}

public static class ComplexNestedClass {

      public Integer someNumber;

      public float someFloat;

      public Tuple3<Long, Long, String> word;

      public IntWritable hadoopCitizen;

}

我們一起看看如下key字段如何了解:

1."count": wc 類的count字段

2."complex":遞歸的選取ComplexNestedClass的所有字段

3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;

4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。

2.5字段表達式執行個體-Scala

以下定義兩個Scala類:

"_1"和"5"分别代表第一和第六個Scala Tuple字段

“f0” and “f5”分别代表第一和第六個Java Tuple字段

3.選擇POJO和Tuples的嵌套屬性

user.zip

在scala裡你可以"_2.user.zip"或"user._4.1.zip”

在java裡你可以“2.user.zip”或者" user.f0.1.zip ”

4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也适用。

2.4字段表達式執行個體-Java

以下定義兩個Java類:

public static class WC {

     public ComplexNestedClass complex;

     private int count;

     public int getCount() {

           return count;

      }

      public void setCount(int c) {

           this.count = c;

      }

}

public static class ComplexNestedClass {

      public Integer someNumber;

      public float someFloat;

      public Tuple3<Long, Long, String> word;

      public IntWritable hadoopCitizen;

}

我們一起看看如下key字段如何了解:

1."count": wc 類的count字段

2."complex":遞歸的選取ComplexNestedClass的所有字段

3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;

4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。

2.5字段表達式執行個體-Scala

以下定義兩個Scala類:

flink學習筆記-各種Time