天天看點

Apache Kyuubi 在B站大資料場景下的應用實踐

作者:閃念基因

本期作者

賈冬冬-哔哩哔哩資深開發工程師

陳昱康-哔哩哔哩技術專家

01 背景介紹

近幾年随着B站業務高速發展,資料量不斷增加,離線計算叢集規模從最初的兩百台發展到目前近萬台,從單機房發展到多機房架構。在離線計算引擎上目前我們主要使用Spark、Presto、Hive。架構圖如下所示,我們的BI、ADHOC以及DQC服務都是通過自研的Dispatcher路由服務來實作統一SQL排程,Dispatcher會結合查詢SQL文法特征、讀HDFS量以及目前引擎的負載情況,動态地選擇目前最佳計算引擎執行任務。如果使用者SQL失敗了會做引擎自動降級,降低使用者使用門檻;其中對于Spark查詢早期我們都是走STS,但是STS本身有很多性能和可用性上的問題,是以我們引入了Kyuubi,通過Kyuubi提供的多租戶、多引擎代理以及完全相容Hive Thrift協定能力,實作各個部門Adhoc任務的資源隔離和權限驗證。

Query查詢情況

目前在Adhoc查詢場景下,Spark sql占比接近一半,依賴Kyuubi對于Scala文法的支援,目前已經有部分進階使用者使用scala文法送出語句執行,并且可以在SQL和Scala模式做自由切換,這大大豐富了adhoc的使用場景。

Apache Kyuubi 在B站大資料場景下的應用實踐

02 Kyuubi應用

Kyuubi 是網易團隊貢獻給 Apache 社群的開源項目。Kyuubi 主要應用在大資料領域場景,包括大資料離線計算、adhoc、BI等方向。Kyuubi 是一個分布式、支援多使用者、相容 JDBC 或 ODBC 的大資料處理服務。

為目前熱門的計算引擎(例如Spark、Presto或Flink等)提供SQL等查詢服務。

我們選擇Kyuubi的原因:

1. 完全相容Hive thrift 協定,符合B站已有的技術選型。

2. 高可用和資源隔離,對于大規模的生産環境必不可少。

3. 靈活可擴充,基于kyuubi可以做更多适配性開發。

4. 支援多引擎代理,為未來統一計算入口打下基礎。

5. 社群高品質實作以及社群活躍。

Kyuubi 的架構可以分成三個部分:

1. 用戶端:使用者使用jdbc或者restful協定來送出作業擷取結果。

2. kyuubi server:接收、管理和排程與用戶端建立的Kyuubi Session,Kyuubi Session最終被路由到實際的引擎執行。

3. kyuubi engine:接受處理 kyuubi server發送過來的任務,不同engine有着不同的實作方式。

Apache Kyuubi 在B站大資料場景下的應用實踐

03 基于Kyuubi的改進

Kyuubi已經在B站生産環境穩定運作一年以上,目前所有的Adhoc查詢都通過kyuubi來接入大資料計算引擎。 在這一年中我們經曆了兩次大版本的演進過程,從最初kyuubi 1.3到kyuubi 1.4版本,再從kyuubi 1.4更新kyuubi 1.6版本。與之前的STS相比,kyuubi在穩定性和查詢性能方面有着更好的表現。在此演進過程中,我們結合B站業務以及kyuubi功能特點,對kyuubi進行部分改造。

3.1 增加QUEUE模式

Kyuubi Engine原生提供了CONNECTION、USER、GROUP和SERVER多種隔離級别。在B站大資料計算資源容量按照部門劃分,不同部門在Yarn上對應不同的隊列。我們基于GROUP模式進行了改造,實作Queue級别的資源隔離和權限控制。

使用者資訊和隊列的映射由上層工具平台統一配置和管理,Kyuubi隻需關心上遊Dispatcher送出過來user和queue資訊,進行排程并分發到對應隊列的spark engine上進行計算。目前我們有20+個adhoc隊列,每個隊列都對應一個或者多個Engine執行個體(Engine pool)。

Apache Kyuubi 在B站大資料場景下的應用實踐

3.2 在QUEUE模式下支援多租戶

