1. sparkR的簡介
SparkR是一個R語言包,它提供了輕量級的方式使得可以在R語言中使用Apache Spark。在Spark 1.4中,SparkR實作了分布式的data frame,支援類似查詢、過濾以及聚合的操作(類似于R中的data frames:dplyr),但是這個可以操作大規模的資料集。
2. 使用spark的兩種方式
1.在sparkR的shell中互動式使用
sparkR
2.在R腳本中使用
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "spark://10.137",sparkEnvir = list(spark.driver.memory="3g"))
3. 純R語言和SparkR
當資料量很大時,純R速度就比較慢,無法用到大資料分布式性能,SparkR可以。
注意比較python,pyspark,SparkR等
4. SparkR DataFrame的基本使用
DataFrame是資料組織成一個帶有列名稱的分布式資料集。在概念上和關系型資料庫中的表類似,或者和R語言中的data frame類似,但是這個提供了很多的優化措施。構造DataFrame的方式有很多:可以通過結構化檔案中構造;可以通過Hive中的表構造;可以通過外部資料庫構造或者是通過現有R的data.frame構造等等。
1.從SparkContext和SQLContext開始
SparkContext是SparkR的切入點,它使得你的R程式和Spark叢集互通。你可以通過sparkR.init來建構SparkContext,然後可以傳入類似于應用程式名稱的選項給它。如果想使用DataFrames,我們得建立SQLContext,這個可以通過SparkContext來構造。如果你使用SparkR shell, SQLContext 和SparkContext會自動地建構好。
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
2.建立DataFrame
如果有SQLContext執行個體,那麼應用程式就可以通過本地的R data frame(或者是Hive表;或者是其他資料源)來建立DataFrames。下面将詳細地介紹。
(1)通過本地data.frame構造
最簡單地建立DataFrames是将R的data frame轉換成SparkR DataFrames,我們可以通過createDataFrame來建立,并傳入本地R的data.frame以此來建立SparkR DataFrames,下面例子就是這種方法:
user=data.frame(name=c('zhangsan','lisi','wangwu','zhaoliu'),age=c(21,23,20,27))
df <- createDataFrame(sqlContext, user)
(2)通過Data Sources構造
通過DataFrame接口,SparkR支援操作多種資料源,本節将介紹如何通過Data Sources提供的方法來加載和儲存資料。你可以閱讀Spark SQL程式設計指南來了解更多的options選項.
Data Sources中建立DataFrames的一般方法是使用read.df,這個方法需要傳入SQLContext,需要加載的檔案路徑以及資料源的類型。SparkR内置支援讀取JSON和Parquet檔案,而且通過Spark Packages你可以讀取很多類型的資料,比如CSV和Avro檔案。
下面是介紹如何JSON檔案,注意,這裡使用的檔案不是典型的JSON檔案。每行檔案必須包含一個分隔符、自包含有效的JSON對象:
people <- read.df(sqlContext, "/wmf/people.json", "json")
head(people)
# SparkR 能自動從Json檔案推斷schema
printSchema(people)
Data sources API還可以将DataFrames儲存成多種的檔案格式,比如我們可以通過write.df将上面的DataFrame儲存成Parquet檔案:
write.df(people, path="people.parquet", source="parquet", mode="overwrite")
(3)通過Hive tables構造
我們也可以通過Hive表來建立SparkR DataFrames,為了達到這個目的,我們需要建立HiveContext,因為我們可以通過它來通路Hive MetaStore中的表。注意,Spark内置就對Hive提供了支援。
hiveContext <- sparkRHive.init(sc)
sql="能在bdcmagic上運作的sql語句"
results<-sql(hiveContext, sql)
head(results)
3.DataFrame的相關操作
(1)選擇行和列
#建立一個資料框
user=data.frame(name=c('zhangsan','lisi','wangwu','zhaoliu'),age=c(21,23,20,27))
df <- createDataFrame(sqlContext, user)
#獲得資料框的一個基本資訊
df
#選擇某一列
head(select(df,df$name)) #或者直接使用資料框的列名來選擇head(select(df,name))
(2)Grouping和Aggregation
#n操作符其實就是count的意思
head(summarize(groupBy(df, df$sex), count = n(df$sex)))
#資料框的排序
sex_counts=summarize(groupBy(df, df$sex), count = n(df$sex))
head(arrange(sex_counts, desc(sex_counts$count)))
(3)列上面的操作
SparkR提供了大量的函數用于直接對列進行資料處理的操作。
#為資料框增加一列
df$second_age=df$age+10
head(df)
(4)在資料框上使用SQL查詢
#建立一個資料框
...
#将資料框注冊成表
registerTempTable(df, "people")
#運作sql語句
sql(hiveContext,"sql語句,eg:select * from people")
#過濾,選擇滿足條件的行
head(filter(df, df$age < 23))