天天看點

【小白視角】大資料基礎實踐(七) Spark的基本操作

目錄

1. Spark概述

1.1 背景

1.2 特點

1.3 使用趨勢

2. Spark生态系統

2.1 Spark與Hadoop的對比。

2.2 Job

2.3 容錯率

2.4 通用性

2.5 實際應用

2.6 Spark生态系統元件的應用場景

2.7 Spark元件

2.7.1 Spark Core

2.7.2 Spark SQL

2.7.3 Spark Streaming

2.7.4 MLlib

2.7.5 Graphx

2.7.6 Cluster Managers

3. Spark運作架構

3.1 基本概念

3.2 架構設計

3.3 Spark 運作基本流程

3.4 Spark 運作原理

3.4.1 RDDs

3.4.2 RDD運作原理

3.4.3 Scala

4. SparkSQL

5. Spark程式設計實踐

5.1 程式設計環境

5.2 實驗步驟:

5.2.1 Spark環境配置

5.2.2 spark shell中編寫Scala代碼實作:

5.2.3 編寫Scala獨立應用程式:

最後

基于記憶體計算的大資料并行計算架構,可用于建構大型的、低延遲的資料分析應用程式。

Apache軟體基金會最重要的三大分布式計算系統開源項目之一(Hadoop、Spark、Storm)

運作速度快

Spark擁有DAG執行引擎,支援在記憶體中對資料進行疊代計算。官方提供的資料表明,如果資料由磁盤讀取,速度是Hadoop MapReduce的10倍以上,如果資料從記憶體中讀取,速度可以高達100多倍。

易用性好

Spark不僅支援Scala編寫應用程式,而且支援Java和Python等語言進行編寫,特别是Scala是一種高效、可拓展的語言,能夠用簡潔的代碼處理較為複雜的處理工作。

通用性強

随處運作

【小白視角】大資料基礎實踐(七) Spark的基本操作
谷歌的大資料分析應用使用趨勢
【小白視角】大資料基礎實踐(七) Spark的基本操作

Hadoop:

一個MapReduce程式就是一個Job,一個Job裡有一個或多個Task,區分為Map Task和Reduce Task

Spark:

Job概念與Hadoop不同,在它之上還有Application,一個Application和一個SparkContext相關聯,每個Application可以有一個或多個Job并行或串行運作;Job由Action觸發Job裡又包含多個Stage,Stage是以Shuffle進行劃分的,每個Stage包含了由多個Task組成的Task Set。

【小白視角】大資料基礎實踐(七) Spark的基本操作

Spark容錯性比Hadoop更好:

Spark引進了彈性分布式資料集RDD的抽象,這些集合是彈性的,如果資料集一部分丢失,則可以根據“血統”(即允許基于資料衍生過程)對他們進行重建。

另外在RDD計算時,可以通過CheckPoint來實作容錯,CheckPoint有兩種方式:

CheckPoint Data 和Logging The Updates,使用者可以控制采用哪種方式來實作容錯。

Spark通用性也比Hadoop更好:

Hadoop隻提供了Map和Reduce兩種操作;

Spark提供了資料集操作類型很多種,大緻分為Transformation和Action兩大類:

Transformation 包 括 Map 、 Filter 、 FlatMap 、 Sample 、 GroupByKey 、ReduceByKey 、 Union 、 Join 、 Cogroup 、 MapValues 、 Sort 、 Count 和PartionBy等多種操作類型。

Action包括Collect、Reduce、Lookup和Save等操作。

另外各個處理節點之間的通信模型不再像Hadoop隻有Shuffle一種模式,使用者可以命名、物化,控制中間結果的存儲、分區等

在實際應用中,大資料處理主要包括以下三個類型:

複雜的批量資料處理:通常時間跨度在數十分鐘到數小時之間

基于曆史資料的互動式查詢:通常時間跨度在數十秒到數分鐘之間

基于實時資料流的資料處理:通常時間跨度在數百毫秒到數秒之間