kyuubi server端由超級使用者Hive啟動,在spark場景下driver和executor共享同一個的使用者名。不同的使用者送出不同的sql, driver端和executor端無法區分目前的任務是由誰送出的,在資料安全、資源申請和權限通路控制方面都存在着問題。

針對該問題,我們對以下幾個方面進行了改造。

3.2.1 kyuubi server端

1. kyuubi server以hive principal身份啟動。

2. Dispatcher以username proxyUser身份送出SQL。

3.2.2 spark engine端

1. Driver和Executor以hive身份啟動。

2. Driver以username proxyUser身份送出SQL。

3. Executor啟動Task線程需要以username proxyUser身份執行Task。

4. 同時需要保證所有的公共線程池,綁定的UGI資訊正确。如ORC Split線程池上,當Orc檔案達到一定數量會啟用線程池進行split計算,線程池是全局共享,永久綁定的是第一次觸發調用的使用者UGI資訊,會導緻使用者UGI資訊錯亂。

Apache Kyuubi 在B站大資料場景下的應用實踐

3.3 kyuubi engine UI 展示功能

在日常使用中我們發現 kyuubi 1.3 Engine UI頁面展示不夠友好。不同的使用者執行不同的SQL無法區分的開,session 、job、stage、task無法關聯的起來。

導緻排查定位使用者問題比較困難,我們借鑒STS拓展了kyuubi Engine UI頁面。我們對以下幾個方面進行了改造。

1. 自定義kyuubi Listener監聽Spark Job、Stage、Task相關事件以及SparkSQL相關事件:SessionCreate、SessionClose、executionStart、executionRunning、executionEnd等

2. Engine執行SQL相關操作時,綁定并發送相關SQL Event,構造SQL相關狀态事件,将采集的Event進行狀态分析、彙總以及存儲。

3. 自定義Kyuubi Page進行Session以及SQL相關狀态實時展示。

Session Statistics資訊展示

Apache Kyuubi 在B站大資料場景下的應用實踐

SQL Statistics資訊展示

Apache Kyuubi 在B站大資料場景下的應用實踐

3.4 kyuubi支援配置中心加載Engine參數

為了解決隊列之間計算資源需求的差異性,如任務量大的隊列需要更多計算資源(Memory、Cores),任務量小的隊列需要少量資源,每個隊列需求的差異,我們将所有隊列的Engine相關資源參數統一到配置中心管理。每個隊列第一次啟動Engine前,将查詢自己所屬隊列的參數并追加到啟動指令中,進行參數的覆寫。

Apache Kyuubi 在B站大資料場景下的應用實踐

3.5 Engine執行任務的

進度顯示與消耗資源上報功能

任務在執行過程中,使用者最關心的就是自己任務的進度以及健康狀況,平台比較關心的是任務所消耗的計算資源成本。我們在Engine端,基于事件采集user、session、job、stage資訊并進行存儲,啟動定時任務将收集的user、session、job、stage資訊進行關聯并進行資源消耗成本計算,并将結果注入對應operation log中, 回傳給前端日志展示。

任務進度資訊展示

Apache Kyuubi 在B站大資料場景下的應用實踐

查詢消耗資源上報展示

Apache Kyuubi 在B站大資料場景下的應用實踐

04 Kyuubi穩定性建設

4.1 大結果集溢寫到磁盤

在adhoc 場景中使用者通常會拉取大量結果到 driver 中,同一時間大量的使用者同時拉取結果集,會造成大量的記憶體消耗,導緻spark engine記憶體緊張,driver性能下降問題,直接影響着使用者的查詢體驗,為此專門優化了driver fetch result 的過程,在擷取結果時會實時監測driver記憶體使用情況,當driver記憶體使用量超過門檻值後會先将拉取到的結果直接寫出到本地磁盤檔案中,在使用者請求結果時再從檔案中分批讀出傳回,增加driver的穩定性。

Apache Kyuubi 在B站大資料場景下的應用實踐

4.2 單個 SQL 的task并發數 、

執行時間和 task 數量的限制

