什麼是Spark?
Spark是一個分布式計算引擎,2009年誕生于UC伯克利的AMPLab,2010年開源并于2013年成為Apache頂級項目。
名稱 | 位址 |
---|---|
官方網站 | https://spark.apache.org/ |
git位址 | https://github.com/apache/spark |
Spark具有如下特點:
1.快速
-
DAG架構
Spark采用的是DAG架構,DAG是在MapReduce架構基礎上演化而來。
對于一些複雜的資料處理,比如有多個Reduce Stage,MapReduce架構中一個Reduce前面必須要有一個Map(Map-Reduce-Map-Reduce...),不能多個Reduce級聯處理,這樣會導緻處理過程中會增加很多備援的Map階段,即使Map不做任何資料處理(讀取HDFS資料直接輸出),但是這個過程耗費了很多時間和資源。
DAG架構可以任意的組合Map/Reduce的算子(如Map-Reduce-Reduce),更加靈活更快速。
如Tez(Tez也是DAG)文檔裡面有例子說明,詳見
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Tez,其中以一個TPC-DS的例子進行了說明。
- MapReduce是多程序模型,雖然可以更細粒度控制task占用的資源,但是JVM啟動會消耗更多的時間,Spark則采用的是多線程模型,task啟動快,不同的task可以共享記憶體;
- Spark可以對RDD資料集進行cache,對疊代計算很友好更快速
- Spark的性能優化項目Tungsten Project( https://www.slideshare.net/databricks/spark-performance-whats-next) ,對計算過程中的記憶體管理/CPU緩存友好等方面進行了很多優化。如WholeStageCodeGen,對火山模型(Volcano Model)進行了優化,減少了函數調用等。
2.易用
- 支援SQL/Scala/Java/Python/R語言
-
算子豐富
使用者可以将算子進行組合完成資料處理,如wordcount ,隻需要寫幾行代碼,相對于MapReduce實作Map和Reduce要簡單很多。
val rdd = spark.sparkContext.textFile("/README.md")
val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.saveAsTextFile("/results")
-
互動式
可以通過SQL/Scala/Python/R的shell進行互動式的使用
如:
[root@emr-header-1 ~]# spark-shell
scala>spark.sql("create table t(a string)")
[root@emr-header-1 ~]# pyspark
>>> textFile = spark.read.text("README.md")
[root@emr-header-1 ~]# spark-sql
> select * from t;
-
接口更統一
Spark 2.0中StructStreaming/MLlib等接口基本統一到DataSet/DataFrame,API簡單,使得程式設計更容易,而SparkSQL/SparkCore子產品的優化,可以立即展現到上層子產品(Streaming/MLlib等)。
3.通用
Spark包含SparkSQL/StructStreaming/MLlib/GraphX,能夠處理各種大資料處理需求,如ETL離線處理、流式計算、機器學習、圖計算等,隻需要Spark就能應對大資料進行中的大部分場景。
4.融合
-
多種部署方式
不僅可以獨立部署standalone模式,也可以運作在Yarn/Mesos等資源排程架構之上
-
多資料源接入
可以讀寫HBase/HDFS/Cassandra/OSS/S3/Hive/Alluxio等DataSource,如:
// 初始化SparkSession val spark = SparkSession.builder .master("local[2]") //local模式 .appName("test") .enableHiveSupport() //使用Hive的中繼資料管理 .getOrCreate() val df1 = spark.read.parquet(basePath) val df2 = spark.read.text("oss://bucket/path/xxx") val df3 = spark.sql("select * from t")
Spark技術棧
1. 資料源
Spark支援對接各種資料源,如HDFS/OSS/HBase/MySQL/Kafka等。
DataFrame封裝了一些資料源接入,比如json/csv/mysql等,使用者可以直接通過調用相關api去讀寫這些資料源檔案;
DataFrame還提供了DataSource接入的擴充api,使用者可以根據api将自己的DataSource接入Spark;
使用者也可以将資料源封裝成RDD來使用;網站
https://spark-packages.org/上有很多第三方實作的資料源可以直接拿來使用。
EMR團隊也實作了很多對接阿裡雲産品的SDK供大家使用(
https://github.com/aliyun/aliyun-emapreduce-sdk)社群目前在做DataSourceV2的重構(
https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#)2. 資源排程
Spark可以通過YARN/Mesos進行資源管理。如将Spark作業送出到YARN的某個隊列中,通過控制隊列的配置設定達到對Spark作業的資源限制管理等。
3.Spark引擎
Spark是一個大資料處理的工具包,一套引擎裡面可以做ETL/流計算/圖計算等。
SparkCore是Spark引擎的最底層,它的任何改動/優化都會影響到上層子產品。它以RDD為核心,将外層資料源抽象成RDD資料集,然後通過一些算子(
transformation
)對RDD進行轉換操作(如map/filter等)生成新的RDD,最終通過算子(
action
)真正的送出執行擷取所需資料結果。
如上圖所示,将HDFS檔案抽象成RDD1資料集,然後通過map/filter算子對RDD1進行轉換處理,分别得到了新的RDD2/RDD3,最後通過saveAsTextFile這個
action
類型的算子真正觸發作業的送出運作,将結果寫到HDFS中。
上圖隻是一個簡單的SparkCore中以RDD為核心的資料處理流程。RDD提供了很多操作算子,使用者可以利用這些算子進行組合來處理更複雜的資料處理邏輯,如groupBy/reduce等等。
SparkCore對RDD資料處理過程,包含很多子產品,比如Stage/Task的排程, Shuffle, 記憶體管理, 排序等等,以後再詳細介紹。
下面是一張大概的内部執行流程圖,圖中相關概念可以去Spark官網檢視(如RDD/transformation和action兩種類型的算子/寬依賴/窄依賴等)。
後續
1.對SparkCore中的排程/shuffle/記憶體管理等詳細介紹
2.上層元件SparkSQL/StructStreaming/MLlib/GraphX/SparkR的介紹
歡迎指正交流