目前對以上三種場景需求都有比較成熟的處理架構,

第一種情況可以用Hadoop的MapReduce來進行批量海量資料處理,

第二種情況可以Impala進行互動式查詢,

對于第三中情況可以用Storm分布式處理架構處理實時流式資料。

成本問題:

以上三者都是比較獨立,各自一套維護成本比較高,會帶來一些問題:

不同場景之間輸入輸出資料無法做到無縫共享,通常需要進行資料格式的轉換

不同的軟體需要不同的開發和維護團隊,帶來了較高的使用成本

比較難以對同一個叢集中的各個系統進行統一的資源協調和配置設定

而Spark的出現能夠一站式平台滿意以上需求

【小白視角】大資料基礎實踐(七) Spark的基本操作

包含Spark的基本内容,包含任務排程,記憶體管理,容錯機制等。

Spark Core内部定義了RDDs(彈性分布式資料集)。RDDs代表橫跨很多工作節點的資料集合,RDDs可以被并行的處理。

Spark Core提供了很多APIs來建立和操作這些集合RDDs

Spark處理結構化資料的庫。它支援通過SQL查詢資料。就像HQL(Hive SQL)一樣,并且支援很多資料源,像Hive表、JSON等。

Shark是一種較老的基于Spark的SQL項目,它是基于Hive修改的,它現在已經被Spark-SQL代替了。

實時資料流處理元件,類似Storm

Spark Streaming提供了API來操作實時流資料。

Spark 有一個包含通用機器學習功能的包,就是MLlib(machine learning lib)

MLlib 包含了分類,聚類,回歸,協同過濾算法,還包括子產品評估和資料導入。

它還提供了一些低級的機器學習原語,包括通用梯度下降優化算法。

除此之外,還支援叢集上的橫向擴充。

是處理圖的庫,并進行圖的并行計算。就像Spark Streaming和Spark SQL一樣,Graphx也繼承了Spark RDD API,同時允許建立有向圖。

Graphx提供了各種圖的操作,例如subgraph和mapVertices,也包含了常用的圖算法,例如PangeRank等。

Cluster Managers就是叢集管理。Sparkl能夠運作在很多cluster managers上面,包括Hadoop YARN,Apache Mesos和Spark自帶的單獨排程器。

如果你有了Hadoop Yarn或是Mesos叢集,那麼Spark對這些叢集管理工具的支援,使Spark應用程式能夠在這些叢集上面運作。

RDD:是ResillientResillient Distributed DatasetDistributed Dataset(彈性分布式資料集)的簡稱,是分布式記憶體的一個抽象概念, 提供了一種高度受限的共享記憶體模型

DAG:是Directed Acyclic Graph(有向無環圖)的簡稱,反映 RDD 之間的依賴關系

Executor:是運作在工作節點 WorkerNode)的一個程序,負責運作 Task

Application:使用者編寫的 Spark 應用程式

Task:運作在 Executor 上的工作單元

Job:一個 Job 包含多個 RDD 及作用于相應 RDD上的各種操作

Stage:是 Job 的基本排程機關,一個 Job 會分為多組 Task ,每組 Task 被稱為 Stage或者也被稱為 TaskSet ,代表了一組關聯的、互相之間沒有 Shuffle 依賴關系的任務組成的任務集

Spark運作架構包括叢集資料總管(Cluster Manager)、運作作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行程序(Executor)

資料總管可以自帶或Mesos或YARN

與Hadoop MapReduce計算架構相比,Spark所采用的Executor有兩個優點:

一是利用多線程來執行具體的任務,減少任務的啟動開銷

二是Executor中有一個BlockManager存儲子產品,會将記憶體和磁盤共同作為儲存設備,有效減少IO開銷

【小白視角】大資料基礎實踐(七) Spark的基本操作

一個 Application 由一個 Driver 和若幹個 Job 構成,一個 Job 由多個 Stage 構成,一個Stage 由多個沒有 Shuffle 關系的 Task 組成

