天天看點

Spark 系列教程(2)運作模式介紹

Spark 運作模式

Apache Spark 是用于大規模資料處理的統一分析引擎,它提供了 Java、Scala、Python 和 R 語言的進階 API,以及一個支援通用的執行圖計算的優化引擎。

Spark Core 是 Spark 的核心子產品,負責任務排程、記憶體管理等功能。Spark Core 的實作依賴于 RDD(Resilient Distributed Datasets,彈性分布式資料集)的程式抽象概念。

在 Spark Core 的基礎上,Spark 提供了一系列面向不同應用需求的元件,包括使用 SQL 進行結構化資料處理的 Spark SQL、用于實時流處理的 Spark Streaming、用于機器學習的 MLlib 以及用于圖處理的 GraphX。

Spark 本身并沒有提供分布式檔案系統,因而 Spark 的資料存儲主要依賴于 HDFS,也可以使用 HBase 和 S3 等作為存儲層。

Spark 有多種運作模式:

  • 1.可以運作在一台機器上,稱為 Local(本地)運作模式。
  • 2.可以使用 Spark 自帶的資源排程系統,稱為 Standalone 模式。
  • 3.可以使用 Yarn、Mesos、Kubernetes 作為底層資源排程系統,稱為 Spark On Yarn、Spark On Mesos、Spark On K8S。
    Spark 系列教程(2)運作模式介紹
    Driver 是 Spark 中的主要程序,負責執行應用程式的 main() 方法,建立 SparkContext 對象,負責與 Spark 叢集進行互動,送出 Spark 作業,并将作業轉化為 Task(一個作業由多個 Task 任務組成),然後在各個 Executor 程序間對 Task 進行排程和監控。

根據應用程式送出方式的不同,Driver 在叢集中的位置也有所不同,應用程式送出方式主要有兩種:Client 和 Cluster,預設是 Client,可以在向 Spark 叢集送出應用程式時使用

--deploy-mode

參數指定送出方式。

Spark 系列教程(2)運作模式介紹

Local 模式

Local 模式的部署方式比較簡單,隻需下載下傳安裝包并解壓就可以使用了。具體可以參考上一章的

Spark 系列教程(1)Word Count

的介紹,本文就不再贅述了。

在 spark-shell 互動式界面執行一個簡單的計算,取出 0~99 之間的值。

❯ bin/spark-shell
21/10/07 11:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://chengzw:4040
Spark context available as 'sc' (master = local[*], app id = local-1633578611004).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala>  range.collect()
res0: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)      

Standalone 模式

在 Spark Standalone 模式中,資源排程是由 Spark 自己實作的。 Spark Standalone 模式是 Master-Slaves 架構的叢集模式,和大部分的 Master-Slaves 結構的叢集一樣,存在着 Master 單點故障的問題。對于單點故障的問題,Spark 提供了兩種方案:

  • 基于檔案系統的單點恢複(Single-Node Recovery with Local File System),将 Application 和 Worker 的注冊資訊寫入檔案中,當 Master 當機時,可以重新啟動 Master 程序恢複工作。該方式隻适用于開發或測試環境。
  • 基于 Zookeeper 的 Standby Masters(Standby Masters with ZooKeeper)。ZooKeeper 提供了一個 Leader Election 機制,利用這個機制可以保證雖然叢集存在多個 Master,但是隻有一個是 Active 的,其他的都是 Standby。當 Active 的 Master 出現故障時,另外的一個 Standby Master 會被選舉出來,對于恢複期間正在運作的應用程式,由于 Application 在運作前已經向 Master 申請了資源,運作時 Driver 負責與 Executor 進行通信,管理整個 Application,是以 Master 的故障對 Application 的運作不會造成影響,但是會影響新的 Application 的送出。接下來将介紹 Spark Standalone 模式基于 Zookeeper 的 HA 高可用部署。
    Spark 系列教程(2)運作模式介紹

前提條件

Host 設定

編輯 /etc/hosts 檔案:

192.168.1.117 hadoop1
192.168.1.118 hadoop2
192.168.1.119 hadoop3      

拷貝到其他兩台機器上:

scp  /etc/hosts root@hadoop2:/etc/hosts
scp  /etc/hosts root@hadoop3:/etc/hosts      
配置免密登入

為了友善後續拷貝檔案以及執行腳本,配置 SSH 免密登入。在 hadoop1 上生成 RSA 非對稱密鑰對:

