天天看點

Spark工程開發常用函數與方法(Scala語言)

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.sql.{SaveMode, DataFrame}

import scala.collection.mutable.ArrayBuffer

import main.asiainfo.coc.tools.Configure

import org.apache.spark.sql.hive.HiveContext

import java.sql.DriverManager

import java.sql.Connection

 1 連接配接前台資料源 查詢前台MYSQL中的資料

val DIM_COC_INDEX_INFO_DDL = s"""
CREATE TEMPORARY TABLE DIM_COC_INDEX_INFO
USING org.apache.spark.sql.jdbc
OPTIONS (
url '${mySQLUrl}',
dbtable 'DIM_COC_INDEX_INFO'
)""".stripMargin

sqlContext.sql(DIM_COC_INDEX_INFO_DDL)
val DIM_COC_INDEX_INFO = sql("SELECT * FROM DIM_COC_INDEX_INFO").cache()
      

  

2   在A表中篩選出 B表中擷取的TARGET_TABLE_CODE 然後再按照DATA_SRC_CODE排序,查詢出源表的集合

val sources = DIM_COC_INDEX_INFO.filter("TARGET_TABLE_CODE ='"+TARGET_TABLE_CODE+"'")
        .select("DATA_SRC_CODE").groupBy("DATA_SRC_CODE").agg(DIM_COC_INDEX_INFO("DATA_SRC_CODE")).collect
      

3 将表進行關聯

resultIndexTableDF = resultIndexTableDF.join(SOURCE_TABLE,ALL_USERS.col(ALL_USER_JOIN_COLUMN_NAME) === SOURCE_TABLE.col(SOURCE_TABLE_JOIN_COLUMN_NAME),"left_outer")
resultIndexTableDF.dtypes.foreach(println)
      

4 根據條件篩選

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner")
      .join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner")
      .join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner")
      .join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner")
      .filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === CI_APPROVE_STATUS_SUCCESS_CODE
      and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2)
      and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull  //TODO   trim.length>0
      )
      and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === TABLE_DATA_CYCLE
      ).cache()
      

5 根據某字段對表進行排序

val labelTargetTables = labels.groupBy("CI_MDA_SYS_TABLE.TABLE_ID","CI_MDA_SYS_TABLE.TABLE_NAME").agg(labels("CI_MDA_SYS_TABLE.TABLE_ID"),labels("CI_MDA_SYS_TABLE.TABLE_NAME")).collect
      

6 建立parquet格式的表 可使用schema.生成到指定的schema.

sqlContext.sql("create table "+labelTargetTableName+" stored as parquet as select * from default."+labelTargetTableNameJson)
      

7 儲存資料格式,可以指定生成的格式

resultLabelTable.saveAsTable(tableName = labelTargetTableName, source="parquet", mode=SaveMode.Overwrite)
      

8 根據篩選查詢出相應資料,由于cache方法并不屬于action操作,接下來的操作需要這一步所執行的資料資訊,是以這裡使用collect方法,再執行周遊方法

val r0000Labels = labelInThisTargetTable.filter("COUNT_RULES_CODE = 'R_00000'").select("CI_LABEL_INFO.LABEL_ID","COLUMN_NAME").collect
for(r0000Label <- r0000Labels){
   ........
}