天天看點

spark-streming 中調用spark-sql時過程遇到的問題

在spark-streming 中調用spark-sql時過程遇到的問題

使用版本:spark-2.1.0

JDK1.8

1. spark-sql中對limit 的查詢結果使用sum() 聚合操作不生效

如下sql會報出 top10_sts 存在異常。

SELECT 
  SUM(mtime_show_times) AS top10_sts 
FROM
  tb_movie_bo_pt_params 
ORDER BY mtime_persion_times DESC 
LIMIT 10 
           

改成如下sql邏輯正常執行

SELECT 
  SUM(mtime_show_times) AS top10_sts 
FROM
  (SELECT 
    * 
  FROM
    tb_moive_bo_pt_params 
  ORDER BY mtime_persion_times DESC 
  LIMIT 10) a 
           

2. spark-sql中使用union 連接配接兩個表;再将union結果進行過濾 != 操作不生效問題。

SELECT 
  'ALL_MOVIE' AS movie_id,
  SUM(no_sale) AS persion_times,
  COUNT(1) AS show_times 
FROM
  tb_bo_real_time 
WHERE biz_date = '#{var_date}' 
UNION
SELECT 
  'ALL_MOVIE2' AS movie_id,
  '2017-12-31' AS persion_times,
  '1123' AS show_times 
           

基于union 的結果生成的臨時表 temp_tb;

執行如下操作得不到預期的結果:

SELECT 
 *
FROM
  temp_tb 
WHERE movie_id != 'ALL_MOVIE' 
           

這 可能是spark的bug,經過調試後發現,使用 union關鍵字之後就會出現該問題。

測試發現有2種解決辦法:

  • 将uinon的兩部分分别使用sql計算,之後在使用RDD的union操作,将兩個資料集合合并起來。
val movie_summary_realtime_Df = sparkSession.sql(config.getProperty("test_union_sql").replace("#{var_date}", biz_date))
    movie_summary_realtime_Df.collect().foreach(println)
    println("-----------movie_summary_realtime_all_Df------------------")
    val movie_summary_realtime_all_Df = sparkSession.sql(config.getProperty("test_union_sql_all").replace("#{var_date}", biz_date))
    movie_summary_realtime_all_Df.collect().foreach(println)
    println("-----------union_Df------------------")

    val unioDf = movie_summary_realtime_Df.union(movie_summary_realtime_all_Df)
    unioDf.collect().foreach(println)
    unioDf.createOrReplaceTempView("tb_bo_movie_summary_realtime")

    println("-----------test filter------------------")
    val test_DF = sparkSession.sql("SELECT movie_id FROM tb_bo_movie_summary_realtime WHERE movie_id != 'ALL_MOVIE'")
    test_DF.collect().foreach(println)
           
  • 将相關依賴表cache後,再進行sql操作。
val movie_summary_realtime_Df = sparkSession.sql(config.getProperty("tb_bo_movie_summary_realtime").replace("#{var_date}", biz_date))
    movie_summary_realtime_Df.cache()
           

3. spark 記憶體快照的更新

def updateSeatMapState(moviesKey: String, seatMap: Option[JSONObject], state: State[JSONObject]) = {
    var newValue:JSONObject = seatMap match {
      case None => { val temp =state.get();temp;}
      case _ => { state.update(seatMap.get);seatMap.get; }
    }
    val output = (moviesKey, newValue)
    output
  }
           
  • 目前值Option[JSONObject] 有可能為none,state.update(none) 會有空指針異常,造成程式退出。
  • 目前值Option[JSONObject] 為none時,有兩種情況。一種是業務确實為空;另一種是目前key已經過期了。

    不管那種情況,都不需要更新state的值。

  • 如果一個state的可以過期了,再調用state.update()就會報出一個更新過期Key的異常,後程式退出。
  • 狀态值如果不設定過期,就會一直存在,系統長時間運作性能會越來越差,并出現記憶體溢出,而異常退出。
  • 更新方法需要傳回值。
java.lang.IllegalArgumentException: requirement failed: Cannot update the state that is timing out
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.streaming.StateImpl.update(State.scala:156)
        at com.mtime.bigdata.bo.RealTimeBoxOfficeCluster$.updateSeatMapState(RealTimeBoxOfficeCluster.scala:110)
        at com.mtime.bigdata.bo.RealTimeBoxOfficeCluster$$anonfun$6.apply(RealTimeBoxOfficeCluster.scala:72)
        at com.mtime.bigdata.bo.RealTimeBoxOfficeCluster$$anonfun$6.apply(RealTimeBoxOfficeCluster.scala:72)
        at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
        at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
        at 
           

作者:

丹江湖畔養蜂子的趙大爹

出處:http://www.cnblogs.com/honeybee/

關于作者:丹江湖畔養蜂子的趙大爹

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連結