天天看點

Spark SQL and DataFrame GuideOverviewDataFramesData SourcesPerformance TuningDistributed SQL EngineMigration Guide(遷移指南)Data Types

Overview

Spark SQL是一個處理結構化資料的Spark子產品。它提供了一個稱為DataFrames的程式抽象,并且可以作為分布式SQL查詢引擎。

DataFrames

一個DataFrame是一個組織成命名列的分布式資料集合。它從概念上等同于一個關系型資料庫或者一個R/Python的資料架構,并且内部有更好的優化。DataFrames可以從廣泛的資料來源來建構,例如:結構化資料檔案,Hive的tables,外部資料集,或存在的RDDs。

DataFrame API可以使用Scala, Java, Python。

此頁中所有例子中使用的Spark分布中的樣例資料,都可以使用

spark-shell

pyspark

shell運作。

Starting Point:

SQLContext

Spark SQL中所有函數的入口點是

SQLContext

class,或者它的子類。為建立一個基本SQLContext,你隻需要一個SparkContext。

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
           

除了基本的

SQLContext

,你可以建立

HiveContext

-提供一個由基本SQLContext提供的函數的超集。額外的特性包括使用更徹底的HiveQL parser編寫查詢的能力,通路Hive UDFs,從Hive tables讀取資料的能力。要使用

HiveContext

,你不需要一個存在的Hive系統,而且

SQLContext

所有可用資料源依然可用。

HiveContext

隻是單獨的封裝,為了避免把Hive的所有依賴包含到Spark的預設構造中。如果這些依賴對你的應用不是問題(不會引起問題),那麼Spark 1.3版本中推薦使用

HiveContext

。未來的版本會關注把

SQLContext

等價于

HiveContext

習慣的解析查詢-指定SQL變量,也是可以被選擇的,隻要使用

spark.sql.dialect

選項。這個參數可以使用

SQLContext

setConf

方法、或在SQL中使用設定指令

key=value

。一個

SQLContext

,唯一可用的方言是”sql”-使用Spark SQL提供的簡單SQL解析。

HiveContext

中,預設的是”hiveql”,雖然”sql”也可用。由于HiveQL解析器更加完善了,它是更被推薦的。

Creating DataFrames

通過一個

SQLContext

,可以從一個existing RDD,Hive table,data source建立DataFrames。

下面例子,基于一個JSON檔案建立一個DataFrame:

val sc: SparkContext // An existiong SparkContext.
val SqlContext = new org.apache.Spark.sql.SQLContext(sc)

val df = sqlContext.jsonFile("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
           

DataFrame Operations

DataFrames為結構化資料處理提供領域特點語言(DSL),用Scala, Java, Python。

這裡我收錄一些使用DataFrame處理結構化資料的基本例子:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.jsonFile("examples/src/main/resources/people.json")

// Show the content
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format.
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select("name", df("age") + ).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("name") > ).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
           

Running SQL Queries Programmatically

SQLContext

的sql方法,可以使應用運作SQL查詢程式并傳回一個

DataFrame

結果。

val sqlContext = ... // An existing SQLContext.
val df = sqlContext.sql("SELECT * FROM table")
           

Interoperating with RDDs

Spark SQL支援兩種不同的方法轉化存在的RDDs成DataFrame。第一種方法使用反射推斷包含指定的對象類型的RDD schema。基于這種方式的反射導緻更簡潔的代碼,并且當你編寫Spark應用時,一旦你已經了解了這個schema,工作會運作的良好。

建立* DataFrame 第二種方法通過一個程式設計接口-允許你構造一個schema,然後應用它到一個存在的RDD。雖然這個方法更繁瑣,但是當運作前列和類型不明确的情況下,它也允許你構造 DataFrame *。

Inferring the Schema Using Reflection

Spark SQL的Scala接口支援自動轉換一個包含樣例類的RDD成DataFrame。這個樣例類定義了table的schema。樣例類的參數名字,使用反射來讀取,并且會變成列的名字。樣例類也可以是嵌入的或包含像Sequences或Arrays的複雜類型。這個RDD可以被隐式轉化成* DataFrame *,然後注冊成table。Tables可以在随後的SQL語句中使用。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
//       you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a tables.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(), p().tirm.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the results can be accessed by ordinal.
teenagers.map(t => "Name: " + t()).collect().foreach(println)
           

