天天看點

Spark權威指南(中文版)----第16章 開發Spark應用程式

在第15章中,您了解了Spark如何在叢集上運作代碼。現在,我們将向您展示開發一個獨立的Spark應用程式并将其部署到叢集上是多麼容易。我們将使用一個簡單的模闆來實作這一點,該模闆分享了一些關于如何建構應用程式的簡單技巧,包括設定建構工具和單元測試。這個模闆可以在本書的代碼存儲庫中找到。這個模闆實際上并不是必需的,因為從頭編寫應用程式并不困難,但是它很有幫助。讓我們從第一個應用程式開始。

16.1.   編寫Spark應用程式

Spark應用程式是兩種東西的組合:Spark叢集和代碼。在這種情況下,叢集将是本地模式,應用程式将是預定義的模式。讓我們浏覽一下每種語言中的應用程式。

16.1.1.   一個簡單的基于scala的應用程式

Scala是Spark的“原生”語言,自然是編寫應用程式的好方法。這和編寫Scala應用程式沒什麼不同。

提示              Scala看起來有些吓人,這取決于您的背景,但如果隻是為了更好地了解Spark,那麼還是值得學習的。此外,你不需要學習這門語言的所有細節;從基礎開始,您會發現在Scala中很快就可以提高生産力。使用Scala還将打開許多大門。通過一點實踐,通過Spark的代碼庫進行代碼級跟蹤并不困難。           

您可以使用兩個基于Java虛拟機(JVM)的建構工具sbt或Apache Maven建構應用程式。與任何建構工具一樣,它們都有各自的怪癖,但是從sbt開始可能是最簡單的。您可以在sbt網站上下載下傳、安裝和了解sbt。您也可以從Maven各自的網站上安裝Maven。

要為Scala應用程式配置sbt build,我們需要指定一個build.sbt檔案,用于管理包資訊。在build.sbt檔案中,有幾個關鍵的地方:

  • 項目中繼資料(包名稱、包版本控制資訊等)
  • 在哪裡解決依賴關系
  • 庫所需的依賴項

您可以指定更多的選項;但是,它們超出了本書的範圍(您可以在web和sbt文檔中找到相關資訊)。也有一些關于這個主題的書籍,可以作為一個有用的參考。下面是Scala build.sbt檔案的示例(以及我們在模闆中包含的檔案)。注意,我們必須指定Scala版本和Spark版本:

Spark權威指南(中文版)----第16章 開發Spark應用程式

現在我們已經定義了建構檔案,實際上可以開始向項目添加代碼了。我們将使用标準的Scala項目結構,你可以在sbt參考手冊中找到(這是與Maven項目相同的目錄結構):

Spark權威指南(中文版)----第16章 開發Spark應用程式

我們将源代碼放在Scala和Java目錄中。在本例中,我們将如下内容放入檔案中;這将初始化SparkSession,運作應用程式,然後退出:

Spark權威指南(中文版)----第16章 開發Spark應用程式

注意,我們定義了一個包括main方法的類,當使用spark-submit将其送出到叢集執行時,可以從指令行運作這個類。

現在我們已經設定好了我們的項目并向其添加了一些代碼,是時候建構它了。我們可以使用sbt assemble建構一個“超級JAR”或“胖JAR”,其中包含一個JAR中的所有依賴項。對于某些部署,這可能很簡單,但對于其他部署,這可能會導緻複雜性(尤其是依賴沖突)。一個輕量級的方法是運作sbt package,它将把所有依賴項收集到目标檔案夾中,但不會将所有依賴項打包到一個大JAR中。

運作程式

目标檔案夾包含我們可以用作spark-submit參數的JAR。在建構Scala包之後,您可以使用以下代碼在本地機器上進行spark-submit(此代碼片段利用别名建立$SPARK_HOME變量;你可以将$SPARK_HOME替換為包含你下載下傳的Spark版本的目錄):