[root@hadoop1 hadoop]# ssh-keygen 
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): 
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:wkMiPVpbBtjoZwBIpyvvluYtfQM9hQeHtgBFVfrwL1I root@hadoop1
The key's randomart image is:
+---[RSA 2048]----+
|+o.O+..o.        |
|. *.o.+..        |
| o..=o*=         |
|  o+oOo+o        |
|...o..+oE        |
|..  . o+ .       |
|  .o .... .      |
| .=.. o. .       |
| +o... .         |
+----[SHA256]-----+      

将公鑰拷貝到叢集中的其他機器:

[root@hadoop1 hadoop]# ssh-copy-id root@hadoop1
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop2
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop3      
安裝 Java

進入 [Oracle 官網] (https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html) 下載下傳并解壓 JDK 安裝包。設定環境變量,編輯 vim /etc/profile:

export JAVA_HOME=/software/jdk
export PATH=$PATH:$JAVA_HOME/bin      

Zookeeper 叢集部署

下載下傳并解壓安裝包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -xzvf apache-zookeeper-3.5.8-bin.tar.gz
mv apache-zookeeper-3.5.8 /software/zk      
編輯配置檔案

編輯 zk/conf/zoo.cfg 檔案:

#用于配置 Zookeeper 中最小時間機關的長度,機關是毫秒
tickTime=2000
#該參數用于配置 Leader 伺服器等待 Follower 啟動,并完成資料同步的時間
#乘上 tickTime 得到具體時間:10 * 2000 = 20000 毫秒
initLimit=10
#Leader 與 Follower 心跳檢測的逾時時間。
#乘上 tickTime 得到具體時間:5 * 2000 = 10000 毫秒
syncLimit=5
#資料存放目錄
dataDir=/software/zk/data
#用戶端連接配接端口
clientPort=2181
#Zookeeper叢集成員位址
#2888端口用于叢集間通信,leader會監聽此端口
#3888端口用于leader選舉
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888      

同步修改後的配置檔案到叢集的其他節點:

scp -r zk root@hadoop2:/software/
scp -r zk root@hadoop3:/software/      
辨別 Server ID
#在 hadoop1 節點上執行
echo 1 > /root/zookeeper-cluster/zk1/myid
#在 hadoop2 節點上執行
echo 2 > /root/zookeeper-cluster/zk2/myid
#在 hadoop3 節點上執行
echo 3 > /root/zookeeper-cluster/zk3/myid      
啟動 Zookeeper 叢集

分别在 3 台節點上執行以下指令啟動 Zookeeper:

zk/bin/zkServer.sh start      
檢視 Zookeeper 叢集狀态

分别在 3 台節點上檢視 Zookeeper 狀态,可以看到此時 hadoop2 節點為 Zookeeper 的 Master 節點。

hadoop1 節點:

