天天看點

算法市場Spark算法編寫Tips

随着Spark的流行,越來越多的同學在使用Spark編寫算法使用。集團Spark團隊推出了非常好用的Spark On ODPS 用戶端 https://yuque.antfin-inc.com/amr/alispark/vrl4tz 給大家使用,在編寫Spark的過程中也踩了一些坑,正好記錄一些Tips供大家參考,

讀入參數

  • local模式傳遞參數
    • 使用local模式送出的時候我們一般使用./bin/spark-submit 指令。對該指令打出Help後發現沒有關于參數傳遞的說明。不過local模式和Client模式都統一了參數的輸入方式,直接在指令後接上 參數=值 的這種鍵值對就可以被Spark代碼接收到。
    • 比如這是一個Spark2.0 local模式啟動示例
      ./bin/spark-submit --class 入口函數 jar包 參數1=值1 參數2=值2 參數3=值3           
  • Spark 2.0讀入參數
    • Spark中參數是作為main函數的args傳入的,但是拿到的都是參數=值的形式,可以用下面的函數将其處理成HashMap形式在後面的程式使用
      public static final String delimiter = "=";
      
      /**
       * 轉化輸入參數
       * @param args
       * @return
       */
      public static Map<String, String> getMainArgs(String args[]) {
        System.out.println("============Parameter================");
        Map<String,String> params = new HashMap<>();
        for(String arg:args){
            String ss [] = arg.trim().split(delimiter);
            params.put(ss[0],ss[1]);
            System.out.println(ss[0]+":"+ss[1]);
        }
        System.out.println("=====================================");
        return params;
      }           
  • PySpark讀入參數
    • PySpark中參數也是作為指令的一部分傳入,但是在傳入的過程中可能會參入其他的内容,需要對其他的格式做一個區分。
      # 處理輸入參數
      arg_dict = {}
      for arg in sys.argv:
        argParam = arg.split('=', 1)
        if len(argParam) > 1:
            arg_dict[argParam[0]] = argParam[1]           

讀寫ODPS

  • Spark 2.0
    • 使用Spark Session直接運作SQL語句
      SparkSession sparkSession = SparkSession.builder().appName("spark sql test").getOrCreate();
      sparkSession.sql("CREATE TABLE IF NOT EXISTS " + outputProjectName + "." + outputTableName+"(id STRING,content STRING,hash_value STRING)");           
  • PySpark
    • spark = SparkSession.builder.appName("spark sql").getOrCreate()
      spark.sql("CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(id STRING,content STRING)")