Spark權威指南(中文版)----第16章 開發Spark應用程式
16.1.2.   編寫Python應用程式

編寫PySpark應用程式實際上與編寫普通的Python應用程式或包沒有什麼不同。它特别類似于編寫指令行應用程式。Spark沒有建構概念,隻有Python腳本,是以要運作應用程式,隻需對叢集執行腳本。

為了促進代碼重用,通常将多個Python檔案打包到Spark代碼的egg或ZIP檔案中。要包含這些檔案,您可以使用spark-submit的——py-files參數來添加.py、.zip或.egg檔案,以便與應用程式一起分發。

當運作代碼時,用Python建立一個等價于“Scala/Java main類”的類。将某個腳本指定為建構SparkSession的可執行腳本。這是我們傳遞給spark-submit的主要參數:

Spark權威指南(中文版)----第16章 開發Spark應用程式

當您這樣做時,您将獲得一個SparkSession,您可以将它傳遞給您的應用程式。最佳實踐是在運作時傳遞這個變量,而不是在每個Python類中執行個體化它。

在Python中開發時,一個有用的技巧是使用pip将PySpark指定為依賴項。您可以通過運作指令pip install pyspark來實作這一點。這允許您以可能使用其他Python包的方式使用它。這也使得許多編輯器中的代碼完成非常有用。這是Spark 2.2中全新的一個版本,是以可能需要一兩個版本才能完全投入生産,但是Python在Spark社群中非常流行,它肯定是Spark未來的基石。

編寫完代碼之後,就可以送出代碼執行了。(我們正在執行與項目模闆中相同的代碼。)您隻需要調用spark-submit與該資訊:

Spark權威指南(中文版)----第16章 開發Spark應用程式
16.1.3.   編寫Java應用程式

編寫Java Spark應用程式與編寫Scala應用程式是一樣的。核心差異涉及到如何指定依賴項。

本例假設您正在使用Maven指定依賴項。在本例中,您将使用以下格式。在Maven中,您必須添加Spark Packages存儲庫,以便從這些位置擷取依賴項:

Spark權威指南(中文版)----第16章 開發Spark應用程式

當然,您遵循與Scala項目版本相同的目錄結構(因為它們都符合Maven規範)。然後,我們隻需遵循相關的Java示例來實際建構和執行代碼。現在,我們可以建立一個簡單的例子,指定一個main類,讓我們執行(更多關于這個在本章末尾):

Spark權威指南(中文版)----第16章 開發Spark應用程式

然後,我們使用mvn package對它進行打包(需要安裝Maven)。

這個操作将與運作Scala應用程式(或者Python應用程式)完全相同。簡單地使用spark-submit:

Spark權威指南(中文版)----第16章 開發Spark應用程式

16.2.    測試Spark應用程式

現在您已經知道編寫和運作一個Spark應用程式需要什麼,是以讓我們轉到一個不那麼令人興奮但仍然非常重要的主題:測試。測試Spark應用程式依賴于幾個關鍵原則和政策,在編寫應用程式時應該牢記這些原則和政策。

16.2.1.   政策原則

測試資料pipelines和Spark應用程式與實際編寫它們一樣重要。這是因為您希望確定它們對未來的資料、邏輯和輸出方面的更改具有彈性。在本節中,我們将首先讨論您可能希望在典型的Spark應用程式中測試什麼,然後讨論如何組織代碼以便進行簡單的測試。

輸入資料的彈性

對不同類型的輸入資料保持彈性對于如何編寫資料管道非常重要。資料将會改變,因為業務需求将會改變。是以,您的Spark應用程式和管道應該至少對輸入資料中的某種程度的更改具有彈性,或者確定以一種優雅而有彈性的方式處理這些故障。在大多數情況下,這意味着要聰明地編寫測試來處理不同輸入的邊緣情況。

業務邏輯彈性和演化

