天天看點

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

點選檢視第一章 點選檢視第三章

第2章

環境準備

本章主要介紹Flink在使用前的環境安裝準備,包括必須依賴的環境以及相應的參數,首先從不同運作環境進行介紹,包括本地調試環境、Standalone叢集環境,以及在On Yarn環境上。另外介紹Flink自帶的Template模闆,如何通過該項目模闆本地運作代碼環境的直接生成,而不需要使用者進行配置進行大量的開發環境配置,節省了開發的時間成本。最後介紹Flink源碼編譯相關的事項,通過對源碼進行編譯,進而對整個Flink計算引擎有更深入的了解。

2.1 運作環境介紹

Flink執行環境主要分為本地環境和叢集環境,本地環境主要為了友善使用者編寫和調試代碼使用,而叢集環境則被用于正式環境中,可以借助Hadoop Yarn或Mesos等不同的資料總管部署自己的應用。

環境依賴

(1)JDK環境

Flink核心子產品均使用Java開發,是以運作環境需要依賴JDK,本書暫不詳細介紹JDK安裝過程,使用者可以根據官方教程自行安裝,其中包括Windows和Linux環境安裝,需要注意的是JDK版本需要保證在1.8以上。

(2)Scala環境

如果使用者選擇使用Scala作為Flink應用開發語言,則需要安裝Scala執行環境,Scala環境可以通過本地安裝Scala執行環境,也可以通過Maven依賴Scala-lib來引入。

(3)Maven編譯環境

Flink的源代碼目前僅支援通過Maven進行編譯,是以如果需要對源代碼進行編譯,或通過IDE開發Flink Application,則建議使用Maven作為項目工程編譯方式。Maven的具體安裝方法這裡不再贅述。

需要注意的是,Flink程式需要Maven的版本在3.0.4及以上,否則項目編譯可能會出問題,建議使用者根據要求進行環境的搭建。

(4)Hadoop環境

對于執行在Hadoop Yarn資料總管的Flink應用,則需要配置對應的Hadoop環境參數。目前Flink官方提供的版本支援hadoop2.4、2.6、2.7、2.8等主要版本,是以使用者可以在這些版本的Hadoop Yarn中直接運作自己的Flink應用,而不需要考慮相容性的問題。

2.2 Flink項目模闆

Flink為了對使用者使用Flink進行應用開發進行簡化,提供了相應的項目模闆來建立開發項目,使用者不需要自己引入相應的依賴庫,就能夠輕松搭建開發環境,前提是在JDK(1.8及以上)和Maven(3.0.4及以上)的環境已經安裝好且能正常執行。在Flink項目模闆中,Flink提供了分别基于Java和Scala實作的模闆,下面就兩套項目模闆分别進行介紹和應用。

2.2.1 基于Java實作的項目模闆

1. 建立項目

建立模闆項目的方式有兩種,一種方式是通過Maven archetype指令進行建立,另一種方式是通過Flink提供的Quickstart Shell腳本進行建立,具體執行個體說明如下。

□通過Maven Archetype進行建立:

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

通過以上Maven指令進行項目建立的過程中,指令會互動式地提示使用者對項目的groupId、artifactId、version、package等資訊進行定義,且部分選項具有預設值,使用者直接回車即可,如圖2-1所示。我們建立了執行個體項目成功之後,用戶端會提示使用者項目建立成功,且在目前路徑中具有相應建立的Maven項目。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

□通過quickstart腳本建立:

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

通過以上腳本可以比較簡單地建立項目,執行後項目會自動生成,但是項目的名稱和一些GAV資訊都是自動生成的,使用者不能進行互動式重新定義,其中的項目名稱為quickstart,gourpid為org.myorg.quickstart,version為0.1。這種方式對于Flink入門相對比較适合,其他有一定基礎的情況下,則不建議使用這種方式進行項目建立。

注意:在Maven 3.0以上的版本中,DarchetypeCatalog配置已經從指令行中移除,需要使用者在Maven Settings中進行配置,或者直接将該選項移除,否則可能造成不能生成Project的錯誤。

2. 檢查項目

對于使用quickstart curl指令建立的項目,我們可以看到的項目結構如代碼清單2-1所示,如果使用者使用Maven Archetype,則可以自己定義對應的artifactId等資訊。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

從上述項目結構可以看出,該項目已經是一個相對比較完善的Maven項目,其中建立出來對應的Java執行個體代碼,分别是BatchJob.java和Streaming.java兩個檔案,分别對應Flink批量接口DataSet的執行個體代碼和流式接口DataStream的執行個體代碼。在建立好上述項目後,建議使用者将項目導入到IDE進行後續開發,Flink官網推薦使用的是Intellij IDEA或者Eclipse進行項目開發,具體的開發環境配置可以參考下一節中的介紹。

