天天看点

第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作

内容:

    1.RDD与DataFrame转换的重大意义

    2.使用Java实战RDD与DataFrame转换

    3.使用Scala实战RDD与dataFrame转换

一.  RDD与DataFrame转换的重大意义

    1.在Spark中RDD可以直接转换成DataFrame。SparkCore的核心是RDD,所有的调度都是基于RDD完成的,对RDD的操作都可以转换成基于DataFrame使用SparkSQL来操作。RDD可能接上数据库,接上NoSQL,其他文件系统等各种数据来源,然后将数据转换为DataFrame,极大简化了大数据的开发,原来写Scala\Java,现在只需要写SparkSQL。

    2.同时对DataFrame的操作又可以转换成RDD,基于DataFrame对数据进行SQL或机器学习等操作后又可以转换为RDD,这对于保存数据、格式化非常方便。

    3.RDD变DataFrame有两种方式:

        a)通过反射,推断RDD元素中的元数据。

        RDD中的数据本身是没有元数据的,例如一个Person的信息里有id/name/age,RDD的Record不知道id/name/age这些信息,但如果变成DataFrame的话,DataFrame必须知道这些信息。最简单的就是通过反射知道在RDD和DataFrame转换时拥有这些元数据信息。

            在Scala中就是Case Class映射。写一个Case Class,描述RDD中不同列的元数据是什么。Scala:case class映射。

            在Java中就是通过JavaBean。Java:Bean(但不能支持嵌套的JavaBean,也不能有List/Map等复杂的数据结构。只能用简单的数据类型:String/Int等。Scala就没有这些限制)

         注意:使用反射的前提,已经知道元数据信息了(静态的)。但有些场景下只有在运行时才能知道元数据信息(动态的)

        b)创建DataFrame时事先不知道元数据信息,只能在运行时动态构建元数据。然后再把这些元数据信息应用于RDD上。这种情况是比较常见的情况,即动态获取Schema。

二、使用Java实战RDD与DataFrame转换

1.准备数据:person.txt

1,Spark,7
2,Hadoop,11
3,Flink,5
           

2.编写Person.class

package SparkSQL;

import java.io.Serializable;

/**
 * FileName: Person
 * Author:   hadoop
 * Email:    [email protected]
 * Date:     18-10-28 下午4:27
 * Description:
 */
public class Person implements Serializable {

        private int id;
        private String name;
        private int age;

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        @Override
        public String toString() {
            return "Person{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}';
        }

}
           

3.编写RDDToDataFrameByReflection.class

package SparkSQL;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.List;

/**
 * FileName: RDDToDataFrameByReflection
 * Author:   hadoop
 * Email:    [email protected]
 * Date:     18-10-28 下午3:27
 * Description:使用反射的方式将RDD转换成为DataFrame
 */
public class RDDToDataFrameByReflection {
    public static void main(String[] args){
        //创建SparkConf用于读取系统信息并设置运用程序的名称
        SparkConf conf  = new SparkConf().setAppName("RDDToDataFrameByReflection").setMaster("spark://Master:7077");
        //创建JavaSparkContext对象实例作为整个Driver的核心基石
        JavaSparkContext sc = new JavaSparkContext(conf);
        //设置输出log的等级,可以设置INFO,WARN,ERROR
        sc.setLogLevel("ERROR");
        //创建SQLContext上下文对象,用于SqL的分析
        SQLContext sqlContext = new SQLContext(sc);
        //创建RDD,读取textFile
        JavaRDD<String> lines = sc.textFile("/hadoop/dataframe/input");
        /**
         * 将读入的RDD数据转化为Person类型的DataFrame
         */
        JavaRDD<Person> person = lines.map(new Function<String, Person>() {
            @Override
            public Person call(String line) throws Exception {
                String[] splited = line.split(",");
                Person p = new Person();
                p.setId(Integer.valueOf(splited[0].trim()));
                p.setName(splited[1]);
                p.setAge(Integer.valueOf(splited[2].trim()));
                return p;
            }
        });
        /**
         * reateDataFrame方法来自于sqlContext,有两个参数,第一个是RDD,这里就是lines.map之后的person
         * 这个RDD里的类型是Person,即每条记录都是Person,Person其实是有id,name,age的,
         * JavaRDD本身并不知道id,name,age信息,所以要创建DataFrame,DataFrame需要知道id,name,age信息,
         * DataFrame怎么知道的呢?这里用createDataFrame时传入两个参数,第一个的RDD本身,第二个参数是
         * 对RDD中每条数据的元数据的描述,这里就是java bean class,即person.class
         * 实际上工作原理是:person.class传入时本身会用反射的方式创建DataFrame,
         * 在底层通过反射的方式获得Person的所有fields,结合RDD本身,就生成了DataFrame
         */
       Dataset ds =  sqlContext.createDataFrame(person,Person.class);
        //将DataFrame变成一个TempTable
       ds.registerTempTable("person");
        //在内存中就会生成一个persons的表,在这张临时表上就可以写SQL语句了
       Dataset bigDatas = sqlContext.sql("select * from person where age >= 6");
        //转过来就可以把查询后的结果变成 RDD,返回的是JavaRDD<Row>
       JavaRDD<Row> bigdataRDD = bigDatas.javaRDD();
        //再对RDD进行map操作。元素是一行一行的数据(SQL的Row),结果是Person,再次还原成Person。
        //这里返回的是具体的每条RDD的元素。
      JavaRDD<Person> result =  bigdataRDD.map(new Function<Row, Person>() {
           @Override
           public Person call(Row row) throws Exception {
               Person p = new Person();
               /**
                * 转化为DataFrame后,dataFrame对数据字段进行了结构优化,
                * 对字段进行了排序,所以使用下面的方式是不能按正确数据顺序访问数据的
                *         p.setId(row.getInt(0));
                *         p.setName(row.getString(1));
                *         p.setAge(row.getInt(2));
                */

               p.setId(row.getInt(1));
               p.setName(row.getString(2));
               p.setAge(row.getInt(0));
               return p;
           }
       });
     List<Person> personList =  result.collect();
     for (Person p : personList){
         System.out.println(p);
     }

    }
}

           

4.运行结果:

第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作

继续阅读