管道中的業務邏輯和輸入資料都可能發生更改。更重要的是,您希望確定從原始資料中推導出來的是您實際認為自己在推導的東西。這意味着您将需要對實際資料進行健壯的邏輯測試,以確定您實際上得到了您想要的結果。這裡需要注意的一件事是,嘗試編寫一組“Spark單元測試”,隻測試Spark的功能。你可能不想這樣做;相反,您希望測試您的業務邏輯,并確定您設定的複雜業務管道實際上正在做您認為它應該做的事情。

輸出的彈性和原子性

假設您已經為輸入資料結構中的更改做好了準備,并且您的業務邏輯經過了良好的測試,現在您需要確定您的輸出結構是您所期望的。這意味着您需要優雅地處理輸出模式解析。通常情況下,資料不會被簡單地轉儲到某個位置,再也不會被讀取—您的大多數Spark管道可能正在為其他Spark管道提供資料。因為這個原因你要確定下遊消費者了解的“狀态”,可能意味着它更新的頻率,以及資料是否“完整的”(例如,沒有後期資料),或者不會有任何最後一分鐘修正資料。

前面提到的所有問題都是建構資料管道時應該考慮的原則(實際上,無論是否使用Spark)。這種戰略思維對于為您想要建構的系統打下基礎非常重要。

16.2.2.   戰術

雖然戰略思維很重要,但是讓我們更詳細地讨論一些可以使應用程式易于測試的政策。最高價值的方法是通過使用适當的單元測試來驗證您的業務邏輯是正确的,并確定您對不斷變化的輸入資料具有彈性,或者已經對其進行了結構化,以便将來模式演化不會失去作用。如何做到這一點,很大程度上取決于作為開發人員的您,因為這将根據您的業務領域和領域專長而有所不同。

管理SparkSessions

使用單元測試架構(如JUnit或ScalaTest)測試Spark代碼相對容易,因為Spark具有本地模式——隻需建立一個本地模式SparkSession作為測試工具的一部分來運作它。然而,要使此工作正常,您應該在管理代碼中的Spark時盡可能多地執行依賴項注入。也就是說,隻初始化SparkSession一次,并在運作時将其傳遞給相關的函數和類,以便在測試期間友善地進行替換。這使得在單元測試中使用一個虛拟的SparkSession測試每個單獨的函數更加容易。

使用哪個Spark API ?

Spark提供了多種api的選擇,從SQL到DataFrames和Datasets,每一種api都可能對應用程式的可維護性和可測試性産生不同的影響。坦白地說,正确的API取決于您的團隊及其需求:一些團隊和項目将需要不那麼嚴格的SQL和DataFrame API來提高開發速度,而其他團隊則希望使用類型安全的資料集或RDDs。

通常,我們建議對每個函數的輸入和輸出類型進行文檔化和測試,而不管使用哪種API。類型安全API自動為您的函數強制執行一個最小的契約,這使得其他代碼很容易在此基礎上進行建構。如果您的團隊更喜歡使用DataFrames或SQL,那麼請花一些時間記錄和測試每個函數傳回什麼,以及它接受什麼類型的輸入,以避免以後出現意外,就像在任何動态類型的程式設計語言中一樣。雖然較低層的RDD API也是靜态類型的,但我們建議隻在需要資料集中不存在的底層特性(比如分區)時才使用它,這應該不是很常見;Dataset API允許更多的性能優化,并且将來可能提供更多的性能優化。

對于應用程式使用哪種程式設計語言也有類似的考慮:對于每個團隊當然沒有正确的答案,但是根據您的需要,每種語言将提供不同的好處。我們一般建議使用靜态類型語言,像Scala和Java為更大的應用程式或者那些你希望能夠進入低級代碼完全控制性能,但Python和R可能是更好的在其他情況下,示例中,如果您需要使用一些其他的庫。Spark代碼應該很容易在每種語言的标準單元測試架構中進行測試。

連接配接到單元測試架構

