天天看點

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

【注】該系列文章以及使用到安裝包/測試資料 可以在《傾情大奉送–Spark入門實戰系列》擷取

1 SparkSQL的發展曆程

1.1 Hive and Shark

SparkSQL的前身是Shark,給熟悉RDBMS但又不了解MapReduce的技術人員提供快速上手的工具,Hive應運而生,它是當時唯一運作在Hadoop上的SQL-on-Hadoop工具。但是MapReduce計算過程中大量的中間磁盤落地過程消耗了大量的I/O,降低的運作效率,為了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始産生,其中表現較為突出的是:

  • MapR的Drill
  • Cloudera的Impala
  • Shark

其中Shark是伯克利實驗室Spark生态環境的元件之一,它修改了下圖所示的右下角的記憶體管理、實體計劃、執行三個子產品,并使之能運作在Spark引擎上,進而使得SQL查詢的速度得到10-100倍的提升。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

1.2 Shark和SparkSQL

但是,随着Spark的發展,對于野心勃勃的Spark團隊來說,Shark對于Hive的太多依賴(如采用Hive的文法解析器、查詢優化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個元件的互相內建,是以提出了SparkSQL項目。SparkSQL抛棄原有Shark的代碼,汲取了Shark的一些優點,如記憶體列存儲(In-Memory Columnar Storage)、Hive相容性等,重新開發了SparkSQL代碼;由于擺脫了對Hive的依賴性,SparkSQL無論在資料相容、性能優化、元件擴充方面都得到了極大的友善,真可謂“退一步,海闊天空”。

  • 資料相容方面 不但相容Hive,還可以從RDD、parquet檔案、JSON檔案中擷取資料,未來版本甚至支援擷取RDBMS資料以及cassandra等NOSQL資料;
  • 性能優化方面 除了采取In-Memory Columnar Storage、byte-code generation等優化技術外、将會引進Cost Model對查詢進行動态評估、擷取最佳實體計劃等等;
  • 元件擴充方面 無論是SQL的文法解析器、分析器還是優化器都可以重新定義,進行擴充。

2014年6月1日Shark項目和SparkSQL項目的主持人Reynold Xin宣布:停止對Shark的開發,團隊将所有資源放SparkSQL項目上,至此,Shark的發展畫上了句話,但也是以發展出兩個直線:SparkSQL和Hive on Spark。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

其中SparkSQL作為Spark生态的一員繼續發展,而不再受限于Hive,隻是相容Hive;而Hive on Spark是一個Hive的發展計劃,該計劃将Spark作為Hive的底層引擎之一,也就是說,Hive将不再受限于一個引擎,可以采用Map-Reduce、Tez、Spark等引擎。

1.3 SparkSQL的性能

Shark的出現,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

那麼,擺脫了Hive的限制,SparkSQL的性能又有怎麼樣的表現呢?雖然沒有Shark相對于Hive那樣矚目地性能提升,但也表現得非常優異:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

為什麼SparkSQL的性能會得到怎麼大的提升呢?主要SparkSQL在下面幾點做了優化:

A:記憶體列存儲(In-Memory Columnar Storage)

SparkSQL的表資料在記憶體中存儲不是采用原生态的JVM對象存儲方式,而是采用記憶體列存儲,如下圖所示。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

該存儲方式無論在空間占用量和讀取吞吐率上都占有很大優勢。

對于原生态的JVM對象存儲方式,每個對象通常要增加12-16位元組的額外開銷,對于一個270MB的TPC-H lineitem table資料,使用這種方式讀入記憶體,要使用970MB左右的記憶體空間(通常是2~5倍于原生資料空間);另外,使用這種方式,每個資料記錄産生一個JVM對象,如果是大小為200B的資料記錄,32G的堆棧将産生1.6億個對象,這麼多的對象,對于GC來說,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數量呈線性相關)。顯然這種記憶體存儲方式對于基于記憶體計算的Spark來說,很昂貴也負擔不起。

