天天看點

SparkSQL中DataFrame常用API

[html]  view plain  copy

  1. <pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="java">package com.fosun.sparkdemo;  
  2. import org.apache.spark.SparkConf;  
  3. import org.apache.spark.api.java.JavaSparkContext;  
  4. import org.apache.spark.sql.DataFrame;  
  5. import org.apache.spark.sql.SQLContext;  
  6. import org.apache.spark.sql.functions;  
  7. import org.apache.spark.sql.api.java.UDF1;  
  8. import org.apache.spark.sql.types.DataTypes;  
  9. public class DataFrameJson {  
  10.     public static void main(String[] args) {  
  11.         JavaSparkContext sc =new JavaSparkContext((new SparkConf()).setAppName("Json").setMaster("local"));  
  12.         SQLContext sqlContext = new SQLContext(sc);  
  13.         //自定義UDF函數  
  14. //      sqlContext.udf().register("customF", new UDF1<String,String>(){  
  15. //  
  16. //          private static final long serialVersionUID = 1L;  
  17. //  
  18. //          @Override  
  19. //          public String call(String t1) throws Exception {  
  20. //              return "cast"+t1;  
  21. //          }  
  22. //            
  23. //      }, DataTypes.StringType);  
  24.         //使用json讀取檔案  
  25. //      DataFrame df = sqlContext.read().json("D:/workspace/work2/sparkdemo/src/main/java/com/fosun/sparkdemo/resource/people.json");  
  26.         //将DataFrame注冊為一張表  
  27. //      df.registerTempTable("people");  
  28. //      //查詢表中的字段,并将age轉化為int型  
  29. //      df = df.select(df.col("name"),df.col("age").cast(DataTypes.IntegerType));  
  30.         //使用自定義函數  
  31. //      DataFrame df2 = sqlContext.sql("select customF(name) from people");  
  32.         //去重  
  33. //      df2 = df2.distinct();  
  34.         //展示DataFrame  
  35. //      df2.show();  
  36.         //過濾,類似于sql中的過濾條件與where相同,col和apply相同都是擷取列  
  37. //      df = df.filter(df.col("age").$eq$eq$eq(15).and(df.col("name").$eq$eq$eq("lisi")));  
  38. //      df = df.filter(df.col("age").$greater(14).$amp$amp(df.col("age").$less(16)));  
  39. //      df = df.filter(df.apply("age").$greater(14).$amp$amp(df.apply("age").$less(16)));//df.apply(column)==df.col(column)  
  40. //      df = df.filter("age = 15");  
  41. //      df = df.filter("name = 'lisi'");  
  42. //      df = df.filter("age = 15 && name = 'lisi'");//error 不能使用 &&  
  43. //      df = df.where("age = 15").where("name = 'lisi'");  
  44. //      df.filter(df.col("age").gt(13)).show();  
  45.         //新增一列,并用其他列指派  
  46. //      df = df.withColumn("subAge", df.col("age").$plus(10));//添加一列  
  47.         //新增一列,常數值列  
  48. //      df = df.withColumn("newColumn", functions.lit(11));//添加一列  
  49.         //給列名重命名  
  50. //      df = df.withColumnRenamed("subAge","newAge");  
  51. //      df = df.groupBy(df.col("name")).count();//添加新的字段count,然後以name來分組  
  52.         //agg使用聚合函數,由于java版缺乏函數式程式設計的支援,故使用org.apache.spark.sql.functions中的常量方法  
  53. //      df = df.groupBy("name").agg(functions.avg(df.col("age")).alias("avg"));//并且取個别名  
  54. //      df = df.sort(df.col("name").asc(),df.col("age").desc());//排序  
  55.         //将DataFrame儲存為parquet檔案  
  56. //      df.write().parquet("D:/workspace/work2/sparkdemo/src/main/java/com/fosun/sparkdemo/resource/people.parquet");  
  57. //      //讀取parquet檔案  
  58. //      DataFrame df2 = sqlContext.read().parquet("D:/workspace/work2/sparkdemo/src/main/java/com/fosun/sparkdemo/resource/people.parquet");  
  59.         //展示DataFrame的schema,和SQL中的Schema類似  
  60. //      df2.printSchema();  
  61. //      df.show();  
  62. //      df.printSchema();  
  63.         //檢視unix時間戳,java中unix_timestamp是ms,sparkSQL中是ms  
  64. //      DataFrame df = sqlContext.sql("select unix_timestamp() as ts"); //機關是s  
  65. //      df.show();//1492397384  
  66.         //有關unix時間戳的格式轉化  
  67. //      DataFrame df = sqlContext.sql("select from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as datetime"); //機關是s  
  68. //      df.show();//2017-04-17 10:54:36  
  69.         sc.stop();  
  70.     }  
  71. }  
  72. </pre><br>  
  73. <br>  
  74. <pre></pre>  
  75. <pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="html"></pre><pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="html">項目的pom.xml檔案</pre><pre code_snippet_id="2337139" snippet_file_name="blog_20170417_1_1032614" name="code" class="html"><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  76.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  77.   <modelVersion>4.0.0</modelVersion>  
  78.   <groupId>com.fosun</groupId>  
  79.   <artifactId>sparkdemo</artifactId>  
  80.   <version>0.0.1-SNAPSHOT</version>  
  81.   <packaging>jar</packaging>  
  82.   <name>sparkdemo</name>  
  83.   <url>http://maven.apache.org</url>  
  84.   <properties>  
  85.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  86.   </properties>  
  87.   <dependencies>  
  88.   <dependency>  
  89.         <groupId>org.apache.spark</groupId>  
  90.         <artifactId>spark-core_2.10</artifactId>  
  91.         <version>1.6.2</version>  
  92.     </dependency>  
  93.     <dependency>  
  94.         <groupId>org.apache.spark</groupId>  
  95.         <artifactId>spark-sql_2.10</artifactId>  
  96.         <version>1.6.2</version>  
  97.     </dependency>  
  98.     <dependency>  
  99.       <groupId>junit</groupId>  
  100.       <artifactId>junit</artifactId>  
  101.       <version>3.8.1</version>  
  102.       <scope>test</scope>  
  103.     </dependency>  
  104.   </dependencies>  
  105.   <build>  
  106.     <plugins>  
  107.         <plugin>  
  108.             <groupId>org.apache.maven.plugins</groupId>  
  109.             <artifactId>maven-compiler-plugin</artifactId>  
  110.             <version>3.5.1</version>  
  111.             <configuration>  
  112.                 <source>1.8</source>  
  113.                 <target>1.8</target>  
  114.                 <encoding>UTF-8</encoding>  
  115.             </configuration>  
  116.         </plugin>  
  117.     </plugins>  
  118. </build>  
  119. </project>  
  120. </pre>  
  121. <p>項目中使用到的json檔案</p>  
  122. <p></p>  
  123. <pre code_snippet_id="2337139" snippet_file_name="blog_20170417_2_2047153" name="code" class="plain">[{"name":"zhangsan","age":"12"},{"name":"lisi","age":"15"},{"name":"lisi","age":"18"}]</pre><br>  
  124. <br>  
  125. <p></p>  

繼續閱讀