天天看點

Spark實戰(九)sparkRDD轉為DataFrame的兩種方式一、使用反射的方式二、使用程式設計方式動态轉三、txt檔案中文亂碼

一、使用反射的方式

   這種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema,也就是根據包括case class資料的RDD轉換成DataFrame,這個方式簡單,在已知schema的時候非常友善

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("TopNStatJob")
  .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
  .master("local[2]").getOrCreate()

//    val accessRdd = spark.sparkContext.textFile("D://item//mukewang//data//1.txt")
    val accessRdd = transfer(spark,"D://item//mukewang//data//province.txt")

inferReflection(accessRdd,spark)

 }
	def inferReflection(rdd :RDD[String],spark:SparkSession): Unit = {

//    val  rdd1 = rdd.map(line =>{
//      val splits = line.split("\t")
//
//    })
    //注意:需要導入隐式轉換
    import spark.implicits._
    val rddDF = rdd.map(_.split("\t")).map(line => region(line(0),line(1))).toDF()
    rddDF.show()

  }
 case class region(code: String, name: String)
           

二、使用程式設計方式動态轉

   首先建構一個RDD,再建立由一個 StructType 表示的模式,構造schema用到了兩個類StructType和StructFile,然後通過 createDataFrame 方法應用模式

def program(rdd :RDD[String],spark:SparkSession):Unit={

//    rdd.take(10).foreach(println)
    val rddDF = rdd.map(_.split("\t")).map(line => Row(line(0),line(1)))
    val structType = StructType(Array(StructField("code",StringType,true),
      StructField("name",StringType,true)
    ))

    val infoDF = spark.createDataFrame(rddDF,structType)
    infoDF.printSchema()
    infoDF.show()
  }
           

三、txt檔案中文亂碼

   下面是一個讀取txt檔案中文亂碼的解決方法:

def transfer(sc:SparkSession,path:String):RDD[String]={
    sc.sparkContext.hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],1)
      .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
  }
           

   在使用時直接調用即可:

val accessRdd = transfer(spark,"D://item//mukewang//data//province.txt")
           

繼續閱讀