對于記憶體列存儲來說,将所有原生資料類型的列采用原生數組來存儲,将Hive支援的複雜資料類型(如array、map等)先序化後并接成一個位元組數組來存儲。這樣,每個列建立一個JVM對象,進而導緻可以快速的GC和緊湊的資料存儲;額外的,還可以使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)降低記憶體開銷;更有趣的是,對于分析查詢中頻繁使用的聚合特定列,性能會得到很大的提高,原因就是這些列的資料放在一起,更容易讀入記憶體進行計算。

B:位元組碼生成技術(bytecode generation,即CG)

在資料庫查詢中有一個昂貴的操作是查詢語句中的表達式,主要是由于JVM的記憶體模型引起的。比如如下一個查詢:

在這個查詢裡,如果采用通用的SQL文法途徑去處理,會先生成一個表達式樹(有兩個節點的Add樹,參考後面章節),在實體處理這個表達式樹的時候,将會如圖所示的7個步驟:

1.調用虛函數Add.eval(),需要确認Add兩邊的資料類型

2.調用虛函數a.eval(),需要确認a的資料類型

3.确定a的資料類型是Int,裝箱

4.調用虛函數b.eval(),需要确認b的資料類型

5.确定b的資料類型是Int,裝箱

6.調用Int類型的Add

7.傳回裝箱後的計算結果

其中多次涉及到虛函數的調用,虛函數的調用會打斷CPU的正常流水線處理,減緩執行。

Spark1.1.0在catalyst子產品的expressions增加了codegen子產品,如果使用動态位元組碼生成技術(配置spark.sql.codegen參數),SparkSQL在執行實體計劃的時候,對比對的表達式采用特定的代碼,動态編譯,然後運作。如上例子,比對到Add方法:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

然後,通過調用,最終調用:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

最終實作效果類似如下僞代碼:

val a: Int = inputRow.getInt()
val b: Int = inputRow.getInt()
val result: Int = a + b
resultRow.setInt(, result)
           

對于Spark1.1.0,對SQL表達式都作了CG優化,具體可以參看codegen子產品。CG優化的實作主要還是依靠scala2.10的運作時放射機制(runtime reflection)。對于SQL查詢的CG優化,可以簡單地用下圖來表示:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

C:Scala代碼優化

另外,SparkSQL在使用Scala編寫代碼的時候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對于使用者來說,還是使用統一的接口,沒受到使用上的困難。下圖是一個Scala代碼優化的示意圖:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

2 SparkSQL運作架構

類似于關系型資料庫,SparkSQL也是語句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)組成,分别對應sql查詢過程中的Result、Data Source、Operation,也就是說SQL語句按Result–>Data Source–>Operation的次序來描述的。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

當執行SparkSQL語句的順序為:

1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROM、WHERE),哪些是表達式、哪些是Projection、哪些是Data Source等,進而判斷SQL語句是否規範;

2.将SQL語句和資料庫的資料字典(列、表、視圖等等)進行綁定(Bind),如果相關的Projection、Data Source等都是存在的話,就表示這個SQL語句是可以執行的;

3.一般的資料庫會提供幾個執行計劃,這些計劃一般都有運作統計資料,資料庫會在這些計劃中選擇一個最優計劃(Optimize);

4.計劃執行(Execute),按Operation–>Data Source–>Result的次序來進行的,在執行過程有時候甚至不需要讀取實體表就可以傳回結果,比如重新運作剛運作過的SQL語句,可能直接從資料庫的緩沖池中擷取傳回結果。

2.1 Tree和Rule

SparkSQL對SQL語句的處理和關系型資料庫對SQL語句的處理采用了類似的方法,首先會将SQL語句進行解析(Parse),然後形成一個Tree,在後續的如綁定、優化等處理過程都是對Tree的操作,而操作的方法是采用Rule,通過模式比對,對不同類型的節點采用不同的操作。在整個sql語句的處理過程中,Tree和Rule互相配合,完成了解析、綁定(在SparkSQL中稱為Analysis)、優化、實體計劃等過程,最終生成可以執行的實體計劃。

