文章目錄
-
- 安全
- 使用Spark Shell的互動分析
-
- 基礎
- 有關Dataset的更多操作
- 緩存
- 獨立的應用程式
- 從入門到放棄?
本教程提供了使用Spark的快速介紹。我們将首先通過Spark的互動式shell(用Python或Scala)介紹API,然後展示如何用Java、Scala和Python編寫應用程式。
想要按照本指南學習,首先需要從Spark網站下載下傳Spark的打包版本。因為我們不使用HDFS,是以您下載下傳可以為任何版本的Hadoop包。
注意,在Spark 2.0之前,Spark的主要程式設計接口是彈性分布式資料集(RDD)。Spark 2.0之後,RDD被Dataset所取代,Dataset與RDD一樣是強類型的,但是在底層有更豐富的優化。RDD接口仍受支援,您可以在 RDD程式設計指南 中獲得更詳細的參考。但是,我們強烈建議您切換到使用Dataset,它比RDD有更好的性能。有關Dataset的更多資訊,請參閱 SQL程式設計指南。
注:實際操作前,請確定已經安裝好了spark
安全
預設情況下,Spark中的安全性處于關閉狀态。這意味着您在預設情況下很容易受到攻擊。是以,運作Spark前請閱讀Spark安全指南。
使用Spark Shell的互動分析
基礎
Spark的shell為學習API提供了一個簡單的方法,同時它是一個互動分析資料的強大工具。它在Scala(在Java VM上運作,是以能很好的使用現有Java庫)或Python中都有。在Spark安裝目錄中運作以下指令可以啟動它:
./bin/spark-shell
Spark中最主要的概念是稱為 Dataset 的分布式資料集。Dataset可以從Hadoop讀入來建立(如HDFS檔案),也可以通過其他Dataset轉換來建立。(在啟動的Spark Shell中)如下是通過Spark源目錄中的README文本檔案建立一個新的Dataset:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
可以通過調用某些 操作(action) 直接擷取Dataset的值,也可以将其 轉換(transfor) 為新的Dataset。更多詳細資訊,請參閱API文檔。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
接下來的操作是把這個Dataset轉換成一個新的Dataset。通過調用filter傳回一個新的Dataset,其中資料的是原檔案内容的一個子集。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
可以把轉換和操作連在一起調用:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
有關Dataset的更多操作
Dataset的操作和轉換可用于更複雜的計算。假設我們想找到最多單詞對應的行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
代碼首先将文本行映射到一個整數值,隐性建立了一個新的Dataset。對該Dataset調用reduce以查找最大的字數。map和reduce的參數是Scala函數文本(閉包),可以使用任何語言特性或Scala/Java庫。例如,我們可以很容易地調用在别處聲明的函數。我們将使用Math.max()函數使此代碼更易于了解:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一種常見的資料流模式是MapReduce,它伴随Hadoop流行起來。Spark可以輕松實作MapReduce流:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在這裡,我們調用flatMap将行Dataset轉換為詞Dataset,然後組合使用groupByKey和count将檔案中的每個詞統計為(String,Long)的Dataset 。要将資料合并到目前shell中,可以調用collect:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
緩存
Spark還支援将資料集加載到叢集的記憶體中緩存。當重複通路資料時(例如查詢小的“熱”資料集或運作PageRank等疊代算法時),這非常有用。作為一個簡單的示例,讓我們将linesWithSpark緩存:
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用Spark來浏覽和緩存一個100行的文本檔案似乎很愚蠢。有趣的是,這些相同的函數可以用于非常大的資料集,即使它們是橫跨幾十個或數百個節點。您還可以将bin/spark_shell連接配接到叢集,以互動方式執行此操作,詳情可參考RDD程式設計指南 。
獨立的應用程式
假設我們希望使用Spark API編寫一個獨立的應用程式,接下來将介紹Scala(使用sbt)、Java(使用Maven)和Python(pip)的簡單應用程式。
我們将在Scala中建立一個非常簡單的Spark應用程式,名為SimpleApp.Scala:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
注:應用程式應該定義main()方法,而不是擴充scala.App。scala.App的子類在這裡可能無法正常工作。
這個程式是計算Spark 的README 檔案中包含“a”的行數和包含“b”的行數(*注:您需要用安裝SPARK的位置替換YOUR_SPARK_HOME *)。與前面使用Spark shell初始化自己的SparkSession的示例不同,我們将SparkSession初始化作為程式的一部分。
我們調用SparkSession.builder來構造一個SparkSession,然後設定應用程式名,最後調用getOrCreate來擷取SparkSession執行個體。
我們的應用程式依賴于Spark API,是以我們還将包含一個sbt配置檔案build.sbt,它将引入Spark依賴。此檔案還添加了Spark依賴的存儲庫:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
為了讓sbt正常工作,我們需要修改SimpleApp.scala和build.sbt位置,将目錄結構調整成典型結構。目錄調整好後,我們就可以建立一個包含應用程式代碼的JAR包,然後使用spark-submit腳本運作我們的程式。(注:請確定安裝了sbt)
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
從入門到放棄?
恭喜您成功運作了第一個Spark應用程式!
- 有關API的深入概述,請從RDD程式設計指南 和SQL程式設計指南 開始。
- 要在叢集上運作應用程式,請轉到部署概述。
- 最後,Spark在examples目錄中包含了一些示例(Scala 、Java、Python、R)。您可以按如下方式運作它們:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
英語困難戶可以等我更新,whatever,建議看原文檔。