天天看點

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

前言

Spark自從2014年1.2版本釋出以來,已成為大資料計算的通用元件。網上介紹Spark的資源也非常多,但是不利于使用者快速入門,是以本文主要通從使用者的角度來介紹Spark,讓使用者能快速的認識Spark,知道Spark是什麼、能做什麼、怎麼去做。

具體的概念可以參考spark社群的

相關文章

Spark是什麼

摘用官網的定義:

Spark是一個快速的、通用的分布式計算系統。

提供了進階API,如:Java、Scala、Python和R。

同時也支援進階工具,如:Spark SQL處理結構化資料、MLib處理機器學習、GraphX用于圖計算、Spark Streming用于流資料處理。

也就是說Spark提供了靈活的、豐富接口的大資料處理能力。下圖是Spark的子產品圖:

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

使用者使用的SQL、Streaming、MLib、GraphX接口最終都會轉換成Spark Core分布式運作。

目前使用者用的比較多的是SQL和Streaming,這裡先主要介紹下這兩個。

Spark SQL

Spark SQL是Spark提供的SQL接口,使用者使用Spark SQL可以像使用傳統資料庫一樣使用SQL。例如:建立表、删除表、查詢表、join表等。連接配接到Spark SQL後可以做如下操作(可參考:

如何連接配接Spark SQL

)。

# 在Spark中建立一個表:test_parquet,表的存儲檔案格式為:parquet
create table test_parquet(
    id int,
    name string,
    value double
) using parquet;           

此指令運作完畢後,Spark系統會在hdfs上建立一個名稱為test_parquet的目錄,例如/user/hive/warehouse/test_parquet/。

然後往Spark表中插入資料。

# 往Spark表:test_parquet插入資料
insert into test_parquet values(1001, 'name1001', 95.49);
insert into test_parquet values(1002, 'name1002', 73.25);
insert into test_parquet values(1003, 'name1003', 25.65);
insert into test_parquet values(1004, 'name1004', 23.39);
insert into test_parquet values(1005, 'name1005', 8.64);
insert into test_parquet values(1006, 'name1006', 52.60);
insert into test_parquet values(1007, 'name1007', 42.16);
insert into test_parquet values(1008, 'name1008', 85.39);
insert into test_parquet values(1009, 'name1009', 7.22);
insert into test_parquet values(1010, 'name1010', 10.43);           

插入資料的步驟運作後,Spark會在hdfs目錄:/user/hive/warehouse/test_parquet/中建立一些列字尾為.parquet的檔案,如下:

/user/hive/warehouse/test_parquet/part-00000-49fefdf2-2ef0-4b7d-9414-6ac52e0390cb-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-52106855-8dd8-4f3a-8746-3025cf4898ea-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-89d738e9-754b-44b0-abd3-f2dd91cd0389-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-bce8efef-13ef-42e2-a6c7-8611e21e931a-c000.snappy.parquet           

插入完成後開始查詢資料。

select * from test_parquet           

查詢資料的過程是Spark并行從hdfs系統上拉取每個Parquet,然後在Spark中并行計算。

這裡隻是簡單列舉了Spark 建立Parquet表的過程,Spark也可以支援讀取其他格式的表,例如對接資料庫等。需要了解可參考:

Spark Connectors

Spark SQL除了對使用者提供了SQL指令的接口,也提供了API接口。Datasets(DataFrames),例如使用API建立Parquet如下:

//spark 讀取json格式檔案,傳回一個DataFrames
val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames儲存為Parquet格式檔案。
peopleDF.write.parquet("people.parquet")

//讀取Parquet檔案,傳回DataFrames
val parquetFileDF = spark.read.parquet("people.parquet")

// DataFrames注冊成一個Parquet表
parquetFileDF.createOrReplaceTempView("parquetFile")
//使用SQL查詢Parquet表
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
//列印資料
namesDF.map(attributes => "Name: " + attributes(0)).show()           

Spark Streaming

Spark Streaming是流式處理系統,可以處理流式資料。下面用個例子說明Streaming的過程。

Spark Streaming可以對接Kafka。假如kafka産生的資料格式為:

id        values      time 
id001     98.2     1560414467
id002     99.2     1560414468
id001     87.2     1560414469           