2.1.1 Tree

  • Tree的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
  • Logical Plans、Expressions、Physical Operators都可以使用Tree表示
  • Tree的具體操作是通過TreeNode來實作的
  • SparkSQL定義了catalyst.trees的日志,通過這個日志可以形象的表示出樹的結構
  • TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)進行操作
  • 有了TreeNode,通過Tree中各個TreeNode之間的關系,可以對Tree進行周遊操作,如使用transformDown、transformUp将Rule應用到給定的樹段,然後用結果替代舊的樹段;也可以使用transformChildrenDown、transformChildrenUp對一個給定的節點進行操作,通過疊代将Rule應用到該節點以及子節點。
  • TreeNode可以細分成三種類型的Node:
  • UnaryNode 一進制節點,即隻有一個子節點。如Limit、Filter操作
  • BinaryNode 二進制節點,即有左右子節點的二叉節點。如Jion、Union操作
  • LeafNode 葉子節點,沒有子節點的節點。主要使用者指令類操作,如SetCommand

2.1.2 Rule

  • Rule的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
  • Rule在SparkSQL的Analyzer、Optimizer、SparkPlan等各個元件中都有應用到
  • Rule是一個抽象類,具體的Rule實作是通過RuleExecutor完成
  • Rule通過定義batch和batchs,可以簡便的、子產品化地對Tree進行transform操作
  • Rule通過定義Once和FixedPoint,可以對Tree進行一次操作或多次操作(如對某些Tree進行多次疊代操作的時候,達到FixedPoint次數疊代或達到前後兩次的樹結構沒變化才停止操作,具體參看RuleExecutor.apply)

2.2 sqlContext和hiveContext的運作過程

SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext現在隻支援SQL文法解析器(SQL-92文法);hiveContext現在支援SQL文法解析器和hivesql文法解析器,預設為hiveSQL文法解析器,使用者可以通過配置切換成SQL文法解析器,來運作hiveSQL不支援的文法,

2.2.1 sqlContext的運作過程

sqlContext總的一個過程如下圖所示:

1.SQL語句經過SqlParse解析成UnresolvedLogicalPlan;

2.使用analyzer結合資料資料字典(catalog)進行綁定,生成resolvedLogicalPlan;

3.使用optimizer對resolvedLogicalPlan進行優化,生成optimizedLogicalPlan;

4.使用SparkPlan将LogicalPlan轉換成PhysicalPlan;

5使用prepareForExecution()将PhysicalPlan轉換成可執行實體計劃;

6.使用execute()執行可執行實體計劃;

7.生成SchemaRDD。

在整個運作過程中涉及到多個SparkSQL的元件,如SqlParse、analyzer、optimizer、SparkPlan等等

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

2.2.2 hiveContext的運作過程

hiveContext總的一個過程如下圖所示:

1.SQL語句經過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程中對hiveql語句使用getAst()擷取AST樹,然後再進行解析;

2.使用analyzer結合資料hive源資料Metastore(新的catalog)進行綁定,生成resolved LogicalPlan;

3.使用optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan,優化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;

4.使用hivePlanner将LogicalPlan轉換成PhysicalPlan;

5.使用prepareForExecution()将PhysicalPlan轉換成可執行實體計劃;

6.使用execute()執行可執行實體計劃;

7.執行後,使用map(_.copy)将結果導入SchemaRDD。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

2.3 catalyst優化器

SparkSQL1.1總體上由四個子產品組成:core、catalyst、hive、hive-Thriftserver:

  • core處理資料的輸入輸出,從不同的資料源擷取資料(RDD、Parquet、json等),将查詢結果輸出成schemaRDD;
  • catalyst處理查詢語句的整個處理過程,包括解析、綁定、優化、實體計劃等,說其是優化器,還不如說是查詢引擎;
  • hive對hive資料的處理
  • hive-ThriftServer提供CLI和JDBC/ODBC接口

在這四個子產品中,catalyst處于最核心的部分,其性能優劣将影響整體的性能。由于發展時間尚短,還有很多不足的地方,但其插件式的設計,為未來的發展留下了很大的空間。下面是catalyst的一個設計圖:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

