一、使用反射的方式
這種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema,也就是根據包括case class資料的RDD轉換成DataFrame,這個方式簡單,在已知schema的時候非常友善
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]").getOrCreate()
// val accessRdd = spark.sparkContext.textFile("D://item//mukewang//data//1.txt")
val accessRdd = transfer(spark,"D://item//mukewang//data//province.txt")
inferReflection(accessRdd,spark)
}
def inferReflection(rdd :RDD[String],spark:SparkSession): Unit = {
// val rdd1 = rdd.map(line =>{
// val splits = line.split("\t")
//
// })
//注意:需要導入隐式轉換
import spark.implicits._
val rddDF = rdd.map(_.split("\t")).map(line => region(line(0),line(1))).toDF()
rddDF.show()
}
case class region(code: String, name: String)
二、使用程式設計方式動态轉
首先建構一個RDD,再建立由一個 StructType 表示的模式,構造schema用到了兩個類StructType和StructFile,然後通過 createDataFrame 方法應用模式
def program(rdd :RDD[String],spark:SparkSession):Unit={
// rdd.take(10).foreach(println)
val rddDF = rdd.map(_.split("\t")).map(line => Row(line(0),line(1)))
val structType = StructType(Array(StructField("code",StringType,true),
StructField("name",StringType,true)
))
val infoDF = spark.createDataFrame(rddDF,structType)
infoDF.printSchema()
infoDF.show()
}
三、txt檔案中文亂碼
下面是一個讀取txt檔案中文亂碼的解決方法:
def transfer(sc:SparkSession,path:String):RDD[String]={
sc.sparkContext.hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],1)
.map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
}
在使用時直接調用即可:
val accessRdd = transfer(spark,"D://item//mukewang//data//province.txt")