現在業務需要每分鐘從Kafka讀取一批資料,對數進行資訊補齊,因為kafka拿到的資料隻有id資訊,使用者想補齊name資訊。

假如具有id、name資訊的表存儲在Phoenix中。

這樣就可以通過Spark Streaming來完成這些業務訴求。在Spark的業務處理邏輯中拿到kafka的資料後,使用id關聯Phoenix表拿到name資訊,然再寫入到其他資料庫。

例如此業務的Spark Streaming的業務邏輯代碼如下:

//scala代碼樣例
val words = messages.transform { rdd =>
      rdd.map {line =>
        println(s"==0== words = $line")
        //逗号分隔
        val words = line.value().split(",")
        words
      }
    }.foreachRDD { lineArray =>
      lineArray.foreachPartition { dataPartition =>
        val connectionPool = Phoenix5xConnectionPool(queryServerAddress)
        val phoenixConn = connectionPool.borrowObject()
        val statment = phoenixConn.createStatement()
        var i = 0
        while (dataPartition.hasNext) {
          val kv = dataPartition.next()
          //關聯id、擷取name資訊。
          val rs = statment.executeQuery(s"select name from $phoenixTableName where id = ${kv(0)}")
          val name = rs.getString(1)
          //把結果寫入到資料庫
          statment.execute(s"upsert into $resultTable values('${kv(0)}','${kv(1)}', ${kv(2)}, '$name')")
          i = i + 1
          if (i % batchSize == 0) {
            phoenixConn.commit()
          }
        }
        phoenixConn.commit()
        connectionPool.returnObject(phoenixConn)
      }
    }           

Spark Streming對接Kafka可參考:

Spark對接kafkak快速入門

。Spark Streming對接Phoenix代碼可參考:

SparkStreming

SparkSQL

Spark适合做什麼

先看下Spark在目前常用的BigData業務架構中的位置。

下圖是常用的BigData 大資料元件Spark+HBase+Cassandra+ES(Solor),這些元件組合可覆寫BigData 95%以上的業務場景。

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

圖中資料BigData分4個層次,由上到下分别為:

業務系統層:一般是直接面向使用者的業務系統。

計算層:Spark的分布式計算。

資料庫層:HBase+Cassandra資料庫提供實時查詢的能力。

存儲層:HDFS或者OSS。

這裡主要介紹下計算層Spark。

Spark計算層會把資料從資料庫、列式存儲(數倉)中拉去到Spark中進行分布式計算。我們把Spark打開看下是如何分布式計算的。先看下Spark運作時候的部署結構。

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

由上圖可以看到Spark部署時分布式的,有一個Driver,有N個Executor。業務系統對接Driver,Driver把計算邏輯發送到每個Executor運作,Executor運作結果再傳回。

是以當Spark拉取資料庫、數倉資料時會并行拉取到每個Executor做并行運算。

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

例如Spark SQL中查詢表的例子,以及Spark Streming的中處理批資料的例子,Spark運算時是每個Executor并處理資料的,Executor處理資料的邏輯是由使用者編碼控制的,例如使用者寫的SQL語句,調用API寫的業務代碼等。

那麼Spark适合什麼樣的計算呢?

下圖列出了Spark 和HBase資料庫各自适用的場景(摘自

HBase和Spark對比

):

對比項目             Phoenix(HBase) Spark
SQL複雜度 簡單查詢, 必須命中索引 且 命中後 傳回的資料較少,如果是join,則join任意一則傳回的資料量在10w以下,且另一側必須命中索引。 為了保障叢集穩定性,一些複雜的sql及耗時的sql會被平台拒絕運作。 全部支援執行完成,支援Spark 映射到Phoenix,做到Spark在簡單SQL查詢能到Phoenix同樣的性能,不過Spark定位為 分析的場景,與Phoenix 純TP有本質的差別
叢集 HBase共享一個叢集,本質是HBase提供的SQL Spark需要單獨購買的叢集,Spark叢集運算不影響其它資料庫
并發 單機 1w-5w左右 Spark最高不超過100
延遲 延遲在ms級别,一些命中較多的資料的sql會到 秒 一般延遲在300ms以上,大部分sql需要秒,分鐘,甚至小時
資料Update Phoenix支援 Spark不支援
支援業務 線上業務 離線業務 或者 準線上業務

