[html] view plain copy
- <pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="java">package com.fosun.sparkdemo;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.functions;
- import org.apache.spark.sql.api.java.UDF1;
- import org.apache.spark.sql.types.DataTypes;
- public class DataFrameJson {
- public static void main(String[] args) {
- JavaSparkContext sc =new JavaSparkContext((new SparkConf()).setAppName("Json").setMaster("local"));
- SQLContext sqlContext = new SQLContext(sc);
- //自定義UDF函數
- // sqlContext.udf().register("customF", new UDF1<String,String>(){
- //
- // private static final long serialVersionUID = 1L;
- //
- // @Override
- // public String call(String t1) throws Exception {
- // return "cast"+t1;
- // }
- //
- // }, DataTypes.StringType);
- //使用json讀取檔案
- // DataFrame df = sqlContext.read().json("D:/workspace/work2/sparkdemo/src/main/java/com/fosun/sparkdemo/resource/people.json");
- //将DataFrame注冊為一張表
- // df.registerTempTable("people");
- // //查詢表中的字段,并将age轉化為int型
- // df = df.select(df.col("name"),df.col("age").cast(DataTypes.IntegerType));
- //使用自定義函數
- // DataFrame df2 = sqlContext.sql("select customF(name) from people");
- //去重
- // df2 = df2.distinct();
- //展示DataFrame
- // df2.show();
- //過濾,類似于sql中的過濾條件與where相同,col和apply相同都是擷取列
- // df = df.filter(df.col("age").$eq$eq$eq(15).and(df.col("name").$eq$eq$eq("lisi")));
- // df = df.filter(df.col("age").$greater(14).$amp$amp(df.col("age").$less(16)));
- // df = df.filter(df.apply("age").$greater(14).$amp$amp(df.apply("age").$less(16)));//df.apply(column)==df.col(column)
- // df = df.filter("age = 15");
- // df = df.filter("name = 'lisi'");
- // df = df.filter("age = 15 && name = 'lisi'");//error 不能使用 &&
- // df = df.where("age = 15").where("name = 'lisi'");
- // df.filter(df.col("age").gt(13)).show();
- //新增一列,并用其他列指派
- // df = df.withColumn("subAge", df.col("age").$plus(10));//添加一列
- //新增一列,常數值列
- // df = df.withColumn("newColumn", functions.lit(11));//添加一列
- //給列名重命名
- // df = df.withColumnRenamed("subAge","newAge");
- // df = df.groupBy(df.col("name")).count();//添加新的字段count,然後以name來分組
- //agg使用聚合函數,由于java版缺乏函數式程式設計的支援,故使用org.apache.spark.sql.functions中的常量方法
- // df = df.groupBy("name").agg(functions.avg(df.col("age")).alias("avg"));//并且取個别名
- // df = df.sort(df.col("name").asc(),df.col("age").desc());//排序
- //将DataFrame儲存為parquet檔案
- // df.write().parquet("D:/workspace/work2/sparkdemo/src/main/java/com/fosun/sparkdemo/resource/people.parquet");
- // //讀取parquet檔案
- // DataFrame df2 = sqlContext.read().parquet("D:/workspace/work2/sparkdemo/src/main/java/com/fosun/sparkdemo/resource/people.parquet");
- //展示DataFrame的schema,和SQL中的Schema類似
- // df2.printSchema();
- // df.show();
- // df.printSchema();
- //檢視unix時間戳,java中unix_timestamp是ms,sparkSQL中是ms
- // DataFrame df = sqlContext.sql("select unix_timestamp() as ts"); //機關是s
- // df.show();//1492397384
- //有關unix時間戳的格式轉化
- // DataFrame df = sqlContext.sql("select from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as datetime"); //機關是s
- // df.show();//2017-04-17 10:54:36
- sc.stop();
- }
- }
- </pre><br>
- <br>
- <pre></pre>
- <pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="html"></pre><pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="html">項目的pom.xml檔案</pre><pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="html"><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.fosun</groupId>
- <artifactId>sparkdemo</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>sparkdemo</name>
- <url>http://maven.apache.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>1.6.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- <version>1.6.2</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
- </pre>
- <p>項目中使用到的json檔案</p>
- <p></p>
- <pre code_snippet_id="2337139" snippet_file_name="blog_20170417_2_2047153" name="code" class="plain">[{"name":"zhangsan","age":"12"},{"name":"lisi","age":"15"},{"name":"lisi","age":"18"}]</pre><br>
- <br>
- <p></p>