Programmatically Specifying the Schema

當不能提前定義樣例類(例如,記錄的結構以字元串編碼,或一個文本資料集-不同使用者用不同的解析和不同字段設計),以程式設計方式建立一個* DataFrame *需要三步。

1. 從原始RDD建立一個* Rows * RDD

2. 建立一個

StructType

-(比對步驟1中RDD中

Rows

的結構),代表的schema

3. 通過

SQLContext

提供的

createDate

方法應用schema到

Rows

RDD

例如:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Spark SQL data types and Row.
import org.apache.spark.sql._

// Generate the schema based on the string of schema
val schema =
    StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD(people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(), p().tirm))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results: DataFrame = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrame and spport all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t()).collect().foreach(println)
           

Data Sources

通過

DataFrame

的接口Spark SQL支援各種資料源操作。* DataFrame 能像普通RDDs一樣操作,并且也可以被注冊成臨時表。把 DataFrame *注冊成表,允許你運作SQL查詢它的資料。本章描述使用Spark Data Source加載和儲存資料的正常方法,然後進入用于内建資料源的特殊選擇。

Generic Load/Save Functions

最簡單的形式,預設的資料源(預設

parquet

-除非使用

spark.sql.sources.default

設定)會被所有操作使用。

val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")
           

Manually Specifying Options

你也可以手動的指定資料源,連同你想傳遞給資料源的額外選項。資料源由它們的完全限定名稱(即

org.apache.spark.sql.parquet

)指定,對于内建資料源你也可以使用短名(json, parquet, jdbc)。使用這種文法,任意類型的* DataFrame *可以被轉化成其它類型。

val df = sqlContext.load("people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")
           

Save Modes

存儲操作可以可選的接受一個* SaveMode ,指定怎麼操作一個存在的資料,如果存在的話。重點是認識到這些 Save Mode 能操作鎖,而且不是原子性的。是以,* 對于多個寫入者試圖寫入同一位置這種情況,不是安全的。 **

此外,當支援

overwrite

,寫入新資料之前,資料會被删除。

Scala/Java Python Meaning
SaveMode.ErrorIfExists (default) “error” (default) 當存儲一個DataFrame到資料源時,如果資料已經存在,會抛出異常
SaveMode.Append “append” 當存儲一個DataFrame到資料源時,如果資料/表已經存在,DataFrame的内容會附加到存在的資料
SaveMode.Ignore “ignore” Ignore模式意味當存儲一個DataFrame到資料源時,如果資料已經存在,這個存儲操作不會存儲DataFrame内容,而且不會改變存在的資料。類似于SQL的

CREATE TABLE IF NOT EXISTS

Saving to Persistent Tables

當使用

HiveContext

工作時,使用

saveAsTable

指令* DataFrame 也能以持久化表的形式被存儲。不同于

registerTempTable

指令,

saveAsTable

指令會實體化 DataFrame 的内容,然後在 HiveMetastore 中建立一個資料的指針。在你的Spark程式重新開機後,持久化表仍然會存在,隻要你維持連接配接同一進制存儲。持久化表的 DataFrame *可以調用

SQLContext

table

方法,with 表名。

預設的,

saveAsTable

會建立一個”managed table”,意味着資料的位置會被元存儲控制。當表被删除時,Managed tables也會自動的删除它的資料。

Parquet Files

Parquet是一個被許多資料存儲系統支援的* 列格式 *存儲。Spark SQL同時提供讀取和寫入* Parquet *檔案(自動儲存原始資料的schema)的支援。

Loading Data Programmatically

使用上面例子的資料:

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the pervious example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t()).collect().foreach(println)
           

Partition discovery

系統中表分區是常用的最佳化方法,像Hive。在分區的表種,資料通常存儲在不同的目錄中,每個分區目錄的路徑中編碼分區的列值。Parquet資料源