在生産過程中,我們經常性的遇到單個大作業直接占用了整個Engine的全部計算資源,導緻短作業長時間得不到計算資源,一直 pending的情況,為了解決這種問題我們對以下幾個方面進行優化。

  • Task并發數方面:預設情況下Task排程時隻要有資源就會全部排程配置設定出去,後續SQL過來就面臨着完全無資源可用的情況,我們對單個SQL參與排程的task數進行了限制,具體的限制數随着可用資源大小進行動态調整。
  • 單個 SQL 執行時間方面:上層Dispatcher和下層Engine都做了逾時限制,規定adhoc任務超過1小時,就會将該任務kill掉。
  • 單個Stage task數量方面:同時我們也對單個stage的task數進行限制,一個stage最大允許30W個task。

4.3 單次 table scan 的檔案數和大小的限制

為保障kyuubi的穩定性,我們對查詢資料量過大的SQL進行限制。通過自定義外部optimization rule(TableScanLimit)來達到目的。TableScanLimit比對LocalLimit,收集子節點project、filter。比對葉子結點HiveTableRelation和HadoopFsRelation。即比對Hive表和DataSource表的Logical relation,針對不同的表采取不同的計算方式。

1. HiveTableRelation:

  • 非分區表, 通過table meta 拿到表的totalSize、numFiles、numRows值。
  • 分區表,判斷是否有下推下來的分區。若有,則拿對應分區的資料 totalSize、numFiles、numRows。若沒有,則拿全表的資料。

2. HadoopFsRelation:判斷partitionFilter是否存在動态filter

  • 不存在,則通過partitionFilter得到需要掃描的分區
  • 存在,則對partitionFilter掃描出來的分區進一步過濾得到最終需要掃描的分區
Apache Kyuubi 在B站大資料場景下的應用實踐

擷取到SQL查詢的dataSize、numFiles、numRows後, 還需要根據表存儲類型、不同字段的類型、是否存在limit、在根據下推來的project、filter 得出最終需要掃描的列,估算出需要table scan size,如果table scan size超過制定門檻值則拒絕查詢并告知原因。

4.4 危險join condition發現&Join膨脹率的限制

4.4.1 危險join condition發現

為保障kyuubi的穩定性,我們也對影響Engine性能的SQL進行限制。使用者在寫sql時可能并不了解spark對于join的底層實作,可能會導緻程式運作的非常慢甚至OOM,這個時候如果可以為使用者提供哪些join condition可能是導緻engine運作慢的原因,并提醒使用者改進和友善定位問題,甚至可以拒絕這些危險的query送出。

在選擇 join 方式的時候如果是等值 join 則按照 BHJ,SHJ,SMJ 的順序選擇,如果還沒有選擇join type則判定為 Cartesian Join,如果 join 類型是InnerType的就使用 Cartesian Join,Cartesian Join會産生笛卡爾積比較慢,如果不是 InnerType,則使用 BNLJ,在判斷 BHJ 時,表的大小就超過了broadcast 門檻值,是以将表broadcast出去可能會對driver記憶體造成壓力,性能比較差甚至可能會 OOM,是以将這兩種 join 類型定義為危險 join。

如果是非等值 join 則隻能使用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 時選不出 build side 說明兩個表的大小都超過了broadcast門檻值,則使用Cartesian Join,如果Join Type不是 InnerType 則隻能使用 BNLJ,是以Join政策中選擇Cartesian Join和第二次選擇 BNLJ 時為危險 join。

Apache Kyuubi 在B站大資料場景下的應用實踐

4.4.2 Join膨脹率的限制

在shareState 中的 statusScheduler 用于收集 Execution 的狀态和名額,這其中的名額就是按照 nodes 彙總了各個 task 彙報上來的 metrics,我們啟動了一個 join檢測的線程定時的監控 Join 節點的 "number of output rows"及Join 的2個父節點的 "number of output rows" 算出該 Join 節點的膨脹率。

Apache Kyuubi 在B站大資料場景下的應用實踐

Join 節點的膨脹檢測:

05 kyuubi 新應用場景

5.1 大查詢connection&scala模式的使用

5.1.1 connection模式的使用

