本場視訊連結: EMR打造高效雲原生資料分析引擎 本場ppt材料: https://www.slidestalk.com/AliSpark/2019___0926_110365
基于開源體系打造雲上資料分析平台
客戶選擇開源方案的原因主要有以下幾點:
• 靈活多樣的業務場景:目前即便是一個小企業,其資料存儲也可能是多種多樣的,比如業務資料、日志資料和圖資料等,這種情況下,需要有一個高度定制化的系統來串聯不同的業務場景;
• 自己有專業的運維能力:開源系統有充足的人才儲備,豐富的網上資料與開源的強大後盾,可以確定公司業務的順利開展;
• 多種業務需求vs成本壓力:每種雲上産品有自己的使用場景,對于中小企業來說購買多種雲産品将會造成很大的成本壓力,而通過開源體系維護一套系統,在內建使用者業務中所需元件的同時,可以降低使用者的成本。

下圖是阿裡巴巴EMR系統的産品架構圖。使用者上雲的方式主要有兩種,一種是購買ECS資源自己搭建一套開源系統;另一種是直接選擇阿裡巴巴的EMR系統。第一種方式由于開源系統元件多,涉及到了Spark、Hive、Flink和TensorFlow等,從零搭建一套完整的大資料系統對于使用者來講非常複雜,尤其是成百上千的叢集規模也給運維造成了很大的挑戰。而使用EMR系統具有以下優點:
1) 阿裡雲EMR系統可以幫助使用者一鍵化自動部署、配置相關元件,開箱即用,同時還會根據使用者的機器類型進行參數的自動推薦調優。
2) 阿裡雲EMR系統與阿裡雲其他産品實作了打通,比如資料存放在OSS,EMR系統無需額外再做認證配置,便可以很友善地讀取OSS上的資料;
3) 阿裡雲EMR系統內建了很多自研插件,這些插件在其他産品中是沒有的;
4) 阿裡雲EMR系統的所有元件相容開源但優于開源,如Flink內建了阿裡雲自研的Blink和TensorFlow(PAI),這也是阿裡雲為社群做的一點貢獻,目的是為了讓使用者能用到阿裡雲内部的技術;
5) 阿裡雲EMR系統提供了全平台的作業診斷與告警元件APM來實作自動化運維,大大降低叢集運維的複雜性;
6) 阿裡雲EMR系統還與DataWorks對接,使用者可以以DataWorks為入口,傻瓜式地使用EMR系統。
EMR系統的目标主要有以下三個:
• 平台化:将EMR做成一個統一的雲上資料分析平台,幫助使用者打造全棧式的大資料解決方案,支援全系列VM容器化,提供企業級HAS和大資料APM;
• 技術社群&深度:持續深耕技術社群,打造大資料友好的雲 Native 存儲,同時将技術回饋給社群,為社群做貢獻;
• 生态:EMR系統将結合阿裡雲其他産品建構一個生态,接入Blink、PAI,內建OSS、OTS方案。
EMR-Jindo:雲原生高效資料分析引擎
下圖展示了TPC-DS的基準測試報告,可以發現在2019年3月份10TB的測試中,性能名額得分是182萬左右,成本是0.31 USD;而2019年十月份同樣的測試性能名額得分已經變成526萬,成本下降到0.53 CNY,也就是說經過半年左右性能提升了2.9倍,成本縮減到原來的四分之一。同時阿裡巴巴還成為了首個送出TPC-DS測試100TB測試報告的廠商。這些成績的背後是EMR-Jindo引擎的支援。
EMR-Jindo引擎架構主要分為兩部分:
• Jindo-Spark:EMR内部全面優化的Spark高效計算引擎,可以處理多種計算任務;
• Jindo-FS:自研的雲原生存儲引擎,相容開源HDFS的接口,兼顧性能與價格。
1) Jindo-Spark
Jindo-Spark高效計算引擎對Spark采取了一系列優化措施,比如Runtime Filter支援自适應的運作時資料裁剪;Enhanced Join Reorder來解決外連接配接重排等問題;TopK支援推理并下推 TopK 邏輯,幫助盡早地過濾資料;File Index支援檔案級别過濾和min/max/bloom/倒排等;自研開發了Relational Cache,實作使用一套引擎就可以将查詢從分鐘級提升為亞秒級;針對特定的場景推出Spark Transaction功能,為Spark引入Full ACID支援;實作了Smart Shuffle功能,從底層來減少sort-merge 次數,提升Shuffle的效率。
• Runtime Filter:
類似于Spark中的Dynamic Partition Pruning(DPP),但是其比DPP功能更強大。除了DPP能處理的分析表之外,Runtime Filter還可以處理非分析表。其基本原理是運作時動态裁剪資料,避免不必要的計算。比如,面對一個join查詢,無法通過value下推到存儲層而将資料過濾,邏輯推算的時候無法預知最後的資料量級。這種情況下如果是分析表,Runtime Filter首先會估計其中一個表中參與join操作的資料量,如果資料量較小,則提前進行資料篩選,再推送到另一側做資料過濾;而對于非分析表,會引入Filter,如BloomFilter獲得Min或Max的統計資訊,
根據這些統計資訊,将備選資料比較少的一側提取出來,推到另一側進行過濾。Runtime Filter的成本很小,隻需要在優化器中進行簡單評估,卻可以帶來顯著的性能提升。如下圖所示,Runtime Filter實作了35%左右的整體性能提升。該特性已經在Spark送出了PR(SPARK-27227)。
• Enhanced Join Recorder:
大家都知道,算子執行順序可能會極大地影響sql的執行效率,這種情況下優化的核心原則是改變算子的執行順序,盡早地過濾資料。
比如下圖左上角的例子中,如果最底層兩個表非常大的話,則這兩張表join的開銷會非常大,join後的大資料再去join小表,大資料一層一層地傳遞下去,就會影響整個流程的執行效率。此時,優化的思想是先将大表中一些無關的資料過濾掉,減少往下遊傳遞的資料量。針對該問題,Spark使用的是動态規劃算法,但其隻适用于表的數量比較少的情況,如果表的數量大于12,該算法就束手無策。面對表的數量比較多的情況,EMR提供了多表join的遺傳算法,其可以将原來的動态規劃算法的2n的複雜度降到線性的量級,能完成成百上千張表的join。
下圖右上角可以看到,Query64有18個表參與join,動态規劃算法優化時間就需要耗費1400秒,而多表join的遺傳算法僅需要20秒左右就可完成。Join Recorder另外一個重要的功能是外連接配接重排算法,大家都知道sql中外連接配接不能随意交換順序的,但這并不代表不能交換順序,比如A left join B, 然後再left join C,事實上在某種條件下其順序是可交換的。在Spark中,外連接配接的優化是直接被放棄掉,而EMR則根據現有研究找到了順序可交換的充分必要條件,實作了外連接配接重排算法(如下圖左下角所示),對外連接配接的執行效率有了質的提升(下圖右下角)
• Relational Cache:
Spark原本的Cache存在幾個局限點,其一Spark的Cache是session級别,如果發現某一個Query的片段使用比較頻繁,就會對為這個session建立一個cache,但是session結束後,cache就會消失;其二Spark的Cache是存儲在本機上,而不是分布式存儲,是以無法做到通用。在此基礎上,EMR平台實作了Relational Cache,對任意Spark表,視圖或者Dataset等關系型資料抽象的資料實體都建立cache, 類似于物化視圖(Materialized View),但是比物化視圖功能要豐富。Relational Cache的使用場景包括a)亞秒級響應MOLAP引擎;b)互動式BI,Dashboard;c)資料同步;d)資料預組織。
Relational Cache的建立過程如下,其文法與Spark sql常見的DDL類似。首先CACHE一個表或視圖,然後指定Relational Cache的更新政策(DEMAND或COMMIT)、是否用于後續優化、Cache資料的存儲方式以及Cache的視圖邏輯。Relational Cache支援cache任意Table、View,支援cache到記憶體、HDFS、OSS等任意資料源,JSON、ORC、Parquet等任意資料格式。
Relational Cache還支援對使用者輸入的sql的優化。原來的Spark sql Cache對于使用者輸入的sql優化非常僵硬死闆,使用者輸入的sql必須精确比對上Cache才能使用。而Relational Cache則完全不同,如果有a、b、c、d四個表join的cache,當又有a、b、e三個表join的情況下,a、b join的結果便可以從四個表join時生成的Cache資料中讀取。下圖中右側展示了Cache和沒有Cache的基準測試結果,可以看出Relational Cache可以保證測試的響應時間在亞秒級。
請參考
Spark Relational Cache實作亞秒級響應的互動式分析• Spark Transaction:有些使用者可能會使用Hive表,Hive表有事務支援,然而Spark在事務這一項上是不相容Hive的。是以,為了滿足使用者資料訂正/删除以及資料流導入的場景支援,EMR平台提供了Spark Transaction支援事務的ACID支援。
傳統的資料導入是分批的,比如一天一導入,而流資料導入場景下資料是實時寫入的原始資料,并未經過任何處理,是以會有delete和update的需求。Spark Transaction整體來講是一種鎖+MVCC的實作形式,MVCC與底層的存儲密不可分。大資料在Hive和Spark相容的情況下,都是檔案的形式存在目錄中,檔案的版本通過行來控制,寫入的每一行都會加上Meta Columns,如op、original_write-id、bucket id和row_id等,來辨別這是全表唯一的一行。當需要更新某一行的時候,并不會原地更新該行,而是将該行取出來,重寫後産生新的版本進行存儲。讀取的時候,多版本會進行合并後傳回給使用者。
###2) Jindo-FS
EMR早期推出了一種本地盤機型,使用這種機型來部署叢集類似于用本地叢集在雲下部署大資料發行版,價格較高;此外由于當時HDFS有中繼資料瓶頸,本地存儲的動态化伸縮面臨很大的挑戰。針對這方面的問題,解決的方案是計算與存儲分離,将資料存儲在OSS上,但是這種分離帶來的直接結果就是性能變差,因為OSS中繼資料操作耗時,讀取資料跨網絡,傳輸帶寬也會嚴重影響性能。
進而的解決方案是将資料從遠端拉取到計算側進行緩存,這也是Jindo-FS做的事情。Jindo-FS是類似于HDFS的系統,其架構也類似于HDFS的Master-Slave架構,分成Name Service 和Storage Service。它支援将某些通路頻率比較高的表可以放到RocksDB中進行多級緩存。Jindo-FS整體不同于HDFS的Master結點, Jindo-FS的“Master”(Name Service)是一個分布式叢集,使用raft 協定,提供入口服務;提供多Name Space支援;中繼資料以kv形式存放于高性能kv store 中;因為其本身不存儲資料,真實資料在OSS和OTS中,是以支援資料的彈性擴充和銷毀重建。
Jindo-FS底層的中繼資料管理會将資料拆成一系列的kv,通過遞增的id來逐層查詢。如/home/Hadoop/file1.txt需要讀三次OTS。下圖右側的測試結果說明Jindo-FS在中繼資料操作方面相對于OSS有較好的性能提升。
Jindo-FS使用Storage Service來進行底層存儲,在寫流程中Storage Service将要寫的檔案同時存儲到本地和OSS中,然後再傳回給使用者寫的結果,同時還會在叢集結點内進行多副本傳輸;而讀操作和HDFS類似,如果命中本地,則在本地存儲中讀取,否則要進行遠端讀取。Storage Service具備高性能、高可靠、高可用、彈性存儲等特性,為了支援高性能,Jindo-FS建立了資料流高速通道,同時還有一系列的政策,如減少記憶體拷貝次數等。
Jindo-FS中Name Service如何實作高可靠、如何進行熱點資料發現與緩存替換、塊存儲模式與緩存模式;以及Storage Service如何應對讀寫失敗、資料塊如何設計并存儲、如何實作高速資料通道等問題,請參見大資料生态專場《
雲上大資料的高效能資料庫的存儲方案》的分享。
相關文章:
JindoFS概述:雲原生的大資料計算存儲分離方案 JindoFS解析 - 雲上大資料高性能資料湖存儲方案阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區數個Spark技術同學每日線上答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!