編 輯:LCP
彭友們好,我是你的老朋友。最近業務方提出了新的需求,增加了些業務邏輯,同時資料量也成倍增加,而且還要求每日産出名額結果,之前出過一個解決方案,效果還不錯:
調整前資料量 | 調整後資料量 | |
生産端 | 每日20億左右 | 每日500億左右 |
輸出端 | 減半,10億左右 | 每日50億+ |
考慮到投入産出比,該需求仍然采用來原來老的技術方案設計,隻是做了些優化手段。具體使用到的技術:Java,Kafka,MLSQL,Logstash,Ruby,Hive,ES,SparkSQL,Datax。
方案設計
「1.資料流向」
流程:
1.業務方将資料推送至MQ,并将消息進行序列化處理
2.通過流平台接入消費消息,并進行一部分邏輯處理,再次回轉到MQ中
3.使用logstash消費消息,編寫ruby進行邏輯處理,将資料寫入hdfs
4.數倉對hdfs檔案進行加載入表,進行模組化處理
5.最後将名額結果寫入到業務方,以es存儲
相信彭友們對這部分的流程比較清晰,其實這就屬于數倉開發流程和分層處理。
基本流程梳理完成後,如果直接按照該種模式每日處理如此大的資料量,必然會占用大量的資源,對其他的排程任務産生影響。接下來就要結合實際的需求進行詳細的設計。
「2.技術方案」
流程:
1.業務側通過protobuf序列化将資料推送至kafka
2.通過MLSQL開發udf,消費kafka資料反序列化,并進行去重處理(「這裡設定的是每3秒一個批次,這裡做了一次批内去重」)并将資料再次寫回kafka中
3.logstash端消費kafka資料,并編寫filter邏輯,這裡涉及到外部接口調用的邏輯,将調用結果儲存至系統環境變量中(「考慮資料接口更新頻率和資料量,這裡每小時調用一次,另存入環境變量其實就是一個緩存,之是以沒有存入檔案,是因為logstash每次處理event都要對檔案進行操作,效率較低」),然後進行擷取,最後otut端将資料寫入hdfs中
4.資料寫入hdfs後,加載到hive表中,進行模組化開發
5.最後使用datax将名額統計結果寫入到es中供業務方使用
「3.問題引入」
雖然上述的方案能夠實作實時消息上百億的資料量,但未考慮到數倉側的計算壓力,目前生産上每日跑批任務有6000+個任務,如果夜間去計算50億+的資料量,将會把所有的資源全部占用,導緻其他任務一直阻塞,最後會影響重要任務産出(「别問我怎麼知道的,因為這是踩過的坑」)。是以接下來需要對該部分任務進行優化,盡量做到占用最小的資源以最快的時間産出。
性能優化
「1.消費優化,其目标:即做到消費不延遲」
消費端的優化仍然沿用上篇的優化手段,唯一不同的地方在于logstash端涉及到的緩存問題,剛才上面也提及過需要調用外部接口來過濾一部分資料,但接口不可能是一直調用的,否則會對接口造成壓力,結合接口更新和資料特性,這裡每小時調用一次即可,那麼問題來了,調用的結果存儲到哪裡呢?
首先不能存儲到外部系統,一方面增加了強依賴,另一方面對外部系統也會有壓力負載,是以需要落入本地,之前采用過落入檔案的方式,然後ruby中對檔案進行操作,但是效果不佳(「這裡涉及到頻繁的檔案打開關閉和同步問題」),後采用環境變量的方式進行存儲,每次處理event的時候讀取環境變量字段即可。具體使用方式如下:
if not ENV["app_list"].nil?
event["arrays"] = ENV["app_list"].split(" ")
event.cancel if event["arrays"].include? event["service_name"]
# 這裡的ENV["app_list"]就是讀取的環境變量app_list字段
end
「2.批處理優化,其目标:占用最少的資源花最小的時間産出」
1.mapper和reduce數量調整
對于批處理的優化無非是對mapreduce的優化,由于logstash寫入的hdfs檔案是可切分的,是以産生的mapper數跟塊的個數有關系,但由于mapred.min.split.size參數在服務端已經固定配置了(固定256M),如果大于該值則會直接報錯。
是以如果檔案越大,那麼切分的mapper數也就越多,申請的資源也就越多,是以對于占用小量資源的優化,從源頭上就不可行。
2.存儲優化
由于logstash輸出的格式是textfile的,如果直接對該種格式進行處理,将會占用大量的網絡資源,是以需要進行格式轉換和壓縮存儲。
這裡優化的手段是采用orc存儲,snappy格式壓縮(「相對于lz4壓縮,snappy是屬于不可切分的,那麼這也對mapper數量進行了控制」)。
create table if not exists tableA(
id string,
field1 string,
field2 string
)
partitioned by (date_id string,hour string)
stored as orc
tblproperties ("orc.compress" = "SNAPPY");
3.改用執行引擎
基于第一點的優化思路,受限于源頭的檔案切分和可調參數導緻無法控制資源,是以這裡需要借用于分而治之的思想,即将每日的處理調整為每小時處理(「這裡隻是過濾對于該次需求無用的資料,不做每小時聚合的操作,即業務場景需要對全天的資料進行聚合處理」)。
為了盡可能使用較少時間執行,需要将小時處理的任務執行引擎由hive調整為sparksql執行,同時調整了以下幾個參數
--開啟動态分區
set spark.sql.auto.repartition=true;
set spark.shuffle.service.enabled=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
--開啟sparksql自适應,即不采用固定的最小分區,避免産生小檔案
set spark.sql.adaptive.enabled=true;
set spark.sql.autoBroadcastJoinThreshold=209715200;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=1024000000;
效果産出
基于以上的幾種優化手段,目前已經解決了前面提到的占用資源過多和産出時間過長的問題。
資源使用(隊列最大資源使用量) | 産出時間 | |
優化前 | 1200Cores+4293189M+Fair Scheduler | 14803秒 |
優化後 | 100Cores+92752M+Fair Scheduler | 3470秒 |
yarn隊列資源配置
排程任務執行情況
最終要聚合統計的資料量
待解決問題
雖然已經解決掉了占用資源多,産出時間長的問題,但是穩定性也是亟需要解決的問題。由于logstash消費端部署的機器配置各有差異,是以在寫入hdfs的時候及其不穩定,也容易導緻延遲産生。
後續消費這塊邏輯可能會遷移至flink來改造實作,目前這塊仍在內建開發階段中,待完善後會再次分享給彭友們。另如果彭友們如果有更好的解決方案,歡迎一起溝通讨論。