天天看點

Spark 系列教程(1)Word Count

基本概要

Spark 是一種快速、通用、可擴充的大資料分析引擎,是基于記憶體計算的大資料并行計算架構。Spark 在 2009 年誕生于加州大學伯克利分校 AMP 實驗室,2010 年開源,2014 年 2月成為 Apache 頂級項目。

本文是 Spark 系列教程的第一篇,通過大資料領域中的 "Hello World" -- Word Count 示例帶領大家快速上手 Spark。Word Count 顧名思義就是對單詞進行計數,我們首先會對檔案中的單詞做統計計數,然後輸出出現次數最多的 3 個單詞。

前提條件

本文中會使用 spark-shell 來示範 Word Count 示例的執行過程。spark-shell 是送出 Spark 作業衆多方式中的一種,提供了互動式運作環境(REPL,Read-Evaluate-Print-Loop),在 spark-shell 上輸入代碼後就可以立即得到響應。spark-shell 在運作的時候,依賴于 Java 和 Scala 語言環境。是以,為了保證 spark-shell 的成功啟動,需要在本地預裝 Java 與 Scala。

本地安裝 Spark

下載下傳并解壓安裝包

從 [Spark 官網] (http://spark.apache.org/downloads.html) 下載下傳安裝包,選擇最新的預編譯版本即可,然後将安裝包解壓到本地電腦的任意目錄。

Spark 系列教程(1)Word Count
Spark 系列教程(1)Word Count

設定環境變量

為了在本地電腦的任意目錄下都可以直接運作 Spark 相關的指令,我們需要設定一下環境變量。我本地的 Mac 電腦使用的是 zsh 作為終端 shell,編輯 ~/.zshrc 檔案設定環境變量,如果是 bash 可以編輯 /etc/profile 檔案。

export SPARK_HOME=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2
export PATH=$PATH:$SPARK_HOME/bin      

加載環境變量:

source ~/.zshrc      

在終端輸入

spark-shelll --version

指令,如果顯示以下内容,表示我們已經成功在本地安裝好了 Spark。

❯ spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.      

Spark 基本概念

在開始實驗之前,先介紹 3 個 Spark 中的概念,分别是 spark、sparkContext 和 RDD。

  • spark 和 sparkContext 分别是兩種不同的開發入口執行個體:
    • spark 是開發入口 SparkSession 執行個體(Instance),SparkSession 在 spark-shell 中會由系統自動建立。
    • sparkContext 是開發入口 SparkContext 執行個體。在 Spark 版本演進的過程中,從 2.0 版本開始,SparkSession 取代了 SparkContext,成為統一的開發入口。本文中使用 sparkContext 進行開發。
  • RDD 的全稱是 Resilient Distributed Dataset,意思是“彈性分布式資料集”。RDD 是 Spark 對于分布式資料的統一抽象,它定義了一系列分布式資料的基本屬性與處理方法。

實作 Word Count

Word Count 的整體執行過程示意圖如下,接下來按照讀取内容、分詞、分組計數、排序、取 Top3 出現次數的單詞這 5 個步驟對檔案中的單詞進行處理。

Spark 系列教程(1)Word Count

準備檔案

/Users/chengzhiwei/tmp/wordcount.txt 檔案中寫入以下内容:

Spark Hive Hadoop
Kubernetes Elasticsearch Spark
Doris Zookeeper Hadoop
Spark Hive Hudi Iceberg
Kafka Pulsar RocketMQ Hadoop Spark      

第 1 步:讀取檔案

首先,我們調用 SparkContext 的 textFile 方法,讀取源檔案,生成 RDD[String] 類型的 RDD,檔案中的每一行是數組中的一個元素。

//導包
import org.apache.spark.rdd.RDD
 
// 檔案路徑
val file: String = "/Users/chengzhiwei/tmp/wordcount.txt"
 
// 讀取檔案内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)      
Spark 系列教程(1)Word Count

第 2 步:分詞

“分詞”就是把“數組”的行元素打散為單詞。要實作這一點,我們可以調用 RDD 的 flatMap 方法來完成。flatMap 操作在邏輯上可以分成兩個步驟:映射和展平。

// 以行為機關做分詞
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))      