3. 編譯項目

項目經過上述步驟建立後,可以使用Maven Command指令mvn clean package對項目進行編譯,編譯完成後在項目同級目錄會生成target/-.jar,則該可執行Jar包就可以通過Flink指令或者Web用戶端送出到叢集上執行。

注意: 通過Maven建立Java應用,使用者可以在Pom中指定Main Class,這樣送出執行過程中就具有預設的入口Main Class,否則需要使用者在執行的Flink App的Jar應用中指定Main Class。

4. 開發應用

在項目建立和檢測完成後,使用者可以選擇在模闆項目中的代碼上編寫應用,也可以定義Class調用DataSet API或DataStream API進行Flink應用的開發,然後通過編譯打包,上傳并送出到叢集上運作。具體應用的開發讀者可以參考後續章節。

2.2.2 基于Scala實作的項目模闆

Flink在開發接口中同樣提供了Scala的接口,使用者可以借助Scala高效簡潔的特性進行Flink App的開發。在建立項目的過程中,也可以像上述Java一樣建立Scala模闆項目,而在Scala項目中唯一的差別就是可以支援使用SBT進行項目的建立和編譯,以下執行個體,将從SBT和Maven兩種方式進行介紹。

(1)建立Maven項目

1)使用Maven archetype進行項目建立

代碼清單2-2是通過Maven archetype指令建立Flink Scala版本的模闆項目,其中項目相關的參數同建立Java項目一樣,需要通過互動式的方式進行輸入,使用者可以指定對應的項目名稱、groupid、artifactid以及version等資訊。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

執行完上述指令之後,會顯示如圖2-2所示的提示,表示項目建立成功,可以進行後續操作。同時可以在同級目錄中看到已經建立好的Scala項目模闆,其中包括了兩個Scala字尾的檔案。

2)使用quickstart curl腳本建立

如上節所述,在建立Scala項目模闆的過程中,也可以通過quickstart curl腳本進行建立,這種方式相對比較簡單,隻要執行以下指令即可:

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章
帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

執行上述指令後就能在路徑中看到相應的quickstart項目生成,其目錄結構和通過Maven archetype建立的一緻,隻是不支援修改項目的GAV資訊。

(2)建立SBT項目

在使用Scala接口開發Flink應用中,不僅可以使用Maven進行項目的編譯,也可以使用SBT(Simple Build Tools)進行項目的編譯和管理,其項目結構和Maven建立的項目結構有一定的差別。可以通過SBT指令或者quickstart腳本進行建立SBT項目,具體實作方式如下:

1)使用SBT指令建立項目

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

執行上述指令後,會在用戶端輸出建立成功的資訊,表示項目建立成功,同時在同級目錄中生成建立的項目,其中包含兩個Scala的執行個體代碼供使用者參考。

2)使用quickstart curl腳本建立項目

可以通過使用以下指令進行項目建立Scala項目:

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

注意: 如果項目編譯方式選擇SBT,則需要在環境中提前安裝SBT編譯器,同時版本需要在0.13.13以上,否則無法通過上述方式進行模闆項目的建立,具體的安裝教程可以參考SBT官方網站

https://www.scala-sbt.org/download.html

進行下載下傳和安裝。

對于使用Maven archetype建立的Scala項目模闆,其結構和Java類似,在項目中增加了Scala的檔案夾,且包含兩個Scala執行個體代碼,其中一個是實作DataSet接口的批量應用執行個體BatchJob,另外一個是實作DataStream接口的流式應用執行個體StreamingJob,如代碼清單2-3所示。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

1)使用Maven編譯

進入到項目路徑中,然後通過執行mvn clean package指令對項目進行編譯,編譯完成後産生target/-.jar。

2)使用Sbt編譯

進入到項目路徑中,然後通過使用sbt clean assembly對項目進行編譯,編譯完成後再産生target/scala_your-major-scala-version/project-name-assembly-0.1-SNAPSHOT.jar。

在項目建立和檢測完成後,使用者可以選擇在Scala項目模闆的代碼上編寫應用,也可以定義Class調用DataSet API或DataStream API進行Flink應用的開發,然後通過編譯打包,上傳并送出到叢集上運作。

2.3 Flink開發環境配置

我們可以選擇IntelliJ IDEA或者Eclipse作為Flink應用的開發IDE,但是由于Eclipse本身對Scala語言支援有限,是以Flink官方還是建議使用者能夠使用IntelliJ IDEA作為首選開發的IDE,以下将重點介紹使用IntelliJ IDEA進行開發環境的配置。