其中虛線部分是以後版本要實作的功能,實線部分是已經實作的功能。從上圖看,catalyst主要的實作元件有:

  • sqlParse,完成sql語句的文法解析功能,目前隻提供了一個簡單的sql解析器;
  • Analyzer,主要完成綁定工作,将不同來源的Unresolved LogicalPlan和資料中繼資料(如hive metastore、Schema catalog)進行綁定,生成resolved LogicalPlan;
  • optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan;
  • Planner将LogicalPlan轉換成PhysicalPlan;
  • CostModel,主要根據過去的性能統計資料,選擇最佳的實體執行計劃

    這些元件的基本實作方法:

  • 先将sql語句通過解析生成Tree,然後在不同階段使用不同的Rule應用到Tree上,通過轉換完成各個元件的功能。
  • Analyzer使用Analysis Rules,配合資料中繼資料(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉換成resolved LogicalPlan;
  • optimizer使用Optimization Rules,對resolved LogicalPlan進行合并、列裁剪、過濾器下推等優化作業而轉換成optimized LogicalPlan;
  • Planner使用Planning Strategies,對optimized LogicalPlan

3 SparkSQL CLI

CLI(Command-Line Interface,指令行界面)是指可在使用者提示符下鍵入可執行指令的界面,它通常不支援滑鼠,使用者通過鍵盤輸入指令,計算機接收到指令後予以執行。Spark CLI指的是使用指令界面直接輸入SQL指令,然後發送到Spark叢集進行執行,在界面中顯示運作過程和最終的結果。

Spark1.1相較于Spark1.0最大的差别就在于Spark1.1增加了Spark SQL CLI和ThriftServer,使得Hive使用者還有用慣了指令行的RDBMS資料庫管理者較容易地上手,真正意義上進入了SQL時代。

**【注】**Spark CLI和Spark Thrift Server實驗環境為第二課《Spark編譯與部署(下)–Spark編譯安裝》所搭建

3.1 運作環境說明

3.1.1 硬軟體環境

  • 主機作業系統:Windows 64位,雙核4線程,主頻2.2G,10G記憶體
  • 虛拟軟體:VMware® Workstation 9.0.0 build-812388
  • 虛拟機作業系統:CentOS 64位,單核
  • 虛拟機運作環境:
    • JDK:1.7.0_55 64位
    • Hadoop:2.2.0(需要編譯為64位)
    • Scala:2.11.4
    • Spark:1.1.0(需要編譯)
    • Hive:0.13.1

3.1.2 機器網絡環境

叢集包含三個節點,節點之間可以免密碼SSH通路,節點IP位址和主機名分布如下:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

3.2 配置并啟動

3.2.1 建立并配置hive-site.xml

在運作Spark SQL CLI中需要使用到Hive Metastore,故需要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下建立hive-site.xml檔案,然後在該配置檔案中,添加hive.metastore.uris屬性,具體如下:

<configuration>  
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://hadoop1:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>
</configuration>
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

3.2.2 啟動Hive

在使用Spark SQL CLI之前需要啟動Hive Metastore(如果資料存放在HDFS檔案系統,還需要啟動Hadoop的HDFS),使用如下指令可以使Hive Metastore啟動後運作在背景,可以通過jobs查詢:

$nohup hive --service metastore > metastore.log >& &
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

3.2.3 啟動Spark叢集和Spark SQL CLI

通過如下指令啟動Spark叢集和Spark SQL CLI:

$cd /app/hadoop/spark-
$sbin/start-all.sh
$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g
           

在叢集監控頁面可以看到啟動了SparkSQL應用程式:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

這時就可以使用HQL語句對Hive資料進行查詢,另外可以使用COMMAND,如使用set進行設定參數:預設情況下,SparkSQL Shuffle的時候是200個partition,可以使用如下指令修改該參數:

運作同一個查詢語句,參數改變後,Task(partition)的數量就由200變成了20。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

3.2.4 指令參數

通過bin/spark-sql –help可以檢視CLI指令參數:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

其中[options] 是CLI啟動一個SparkSQL應用程式的參數,如果不設定–master的話,将在啟動spark-sql的機器以local方式運作,隻能通過http://機器名:4040進行監控;這部分參數,可以參照Spark1.0.0 應用程式部署工具spark-submit 的參數。

[cli option]是CLI的參數,通過這些參數CLI可以直接運作SQL檔案、進入指令行運作SQL指令等等,類似以前的Shark的用法。需要注意的是CLI不是使用JDBC連接配接,是以不能連接配接到ThriftServer;但可以配置conf/hive-site.xml連接配接到Hive的Metastore,然後對Hive資料進行查詢。

3.3 實戰Spark SQL CLI

3.3.1 擷取訂單每年的銷售單數、銷售總額

第一步 設定任務個數,在這裡修改為20個

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第二步 運作SQL語句

spark-sql>use hive;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第三步 檢視運作結果

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

3.3.2 計算所有訂單每年的總金額

第一步 執行SQL語句

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第二步 執行結果

使用CLI執行結果如下:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

3.3.3 計算所有訂單每年最大金額訂單的銷售額

第一步 執行SQL語句

spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第二步 執行結果

使用CLI執行結果如下:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4 Spark Thrift Server

ThriftServer是一個JDBC/ODBC接口,使用者可以通過JDBC/ODBC連接配接ThriftServer來通路SparkSQL的資料。ThriftServer在啟動的時候,會啟動了一個SparkSQL的應用程式,而通過JDBC/ODBC連接配接進來的用戶端共同分享這個SparkSQL應用程式的資源,也就是說不同的使用者之間可以共享資料;ThriftServer啟動時還開啟一個偵聽器,等待JDBC用戶端的連接配接和送出查詢。是以,在配置ThriftServer的時候,至少要配置ThriftServer的主機名和端口,如果要使用Hive資料的話,還要提供Hive Metastore的uris。

【注】Spark CLI和Spark Thrift Server實驗環境為第二課《Spark編譯與部署(下)–Spark編譯安裝》所搭建

4.1 配置并啟動

4.1.1 建立并配置hive-site.xml

第一步 建立hive-site.xml配置檔案

在$SPARK_HOME/conf目錄下修改hive-site.xml配置檔案(如果在Spark SQL CLI中已經添加,可以省略):

$cd /app/hadoop/spark-/conf
$sudo vi hive-site.xml
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第二步 修改配置檔案

設定hadoop1為Metastore伺服器,hadoop2為Thrift Server伺服器,配置内容如下:

<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://hadoop1:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>

  <property>
    <name>hive.server2.thrift.min.worker.threads</name>
    <value>5</value>
    <description>Minimum number of Thrift worker threads</description>
  </property>

  <property>
    <name>hive.server2.thrift.max.worker.threads</name>
    <value>500</value>
    <description>Maximum number of Thrift worker threads</description>
  </property>

  <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>
  </property>

  <property>
    <name>hive.server2.thrift.bind.host</name>
    <value>hadoop2</value>
    <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>
  </property>
</configuration>
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.1.2 啟動Hive

在hadoop1節點中,在背景啟動Hive Metastore(如果資料存放在HDFS檔案系統,還需要啟動Hadoop的HDFS):

$nohup hive --service metastore > metastore.log >& & 
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.1.3 啟動Spark叢集和Thrift Server

在hadoop1節點啟動Spark叢集

$cd /app/hadoop/spark-/sbin 
$./start-all.sh
在hadoop2節點上進入SPARK_HOME/sbin目錄,使用如下指令啟動Thrift Server
$cd /app/hadoop/spark-/sbin 
$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

注意:Thrift Server需要按照配置在hadoop2啟動!

在叢集監控頁面可以看到啟動了SparkSQL應用程式:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.1.4 指令參數

使用sbin/start-thriftserver.sh –help可以檢視ThriftServer的指令參數:

$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options] 
    Thrift server options: Use value for given property
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