當執行一個 Application 時, Driver 會向叢集管理器申請資源,啟動 Executor ,并向 Executor 發送應用程式代碼和檔案,然後在 Executor 上執行 Task ,運作結束後執行結果會傳回給 Driver 或者寫到 HDFS 或者其他資料庫中

【小白視角】大資料基礎實踐(七) Spark的基本操作

首先為應用建構起基本的運作環境,即由 Driver 建立一個 SparkContext ,進行資源的申請、任務的配置設定和監控

資料總管為 Executor 配置設定資源,并啟動 Executor 程序

SparkContext 根據 RDD 的依賴關系建構 DAG 圖, DAG 圖送出給 DAGScheduler 解析成 Stage ,然後把一個個TaskSet 送出給底層排程器,Task Scheduler處理; Executor 向 SparkContext 申請 Task,Task Scheduler ,将 Task 發放給 Executor 運作,并提供應用程式代碼

Task 在 Executor 上運作,把執行結果,回報給 Task Scheduler ,然後回報給 DAG Scheduler ,運作完畢後寫入資料并釋放所有資源

【小白視角】大資料基礎實踐(七) Spark的基本操作

特點:

每個Application 都有自己專屬的Executor程序,并且該程序在Application運作期間一直駐留。Executor程序以多線程的方式運作Task

Spark運作過程與資料總管無關,隻要能夠擷取Executor程序并保持通信即可

Task采用了資料本地性和推測執行等優化機制