要對代碼進行單元測試,我們建議使用語言中的标準架構(例如JUnit或ScalaTest),并設定測試工具來為每個測試建立和清理SparkSession。不同的架構提供了不同的機制來實作這一點,例如“before”和“after”方法。我們在本章的應用程式模闆中包含了一些單元測試代碼示例。

連接配接到資料源

盡可能地,您應該確定您的測試代碼不連接配接到生産資料源,這樣,如果這些資料源發生更改,開發人員就可以輕松地單獨運作它。實作這一點的一個簡單方法是讓所有業務邏輯函數都以DataFrames或資料集作為輸入,而不是直接連接配接到各個源;畢竟,無論資料源是什麼,後續代碼都将以相同的方式工作。如果您在Spark中使用結構化api,實作這一點的另一種方法是命名表:您可以簡單地注冊一些虛拟資料集(例如,從小文本檔案或記憶體對象加載)作為各種表名,然後從那裡開始。

16.3.  開發過程

使用Spark應用程式的開發過程類似于您可能已經使用過的開發工作流。首先,您可能要維護一個劃痕空間,比如互動式筆記本或其他類似的東西,然後在建構關鍵元件和算法時,将它們移動到更持久的位置,比如庫或包。筆記本體驗是我們經常推薦的體驗之一(我們也經常用它來寫這本書),因為它在實驗中很簡單。還有一些工具,比如Databricks,允許您将筆記本作為生産應用程式運作。

在本地機器上運作時,spark-shell及其各種特定于語言的實作可能是開發應用程式的最佳方法。在大多數情況下,shell用于互動式應用程式,而Spark -submit用于Spark叢集上的生産應用程式。您可以使用shell以互動方式運作Spark,就像我們在本書開頭介紹的那樣。這是運作PySpark、Spark SQL和SparkR的模式。在bin檔案夾中,當您下載下傳Spark時,您将找到啟動這些shell的各種方法。隻需運作spark-shell(對于Scala)、spark-sql、pyspark和sparkR。

在您完成應用程式并建立要運作的包或腳本之後,spark-submit将成為您向叢集送出此作業的最好朋友。

16.4.  運作程式

運作Spark應用程式的最常見方法是通過Spark -submit。在本章前面,我們向您展示了如何運作spark-submit;您隻需指定選項、應用程式JAR或腳本以及相關參數:

Spark權威指南(中文版)----第16章 開發Spark應用程式

在使用Spark -submit送出Spark作業時,始終可以指定是在客戶機模式還是在叢集模式下運作。但是,您應該幾乎總是傾向于在叢集模式下運作(或者在叢集本身的客戶機模式下),以減少執行者和驅動程式之間的延遲。

送出applciations時,在.jar中傳遞一個.py檔案,并将Python .zip、.egg或.py添加到搜尋路徑中,其中包含—py檔案。

為了便于參考,表16-1列出了所有可用的spark-submit選項,包括一些叢集管理器特有的選項。要自己列舉所有這些選項,請運作spark-submit with——help。

 還有一些特定于部署的配置(參見表16-2)。

16.4.1.   應用程式運作的例子

在本章之前,我們已經介紹了一些本地模式的應用程式示例,但是值得一看的是我們如何使用前面提到的一些選項。Spark還在下載下傳Spark時包含的examples目錄中包含幾個示例和示範應用程式。如果你糾結于如何使用某些參數,你可以先在本地機器上試試,然後使用SparkPi類作為主類:

Spark權威指南(中文版)----第16章 開發Spark應用程式

下面的代碼片段對Python也做了同樣的操作。您可以從Spark目錄運作它,這将允許您向獨立叢集管理器送出一個Python應用程式(都在一個腳本中)。您還可以設定與前一個示例相同的執行器限制:

Spark權威指南(中文版)----第16章 開發Spark應用程式

16.5.  配置應用程式