2.3.1 下載下傳IntelliJ IDEA IDE

使用者可以通過IntelliJ IDEA官方位址下載下傳安裝程式,根據作業系統選擇相應的程式包進行安裝。安裝方式和安裝包請參考

https://www.jetbrains.com/idea/download/

2.3.2 安裝Scala Plugins

對于已經安裝好的IntelliJ IDEA預設是不支援Scala開發環境的,如果使用者選擇使用Scala作為開發語言,則需要安裝Scala插件進行支援。以下說明在IDEA中進行Scala插件的安裝:

□打開IDEA IDE後,在IntelliJ IDEA菜單欄中選擇 Preferences選項,然後選擇Plugins子選項,最後在頁面中選擇Browser Repositories,在搜尋框中輸入Scala進行檢索;

□在檢索出來的選項清單中選擇和安裝Scala插件,如圖2-3所示;

□點選安裝後重新開機IDE,Scala程式設計環境即可生效。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

2.3.3 導入Flink應用代碼

開發環境配置完畢之後,下面就可以将2.3.2節中建立好的項目導入到IDE中,具體步驟如下所示:

□啟動IntelliJ IDEA,選擇File→Open,在檔案選擇框中選擇建立好的項目(quickstart),點選确定,IDEA将自動進行項目的導入;

□如果項目中提示沒有SDK,可以選擇File→Project Structure,在SDKS選項中選擇安裝好的JDK路徑,添加Scala SDK路徑,如果系統沒有安裝Scala SDK,也可以通過Maven Dependence将scala-lib引入;

□項目正常導入之後,程式狀态顯示正常,可以通過mvn clean package或者直接通過IDE中自帶的工具Maven編譯方式對項目進行編譯;

完成了在IntelliJ IDEA中導入Flink項目,接下來使用者就可以開發Flink應用了。

2.3.4 項目配置

對于通過項目模闆生成的項目,項目中的主要參數配置已被初始化,是以無須額外進行配置,如果使用者通過手工進行項目的建立,則需要建立Flink項目并進行相應的基礎配置,包括Maven Dependences、Scala的Version等配置資訊。

1. Flink基礎依賴庫

對于Java版本,需要在項目的pom.xml檔案中配置如代碼清單2-4所示的依賴庫,其中flink-java和flink-streaming-java分别是批量計算DataSet API和流式計算DataStream API的依賴庫,{flink.version}是官方的釋出的版本号,使用者可根據自身需要進行選擇,本書中所有的執行個體代碼都是基于Flink 1.7版本開發。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

建立Scala版本Flink項目依賴庫配置如下,和Java相比需要指定scala的版本資訊,目前官方建議的是使用Scala 2.11,如果需要使用特定版本的Scala,則要将源碼下載下傳進行指定Scala版本編譯,否則Scala各大版本之間相容性較弱會導緻應用程式在實際環境中無法運作的問題。Flink 基于Scala語言項目依賴配置庫如代碼清單2-5所示:

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

另外在上述Maven Dependences配置中,核心的依賴庫配置的Scope為provided,主要目的是在編譯階段能夠将依賴的Flink基礎庫排除在項目之外,當使用者送出應用到Flink叢集的時候,就避免因為引入Flink基礎庫而導緻Jar包太大或類沖突等問題。而對于Scope配置成provided的項目可能出現本地IDE中無法運作的問題,可以在Maven中通過配置Profile的方式,動态指定編譯部署包的scope為provided,本地運作過程中的scope為compile,進而解決本地和叢集環境編譯部署的問題。

注意:由于Flink在最新版本中已經不再支援scala 2.10的版本,建議讀者使用scala 2.11,同時Flink将在未來的新版本中逐漸支援Scala 2.12。

2. Flink Connector和Lib依賴庫

除了上述Flink項目中應用開發必須依賴的基礎庫之外,如果使用者需要添加其他依賴,例如Flink中內建的Connector,或者其他第三方依賴庫,需要在項目中添加相應的Maven Dependences,并将這些Dependence的Scope需要配置成compile。

如果項目中需要引入Hadoop相關依賴包,和基礎庫一樣,在打包編譯的時候将Scope注明為provided,因為Flink叢集中已經将Hadoop依賴包添加在叢集的環境中,使用者不需要再将相應的Jar包打入應用中,否則容易造成Jar包沖突。

注意:對于有些常用的依賴庫,為了不必每次都要上傳依賴包到叢集上,使用者可以将依賴的包可以直接上傳到Flink安裝部署路徑中的lib目錄中,這樣在叢集啟動的時候就能夠将依賴庫加載到叢集的ClassPath中,無須每次在送出任務的時候上傳依賴的Jar包。

2.4 運作Scala REPL