首先使用空格作為分隔符,将 lineRDD 中的行元素轉換為單詞,分割之後,每個行元素就都變成了單詞數組,元素類型也從 String 變成了 Array[String],像這樣以元素為機關進行轉換的操作,統一稱作“映射”。

Spark 系列教程(1)Word Count

第 3 步:分組計數

在 RDD 的開發架構下,聚合類操作,如計數、求和、求均值,需要依賴鍵值對(key value pair)類型的資料元素。是以,在調用聚合算子做分組計數之前,我們要先把 RDD 元素轉換為(key,value)的形式,也就是把 RDD[String] 映射成 RDD[(String, Int)]。

使用 map 方法将 word 映射成 (word,1) 的形式,所有的 value 的值都設定為 1,對于同一個的單詞,在後續的計數運算中,我們隻要對 value 做累加即可。

// 把RDD元素轉換為(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1))      
Spark 系列教程(1)Word Count
Spark 系列教程(1)Word Count

第 4 步:排序

現在得到的 wordCounts RDD 中 key 是單詞,value 是這個單詞出現的次數,我們最終要取 Top3 出現次數的單詞,首先要根據單詞出現的次數進行逆序排序。

先交換 wordCounts RDD 中的 key 和 value 中的位置,友善下一步排序。

// 交換 key 和 value 的位置
val exchangeRDD: RDD[(Int, String)] = wordCounts.map{case (k,v)=>(v,k)}      
Spark 系列教程(1)Word Count
Spark 系列教程(1)Word Count
Spark 系列教程(1)Word Count
//導包
import org.apache.spark.rdd.RDD
 
//第 1 步:讀取檔案
// 檔案路徑
val file: String = "/Users/chengzhiwei/tmp/wordcount.txt"
 
// 讀取檔案内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file) 
//第 2 步:分詞
// 以行為機關做分詞
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 第 3 步:分組計數
// 把RDD元素轉換為(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1)) 
// 按照單詞做分組計數
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y) 
//第 4 步:排序
// 交換 key 和 value 的位置
val exchangeRDD: RDD[(Int, String)] = wordCounts.map{case (k,v)=>(v,k)}
// 根據單詞出現的次數逆序排序
val sortRDD: RDD[(Int, String)] = exchangeRDD.sortByKey(false)
// 第 5 步:取 Top3 出現次數的單詞
sortRDD.take(3)      

輸出結果如下,可以看到 Top3 出現次數的單詞分别是 Spark,Hadoop,Hive。到此為止,我們成功實作了 Word Count 的功能。

Array[(Int, String)] = Array((4,Spark), (3,Hadoop), (2,Hive))      

簡化寫法

上面實作 Word Count 的代碼看起來稍稍有些複雜,我們可以使用鍊式調用的寫法将上面的代碼簡化成一行代碼,通過

.

的方式調用 RDD 中的方法,傳回結果是新的 RDD,可以繼續用

.

調用新 RDD 中的方法。

//讀取檔案
//sc 表示 sparkContext 執行個體
sc.textFile("/Users/chengzhiwei/tmp/wordcount.txt").
//根據空格分詞
flatMap(line => line.split(" ")).
//分組,統一把 value 設定為 1
map(word => (word,1)).
//對相同 key 的 value 進行累加
reduceByKey((k,v) => (k+v)).
//把(key,value)對調,目的是按照計數來排序,(Spark,4) => (4,Spark)
map{case (k,v)=>(v,k)}.
//降序排序
sortByKey(false).
//取前 3
take(3)      

Scala 語言為了讓函數字面量更加精簡,還可以使用下劃線

_

作為占位符,用來表示一個或多個參數。我們用來表示的參數必須滿足隻在函數字面量中出現一次。是以上面的寫法可以進一步簡化為以下代碼:

//讀取檔案
sc.textFile("/Users/chengzhiwei/tmp/wordcount.txt").
//根據空格分詞
flatMap(_.split(" ")).
//分組,統一把 value 設定為 1
map((_,1)).
//對相同 key 的 value 進行累加
reduceByKey(_+_).
//把(key,value)對調,目的是按照計數來排序,(Spark,4) => (4,Spark)
map{case (k,v)=>(v,k)}.
//降序排序
sortByKey(false).
//取前 3
take(3)