在Spark中RDD轉換為DataFrame一共有兩種方法。一種是基于一個封裝資料的類去轉換,另一種是通過結構類型去轉換,下面分别介紹。
基于封裝資料的類去轉換(反射方式)
Java版本
這種方法是底層基于類的反射機制去實作的,為了封裝資料我們首先需要一個封裝資料的類也就是JavaBean。然後去構造一個JavaRDD,然後調用構造RDD的方法去構造DataFrame,這時需要傳入構造好的RDD與JavaBean.class進而得到一個Dataset(注意在Java中Dataset也就是DataFrame)。下面做一個展示:
我們所用到的資料:
1,leo,17
2,marry,17
3,jack,18
4,tom,19
下面為Java代碼:
package test.scala.com.zhuliu7.sparksqlnext_java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class RDD2DataFrameReflection {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("dfuse").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD oraginRDD = sc.textFile("E:\\testdata\\students.txt");
JavaRDD stuRDD = oraginRDD.map(new Function() {
@Override
public Student call(String s) throws Exception {
String[] strs = s.split(",");
Student stu = new Student();
stu.setId(Integer.valueOf(strs[0]));
stu.setName(strs[1]);
stu.setAge(Integer.valueOf(strs[2]));
return stu;
}
});
// 使用反射的方式來建立DataFrame,包含的資料類型隻能是簡單類型,這句代碼是核心
Dataset ds = sqlContext.createDataFrame(stuRDD,Student.class);
ds.show();
ds.registerTempTable("t_stu");
Dataset nds = sqlContext.sql("select * from t_stu where age <= 18");
// 将查詢出來的結果再次封裝到RDD中
JavaRDD finalRDD = nds.javaRDD();
// 将資料映射為Student類型
JavaRDD fRDD = finalRDD.map(new Function() {
@Override
public Student call(Row row) throws Exception {
Student stu = new Student();
stu.setAge(row.getInt(0));
stu.setName(row.getString(2));
stu.setId(row.getInt(1));
return stu;
}
});
fRDD.foreach(new VoidFunction() {
@Override
public void call(Student student) throws Exception {
System.out.println(student);
}
});
}
}
從上面我們也能夠看到,想要從DataFrame還原為以前的JavaRDD隻需要将DataFrame轉換為JavaRDD再做一次映射進而轉換為JavaRDD就ok了。
Scala版本
與Java版本的思路一樣隻不過是把JavaBean替換成了case class。代碼如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object RDD2DataFrameReflaction {
case class Student(id : Int, name : String,age : Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DFUse").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // 放在sqlContext之後
val preRDD = sc.textFile("E:\\testdata\\students.txt").map(x => x.split(",")).map(arr => Student(arr(0).toInt,arr(1).toString,arr(2).toInt))
val df = preRDD.toDF() // 這句話編譯通過需要import sqlContext.implicits._的支援,而這句話能夠正确運作需要将case class放在成員變量的位置
df.show()
// 接下來将df中查詢的結果在還原為RDD
df.registerTempTable("t_sdu")
val fdf = sqlContext.sql("select * from t_stu where age > 17")
val tranRDD = fdf.rdd // 這時RDD中的元素都為Row
val finalRDD = tranRDD.map(row => Student(row(0).toString.toInt,row(1).toString,row(1).toString.toInt)) //這個順序可以自定義
val fRDD = tranRDD.map(row => Student(row.getInt(0),row.getString(1),row.getInt(2)))
}
}
使用結構類型StructType去轉換
Java版本
首先去構造一個JavaRDD,這裡要注意Row中的元素資料類型要和我們的即将構造的結構類型中的類型要一一對應。接着就要去構造StructType類型,最後使用SQLContext配合JavaRDD與StructType去建立DataFrame,代碼如下:
package test.scala.com.zhuliu7.sparksqlnext_java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class Programme2DF {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("dfuse").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD oraginRDD = sc.textFile("E:\\testdata\\students.txt");
JavaRDD rowJavaRDD = oraginRDD.map(new Function() {
@Override
public Row call(String s) throws Exception {
String[] strs = s.split(",");
return RowFactory.create(
Integer.valueOf(strs[0]),
strs[1],
Integer.valueOf(strs[2])
);
}
});
// 定義StructType結構類型
List fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("name",DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType type = DataTypes.createStructType(fields);
Dataset ds = sqlContext.createDataFrame(rowJavaRDD,type);
ds.show();
}
}
Scala版本
與Java的思路相似,構造RDD,StructType進而建構DataFrame。
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
object Programme2DF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DFUse").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // 放在sqlContext之後
val rowRDD = sc.textFile("E:\\testdata\\students.txt").map(x => Row(x.split(",")(0).toInt,x.split(",")(1),x.split(",")(2).toInt))
// 構造StructType
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
val df = sqlContext.createDataFrame(rowRDD, structType)
df.show()
}
}
最後感謝石杉老師!