其中[options] 是Thrift Server啟動一個SparkSQL應用程式的參數,如果不設定–master的話,将在啟動Thrift Server的機器以local方式運作,隻能通過http://機器名:4040進行監控;這部分參數,可以參照Spark1.0.0 應用程式部署工具spark-submit 的參數。在叢集中提供Thrift Server的話,一定要配置master、executor-memory等參數。

[thrift server options]是Thrift Server的參數,可以使用-dproperty=value的格式來定義;在實際應用上,因為參數比較多,通常使用conf/hive-site.xml配置。

4.2 實戰Thrift Server

4.2.1 遠端用戶端連接配接

可以在任意節點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接配接ThriftServer,因為沒有采用權限管理,是以使用者名用運作bin/beeline的使用者hadoop,密碼為空:

$cd /app/hadoop/spark-/bin
$./beeline
beeline>!connect jdbc:hive2://hadoop2:10000
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.2.2 基本操作

第一步 顯示hive資料庫所有表

beeline>show database;
beeline>use hive;
beeline>show tables; 
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第二步 建立表testThrift

beeline>create table testThrift(field1 String , field2 Int);
beeline>show tables;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第三步 把tbStockDetail表中金額大于3000插入到testThrift表中

beeline>insert into table testThrift select ordernumber,amount from tbStockDetail  where amount>;
beeline>select * from testThrift;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第四步 重新建立testThrift表中,把年度最大訂單插入該表中