舉個例子說明下上面每一項對應的場景,如下圖:

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

此圖描述的是使用者登入手機淘寶,淘寶根據使用者的ID資訊在淘寶首頁推薦商品這樣的一個流程。我們看下每個流程中哪些場景适合Phoenix、Spark。

1、 擷取使用者的推薦清單。

使用者登入後,手機淘寶要根據使用者的ID從“使用者推薦商品清單 user_reco_list”這個表中擷取資訊。SQL語句可能是這樣的:

select * from user_reco_list where user_id = 'user0001' and time='2019-06-22'           

這個SQL的特點如下:

  • 簡單:隻有select *,沒有join、group by。
  • 有關鍵字過濾:user_id = 'user0001'。
  • 傳回的結果集少(大概傳回幾十行)。
  • user_reco_list表資料量很大(百億級别)
  • 并發量很大,可能同時會有上萬個使用者同時登入。
  • 低延遲時間:使用者一登入要立刻顯示推薦。

類似這種特點的業務查詢就适合使用線上資料Phoenix。

2、 統計使用者的浏覽記錄。

“使用者推薦商品清單 user_reco_list”中資料是怎麼來的呢?是從使用者的浏覽記錄、購買記錄、加入購物車記錄等資訊統計而來的。背景任務每天淩晨從使用者的記錄中進行大量的統計分析,然後把結果寫入“使用者推薦商品清單 user_reco_list”。SQL語句可能是這的:

select sum(click) as counts, user_id, reco_id from user_scan_list group by user_id, reco_id where times>= '2018-06-30' and times<'2018-12-30'            
  • 統計分析:有sum、group by。
  • 查詢時間範圍大:times的時間範圍要半年,即掃描的資料量大。
  • 傳回的結果集大,可能傳回百萬級。
  • user_scan_list表資料量很大,百億級别、千億級等。
  • 并發量小、每天淩晨計算一次。
  • 高時延:計算結果可能要分鐘級别、甚至小時級别。

類似這種特點的業務查詢就适合使用離線數倉:Spark 列存(Parquet)。

通過上面的例子大概可以認識到哪些場景适合Spark,哪些适合Phoenix。Spark和Phoenix互相配合,解決大資料的問題。

Spark如何建數倉

那Spark如何建數倉呢?本質就是把資料導入到Spark,使用Spark的列式存儲檔案格式(例如parquet)存儲資料,使用Spark完成批處理、離線分析的業務。

例如在Spark建立一個以天為分區的明細表:

#建立parquet格式的分區表
create table test_parquet_pt(
    id int,
    name string,
    value double,
    dt string
) using parquet
partitioned by(dt);

#導入資料到Spark表:test_parquet_pt。
#例如從Phoenix表中增量導入1天的資料量。
insert into test_parquet_pt select * from phoenix_table where dt>='2019-01-02' and dt<'2019-01-03';

#分析Spark表
select avg(value) from test_parquet_pt group by dt;           

上面隻是個簡單的執行個體,下面舉例幾個實際的業務場景。

先看下面的一個典型的業務場景。

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

上圖是一個典型的複雜分析及查詢系統。資料流程由圖可見:

  1. 資料由APP、傳感器、商業系統等客戶的業務系統産生發送到Kafka系統。
  2. Spark Streming 對接kafka周期讀取資料入庫到線上資料庫HBase/Phoenix,使用者的營運系統實時查詢線上資料庫。
  3. HBase/Phoenix資料庫周期同步到Spark數倉做離線計算。離線計算的結果寫回HBase/Phoenix或者其他業務資料庫。

上面是一個常用的方案。Spark建立數倉也有客戶對數倉進行分層,例如下圖:

Spark入門介紹前言Spark是什麼Spark适合做什麼小結

客戶把數倉分為四層:操作資料、公共明細、公共彙總、應用資料。每一層的資料由上一層彙聚、統計計算得來,每一層的資料應用于不同的業務場景。此場景的明細可參考:

HBase+Spark建構遊戲大資料平台

資料歸檔到Spark可參考X-Pack Spark提供的:

批量歸檔

小結

本文隻是對Spark做個入門的介紹,更詳細的資料可參考:

Spark社群資料

。X-Pack Spark請參考:

X-Pack Spark分析引擎

繼續閱讀