adhoc大任務和複雜的SQL會導緻kyuubi engine在一定時間内性能下降,嚴重影響了其他正常的adhoc任務的執行效率。我們在adhoc前端開放了大查詢模式,讓這些複雜、查詢量大的任務走kyuubi connection模式。在kyuubi connection模式下一個使用者任務單獨享有自己申請的資源,獨立的Driver,任務的大小快慢都由自身的SQL特征決定,不會影響到其他使用者的SQL任務,同時我們也會适當放開前面一些限制條件。

connection 模式在B站的使用場景:

  1. table scan判定該adhoc任務為大任務,執行時間超過1個小時。
  2. 複雜的SQL任務, 該任務存在笛卡爾積或Join膨脹超過門檻值。
  3. 單個SQL單個stage的task數超過30W。
  4. 使用者自行選擇connection模式。
Apache Kyuubi 在B站大資料場景下的應用實踐

5.1.2 scala模式的使用

SQL模式可以解決大資料80%的業務問題,SQL模式加上Scala模式程式設計可以解決99%的業務問題;SQL是一種非常使用者友好的語言,使用者不用了解Spark内部的原理,就可以使用SQL進行複雜的資料處理,但是它也有一定的局限性。

SQL模式不夠靈活,無法以dataset以及rdd兩種方式進行資料處理操作。無法處理更加複雜的業務,特别是非資料處理相關的需求。另一方面,使用者執行scala code項目時必須打包項目并送出到計算叢集,如果code出錯了就需來回打包上傳,非常的耗時。

Scala模式可以直接送出code,類似Spark互動式Shell,簡化流程。針對這些問題, 我們将SQL模式、Scala模式的優點結合起來,兩者進行混合程式設計,這樣基本上可以解決資料分析場景下大部分的case。

Apache Kyuubi 在B站大資料場景下的應用實踐

5.2 Presto on spark

Presto為了保證叢集的穩定性,每個Query的最大記憶體進行了限制,超過配置記憶體的Query會被Presto oom kill掉。部分ETL任務會出現随着業務增長,資料量增大,占用記憶體也會增多,當超過門檻值後,流程就會出現失敗。

為了解決這個問題,prestodb社群開發了一個presto on spark的項目,通過将query送出到Spark來解決query的記憶體占用過大導緻的擴張性問題,但是社群方案對于已經存在的查詢并不是很友好,使用者的送出方式有presto-cli、pyhive等方式,而要使用Presto on spark項目,則必須通過spark-submit方式将query送出到yarn。

為了讓使用者無感覺的執行presto on spark查詢,我們在presto gateway上做了一些改造,同時借助kyuubi restulful的接口,和service + engine的排程能力,在kyuubi内開發了Presto-Spark Engine,該engine能夠比較友好的來送出查詢到Yarn。

Apache Kyuubi 在B站大資料場景下的應用實踐

主要實作細節如下:

1. presto gateway将query的執行曆史進行儲存,包括query的資源使用情況、報錯資訊等。

2. presto gateway請求HBO服務,判斷目前query是否需要通過presto on spark送出查詢。

3. presto gateway通過zk擷取可用的kyuubi server清單,随機選擇一台,通過http向kyuubi open一個session。

4. presto gateway根據擷取到的sessionHandle資訊,再送出語句。

5. kyuubi server接收到query後,會啟動一個獨立的Presto-Spark Engine,建構啟動指令,執行指令送出spark-submit 到yarn。

6. Presto gateway根據傳回的OperatorHandle資訊, 通過http不斷擷取operation status。

7. 作業成功,則通過fetch result請求将結果擷取并傳回給用戶端。

06 kyuubi部署方式

6.1 Kyuubi server接入K8S

整合 Engine on yarn label的實踐

生産實踐中遇到的問題:

1. 目前kyuubi server/engine部署在混部叢集上,環境複雜,各元件環境互相依賴、釋出過程中難免會存在環境不一緻、誤操作等問題,進而導緻服務運作出錯。

2. 資源管理問題。最初engine使用的是client模式,不同的隊列的engine driver使用的都是大記憶體50g-100g不等 ,同時AM、NM 、DN、kyuubi server都共享着同一台實體機器上的資源,當AM啟動過多, 占滿整個機器的資源,導緻機器記憶體不足,engine無法啟動。