Resilient Distributed Datasets (彈性分布式資料集 RDDs

RDDs 是 Spark 的分發資料和計算的基礎抽象類,是Spark的核心概念;

在 Spark 中,所有的計算都是通過RDDs的建立、轉換、操作完成的。

RDDs 具有lineage graph(血統關系圖)

一個 RDD 就是一個不可改變的分布式集合對象,内部有許多partitions 組成,每個partition 都包括一部分資料,這些 partitions 可以在叢集的不同節點上計算;

Partition 是 Spark 中的并行處理單元。

RDD 提供了一組豐富的操作以支援常見的資料運算,分為“動作 Action和“轉換 Transformation兩種類型

RDD 提供的轉換接口都非常簡單,都是類似map 、 filter、groupBy 、join等粗粒度的資料轉換操作,而不是針對某個資料項的細粒度修改 (不适合網頁爬蟲

表面上 RDD 的功能很受限、不夠強大 實際上 RDD 已經被實踐證明可以高效地表達,許多架構的程式設計模型 比如MapReduce、 SQL、 Pregel

Spark 用 Scala 語言實作了 RDD 的 API ,程式員可以通過調用 API 實作對 RDD 的各種操作

RDD典型的執行順序如下:

RDD讀入外部資料源進行建立

RDD經過一系列的轉換(Transformation)操作,每一次都會産生不同的RDD,供給下一個轉換操作使用

最後一個RDD經過“動作”操作進行轉換,并輸出到外部資料源

這一系列處理稱為一個Lineage(血緣關系),即DAG拓撲排序的結果

RDD的 transformations 和 actions

點選這裡

RDD 運作過程:

建立 RDD 對象;

SparkContext 負責計算 RDD 之間的依賴關系,建構 DAG

DAGScheduler 負責把 DAG 圖分解成多個 Stage ,每個 Stage 中包含了多個Task ,每個 Task 會被 TaskScheduler 分發給各個 WorkerNode 上的 Executor 去執行。

Scala 是一門現代的多範式程式設計語言,運作于 Java 平台( JVM Java 虛拟機),并相容現有的 Java 程式

Scala 的特性:

Scala 具備強大的并發性,支援函數式程式設計,可以更好地支援分布式系統

Scala 文法簡潔,能提供優雅的API

Scala 相容Java ,運作速度快,且能融合到 Hadoop 生态圈中

Scala 是 Spark 的主要程式設計語言,但 Spark還支援 Java 、 Python 、R 作為程式設計語言

Scala 的優勢是提供了 REPL Read Eval Print Loop,互動式解釋器 ),提高程式開發效率

Spark SQL在Hive相容層面僅依賴HiveQL解析、Hive中繼資料,也就是說,從HQL被解析成抽象文法樹(AST)起,就全部由Spark SQL接管了。Spark SQL執行計劃生成和優化都由Catalyst(函數式關系查詢優化架構)負責

【小白視角】大資料基礎實踐(七) Spark的基本操作

Spark SQL增加了SchemaRDD(即帶有Schema資訊的RDD),使使用者可以在Spark SQL中執行SQL語句,資料既可以來自RDD,也可以是Hive、HDFS、Cassandra等外部資料源,還可以是JSON格式的資料

Spark SQL目前支援Scala、Java、Python三種語言,支援SQL-92規範

【小白視角】大資料基礎實踐(七) Spark的基本操作

作業系統:Linux(建議Ubuntu18.04或Ubuntu16.04);

Hadoop版本:3.1.3或2.7.1;

JDK版本:1.8;

Hadoop僞分布式配置

Spark 2.4.8或自編譯版本

Scala 2.11.8或2.8.0

檢測java環境和hadoop環境。

【小白視角】大資料基礎實踐(七) Spark的基本操作
安裝包下載下傳
【小白視角】大資料基礎實踐(七) Spark的基本操作
Scala: https://www.scala-lang.org/download/all.html http://spark.apache.org/downloads.html

關于 Spark 官網下載下傳頁面中 Choose a package type 幾個選項說明:

Source Code:spark 源碼,需要編譯才能使用,可以自由設定編譯選項;

Pre-build with user-provide Hadoop:屬于 Hadoop free 版本,用應用到任意 Hadoop 版本;

Pre-build for Hadoop 2.7、Pre-build for Hadoop 2.6:分别基于 Hadoop2.7、2.6 的預先編譯版本,需要與本機安裝的 Hadoop 版本對應使用;

Pre-build with Scala 2.12 and user provided Apache Hadoop:預先編譯的版本,包含了 Scala2.12,可應用于任意 Hadoop 版本。

安裝scala

解壓安裝包(sudo tar -zxvf scala-2.11.8.tgz -C /usr/local/),并更改 scala

【小白視角】大資料基礎實踐(七) Spark的基本操作
所屬使用者和使用者組為目前使用者及所在組。
【小白視角】大資料基礎實踐(七) Spark的基本操作

配置環境變量:添加S C A L A H O M E 變 量 為 s c a l a 解 壓 路 徑 , 并 在 SCALA_HOME 變量為 scala 解壓路徑,并在SCALA

H

【小白視角】大資料基礎實踐(七) Spark的基本操作
OME變量為scala解壓路徑,并在PATH 變量添加相應 的 bin 目錄。
【小白視角】大資料基礎實踐(七) Spark的基本操作
使得環境生效
【小白視角】大資料基礎實踐(七) Spark的基本操作
檢視是否安裝成功
【小白視角】大資料基礎實踐(七) Spark的基本操作

已經成功了!

安裝spark

解壓安裝包(sudo tar -zxvf spark-2.4.8-bin-without-hadoop.tgz -C /usr/local/),更改所屬使用者及使用者組,并将目錄重命名為 spark-2.4.8,友善後續配置:

【小白視角】大資料基礎實踐(七) Spark的基本操作
更改所屬使用者及使用者組
【小白視角】大資料基礎實踐(七) Spark的基本操作
并将目錄重命名為 spark-2.4.8
【小白視角】大資料基礎實踐(七) Spark的基本操作

配置環境變量,添加 SPARK_HOME 變量,并在 PATH 變量中添加相應的 bin 目錄。

export SPARK_HOME=/usr/local/spark-2.4.8

export PATH=P A T H : PATH:PATH:SPARK_HOME/bin

【小白視角】大資料基礎實踐(七) Spark的基本操作
Spark 配置檔案配置:
【小白視角】大資料基礎實踐(七) Spark的基本操作
将 spark-env.sh.template 檔案複制為 spark-env.sh 檔案:
【小白視角】大資料基礎實踐(七) Spark的基本操作
并配置内容如下:
【小白視角】大資料基礎實踐(七) Spark的基本操作
啟動 spark:啟動 spark 之前要先啟動 HDFS
【小白視角】大資料基礎實踐(七) Spark的基本操作
啟動之後網頁通路 Master:8080 可以檢視目前 Spark workers 狀态。
【小白視角】大資料基礎實踐(七) Spark的基本操作
Spark-shell 進入spark shell
【小白視角】大資料基礎實踐(七) Spark的基本操作
會有這種錯誤
【小白視角】大資料基礎實踐(七) Spark的基本操作
但不需要慌張!不影響使用 scala 使用,如果要解決,可以通過添加系統環境變量。export TERM=xterm-color
【小白視角】大資料基礎實踐(七) Spark的基本操作
就不會有了
【小白視角】大資料基礎實踐(七) Spark的基本操作

1.5 舉個例子

通 過 spark-submit 指令運作 spark 自 帶 實 例 , spark 自 帶 實 例 都 在

SPARK_HOME/examples/jars/spark-examples_2.11-2.4.8.jar 中提供:

spark-submit --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.4.8.jar      

注:在運作SparkPi執行個體時會輸出很多運作日志,可以通過加 grep 指令進行過濾,顯示關心的資訊:

【小白視角】大資料基礎實踐(七) Spark的基本操作

(1)分别從本地檔案、HDFS上的檔案以及Spark Context的parallelized()方法生成分别生成RDD_1、RDD_2、RDD_3,要求本地檔案格式為每行多個單詞,以空格隔開;HDFS上的文本為每行1個單詞,即單詞以換行符隔開,每個RDD中都要包含1個或多個你的學号或者姓名拼音;

1.1 本地建立in.txt

【小白視角】大資料基礎實踐(七) Spark的基本操作

寫入内容

【小白視角】大資料基礎實踐(七) Spark的基本操作

上傳到spark

【小白視角】大資料基礎實踐(七) Spark的基本操作

1.2 本地建立檔案in0.txt

【小白視角】大資料基礎實踐(七) Spark的基本操作

寫入資料

【小白視角】大資料基礎實踐(七) Spark的基本操作

上傳到hdfs中

【小白視角】大資料基礎實踐(七) Spark的基本操作

檢查是否上傳成功

【小白視角】大資料基礎實踐(七) Spark的基本操作

1.3 spark建立檔案

【小白視角】大資料基礎實踐(七) Spark的基本操作

建立成功!

(2) 輸出RDD_1的第一行、RDD_2的所有内容、RDD_3的最大值;

2.1RDD_1的第一行

【小白視角】大資料基礎實踐(七) Spark的基本操作

2.2 RDD_2的所有内容

【小白視角】大資料基礎實踐(七) Spark的基本操作

2.3 RDD_3的最大值

【小白視角】大資料基礎實踐(七) Spark的基本操作

(3) 統計 RDD_1 中“姓名拼音”、“學号”兩個單詞出現的次數;

【小白視角】大資料基礎實踐(七) Spark的基本操作

結果:

zqc 有6個

031904102 有 4個

(4) 對去重後的 RDD_1再去掉RDD_2中的内容;

【小白視角】大資料基礎實踐(七) Spark的基本操作

(5) 将上述結果與RDD_3合并,并将RDD_3分别寫入本地檔案系統和HDFS檔案系統;

【小白視角】大資料基礎實踐(七) Spark的基本操作

檢視是否成功放入

【小白視角】大資料基礎實踐(七) Spark的基本操作
【小白視角】大資料基礎實踐(七) Spark的基本操作

(6)編寫scala代碼實作寫入任意内容到HDFS中,檔案路徑自定義,檔案以”學号-姓名拼音.txt”命名。

先建立一個檔案

【小白視角】大資料基礎實踐(七) Spark的基本操作
【小白視角】大資料基礎實踐(七) Spark的基本操作

在HDFS上檢視

【小白視角】大資料基礎實踐(七) Spark的基本操作

使用 Scala 語言編寫的 Spark 程式,需要使用 sbt 進行編譯打包。Spark 中沒有自帶sbt,需要單獨安裝。可以到 官網 下載下傳 sbt 安裝檔案,最新版即可

【小白視角】大資料基礎實踐(七) Spark的基本操作
【小白視角】大資料基礎實踐(七) Spark的基本操作

下載下傳好

【小白視角】大資料基礎實踐(七) Spark的基本操作

建立一個目錄

【小白視角】大資料基礎實踐(七) Spark的基本操作

這裡我們把 sbt 安裝到“/usr/local/sbt”目錄下,執行如下指令:

【小白視角】大資料基礎實踐(七) Spark的基本操作

把 bin 目錄下的 sbt-launch.jar 複制到 sbt 安裝目錄下

【小白視角】大資料基礎實踐(七) Spark的基本操作

建立一個檔案然後将下列内容寫下去

#!/bin/bash 
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M" 
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"      
【小白視角】大資料基礎實踐(七) Spark的基本操作

儲存後,還需要為該 Shell 腳本檔案增加可執行權限:

然後,可以使用指令 sbt sbtVersion 檢視 sbt 版本資訊:

完成了,是有一點點慢!

【小白視角】大資料基礎實踐(七) Spark的基本操作

(1) 實作wordcount功能,并将結果寫入本地檔案;

在本地建立目錄

【小白視角】大資料基礎實踐(七) Spark的基本操作

建立這個檔案。

【小白視角】大資料基礎實踐(七) Spark的基本操作

寫入資料。

【小白視角】大資料基礎實踐(七) Spark的基本操作

檢查目錄結構

【小白視角】大資料基礎實踐(七) Spark的基本操作

(2)分别使用sbt打包上述程式;

【小白視角】大資料基礎實踐(七) Spark的基本操作

(3)通過spark-submit執行生成的jar。

【小白視角】大資料基礎實踐(七) Spark的基本操作

編寫Scala獨立應用程式:

實作生成任意RDD,并将結果寫入檔案;      
【小白視角】大資料基礎實踐(七) Spark的基本操作

重命名并設定權限組

【小白視角】大資料基礎實踐(七) Spark的基本操作

在終端中執行如下指令建立一個檔案夾 spark_zqc_maven_scala 作為應用程式根,目錄:

【小白視角】大資料基礎實踐(七) Spark的基本操作

寫入下面内容

【小白視角】大資料基礎實踐(七) Spark的基本操作

(2) 分别使用maven打包上述程式;

該 程 序 依 賴 Spark Java API, 因 此 我 們 需 要 通 過 Maven 進 行 編 譯 打 包 。 在./spark_zqc_maven_scala 目錄中建立檔案 pom.xml,然後,在 pom.xml 檔案中 添加如下内容,用來聲明該獨立應用程式的資訊以及與 Spark 的依賴關系:

【小白視角】大資料基礎實踐(七) Spark的基本操作

為了保證 Maven 能夠正常運作,先執行如下指令檢查整個應用程式的檔案結構,

【小白視角】大資料基礎實踐(七) Spark的基本操作

接下來,我們可以通過如下代碼将整個應用程式打包成 JAR 包(注意:計算機需要保持連接配接網絡的狀态,而且首次運作打包指令時,Maven 會自動下載下傳依賴包,需要消耗幾分鐘的時間):

【小白視角】大資料基礎實踐(七) Spark的基本操作

(3) 通過spark-submit執行生成的jar。

【小白視角】大資料基礎實踐(七) Spark的基本操作

小生凡一,期待你的關注。

【小白視角】大資料基礎實踐(七) Spark的基本操作