随着Spark的流行,越來越多的同學在使用Spark編寫算法使用。集團Spark團隊推出了非常好用的Spark On ODPS 用戶端 https://yuque.antfin-inc.com/amr/alispark/vrl4tz 給大家使用,在編寫Spark的過程中也踩了一些坑,正好記錄一些Tips供大家參考,
讀入參數
- local模式傳遞參數
- 使用local模式送出的時候我們一般使用./bin/spark-submit 指令。對該指令打出Help後發現沒有關于參數傳遞的說明。不過local模式和Client模式都統一了參數的輸入方式,直接在指令後接上 參數=值 的這種鍵值對就可以被Spark代碼接收到。
- 比如這是一個Spark2.0 local模式啟動示例
./bin/spark-submit --class 入口函數 jar包 參數1=值1 參數2=值2 參數3=值3
- Spark 2.0讀入參數
- Spark中參數是作為main函數的args傳入的,但是拿到的都是參數=值的形式,可以用下面的函數将其處理成HashMap形式在後面的程式使用
public static final String delimiter = "="; /** * 轉化輸入參數 * @param args * @return */ public static Map<String, String> getMainArgs(String args[]) { System.out.println("============Parameter================"); Map<String,String> params = new HashMap<>(); for(String arg:args){ String ss [] = arg.trim().split(delimiter); params.put(ss[0],ss[1]); System.out.println(ss[0]+":"+ss[1]); } System.out.println("====================================="); return params; }
- Spark中參數是作為main函數的args傳入的,但是拿到的都是參數=值的形式,可以用下面的函數将其處理成HashMap形式在後面的程式使用
- PySpark讀入參數
- PySpark中參數也是作為指令的一部分傳入,但是在傳入的過程中可能會參入其他的内容,需要對其他的格式做一個區分。
# 處理輸入參數 arg_dict = {} for arg in sys.argv: argParam = arg.split('=', 1) if len(argParam) > 1: arg_dict[argParam[0]] = argParam[1]
- PySpark中參數也是作為指令的一部分傳入,但是在傳入的過程中可能會參入其他的内容,需要對其他的格式做一個區分。
讀寫ODPS
- Spark 2.0
- 使用Spark Session直接運作SQL語句
SparkSession sparkSession = SparkSession.builder().appName("spark sql test").getOrCreate(); sparkSession.sql("CREATE TABLE IF NOT EXISTS " + outputProjectName + "." + outputTableName+"(id STRING,content STRING,hash_value STRING)");
- 使用Spark Session直接運作SQL語句
- PySpark
-
spark = SparkSession.builder.appName("spark sql").getOrCreate() spark.sql("CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(id STRING,content STRING)")
-