Spark包含許多不同的配置,其中一些我們在第15章中已經介紹過。根據您希望實作的目标,有許多不同的配置。本節将詳細介紹這些内容。大多數情況下,這些資訊都是供參考的,可能隻值得略讀,除非您正在尋找特定的内容。大多數配置可分為以下幾類:

Spark權威指南(中文版)----第16章 開發Spark應用程式

Spark提供了三個位置來配置

  • Spark屬性控制大多數應用程式參數,可以通過使用SparkConf對象來設定
  • Java系統屬性
  • 寫死的配置檔案

 您可以使用幾個模闆,您可以在Spark home檔案夾的根目錄中找到/conf目錄。您可以将這些屬性設定為應用程式中的寫死變量,或者在運作時指定它們。您可以使用環境變量在每個節點上通過conf/spark-env.sh腳本設定每台機器的設定,例如IP位址。最後,您可以通過log4j.properties配置日志記錄。

16.5.1.   SparkConf

SparkConf管理所有應用程式配置。您可以通過import語句建立一個,如下面的示例所示。建立之後,SparkConf對于特定的Spark應用程式是不可變的:

Spark權威指南(中文版)----第16章 開發Spark應用程式

您可以使用SparkConf配置具有Spark屬性的單個Spark應用程式。這些Spark屬性控制Spark應用程式的運作方式和叢集的配置方式。下面的示例将本地叢集配置為有兩個線程,并指定在Spark UI中顯示的應用程式名稱。

您可以在運作時配置它們,正如您在本章前面通過指令行參數看到的那樣。這有助于啟動一個Spark Shell,它将自動為您包含一個基本的Spark應用程式;例如:

Spark權威指南(中文版)----第16章 開發Spark應用程式

值得注意的是,在設定基于時間期限的屬性時,應該使用以下格式:

Spark權威指南(中文版)----第16章 開發Spark應用程式
16.5.2.   應用程式屬性

應用程式屬性是您從Spark -submit或建立Spark應用程式時設定的屬性。它們定義了基本的應用程式中繼資料以及一些執行特性。表16-3給出了目前應用程式屬性的清單。

Spark權威指南(中文版)----第16章 開發Spark應用程式

通過應用程式web UI(Driver程式端口4040)的”Environment”頁籤,檢視所有的配置資訊,可以確定正确設定了這些值。隻有通過spark-defaults、SparkConf或指令行顯式設定的值才會出現在頁籤中。對于所有其他配置屬性,可以假設使用預設值。

16.5.3.   運作環境屬性配置