針對于該問題,我們研發了一套基于Queue模式資源配置設定排程實作:每個kyuubi server 和 spark engine在znode上都記錄着目前資源使用情況。每個kyuubi server znode資訊:目前kyuubi注冊SparkEngine數量、目前kyuubi server注冊SparkEngine執行個體、kyuubi server記憶體總大小以及目前kyuubi server剩餘記憶體總大小等。

每個engine znode資訊:所屬kyuubi server IP/端口、目前SparkEngine記憶體、目前SparkEngine所屬隊列等 。每次Spark engine的啟動/退出,都會擷取該隊列的目錄鎖,然後對其所屬的kyuubi server進行資源更新操作。kyuubi server如果當機,在啟動時,周遊擷取所有engine在znode的資訊,進行資源和狀态的快速恢複。

3. 針對資源管理功能也存在着一些問題: 資源碎片化問題、新功能的拓展不友好以及維護成本大。Engine使用的是client模式,過多大記憶體的AM會占用用戶端的過多計算資源,導緻engine水準拓展受限。

針對以上提出的問題,我們做了對應的解決方案:

1. kyuubi server接入k8s

我們指定了一批機器作為kyuubi server在k8s上排程資源池,實作kyuubi server環境、資源的隔離。實作了kyuubi server快速部署、提高kyuubi server水準擴充能力,降低了運維成本。

Apache Kyuubi 在B站大資料場景下的應用實踐
Apache Kyuubi 在B站大資料場景下的應用實踐

2. Engine on yarn label

我們将kyuubi engine資源管理交給yarn,由yarn負責engine的配置設定和排程。我們采用了cluster模式以防engine在水準拓展時受到資源限制。采用cluster模式後,我們遇到了新的問題:在queue模式下engine driver使用的都是50g-100g不等的大記憶體,但是由于yarn叢集的配置限制,能夠申請的最大Container資源量為<28G, 10vCore>。為了在cluster模式的情況下保證Driver能夠擷取到足夠的資源,我們改造了yarn以适應此類場景。我們将需求拆分為以下三項:

  • 将kyuubi Driver放置于獨立的Node Label中,該Node Label中的伺服器由kyuubi driver獨立使用;
  • kyuubi Executor仍然放置在Default Label的各對應隊列的adhoc葉子隊列内,承接adhoc任務處理工作;
  • Driver申請的資源需要大于MaxAllocation,即上文所述的<28G, 10vCore>。希望能夠根據Node Label動态設定Queue級别的MaxAllocation,使得kyuubi Driver能夠獲得較大資源量。

首先,我們在yarn上建立了kyuubi_label,并在label内與Default Label映射建立kyuubi隊列,以供所有的Driver統一送出在kyuubi隊列上。并通過“spark.yarn.am.nodeLabelExpression=kyuubi_label”指定Driver送出至kyuubi_label,通過“spark.yarn.executor.nodeLabelExpression= ”指定Executor送出至default label,實作如下的效果:

Apache Kyuubi 在B站大資料場景下的應用實踐

其次,我們将yarn的資源最大值由原先的“叢集”級别管控下放至“隊列+Label”級别管控,通過調整"queue name + kyuubi_label"的Conf,我們能夠将Driver的Container資源量最大值提高至<200G, 72vCore>,且保證其他Container的最大值仍為<28G, 10vCore>。同樣申請50G的Driver,在default叢集中會出現失敗提示:

Apache Kyuubi 在B站大資料場景下的應用實踐

而在kyuubi_lable的同隊列下則能夠成功運作, 這樣我們既借助了yarn的資源管控能力,又保證了kyuubi driver獲得的資源量。

07 未來規劃

1. 小的ETL任務接入kyuubi,減少ETL任務資源申請時間

2. Kyuubi Engine(Spark和Flink)雲原生,接入K8S統一排程

3. Spark jar 任務也統一接入Kyuubi

作者:賈冬冬&陳昱康

來源:微信公衆号:哔哩哔哩技術

出處:https://mp.weixin.qq.com/s/ojhRzLhrGTyU0H4DjJOFDA