天天看點

X-Pack Spark 通路OSS

簡介

對象存儲服務(Object Storage Service,OSS)

是一種海量、安全、低成本、高可靠的雲存儲服務,适合存放任意類型的檔案。容量和處理能力彈性擴充,多種存儲類型供選擇,全面優化存儲成本。

本文主要介紹通過Spark操作OSS資料的常見方式,代碼以Scala為例。本文的代碼可以通過“

資料工作台

”送出。

前置條件

  1. OSS已經建立bucket,假設名稱為:test_spark
  2. 已建立具備讀寫OSS bucket:test_spark權限的使用者。假設使用者名為test_oss,通路OSS的AccessKeyID和AccessKeySecret分别為:accessId,accessKey。
  3. OSS的路徑格式為:oss://${AccessKeyID}:${AccessKeySecret}@${bucketName}.${endPoint}/${ossKeyPath}。例如:oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv

使用Spark讀寫OSS檔案樣例

假設有如下内容的文本資料已經存在OSS中,路徑為:oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv ,内容為:

101, name_101, 0.52
102, name_102, 0.78
103, name_103, 0.76
104, name_104, 0.78
105, name_105, 0.02
106, name_106, 0.29
107, name_107, 0.63
108, name_108, 0.20
109, name_109, 0.07
110, name_110, 0.33           

通過Spark讀取檔案,常用兩種方法

一、 使用DataFrame 讀取,執行個體代碼如下:

val conf = new SparkConf().setAppName("spark sql test")
val sparkSession = SparkSession
      .builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
val ossCsvPath = s"oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv"
//讀取test.csv并生産DataFrame
val fileDF = sparkSession.read.csv(ossCsvPath)
//列印fileDF内容
fileDF.show()
//也可以把fileDF 注冊是Spark表
fileDF.createOrReplaceTempView(“test_table")
sparkSession.sql("select * from test_table").show()               

二、 建立Spark Sql表指向test.csv,執行個體代碼如下:

val sql =
      s"""create table test_table(
         |      id          int,
         |      name        string,
         |      value       float
         |      ) row format delimited fields terminated by ','
         |      location 'oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/'
         |      """.stripMargin
//建立spark 表
sparkSession.sql(sql)
//查詢表資料
sparkSession.sql("select * from test_table").show()           

通過Spark寫檔案,常用DataFrame寫檔案。

示例代碼如下:

val conf = new SparkConf().setAppName("spark sql test")
val sparkSession = SparkSession
      .builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
val ossCsvPath = s"oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv"
//讀取test.csv并生産DataFrame
val fileDF = sparkSession.read.csv(ossCsvPath)
//列印fileDF内容
fileDF.show()
val writeOssParquetPath = "oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/parquet-table/"
//寫parquet格式檔案
fileDF.write.parquet(writeOssParquetPath)
val writeCsvParquetPath = "oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/csv-table/"
//寫csv格式檔案
fileDF.write.csv(writeCsvParquetPath)           

小結

本文給出Spark操作OSS資料的基本用法,更多用法會陸續推出。

繼續閱讀