天天看點

實戰再更新:流批一體處理百億級别資料

實戰再更新:流批一體處理百億級别資料

編 輯:LCP

彭友們好,我是你的老朋友。最近業務方提出了新的需求,增加了些業務邏輯,同時資料量也成倍增加,而且還要求每日産出名額結果,之前出過一個解決方案,效果還不錯:

調整前資料量 調整後資料量
生産端 每日20億左右 每日500億左右
輸出端 減半,10億左右 每日50億+

考慮到投入産出比,該需求仍然采用來原來老的技術方案設計,隻是做了些優化手段。具體使用到的技術:Java,Kafka,MLSQL,Logstash,Ruby,Hive,ES,SparkSQL,Datax。

方案設計

「1.資料流向」

實戰再更新:流批一體處理百億級别資料

流程:

  1.業務方将資料推送至MQ,并将消息進行序列化處理

  2.通過流平台接入消費消息,并進行一部分邏輯處理,再次回轉到MQ中

  3.使用logstash消費消息,編寫ruby進行邏輯處理,将資料寫入hdfs

  4.數倉對hdfs檔案進行加載入表,進行模組化處理

  5.最後将名額結果寫入到業務方,以es存儲

相信彭友們對這部分的流程比較清晰,其實這就屬于數倉開發流程和分層處理。

基本流程梳理完成後,如果直接按照該種模式每日處理如此大的資料量,必然會占用大量的資源,對其他的排程任務産生影響。接下來就要結合實際的需求進行詳細的設計。

「2.技術方案」

實戰再更新:流批一體處理百億級别資料

流程:

  1.業務側通過protobuf序列化将資料推送至kafka

  2.通過MLSQL開發udf,消費kafka資料反序列化,并進行去重處理(「這裡設定的是每3秒一個批次,這裡做了一次批内去重」)并将資料再次寫回kafka中

  3.logstash端消費kafka資料,并編寫filter邏輯,這裡涉及到外部接口調用的邏輯,将調用結果儲存至系統環境變量中(「考慮資料接口更新頻率和資料量,這裡每小時調用一次,另存入環境變量其實就是一個緩存,之是以沒有存入檔案,是因為logstash每次處理event都要對檔案進行操作,效率較低」),然後進行擷取,最後otut端将資料寫入hdfs中

  4.資料寫入hdfs後,加載到hive表中,進行模組化開發

  5.最後使用datax将名額統計結果寫入到es中供業務方使用

「3.問題引入」

雖然上述的方案能夠實作實時消息上百億的資料量,但未考慮到數倉側的計算壓力,目前生産上每日跑批任務有6000+個任務,如果夜間去計算50億+的資料量,将會把所有的資源全部占用,導緻其他任務一直阻塞,最後會影響重要任務産出(「别問我怎麼知道的,因為這是踩過的坑」)。是以接下來需要對該部分任務進行優化,盡量做到占用最小的資源以最快的時間産出。

性能優化

「1.消費優化,其目标:即做到消費不延遲」

  消費端的優化仍然沿用上篇的優化手段,唯一不同的地方在于logstash端涉及到的緩存問題,剛才上面也提及過需要調用外部接口來過濾一部分資料,但接口不可能是一直調用的,否則會對接口造成壓力,結合接口更新和資料特性,這裡每小時調用一次即可,那麼問題來了,調用的結果存儲到哪裡呢?

首先不能存儲到外部系統,一方面增加了強依賴,另一方面對外部系統也會有壓力負載,是以需要落入本地,之前采用過落入檔案的方式,然後ruby中對檔案進行操作,但是效果不佳(「這裡涉及到頻繁的檔案打開關閉和同步問題」),後采用環境變量的方式進行存儲,每次處理event的時候讀取環境變量字段即可。具體使用方式如下:

if not ENV["app_list"].nil?

   event["arrays"] = ENV["app_list"].split(" ")

   event.cancel if event["arrays"].include? event["service_name"]

   # 這裡的ENV["app_list"]就是讀取的環境變量app_list字段

end

「2.批處理優化,其目标:占用最少的資源花最小的時間産出」

1.mapper和reduce數量調整

  對于批處理的優化無非是對mapreduce的優化,由于logstash寫入的hdfs檔案是可切分的,是以産生的mapper數跟塊的個數有關系,但由于mapred.min.split.size參數在服務端已經固定配置了(固定256M),如果大于該值則會直接報錯。

是以如果檔案越大,那麼切分的mapper數也就越多,申請的資源也就越多,是以對于占用小量資源的優化,從源頭上就不可行。

2.存儲優化

  由于logstash輸出的格式是textfile的,如果直接對該種格式進行處理,将會占用大量的網絡資源,是以需要進行格式轉換和壓縮存儲。

這裡優化的手段是采用orc存儲,snappy格式壓縮(「相對于lz4壓縮,snappy是屬于不可切分的,那麼這也對mapper數量進行了控制」)。

create table if not exists tableA(

  id string,

  field1 string,

  field2 string

)

partitioned by (date_id string,hour string)

stored as orc

tblproperties ("orc.compress" = "SNAPPY");

3.改用執行引擎

基于第一點的優化思路,受限于源頭的檔案切分和可調參數導緻無法控制資源,是以這裡需要借用于分而治之的思想,即将每日的處理調整為每小時處理(「這裡隻是過濾對于該次需求無用的資料,不做每小時聚合的操作,即業務場景需要對全天的資料進行聚合處理」)。

為了盡可能使用較少時間執行,需要将小時處理的任務執行引擎由hive調整為sparksql執行,同時調整了以下幾個參數

--開啟動态分區

set spark.sql.auto.repartition=true;

set spark.shuffle.service.enabled=true;

set hive.exec.dynamic.partition.mode=nonstrict;

set hive.exec.dynamic.partition=true;

--開啟sparksql自适應,即不采用固定的最小分區,避免産生小檔案

set spark.sql.adaptive.enabled=true;

set spark.sql.autoBroadcastJoinThreshold=209715200;

set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=1024000000;

效果産出

基于以上的幾種優化手段,目前已經解決了前面提到的占用資源過多和産出時間過長的問題。

資源使用(隊列最大資源使用量) 産出時間
優化前 1200Cores+4293189M+Fair Scheduler 14803秒
優化後 100Cores+92752M+Fair Scheduler 3470秒
實戰再更新:流批一體處理百億級别資料

yarn隊列資源配置

實戰再更新:流批一體處理百億級别資料

排程任務執行情況

實戰再更新:流批一體處理百億級别資料

最終要聚合統計的資料量

待解決問題

雖然已經解決掉了占用資源多,産出時間長的問題,但是穩定性也是亟需要解決的問題。由于logstash消費端部署的機器配置各有差異,是以在寫入hdfs的時候及其不穩定,也容易導緻延遲産生。

後續消費這塊邏輯可能會遷移至flink來改造實作,目前這塊仍在內建開發階段中,待完善後會再次分享給彭友們。另如果彭友們如果有更好的解決方案,歡迎一起溝通讨論。

​​

實戰再更新:流批一體處理百億級别資料

繼續閱讀