天天看點

記一次Spark讀取多個kudu表的優化經曆

程式原邏輯是從一張kudu表讀資料進行後續統計,

需求是修改為從另外兩張不同的kudu表讀資料,合并兩張表的資料進行後續統計。

于是修改的邏輯簡化後是如下兩個步驟:

1、從兩張kudu表讀資料,并且分别注冊成臨時表

import org.apache.kudu.spark.kudu._
val kudu_table1 = spark.read.options(
                  Map("kudu.master" -> kuduMaster,
                      "kudu.table" -> Table1)
                     ).kudu
                .select("col1", "col2", "col3")
                .where(s"col3= substr(${Time},1,8)")
                .distinct()
kudu_table1.createOrReplaceTempView("kudu_table1")

val kudu_table2 = spark.read.options(
                  Map("kudu.master" -> kuduMaster,
                      "kudu.table" -> Table2)
                     ).kudu
                .select("col1", "col2", "col3")
                .where(s"col3= substr(${Time},1,8)")
                .distinct()
kudu_table2.createOrReplaceTempView("kudu_table2")
           

2、使用sparkSQL,合并兩張表進行統計,通過phoenix寫入Hbase

val DF1 = spark.sql("""
      |with union_data as (
      |  select col1,
      |         col2,
      |         col3
      |  from kudu_table1
      |  union all
      |  select col1,
      |         col2,
      |         col3
      |  from kudu_table2
      |)
      |select ... 
      |  from ... 
      | where ... 
      | group by ... 
      |	grouping sets(...)
      |	 )""".stripMargin
     ).persist()
PhoenixJdbcUtil.saveToPhoenix(DF1 , phoenixTable, phoenixZk)
           

測試後發現,運作時間極慢,原本運作一個批次隻需要十幾秒,現在修改後需要八九分鐘。

後優化如下:

1、從兩張kudu表讀資料,對兩個DataFrame先進行DF的union,union後注冊成一個臨時表

import org.apache.kudu.spark.kudu._
val kudu_table1 = spark.read.options(
                  Map("kudu.master" -> kuduMaster,
                      "kudu.table" -> Table1)
                     ).kudu
                .select("col1", "col2", "col3")
                .where(s"col3= substr(${Time},1,8)")
                .distinct()

val kudu_table2 = spark.read.options(
                  Map("kudu.master" -> kuduMaster,
                      "kudu.table" -> Table2)
                     ).kudu
                .select("col1", "col2", "col3")
                .where(s"col3= substr(${Time},1,8)")
                .distinct()

//主要修改點:對DF進行union,并去重以減少後續sparkSQL計算的資料量,注冊成一張臨時表
val union = kudu_table1.union(kudu_table2).distinct()
union.createOrReplaceTempView("kudu_table")
           

2、使用sparkSQL,使用上述的一張臨時表kudu_table直接進行統計,通過phoenix寫入Hbase

val DF1 = spark.sql("""
      |select ... 
      |  from kudu_table
      | where ... 
      | group by ... 
      |	grouping sets(...)
      |	 )""".stripMargin
     ).persist()
PhoenixJdbcUtil.saveToPhoenix(DF1 , phoenixTable, phoenixZk)
           

經測試,優化後的程式運作一個批次隻需1分鐘左右,運作時間減少好幾倍。

具體原理不明(有知道原理的小夥伴可以在下面評論一下,嘿嘿)。但應該是SparkSQL由于要解析SQL,效率上不如dataFrame操作效率高。

2019.10.30補充:

運作慢有可能是sql的問題,之前where條件中使用到了從concat(day,min) = '...',經測試,此做法計算極慢。現在是where day='...' and min='...',該方式效率提高很多。

繼續閱讀