天天看點

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

Spark SQL是處理結構化資料的Spark子產品。它提供了DataFrames這種程式設計抽象,同時也可以作為分布式SQL查詢引擎使用。

DataFrames

DataFrame是一個帶有列名的分布式資料集合。等同于一張關系型資料庫中的表或者R/Python中的data frame,不過在底層做了很多優化;我們可以使用結構化資料檔案、Hive tables,外部資料庫或者RDDS來構造DataFrames。

1. 開始入口:

入口需要從SQLContext類或者它的子類開始,當然需要使用SparkContext建立SQLContext;這裡我們使用pyspark(已經自帶了SQLContext即sc):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
           

還可以使用HiveContext,它可以提供比SQLContext更多的功能,例如可以使用更完整的HiveQL解析器寫查詢,使用Hive UDFs,從Hive表中讀取資料等。使用HiveContext并不需要安裝hive,Spark預設将HiveContext單獨打包避免對hive過多的依賴

2.建立DataFrames

使用JSON檔案建立:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
df.show()
           

注意:

這裡你可能需要将檔案存入HDFS(這裡的檔案在Spark安裝目錄中,1.4版本)

hadoop fs -mkdir examples/src/main/resources/
hadoop fs -put /appcom/spark/examples/src/main/resources/*         /user/hdpuser/examples/src/main/resources/
           

3.DataFrame操作

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
df.show()
## age  name
## null Michael
## 30   Andy
## 19   Justin

# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + ).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
df.filter(df['age'] > ).show()
## age name
## 30  Andy

# Count people by age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1
           

4.使用程式設計運作SQL查詢

SQLContext可以使用程式設計運作SQL查詢并傳回DataFrame。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")
           

5.和RDD互動

将RDD轉換成DataFrames有兩種方法:

  • 利用反射來推斷包含特定類型對象的RDD的schema。這種方法會簡化代碼并且在你已經知道schema的時候非常适用。
  • 使用程式設計接口,構造一個schema并将其應用在已知的RDD上。

一、利用反射推斷Schema

Spark SQL能夠将含Row對象的RDD轉換成DataFrame,并推斷資料類型。通過将一個鍵值對(key/value)清單作為kwargs傳給Row類來構造Rows。key定義了表的列名,類型通過看第一列資料來推斷。(是以這裡RDD的第一列資料不能有缺失)未來版本中将會通過看更多資料來推斷資料類型,像現在對JSON檔案的處理一樣。

# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[], age=int(p[])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print teenName
           

二、程式設計指定Schema

通過程式設計指定Schema需要3步:

  1. 從原來的RDD建立一個元祖或清單的RDD。
  2. 用StructType 建立一個和步驟一中建立的RDD中元祖或清單的結構相比對的Schema。
  3. 通過SQLContext提供的createDataFrame方法将schema 應用到RDD上。
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[], p[].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print name
           

繼續閱讀