雖然不太常見,但有時可能還需要配置應用程式的運作時環境。由于空間限制,我們不能在這裡包含整個配置集。請參考Spark文檔中有關運作時環境的相關配置表(http://spark.apache.org/docs/latest/configuration.html#runtime-environment)。這些屬性允許您為Driver程式和Executor程式配置額外的類路徑和python路徑、python相關員配置以及其他日志記錄配置屬性。

16.5.4.   執行時屬性配置

這些配置是您需要配置的最相關的配置之一,因為它們為您提供了對程式實際執行的更細粒度控制。由于空間限制,我們不能在這裡包含整個配置集。請參考Spark文檔中有關執行行為的相關配置表(http://spark.apache.org/docs/latest/configuration.html#execution-behavior)。最常見的更改配置是spark.executor.cores(控制可用cores的數量)和spark.files.maxPartitionBytes(讀取檔案時的最大分區大小)。

16.5.5.   配置記憶體管理

有時您可能需要手動管理記憶體選項來嘗試優化應用程式。其中許多配置項與最終使用者無關,因為它們涉及許多遺留概念或在Spark 2中因自動記憶體管理而被排除的細粒度控制配置項。由于空間限制,我們不能在這裡包含整個配置集。請參閱Spark文檔中有關記憶體管理的相關配置表(http://spark.apache.org/docs/latest/configuration.html#memory-management)。

16.5.6.   配置shuffle行為

我們已經強調過,由于Spark作業的通信開銷很高,是以shuffle可能會成為瓶頸。是以,有許多低層配置用于控制shuffle行為。由于空間限制,我們不能在這裡包含整個配置集。請參考Spark文檔中有關Shuffle行為的相關配置表(http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior)。

16.5.7.   環境變量

您可以通過環境變量配置某些Spark設定,這些環境變量來自安裝Spark的目錄中的conf/ Spark-env.sh腳本。在Standalone模式和Mesos模式中,該檔案可以提供特定于機器的資訊,比如主機名。當運作本地Spark應用程式或送出腳本時,它也會被引用。

注意,預設情況下,安裝Spark時不存在conf/ Spark-env.sh檔案,可以通過拷貝conf/spark-env.sh.template并重命名獲得。

以下變量可以在spark-env.sh中設定:

JAVA-HOME安裝Java的位置(如果它不在預設路徑上)。PYSPARK_PYTHONPython二進制可執行檔案,用于Driver程式和Worker程式中的PySpark(如果可用,預設為python2.7;否則,python)。如果spark.pyspark.python屬性設定了,則spark.pyspark.python屬性優先級高于此環境變量。PYSPARK_DRIVER_PYTHONPython二進制可執行檔案,僅用于Driver程式中的PySpark(預設為PYSPARK_PYTHON)。屬性spark.pyspark.driver.python如果設定了,則它優先。SPARKR_DRIVER_R用于SparkR shell的二進制可執行檔案(預設為R)。屬性spark.r.shell.command在設定時優先。SPARK_LOCAL_IP要綁定到的機器IP位址。SPARK_PUBLIC_DNSSpark應用程式将要通知的其他機器的主機名。除了列出的變量之外,還有設定Spark獨立叢集腳本的選項,比如在每台機器上使用的核心數量和最大記憶體。因為spark-env.sh是一個shell腳本,是以可以通過程式設計設定其中一些;例如,您可以通過查找特定網絡接口的IP來計算SPARK_LOCAL_IP。

注意              在cluster模式下對Yarn運作Spark時,需要使用Spark .YARN.appmasterenv設定環境變量。在conf/spark-default .conf檔案中設定[EnvironmentVariableName]屬性。在spark-env.sh中設定的環境變量不會在cluster模式下反映在Yarn Application Master中。有關更多資訊,請參見Yarn相關的Spark屬性。           
16.5.8.   應用程式中的Job排程

在給定的Spark應用程式中,如果從單獨的線程送出多個并行作業,則可以同時運作它們。在本節中,我們所說的job指的是一個Spark action和任何需要運作以計算該action的任務。Spark的排程程式是完全線程安全的,并且支援此用例來啟用服務于多個請求的應用程式(例如,針對多個使用者的查詢)。

預設情況下,Spark的排程程式以FIFO方式運作作業。如果隊列頭部的作業不需要使用整個叢集,則稍後的作業可以立即開始運作,但是如果隊列頭部的作業很大,則稍後的作業可能會顯著延遲。

還可以配置作業之間的公平共享。在公平共享下,Spark以循環方式在作業之間配置設定任務,以便所有作業獲得大緻相等的叢集資源共享。這意味着在長作業運作時送出的短作業可以立即開始接收資源,并且仍然可以在不等待長作業完成的情況下獲得良好的響應時間。這種模式最适合多使用者設定。

要啟用公平排程程式,設定spark.scheduler.mode為Fair,可以在SparkContext中設定。

Fair排程程式還支援将作業分組到池中,并為每個池設定不同的排程選項或權重。這可以為更重要的作業建立高優先級池,或者将每個使用者的作業分組在一起,并給使用者平等的共享,而不管他們有多少并發作業,而不是給作業平等的共享。該方法模仿Hadoop Fair排程程式。

在不進行任何幹預的情況下,新送出的作業将進入預設池,可以通過設定spark.scheduler.pool屬性來設定作業池。這是這樣做的(假設sc是您的SparkContext:

16.6.  結束語