beeline>drop table testThrift;
beeline>create table testThrift (field1 String , field2 Int);
beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;
beeline>select * from testThrift;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.2.3 計算所有訂單每年的訂單數

第一步 執行SQL語句

spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
           

第二步 執行結果

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

Stage監控頁面:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

檢視Details for Stage 28

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.2.4 計算所有訂單月銷售額前十名

第一步 執行SQL語句

spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit ;
           

第二步 執行結果

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

Stage監控頁面:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

在其第一個Task中,從本地讀入資料

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

在後面的Task是從記憶體中擷取資料

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

4.2.5 緩存表資料

第一步 緩存資料

beeline>cache table tbStock;
beeline>select count(*) from tbStock;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第二步 運作4.2.4中的“計算所有訂單月銷售額前十名”

beeline>select count(*) from tbStock;
           
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

本次計算劃給11.233秒,檢視webUI,資料已經緩存,緩存率為100%:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第三步 在另外節點再次運作

在hadoop3節點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接配接ThriftServer,然後直接運作對tbStock計數(注意沒有進行資料庫的切換):

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

用時0.343秒,再檢視webUI中的stage:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

Locality Level是PROCESS,顯然是使用了緩存表。

從上可以看出,ThriftServer可以連接配接多個JDBC/ODBC用戶端,并互相之間可以共享資料。順便提一句,ThriftServer啟動後處于監聽狀态,使用者可以使用ctrl+c退出ThriftServer;而beeline的退出使用!q指令。

4.2.6 在IDEA中JDBC通路

有了ThriftServer,開發人員可以非常友善的使用JDBC/ODBC來通路SparkSQL。下面是一個scala代碼,查詢表tbStockDetail,傳回amount>3000的單據号和交易金額:

第一步 在IDEA建立class6包和類JDBCofSparkSQL

參見《Spark程式設計模型(下)–IDEA搭建及實戰》在IDEA中建立class6包并建立類JDBCofSparkSQL。該類中查詢tbStockDetail金額大于3000的訂單:

import java.sql.DriverManager

object JDBCofSparkSQL {
  def main(args: Array[String]) {
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")
    try {
      val statement = conn.createStatement
val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail  where amount>3000")
      while (rs.next) {
        val ordernumber = rs.getString("ordernumber")
        val amount = rs.getString("amount")
        println("ordernumber = %s, amount = %s".format(ordernumber, amount))
      }
    } catch {
      case e: Exception => e.printStackTrace
    }
    conn.close
  }
}
           

第二步 檢視運作結果

在IDEA中可以觀察到,在運作日志視窗中沒有運作過程的日志,隻顯示查詢結果

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

第三步 檢視監控結果

從Spark監控界面中觀察到,該Job有一個編号為6的Stage,該Stage有2個Task,分别運作在hadoop1和hadoop2節點,擷取資料為NODE_LOCAL方式。

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介
Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

在hadoop2中觀察Thrift Server運作日志如下:

Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

繼續閱讀