天天看點

初學Spark

什麼是Spark?

Spark是一個分布式計算引擎,2009年誕生于UC伯克利的AMPLab,2010年開源并于2013年成為Apache頂級項目。

Spark具有如下特點:

1.快速

  • DAG架構

    Spark采用的是DAG架構,DAG是在MapReduce架構基礎上演化而來。

對于一些複雜的資料處理,比如有多個Reduce Stage,MapReduce架構中一個Reduce前面必須要有一個Map(Map-Reduce-Map-Reduce...),不能多個Reduce級聯處理,這樣會導緻處理過程中會增加很多備援的Map階段,即使Map不做任何資料處理(讀取HDFS資料直接輸出),但是這個過程耗費了很多時間和資源。

DAG架構可以任意的組合Map/Reduce的算子(如Map-Reduce-Reduce),更加靈活更快速。

如Tez(Tez也是DAG)文檔裡面有例子說明,詳見

https://cwiki.apache.org/confluence/display/Hive/Hive+on+Tez,

其中以一個TPC-DS的例子進行了說明。

  • MapReduce是多程序模型,雖然可以更細粒度控制task占用的資源,但是JVM啟動會消耗更多的時間,Spark則采用的是多線程模型,task啟動快,不同的task可以共享記憶體;
  • Spark可以對RDD資料集進行cache,對疊代計算很友好更快速
  • Spark的性能優化項目Tungsten Project( https://www.slideshare.net/databricks/spark-performance-whats-next) ,對計算過程中的記憶體管理/CPU緩存友好等方面進行了很多優化。如WholeStageCodeGen,對火山模型(Volcano Model)進行了優化,減少了函數調用等。

2.易用

  • 支援SQL/Scala/Java/Python/R語言
  • 算子豐富

    使用者可以将算子進行組合完成資料處理,如wordcount ,隻需要寫幾行代碼,相對于MapReduce實作Map和Reduce要簡單很多。

val rdd = spark.sparkContext.textFile("/README.md")
val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.saveAsTextFile("/results")           
  • 互動式

    可以通過SQL/Scala/Python/R的shell進行互動式的使用

如:

[root@emr-header-1 ~]# spark-shell
 scala>spark.sql("create table t(a string)")
 
 [root@emr-header-1 ~]# pyspark
 >>> textFile = spark.read.text("README.md")
 
  [root@emr-header-1 ~]# spark-sql
 > select * from t;
           
  • 接口更統一

    Spark 2.0中StructStreaming/MLlib等接口基本統一到DataSet/DataFrame,API簡單,使得程式設計更容易,而SparkSQL/SparkCore子產品的優化,可以立即展現到上層子產品(Streaming/MLlib等)。

3.通用

Spark包含SparkSQL/StructStreaming/MLlib/GraphX,能夠處理各種大資料處理需求,如ETL離線處理、流式計算、機器學習、圖計算等,隻需要Spark就能應對大資料進行中的大部分場景。

4.融合

  • 多種部署方式

    不僅可以獨立部署standalone模式,也可以運作在Yarn/Mesos等資源排程架構之上

  • 多資料源接入

    可以讀寫HBase/HDFS/Cassandra/OSS/S3/Hive/Alluxio等DataSource,如:

    // 初始化SparkSession
    val spark = SparkSession.builder
          .master("local[2]") //local模式
          .appName("test")
          .enableHiveSupport() //使用Hive的中繼資料管理
          .getOrCreate()
    val df1 = spark.read.parquet(basePath) 
    val df2 = spark.read.text("oss://bucket/path/xxx")
    val df3 = spark.sql("select * from t")           

Spark技術棧

初學Spark

1. 資料源

Spark支援對接各種資料源,如HDFS/OSS/HBase/MySQL/Kafka等。

DataFrame封裝了一些資料源接入,比如json/csv/mysql等,使用者可以直接通過調用相關api去讀寫這些資料源檔案;

DataFrame還提供了DataSource接入的擴充api,使用者可以根據api将自己的DataSource接入Spark;

使用者也可以将資料源封裝成RDD來使用;網站

https://spark-packages.org/

上有很多第三方實作的資料源可以直接拿來使用。

EMR團隊也實作了很多對接阿裡雲産品的SDK供大家使用(

https://github.com/aliyun/aliyun-emapreduce-sdk)

社群目前在做DataSourceV2的重構(

https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#)

2. 資源排程

Spark可以通過YARN/Mesos進行資源管理。如将Spark作業送出到YARN的某個隊列中,通過控制隊列的配置設定達到對Spark作業的資源限制管理等。

3.Spark引擎

Spark是一個大資料處理的工具包,一套引擎裡面可以做ETL/流計算/圖計算等。

SparkCore是Spark引擎的最底層,它的任何改動/優化都會影響到上層子產品。它以RDD為核心,将外層資料源抽象成RDD資料集,然後通過一些算子(

transformation

)對RDD進行轉換操作(如map/filter等)生成新的RDD,最終通過算子(

action

)真正的送出執行擷取所需資料結果。

初學Spark
初學Spark

如上圖所示,将HDFS檔案抽象成RDD1資料集,然後通過map/filter算子對RDD1進行轉換處理,分别得到了新的RDD2/RDD3,最後通過saveAsTextFile這個

action

類型的算子真正觸發作業的送出運作,将結果寫到HDFS中。

上圖隻是一個簡單的SparkCore中以RDD為核心的資料處理流程。RDD提供了很多操作算子,使用者可以利用這些算子進行組合來處理更複雜的資料處理邏輯,如groupBy/reduce等等。

SparkCore對RDD資料處理過程,包含很多子產品,比如Stage/Task的排程, Shuffle, 記憶體管理, 排序等等,以後再詳細介紹。

下面是一張大概的内部執行流程圖,圖中相關概念可以去Spark官網檢視(如RDD/transformation和action兩種類型的算子/寬依賴/窄依賴等)。

初學Spark

後續

1.對SparkCore中的排程/shuffle/記憶體管理等詳細介紹

2.上層元件SparkSQL/StructStreaming/MLlib/GraphX/SparkR的介紹

歡迎指正交流

初學Spark

繼續閱讀