和Spark Shell一樣,Flink也提供了一套互動式解釋器(Scala-Shell),使用者能夠在用戶端指令行互動式程式設計,執行結果直接互動式地顯示在用戶端控制台上,不需要每次進行編譯打包在叢集環境中運作,目前該功能隻支援使用Scala語言進行程式開發。另外需要注意的是在開發或者調試程式的過程中可以使用這種方式,但在正式的環境中則不建議使用。

2.4.1 環境支援

使用者可以選擇在不同的環境中啟動Scala Shell,目前支援Local、Remote Cluster 和Yarn Cluster模式,具體指令可以參考以下說明:

□通過start-scala-shell.sh啟動本地環境;

bin/start-scala-shell.sh local

□可以啟動遠端叢集環境,指定遠端Flink叢集的hostname和端口号;

bin/start-scala-shell.sh remote

□啟動Yarn叢集環境,環境中需要含有hadoop用戶端配置檔案;

bin/start-scala-shell.sh yarn -n 2

2.4.2 運作程式

啟動Scala Shell互動式解釋器後,就可以進行Flink流式應用或批量應用的開發。需要注意的是,Flink已經在啟動的執行環境中初始化好了相應的Environment,分别使用“benv”和“senv”擷取批量計算環境和流式計算環境,然後使用對應環境中的API開發Flink應用。以下代碼執行個體分别是用批量和流式實作WordCount的應用,讀者可以直接在啟動Flink Scala Shell用戶端後執行并輸出結果。

□通過Scala-Shell運作批量計算程式,調用benv完成對單詞數量的統計。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

□通過Scala-Shell運作流式計算,調用senv完成對單詞數量的統計。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

注意:使用者在使用互動式解釋器方式進行應用開發的過程中,流式作業和批量作業中的一些操作(例如寫入檔案)并不會立即執行,而是需要使用者在程式的最後執行env.execute(“appname”)指令,這樣整個程式才能觸發運作。

2.5 Flink源碼編譯

對于想深入了解Flink源碼結構和實作原理的讀者,可以按照本節的内容進行Flink源碼編譯環境的搭建,完成Flink源碼的編譯,具體操作步驟如下所示。

Flink源碼可以從官方 Git Repository上通過git clone指令下載下傳:

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

讀者也可以通過官方鏡像庫手動下載下傳,下載下傳位址為

https://archive.apache.org/dist/flink/

使用者根據需要選擇需要編譯的版本号,下載下傳代碼放置在本地路徑中,然後通過如下Maven指令進行編譯,需要注意的是,Flink源碼編譯依賴于JDK和Maven的環境,且JDK必須在1.8版本以上,Maven必須在3.0版本以上,否則會導緻編譯出錯。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

(1)Hadoop版本指定

Flink鏡像庫中已經編譯好的安裝包通常對應的是Hadoop的主流版本,如果使用者需要指定Hadoop版本編譯安裝包,可以在編譯過程中使用-Dhadoop.version參數指定Hadoop版本,目前Flink支援的Hadoop版本需要在2.4以上。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

如果使用者使用的是供應商提供的Hadoop平台,如Cloudera的CDH等,則需要根據供應商的系統版本來編譯Flink,可以指定-Pvendor-repos參數來激活類似于Cloudera的Maven Repositories,然後在編譯過程中下載下傳依賴對應版本的庫。

帶你讀《Flink原理、實戰與性能優化》之二:環境準備第2章

(2)Scala版本指定

Flink中提供了基于Scala語言的開發接口,包括DataStream API、DataSet API、SQL等,需要在代碼編譯過程中指定Scala的版本,因為Scala版本相容性相對較弱,是以不同的版本之間的差異相對較大。目前Flink最近的版本基本已經支援Scala-2.11,不再支援Scala-2.10的版本,在Flink 1.7開始支援Scala-2.12版本,社群則建議使用者使用Scala-2.11或者Scala-2.12的Scala環境。

2.6 本章小結

本章介紹了在使用Flink應用程式開發之前必備的環境要求,分别對基礎環境依賴例如JDK、Maven等進行說明。通過借助于Flink提供的項目模闆建立不同程式設計語言的Flink 應用項目。在2.3節中介紹了如何建構Flink開發環境,以及如何選擇合适的IDE和項目配置。在2.4節中介紹了Flink互動式程式設計用戶端Scala REPL,通過使用互動式用戶端編寫和執行Flink批量和流式應用代碼。2.5節介紹了Flink源碼編譯和打包,編譯中指定不同的Hadoop版本以及Scala版本等。在第3章将重點介紹Flink程式設計模型,其中包括Flink程式基本構成,以及Flink所支援的資料類型等。