[root@hadoop1 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower      

hadoop2 節點:

[root@hadoop2 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader      

hadoop3 節點:

[root@hadoop3 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower      

Spark Standalone 模式 HA 叢集部署

wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
tar -xzvf spark-3.1.2-bin-hadoop2.7.tgz 
mv spark-3.1.2-bin-hadoop2.7 /software/spark      
修改配置檔案

編輯 spark/conf/spark-env.sh 檔案,由于 Spark HA 使用 Zookeeper 來協調主從,是以需要指定 Zookeeper 的位址和 Spark 在 Zookeeper 中使用的目錄。

export JAVA_HOME=/software/jdk
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"      

編輯 spark/conf/slaves 檔案:

hadoop1
hadoop2
hadoop3      
scp -r spark root@hadoop2:/software/
scp -r spark root@hadoop3:/software/      
啟動 Spark 叢集

在 hadoop1 節點上啟動 Spark 叢集,執行 start-all.sh 腳本會在 hadoop1 節點上啟動 Master 程序,并且在 spark/conf/slaves 檔案中配置的所有節點上啟動 Worker 程序。

[root@hadoop1 software]# spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out
hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out
hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out      

登入 hadoop2 節點,啟動第二個 Master(Standby Master)。

[root@hadoop2 software]# spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop2.out      
檢視各節點的程序

在各節點執行 jps 指令檢視啟動的 Java 程序。可以看到 Spark 的 Master 程序分别在 hadoop1 和 hadoop2 節點上運作,Worker 程序在所有節點上運作。QuorumPeerMain 是 Zookeeper 的程序。

[root@hadoop1 software]# jps
18528 Worker
18427 Master
23468 QuorumPeerMain
18940 Jps      
[root@hadoop2 software]# jps
27824 Worker
29954 Jps
23751 QuorumPeerMain
28135 Master      
[root@hadoop3 software]# jps
11696 Worker
12939 QuorumPeerMain
13021 Jps      
Zookeeper 檢視節點注冊狀态

可以看到此時 3 個 Spark 節點都注冊到 Zookeeper 上了,并且此時 192.168.1.117 hadoop1 這個節點是 Master。

[zk: localhost:2181(CONNECTED) 33] ls /spark/master_status
[worker_worker-20210821150002-192.168.1.117-42360, worker_worker-20210821150002-192.168.1.118-39584, worker_worker-20210821150002-192.168.1.119-42991]
[zk: localhost:2181(CONNECTED) 34] get /spark/master_status
192.168.1.117      
Spark HA 測試

浏覽器通路 http://hadoop1:8081 進入 Spark WebUI 界面,此時 hadoop1 節點 Master 的狀态為 ALIVE。

Spark 系列教程(2)運作模式介紹
Spark 系列教程(2)運作模式介紹
[root@hadoop1 software]# spark/bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077
21/08/21 18:00:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop1:4040
Spark context available as 'sc' (master = spark://hadoop1:7077,hadoop2:7077, app id = app-20210821180100-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.      

使用第三方資源排程系統

Spark 可以使用 Yarn、Mesos、Kubernetes 作為底層資源排程系統,目前 Mesos 使用的已經比較少了,本文将介紹 Spark 使用 Yarn 和 Kubernetes 作為排程系統的應用。

Spark On Yarn

Spark On Yarn 模式的搭建比較簡單,僅需要在 Yarn 叢集的一個節點上安裝 Spark 用戶端即可,該節點可以作為送出 Spark 應用程式到 Yarn 叢集的用戶端。Spark 本身的 Master 節點和 Worker 節點不需要啟動。前提是我們需要準備好 Yarn 叢集,關于 Yarn 叢集的安裝可以參考

Hadoop 分布式叢集安裝

使用此模式需要修改 Spark 的配置檔案 conf/spark-env.sh,添加 Hadoop 相關屬性,指定 Hadoop 配置檔案所在的目錄:

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop      

修改完畢後,即可運作 Spark 應用程式,例如運作 Spark 自帶的求圓周率的例子,并以 Spark On Yarn 的 Cluster 模式運作。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar      

在 Yarn 的 ResourceManager 對應的 WebUI 界面中可以檢視應用程式執行的詳細資訊。

Spark 系列教程(2)運作模式介紹
Spark 系列教程(2)運作模式介紹

Spark On K8S

目前基于 Kubernetes 的 Spark 的應用主要采用兩種方式運作:

  • 1.基于 Kubernetes 的 Operator 的 [Spark on K8S Operator] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator),是 Google Cloud Platform 為了支援 Spark 而開發的一種 Operator。個人比較推薦使用 Spark on K8S Operator 的方式送出作業。
  • 2.Spark 原生支援的 [Spark On K8S] (http://spark.apache.org/docs/3.0.0/running-on-kubernetes.html),是 Spark 社群為支援 Kubernetes 這種資源管理架構而引入的 Kubernetes Client 的實作。

Spark Operator 定義了兩個 CRD(Custom Resource Definitions,自定義資源定義)對象,SparkApplication 和 ScheduledSparkApplication。 這些 CRD 是 Spark 作業的抽象,使得在 Kubernetes 叢集中可以使用 YAML 來定義這些作業。另外還提供了 [sparkctl] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/sparkctl/README.md) 指令行工具友善我們操控 SparkApplication 和 ScheduledSparkApplication CRD 資源對象。

使用 Spark On K8S Operator 模式時,需要預先在 Kubernetes 叢集中部署 Spark Operator 容器,用于将 SparkApplication 和 ScheduledSparkApplication 這些 CRD 資源對象轉換為 Kubernetes 原生的資源對象,例如 Pod,Service 等等。

Spark 系列教程(2)運作模式介紹
Spark 系列教程(2)運作模式介紹
Spark 系列教程(2)運作模式介紹
Spark On K8S Operator(推薦)

使用 Spark On K8S Operator 模式時,需要預先在 Kubernetes 叢集中部署 Spark Operator。

部署 Spark Operator

添加 Spark On K8S Operator Helm 倉庫并下載下傳 Helm 資源檔案。

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm pull spark-operator/spark-operator --untar      

修改 values.yaml 檔案中有以下兩個地方需要修改:

  • 1.repository 鏡像倉庫位址,由于國内拉取 Spark 相關鏡像速度較慢,我已經提前下載下傳好鏡像并且上傳至阿裡雲鏡像倉庫中了,大家可以直接使用我的鏡像。
  • 2.sparkJobNamespace:設定 Spark 送出作業的命名空間,會為該命名空間建立一個 ServiceAccount 并賦予相應的權限,ServiceAccount 的名字為

    helm 項目名-spark

  • Spark 系列教程(2)運作模式介紹
  • 使用

    helm install

    指令安裝 Spark Operator,spark-job 命名空間是之後送出 Spark 作業時使用的。
kubectl create namespace spark-job
helm install my-spark spark-operator \
--namespace spark-operator --create-namespace      

确認 Spark Operator Pod 已經正常運作。

❯ kubectl get pod -n spark-operator
NAME                                       READY   STATUS    RESTARTS   AGE
my-spark-spark-operator-674cbc9d9c-8x22x   1/1     Running   0          5m24s      

檢視在 spark-job 命名空間建立的 ServiceAccount。

❯ kubectl get serviceaccounts -n spark-job
NAME             SECRETS   AGE
.....
my-spark-spark   1         2m33s      

運作 SparkApplications

SparkApplications 資源對象中通常使用的 Cluster 模式來送出作業。在 YAML 檔案中指定運作應由程式的 jar 包以及 main() 方法所在的類。

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-job
spec:
  type: Scala
  mode: cluster
  image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: my-spark-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1      

等待一會,檢視 SparkApplications 狀态,COMPLETED 表示已經執行完成該作業。

❯ kubectl get sparkapplications -n spark-job  spark-piNAME       STATUS      ATTEMPTS   START                  FINISH                 AGEspark-pi   COMPLETED   1          2021-10-04T13:13:27Z   2021-10-04T13:13:48Z   8h      

檢視在 spark-job 命名空間建立的 Pod 的日志,可以看到本次作業執行的詳情。

kubectl logs -n spark-job spark-pi-driver spark-kubernetes-driver      
Spark 系列教程(2)運作模式介紹

使用 Spark On K8S 模式送出作業時我們通常可以使用 spark-submit 或者 spark-shell 兩種指令行工具,其中 spark-submit 支援 Cluster 和 Client 兩種送出方式,而 spark-shell 隻支援 Client 一種送出方式。

Spark-Submit

Cluster 模式

使用 spark-submit 的 Cluster 模式送出作業時,由于我們的 Kubernetes 叢集的 API Server 是使用自簽名的證書進行 HTTPS 加密的,是以需要使用

spark.kubernetes.authenticate.submission.caCertFile

參數指定 Kubernetes 叢集的 CA 證書,讓 Spark 用戶端信任自簽名證書。注意這裡的 ServiceAccount 需要自行建立并且賦予以下權限,如果你是按照順序完成實驗的,那麼在前面 Spark On K8S Operator 中已經建立了該 ServiceAccount,可以跳過這一步。

❯ kubectl get rolebindings -n spark-job spark -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  annotations:
    meta.helm.sh/release-name: my-spark
    meta.helm.sh/release-namespace: spark-operator
  creationTimestamp: "2021-09-29T16:10:51Z"
  labels:
    app.kubernetes.io/instance: my-spark
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: spark-operator
    app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
    helm.sh/chart: spark-operator-1.1.6
  name: spark
  namespace: spark-job
  resourceVersion: "204712527"
  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/rolebindings/spark
  uid: 225970e8-472d-4ea5-acb5-08630852f76c
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: spark-role
subjects:
- kind: ServiceAccount
  name: my-spark-spark
  namespace: spark-job
❯ kubectl get role -n spark-job  spark-role -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  annotations:
    meta.helm.sh/release-name: my-spark
    meta.helm.sh/release-namespace: spark-operator
  creationTimestamp: "2021-09-29T16:10:51Z"
  labels:
    app.kubernetes.io/instance: my-spark
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: spark-operator
    app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
    helm.sh/chart: spark-operator-1.1.6
  name: spark-role
  namespace: spark-job
  resourceVersion: "204712525"
  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/roles/spark-role
  uid: 436afb3f-a304-4756-b64a-978d5836c3a2
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - '*'
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - '*'
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - '*'      

執行 spark-submit 指令向 Kubernetes 叢集送出作業。

bin/spark-submit \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar      

關于證書不受信任這裡也有個讨巧的方式,就是使用

kubectl proxy

指令将 API Server 的 HTTPS 轉化為 HTTP。

❯ kubectl proxy
Starting to serve on 127.0.0.1:8001      

然後通過 http://localhost:8001 和 API Server 進行互動,此時就無需指定 CA 證書了。

bin/spark-submit \
--master  k8s://http://localhost:8001 \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar      

通過檢視 Kubernetes 為本次 Spark 作業建立的 Pod 的日志,可以看到運作結果。

❯ kubectl logs -n spark-job  spark-pi-submit-fc7b507c4be84351-driver
......
Pi is roughly 3.140075700378502
......      

Client 模式

Client 模式無需指定 CA 證書,但是需要使用

spark.driver.host

spark.driver.port

指定送出作業的 Spark 用戶端所在機器的位址,端口号預設就是 7078。

bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--deploy-mode client \
--name spark-pi-submit-client \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078 \
/home/chengzw/spark-3.1.2-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.1.2.jar      

使用 Client 模式送出作業在終端就可以直接看到輸出結果了。

Spark 系列教程(2)運作模式介紹

Spark-Shell

spark-shell 隻支援 Client 方式,使用以下指令連接配接 Kubernetes API Server 并打開 spark-shell 互動式界面。

bin/spark-shell \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode client \
--name spark-shell \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078      
21/10/05 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://11.8.38.43:4040
Spark context available as 'sc' (master = k8s://https://11.16.0.153:6443, app id = spark-application-1633401878962).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()
res1: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)      
Spark History Server(可選)

部署 Spark History Server

在運作 Spark Application 的時候,Spark 會提供一個 WebUI 列出應用程式的運作時資訊,但是一旦該應用程式執行完畢後,将無法檢視應用程式執行的曆史記錄。Spark History Server 就是為了處理這種情況而誕生的,我們可以将 Spark 作業的日志送出到一個統一的地方,例如 HDFS,然後 Spark History Server 就可以通過讀取 HDFS 目錄中的檔案來重新渲染生成 WebUI 界面來展示應用程式執行的曆史資訊。

使用以下資源檔案部署一個 Spark History Server,并且通過 NodePort Service 的方式将服務暴露到叢集外部,叢集外部可以通過節點位址:NodePort 來通路 Spark History Server。前提是我們需要準備好 HDFS 叢集,關于 HDFS 叢集的安裝可以參考

apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-history-server
  namespace: spark-job
spec:
  selector:
    matchLabels:
      run: spark-history-server
  replicas: 1
  template:
    metadata:
      labels:
        run: spark-history-server
    spec:
      containers:
        - image:  "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
          name: spark-history-server
          args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"]
          ports:
            - containerPort: 18080
              name: http
          env:
          - name: SPARK_HISTORY_OPTS
            value: "-Dspark.history.fs.logDirectory=hdfs://11.8.36.125:8020/spark-k8s"
---
apiVersion: v1
kind: Service
metadata:
  name: spark-history-server
  namespace: spark-job
spec:
  ports:
  - name: http
    nodePort: 30080
    port: 18080
    protocol: TCP
    targetPort: 18080
  selector:
     run: spark-history-server
  type: NodePort      

Spark On K8S Operator 使用 History Server

設定

spark.eventLog.enabled

參數值為 true 啟用記錄 Spark 日志,

spark.eventLog.dir

指定輸出日志的目錄為 HDFS 目錄。

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-job
spec:
  type: Scala
  mode: cluster
  image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  sparkConf:
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "hdfs://11.8.36.125:8020/spark-k8s"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: my-spark-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1      

在叢集外通過節點位址:30080 通路 Spark History Server,可以在應用程式執行完畢後看到詳細的資訊。

Spark 系列教程(2)運作模式介紹
Spark 系列教程(2)運作模式介紹
bin/spark-submit \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://11.8.36.125:8020/spark-k8s \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar      

建構鏡像

上面的例子都是使用 Spark 官方自帶的程式來送出作業,如果我們想要自定義一個程式可以使用 Spark 官網提供的腳本來建構鏡像。

程式代碼

該項目使用 Maven 來管理依賴。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.12</version>
        <scope>compile</scope>
    </dependency>
</dependencies>      

程式代碼如下,使用 Java 編寫了一個 Word Count 程式。

package com.chengzw.wordcount;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @description WordCount 示例
 * @author chengzw
 * @since 2021/7/25 8:39 下午
 */
public class MyJavaWordCount {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        System.setProperty("spark.ui.showConsoleProgress","false");
        //建立配置對象
        //本地運作
        //SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local");
        //在Spark上運作
        SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");
        //建立SparkContext對象
        JavaSparkContext sc = new JavaSparkContext(conf);
        //讀取hdfs資料
        //在本地運作
        //JavaRDD<String> rdd1= sc.textFile("/tmp/data.txt");
        //在Spark上運作
        JavaRDD<String> rdd1= sc.textFile(args[0]);
        //分詞
        JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String input) throws Exception {
                return Arrays.asList(input.split(" ")).iterator();
            }
        });
        //單詞計數 word,1
        JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        //相同Key的值累加
        JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a + b;
            }
        });
        //觸發計算
        List<Tuple2<String, Integer>> result = rdd4.collect();
        //列印
        for(Tuple2<String,Integer> r : result){
            System.out.println(r._1 + "\t" + r._2);
        }
        //釋放資源
        sc.stop();
    }
}      

