前言
Spark自從2014年1.2版本釋出以來,已成為大資料計算的通用元件。網上介紹Spark的資源也非常多,但是不利于使用者快速入門,是以本文主要通從使用者的角度來介紹Spark,讓使用者能快速的認識Spark,知道Spark是什麼、能做什麼、怎麼去做。
具體的概念可以參考spark社群的
相關文章Spark是什麼
摘用官網的定義:
Spark是一個快速的、通用的分布式計算系統。
提供了進階API,如:Java、Scala、Python和R。
同時也支援進階工具,如:Spark SQL處理結構化資料、MLib處理機器學習、GraphX用于圖計算、Spark Streming用于流資料處理。
也就是說Spark提供了靈活的、豐富接口的大資料處理能力。下圖是Spark的子產品圖:
使用者使用的SQL、Streaming、MLib、GraphX接口最終都會轉換成Spark Core分布式運作。
目前使用者用的比較多的是SQL和Streaming,這裡先主要介紹下這兩個。
Spark SQL
Spark SQL是Spark提供的SQL接口,使用者使用Spark SQL可以像使用傳統資料庫一樣使用SQL。例如:建立表、删除表、查詢表、join表等。連接配接到Spark SQL後可以做如下操作(可參考:
如何連接配接Spark SQL)。
# 在Spark中建立一個表:test_parquet,表的存儲檔案格式為:parquet
create table test_parquet(
id int,
name string,
value double
) using parquet;
此指令運作完畢後,Spark系統會在hdfs上建立一個名稱為test_parquet的目錄,例如/user/hive/warehouse/test_parquet/。
然後往Spark表中插入資料。
# 往Spark表:test_parquet插入資料
insert into test_parquet values(1001, 'name1001', 95.49);
insert into test_parquet values(1002, 'name1002', 73.25);
insert into test_parquet values(1003, 'name1003', 25.65);
insert into test_parquet values(1004, 'name1004', 23.39);
insert into test_parquet values(1005, 'name1005', 8.64);
insert into test_parquet values(1006, 'name1006', 52.60);
insert into test_parquet values(1007, 'name1007', 42.16);
insert into test_parquet values(1008, 'name1008', 85.39);
insert into test_parquet values(1009, 'name1009', 7.22);
insert into test_parquet values(1010, 'name1010', 10.43);
插入資料的步驟運作後,Spark會在hdfs目錄:/user/hive/warehouse/test_parquet/中建立一些列字尾為.parquet的檔案,如下:
/user/hive/warehouse/test_parquet/part-00000-49fefdf2-2ef0-4b7d-9414-6ac52e0390cb-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-52106855-8dd8-4f3a-8746-3025cf4898ea-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-89d738e9-754b-44b0-abd3-f2dd91cd0389-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-bce8efef-13ef-42e2-a6c7-8611e21e931a-c000.snappy.parquet
插入完成後開始查詢資料。
select * from test_parquet
查詢資料的過程是Spark并行從hdfs系統上拉取每個Parquet,然後在Spark中并行計算。
這裡隻是簡單列舉了Spark 建立Parquet表的過程,Spark也可以支援讀取其他格式的表,例如對接資料庫等。需要了解可參考:
Spark Connectors。
Spark SQL除了對使用者提供了SQL指令的接口,也提供了API接口。Datasets(DataFrames),例如使用API建立Parquet如下:
//spark 讀取json格式檔案,傳回一個DataFrames
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames儲存為Parquet格式檔案。
peopleDF.write.parquet("people.parquet")
//讀取Parquet檔案,傳回DataFrames
val parquetFileDF = spark.read.parquet("people.parquet")
// DataFrames注冊成一個Parquet表
parquetFileDF.createOrReplaceTempView("parquetFile")
//使用SQL查詢Parquet表
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
//列印資料
namesDF.map(attributes => "Name: " + attributes(0)).show()
Spark Streaming
Spark Streaming是流式處理系統,可以處理流式資料。下面用個例子說明Streaming的過程。
Spark Streaming可以對接Kafka。假如kafka産生的資料格式為:
id values time
id001 98.2 1560414467
id002 99.2 1560414468
id001 87.2 1560414469
現在業務需要每分鐘從Kafka讀取一批資料,對數進行資訊補齊,因為kafka拿到的資料隻有id資訊,使用者想補齊name資訊。
假如具有id、name資訊的表存儲在Phoenix中。
這樣就可以通過Spark Streaming來完成這些業務訴求。在Spark的業務處理邏輯中拿到kafka的資料後,使用id關聯Phoenix表拿到name資訊,然再寫入到其他資料庫。
例如此業務的Spark Streaming的業務邏輯代碼如下:
//scala代碼樣例
val words = messages.transform { rdd =>
rdd.map {line =>
println(s"==0== words = $line")
//逗号分隔
val words = line.value().split(",")
words
}
}.foreachRDD { lineArray =>
lineArray.foreachPartition { dataPartition =>
val connectionPool = Phoenix5xConnectionPool(queryServerAddress)
val phoenixConn = connectionPool.borrowObject()
val statment = phoenixConn.createStatement()
var i = 0
while (dataPartition.hasNext) {
val kv = dataPartition.next()
//關聯id、擷取name資訊。
val rs = statment.executeQuery(s"select name from $phoenixTableName where id = ${kv(0)}")
val name = rs.getString(1)
//把結果寫入到資料庫
statment.execute(s"upsert into $resultTable values('${kv(0)}','${kv(1)}', ${kv(2)}, '$name')")
i = i + 1
if (i % batchSize == 0) {
phoenixConn.commit()
}
}
phoenixConn.commit()
connectionPool.returnObject(phoenixConn)
}
}
Spark Streming對接Kafka可參考:
Spark對接kafkak快速入門。Spark Streming對接Phoenix代碼可參考:
SparkStreming,
SparkSQLSpark适合做什麼
先看下Spark在目前常用的BigData業務架構中的位置。
下圖是常用的BigData 大資料元件Spark+HBase+Cassandra+ES(Solor),這些元件組合可覆寫BigData 95%以上的業務場景。
圖中資料BigData分4個層次,由上到下分别為:
業務系統層:一般是直接面向使用者的業務系統。
計算層:Spark的分布式計算。
資料庫層:HBase+Cassandra資料庫提供實時查詢的能力。
存儲層:HDFS或者OSS。
這裡主要介紹下計算層Spark。
Spark計算層會把資料從資料庫、列式存儲(數倉)中拉去到Spark中進行分布式計算。我們把Spark打開看下是如何分布式計算的。先看下Spark運作時候的部署結構。
由上圖可以看到Spark部署時分布式的,有一個Driver,有N個Executor。業務系統對接Driver,Driver把計算邏輯發送到每個Executor運作,Executor運作結果再傳回。
是以當Spark拉取資料庫、數倉資料時會并行拉取到每個Executor做并行運算。
例如Spark SQL中查詢表的例子,以及Spark Streming的中處理批資料的例子,Spark運算時是每個Executor并處理資料的,Executor處理資料的邏輯是由使用者編碼控制的,例如使用者寫的SQL語句,調用API寫的業務代碼等。
那麼Spark适合什麼樣的計算呢?
下圖列出了Spark 和HBase資料庫各自适用的場景(摘自
HBase和Spark對比):
對比項目 | Phoenix(HBase) | Spark |
---|---|---|
SQL複雜度 | 簡單查詢, 必須命中索引 且 命中後 傳回的資料較少,如果是join,則join任意一則傳回的資料量在10w以下,且另一側必須命中索引。 為了保障叢集穩定性,一些複雜的sql及耗時的sql會被平台拒絕運作。 | 全部支援執行完成,支援Spark 映射到Phoenix,做到Spark在簡單SQL查詢能到Phoenix同樣的性能,不過Spark定位為 分析的場景,與Phoenix 純TP有本質的差別 |
叢集 | HBase共享一個叢集,本質是HBase提供的SQL | Spark需要單獨購買的叢集,Spark叢集運算不影響其它資料庫 |
并發 | 單機 1w-5w左右 | Spark最高不超過100 |
延遲 | 延遲在ms級别,一些命中較多的資料的sql會到 秒 | 一般延遲在300ms以上,大部分sql需要秒,分鐘,甚至小時 |
資料Update | Phoenix支援 | Spark不支援 |
支援業務 | 線上業務 | 離線業務 或者 準線上業務 |
舉個例子說明下上面每一項對應的場景,如下圖:
此圖描述的是使用者登入手機淘寶,淘寶根據使用者的ID資訊在淘寶首頁推薦商品這樣的一個流程。我們看下每個流程中哪些場景适合Phoenix、Spark。
1、 擷取使用者的推薦清單。
使用者登入後,手機淘寶要根據使用者的ID從“使用者推薦商品清單 user_reco_list”這個表中擷取資訊。SQL語句可能是這樣的:
select * from user_reco_list where user_id = 'user0001' and time='2019-06-22'
這個SQL的特點如下:
- 簡單:隻有select *,沒有join、group by。
- 有關鍵字過濾:user_id = 'user0001'。
- 傳回的結果集少(大概傳回幾十行)。
- user_reco_list表資料量很大(百億級别)
- 并發量很大,可能同時會有上萬個使用者同時登入。
- 低延遲時間:使用者一登入要立刻顯示推薦。
類似這種特點的業務查詢就适合使用線上資料Phoenix。
2、 統計使用者的浏覽記錄。
“使用者推薦商品清單 user_reco_list”中資料是怎麼來的呢?是從使用者的浏覽記錄、購買記錄、加入購物車記錄等資訊統計而來的。背景任務每天淩晨從使用者的記錄中進行大量的統計分析,然後把結果寫入“使用者推薦商品清單 user_reco_list”。SQL語句可能是這的:
select sum(click) as counts, user_id, reco_id from user_scan_list group by user_id, reco_id where times>= '2018-06-30' and times<'2018-12-30'
- 統計分析:有sum、group by。
- 查詢時間範圍大:times的時間範圍要半年,即掃描的資料量大。
- 傳回的結果集大,可能傳回百萬級。
- user_scan_list表資料量很大,百億級别、千億級等。
- 并發量小、每天淩晨計算一次。
- 高時延:計算結果可能要分鐘級别、甚至小時級别。
類似這種特點的業務查詢就适合使用離線數倉:Spark 列存(Parquet)。
通過上面的例子大概可以認識到哪些場景适合Spark,哪些适合Phoenix。Spark和Phoenix互相配合,解決大資料的問題。
Spark如何建數倉
那Spark如何建數倉呢?本質就是把資料導入到Spark,使用Spark的列式存儲檔案格式(例如parquet)存儲資料,使用Spark完成批處理、離線分析的業務。
例如在Spark建立一個以天為分區的明細表:
#建立parquet格式的分區表
create table test_parquet_pt(
id int,
name string,
value double,
dt string
) using parquet
partitioned by(dt);
#導入資料到Spark表:test_parquet_pt。
#例如從Phoenix表中增量導入1天的資料量。
insert into test_parquet_pt select * from phoenix_table where dt>='2019-01-02' and dt<'2019-01-03';
#分析Spark表
select avg(value) from test_parquet_pt group by dt;
上面隻是個簡單的執行個體,下面舉例幾個實際的業務場景。
先看下面的一個典型的業務場景。
上圖是一個典型的複雜分析及查詢系統。資料流程由圖可見:
- 資料由APP、傳感器、商業系統等客戶的業務系統産生發送到Kafka系統。
- Spark Streming 對接kafka周期讀取資料入庫到線上資料庫HBase/Phoenix,使用者的營運系統實時查詢線上資料庫。
- HBase/Phoenix資料庫周期同步到Spark數倉做離線計算。離線計算的結果寫回HBase/Phoenix或者其他業務資料庫。
上面是一個常用的方案。Spark建立數倉也有客戶對數倉進行分層,例如下圖:
客戶把數倉分為四層:操作資料、公共明細、公共彙總、應用資料。每一層的資料由上一層彙聚、統計計算得來,每一層的資料應用于不同的業務場景。此場景的明細可參考:
HBase+Spark建構遊戲大資料平台資料歸檔到Spark可參考X-Pack Spark提供的:
批量歸檔小結
本文隻是對Spark做個入門的介紹,更詳細的資料可參考:
Spark社群資料。X-Pack Spark請參考:
X-Pack Spark分析引擎