現在可以探索并且自動推斷的分區的資訊。例如,我們可以存儲人口資料成分區表,使用下面的目錄結構, 2個額外的列、

gender

country

as 分區列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
           

傳遞

path/to.table

SQLContext.parquetFile

SQLContext.load

,Spark SQL會自動從路徑中提取分區資訊。現在,傳回的* DataFrame *的schema會變成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
           

注意,分區列的資料類型是自動推斷的。目前,支援數值資料類型和字元串類型。

Schema merging

類似于ProtocolBuffer,Avro、Thrift,Parquet也支援schema演化。使用者可以以一個簡單的schema開始,然後如果需要逐漸添加更多的列到這個schema。用這種方法,使用者最後會有多種多樣的不同但互動相容的schemas的Parquet檔案。這個Parquet資料源現在可以自動偵測這個case,然後合并所有檔案的schemas。

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame. stored into a partition directory.
val df1 = sparkContext.makeRDD( to ).map(i => (i, i * )).toDF("single", "double")
df1.saveAsParquetFile("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column.
val df2 = sparkContext.makeRDD( to ).map(i => (i, i * )).toDF("single", "triple")
df2.saveAsParquetFile("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partioning column appeared in the partition directory paths.

// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
           

Configuration

Parquet的配置可以使用

SQLContext

setConf

方法或者使用SQL運作

SET key=value

指令。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false 另一些Parquet産品系統,特殊的Impala和老版本Spark SQL中的,不會區分二進制資料和字元串當寫入Parquet schema的時候。這個标志告訴Spark SQL以字元串解釋二進制資料,來提供這些系統的相容性
spark.sql.parquet.int96AsTimestamp true 另一些Parquet産品系統,特殊的Impala,存儲* Timestamp 成INT96。Spark也會以INT96來存儲 Timestamp *,因為我們需要避免納秒字段的精度丢失
spark.sql.parquet.cacheMetadata true 開啟Parquet schema中繼資料的緩存。可以提升查詢靜态資料的速度
spark.sql.parquet.compression.codec gzip 當寫入Parquet檔案時,設定壓縮編碼器使用。接受的值包括:uncompressed, snappy, gzip, lzo
spark.sql.parquet.filterPushdown false 開啟Parquet過濾器後進先出優化。預設這個特性是關閉的,因為在Paruet 1.6.0rc3 (PARQUET-136)有已知的bug。然而,如果你的表不包含任何空的字元串或二進制列,開啟這個特性還是安全的
spark.sql.hive.convertMetastoreParquet true 當設定成false,Spark SQL會對Parquet表使用Hive SerDe替代内建的支援

JSON Datasets

Spark SQL可以自動推斷JSON資料集的schema,然後加載它成為一個* DataFrame *。可以使用

SQLContext

中兩個方法中的一個來完成這個轉換:

-

jsonFile

- 從JSON檔案的目錄加載資料,每個檔案的行都是一個JSON對象

-

jsonRDD

- 從一個存在的RDD加載資料,RDD的每個元素都是一個包含JSON對象的字元串

注意,提供的檔案-* jsonFile *不是一個典型的JSON檔案。每行都必須包含一個分割、獨立的有效JSON對象。是以,一個正規多行JSON檔案通常都會失敗。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can ne either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
// Create a DataFrame from the file(s) pointed to by path
val people = sqlContext.jsonFile(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
    """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
           

Hive Tables

Spark SQL也支援讀取和寫入存儲在Apache Hive資料。然而,因為Hive有大量的依賴,它沒有被植入Spark預設組裝。通過添加

-Phive

-Phive-thriftserver

标志到Spark’s bulid,就可以啟用Hive支援。這個指令建立一個包含Hive的新的組裝jar。注意,Hive組裝jar必須出現在所有工作節點,因為他們會需要通路Hive序列化庫和反序列化庫,為了通路存儲在Hive的資料。

配置Hive,

conf/.

中的

hive-site.xml

檔案。

當使用Hive時,必須建構一個

HiveContext

-繼承自

SQLContext

,然後添加對使用HiveQL編寫查詢和查找元存儲中的表的支援。如果使用者沒有存在的Hive部署,也可以建立

HiveContex

。如果沒有使用* hive-site.xml *配置,Context會在目前目錄自動建立

metastore_db

warehouse

// sc ia an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
           

JDBC To Other Databases

Spark SQL也包含一個資料源-可以使用JDBC從其它資料源讀取資料。這個功能優先使用JdbcRDD。這是因為以* DataFrame *形式傳回結果,而且它們可以容易的在Spark SQL中處理或與其它資料源聯合。這個JDBC資料源更容易的從Java或Python中使用,因為它不需要使用者提供ClassTag。(注意,這點不同于Spark SQL JDBC server,允許其它應用使用Spark SQL執行查詢。)

第一步,你需要添加你的資料源的JDBC driver到spark classpath。例如,從Spark shell連接配接postgres,你需要執行下面指令:

使用這個資料源的API,可以以* DataFrame *或Spark SQl臨時表的形式加載遠端資料庫的表。下面是支援的可選項:

Property Name Meaning
url 連接配接的JDBC URL
dbtable 将要讀取的JDBC表。注意,有效的SQL查詢子句’FROM’是可用的。例如,取代一個完全的表,你也可以在括号中使用子查詢
driver JDBC driver的class name需要連接配接這個URL。在執行JDBC指令,允許這個驅動注冊它自己和JDBC子系統之前,這個class會加載到master和workers
partitionColumn, lowerBound, upperBound, numPartitions 如果這些選項其中一個被指定了,那麼所有的都必須被指定。它們描述了當從多個節點并行讀取時,怎樣分區表。

partitionColumn

必須是表中的一個數值列,這是一個問題
val jdbcDF = sqlContext.load("jdbc", Map(
        "url" -> "jdbc:postgresql:dbserver",
        "dbtable" -> "schema.tablename")
    )
           

Troubleshooting

  • JDBC driver class 對于* client session 上的原始 class loader 和所有的 executors 來說,必須是可見的。這是因為Java的 DriverManager class 做安全檢查,當一個打開連接配接時,檢查的結果忽略了所有對于原始 class loader 不可見的 drivers 。一個友善的方法是更改所有節點的 compute_classpath.sh ,添加你的 driver *JARs
  • 一些資料庫,例如H2,轉化所有名字為大寫字母。在Spark SQL中,對于這些名字,你需要使用大寫字母

Performance Tuning

對不同的工作負載可以使用* 記憶體緩存資料 ,或 開啟一些實驗性的選項 *來提升性能。

Caching Data In Memory

Spark SQL可以通過調用

sqlContext.cacheTable("tableName")

dataFrame.cache()

來利用記憶體列格式緩存表。是以,Spark SQL隻需要掃描列,然後自動的優化壓縮來減少記憶體的使用和GC壓力。可以調用

sqlContext.uncacheTable("tableName")

從記憶體中移除表。

記憶體緩存配置 使用

SQLContext

setConf

方法或使用SQL運作

SET key=value

指令。

column column column
spark.sql.inMemoryColumnarStorage.compressed true 當設定為* true *時,Spark SQL會自動的基于資料統計對每一列選擇壓縮編碼
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列緩存的批量大小。大一些的批量大小可以提供記憶體使用率和壓縮率,但是有OOMs(Out of Memory)的風險

Other Configuration Options

下面的選項也可以被用來優化查詢性能。這些選項在未來版本可能被棄用,因為更多的優化會自動執行。

Property Name Default Meaning
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置表的位元組最大值,然後當執行結合操作時廣播到所有節點。設定值為 -1,取消廣播。注意,近期統計隻支援Hive Metastore表,使用指令

ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan

來執行
spark.sql.codegen false 當為true時,運作時會為一個指定的查詢中的表達式求值動态生成代碼。對一些包含複雜表達式的查詢,這個選項可以導緻重大的提升。然而,對一些簡單的查詢會緩慢查詢執行
spark.sql.shuffle.partitions 200 設定分區使用的數量,當結合或聚合資料的時候

Distributed SQL Engine

Spark SQL也能以分布式查詢引擎的方式執行,隻要使用它的JDBC/ODBC或指令行的接口。用這種方式,終端使用者或應用可以與Spark SQL目錄進行交換-隻用執行SQL查詢,不需要寫任何代碼。

Running the Thrift JDBC/ODBC server

這裡應用的Thrift JDBC/ODBC server對應Hive 0.13中的HiveServer2。你可以使用Spark或Hive 0.13的腳本測試JDBC server。

在Spark目錄中運作下面的目錄,啟動JDBC/ODBC server:

這個腳本接受所有

bin/spark-sumit

指令行選項,附加

--hiveconf

選項指定Hive屬性。你可以運作

./sbin/start-thriftserver.sh --help

擷取完整的可選項清單。預設的,server listen是localhost:10000。你可以重寫這個自變量通過環境變量,像:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...
           

或系統屬性:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...
           

現在。你可以使用beeline測試Thrift JDBC/ODBC server:

./bin/beeline
           

beeline連接配接 JDBC/ODBC server:

Beeline要求一個username和password。在非安全模式中,簡單的輸入username和空password。安全模式中,請按beeline documentation中的指令。

conf/

中的

hive-site.xml

可以配置Hive。

你也可以使用Hive的beeline腳本。

Thrift JDBC server也支援使用HTTP發送thrift RPC消息。使用下面的設定啟用HTTP模式在系統屬性或在

conf/

hive-site.xml

檔案:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
           

為測試,使用beeline以HTTP模式連接配接JDBC/ODBC server:

Running the Spark SQL CLI

Spark SQL CLI是一個友善的工具,用來以本地模式運作Hive metastore service和從指令行輸入執行查詢。注意,Spark SQL CLI不能與Thrift JDBC server通信。

在Spark目錄中運作下面指令,啟動Spark SQL CLI:

conf/

中的

hive-site.xml

可以配置Hive。你可以運作

./bin/spark-sql --help

擷取完整的可選項清單。

Migration Guide(遷移指南)

Upgrading from Spark SQL 1.0-1.2 to 1.3

Spark 1.30中,我們從Spark SQL中移除”Alpha”标簽,作為其中一部厘清理了可用APIs。從Spark 1.3之前,Spark SQL會提供1.x系列其它版本的二進制相容性。這個相容性保證了排除明确标志為不穩定APIs(即,DeveloperAPI或實驗性的)。

Rename of SchemaRDD to DataFrame

Unification of the Java and Scala APIs

Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)

Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)

UDF Registration Moved to sqlContext.udf (Java & Scala)

Python DataTypes No Longer Singletons

Migration Guide for Shark User

Scheduling

Reducer number

Caching

Compatibility with Apache Hive

Deploying in Existing Hive Warehouses

Supported Hive Features

Unsupported Hive Functionality

Data Types

Spark SQL和* DataFrame *支援下面的資料類型:

  • Numeric類型:
    • ByteType

      : 1-byte 帶符号整數;-128 - 127
    • ShortType

      : 2-byte 帶符号整數; -32768 - 32767
    • IntegerType

      : 4-byte 帶符号整數; -2147483648 - 2147483647.
    • LongType

      : 8-byte 帶符号整數 -9223372036854775808 - 9223372036854775807
    • FloatType

      : 4-byte 單精度浮點數
    • DoubleType

      : 8-byte 雙精度浮點數
    • DecimalType

      : 任意精度帶符号小數。内部使用

      java.math.BigDecimal

      支援。
  • String類型:
    • StringType

  • Binary類型:
    • BinaryType

  • Boolean類型:
    • BooleanType

  • Datetime類型:
    • TimestampType

    • DateType

  • Complex類型:
    • ArrayType(elementType, containsNull)

    • MapType

    • StructType(fields)

      • StructField(name, dataType, nullable)

        Spark SQL的所有資料類型都在包

        org.apache.spark.sql.types

        中。你可以使用下面來通路它們:
import org.apache.spark.sql.types._
           
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull]).Note: The default value of containsNull is true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull]).Note: The default value of valueContainsNull is true.
StructType org.apache.spark.sql.Row StructType(fields).Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed
StructField The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, nullable)

繼續閱讀