打包

點選 mvn package 将程式打成 jar 包。

Spark 系列教程(2)運作模式介紹

建構并上傳鏡像

将 jar 包放到 Spark 安裝包的 examples/jars 目錄中,進入 Spark 目錄然後執行以下指令建構鏡像。

bin/docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t my-spark:1.0.0 build      

檢視建構好的鏡像。

❯ docker images | grep spark
registry.cn-hangzhou.aliyuncs.com/public-namespace/spark                v1.0.0       372341ae930d   12 minutes ago   529MB      

上傳鏡像。

./docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t v1.0.0 push      

使用自己建構的鏡像執行 Word Count 程式。

bin/spark-submit \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class com.chengzw.wordcount.MyJavaWordCount \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v1.0.0 \
 local:///opt/spark/examples/jars/spark-lab-1.0-SNAPSHOT.jar /etc/security/limits.conf      

檢視執行結果:

kubectl logs -n spark-job spark-pi-submit-37945f7c4f24e729-driver
#傳回結果
......
rss 2
space 2
priority 4
4 1
this 1
"soft" 1
max 14
cpu 1
memlock 1
apply 1
......      

參考資料

  • Spark 大資料分析實戰
  • [Spark Standalone Mode] (https://spark.apache.org/docs/0.9.0/spark-standalone.html#standby-masters-with-zookeeper)
  • [Spark:Master High Availability(HA)高可用配置的2種實作] (https://www.cnblogs.com/byrhuangqiang/p/3937654.html)
  • [【k8s系列1】spark on k8s 與 spark on k8s operator的對比] (https://segmentfault.com/a/1190000037503030)
  • [Running Spark on Kubernetes] (http://spark.apache.org/docs/3.0.0/running-on-kubernetes.html)
  • [spark-on-k8s-operator User Guide] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md)
  • [spark-on-k8s-operator Quick Start Guide] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md)
  • [Kubernetes Operator for Apache Spark Design] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md#architecture)
  • [Setting up, Managing & Monitoring Spark on Kubernetes] (https://www.datamechanics.co/blog-post/setting-up-managing-monitoring-spark-on-kubernetes)
  • [Spark UI History server on Kubernetes?] (https://stackoverflow.com/questions/51798927/spark-ui-history-server-on-kubernetes)
  • [How To Manage And Monitor Apache Spark On Kubernetes - Part 1: Spark-Submit VS Kubernetes Operator] (https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-introduction-spark-submit-kubernetes-operator)
  • [How To Manage And Monitor Apache Spark On Kubernetes - Part 2: Deep Dive On Kubernetes Operator For Spark] (https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-deep-dive-kubernetes-operator-for-spark)
  • [Spark on K8S (Kubernetes Native)] (https://www.cnblogs.com/moonlight-lin/p/13296909.html)
  • [Getting Started with Spark on Kubernetes] (http://blog.brainlounge.de/memoryleaks/getting-started-with-spark-on-kubernetes/)
  • [運作支援kubernetes原生排程的Spark程式] (https://jimmysong.io/kubernetes-handbook/usecases/running-spark-with-kubernetes-native-scheduler.html)
  • [深入淺出了解 Spark 部署與工作原理] (https://zhuanlan.zhihu.com/p/99398378)
  • [Spark client mode 和 cluster mode 的差別] (https://baixin.ink/2018/04/28/spark-mode/)