1 離線排程系統
在整個大資料體系中,在原始資料被采集之後,需要使用各種邏輯進行整合和計算之後才能輸出實際有效的資料,才能最終用于商業目的,實作大資料的價值。在整個處理流程中,無論是抽取、轉換、裝載(ETL)的這些過程,還是資料使用者分析處理過程,都是需要包含衆多的處理任務,而且這些任務都不是孤立的,而是存在互相依賴和限制關系的。如何高效的排程和管理這些任務是非常關鍵的,影響到各個流程中的資料的及時性和準确性。在這個過程中任務的高效管理和排程是非常關鍵的,會影響到各個流程中的資料的及時性和準确性。
圖1:資料ETL過程
一個最簡單的任務排程系統莫過于Linux系統自帶的crontab,使用簡單,運作穩定。
圖2:linux的cron定時任務
在項目剛起步時使用crontab無可厚非,随着排程任務的增多,互相之間又有着依賴,crontab就遠遠滿足不了開發的需求了。這些任務的形态各種各樣,任務之間也存在多種多樣的依賴關系。一個任務的執行需要一系列的前置任務的完成。比如一個上遊任務A完成特定邏輯之後,而下遊的任務B則依賴任務A輸出的資料結果才能産生自己的資料和結果。是以為了保證資料的準确性和可靠性,就必須根據這些任務之間的依賴關系從上遊到下遊有序的執行。怎麼樣讓大量的任務準确的完成排程而不出現問題,甚至在任務排程執行中出現錯誤的情況下,任務能夠完成自我恢複甚至執行錯誤告警與完整的日志查詢。大資料離線任務排程系統就是要發揮這樣的作用。
圖3:一個簡單的任務依賴圖
排程系統的核心功能主要就是如下三點:
組織和管理任務流程,定時排程和執行任務,處理任務間依賴關系。
對于一個完善的離線排程系統,需要有以下核心功能:
1)作為大資料體系中的一個指揮中心,負責根據時間,依賴,任務優先級,資源等條件排程任務;
2)需要能處理任務的多種依賴關系,包括時間依賴,任務上下遊依賴,自身依賴等;
3)資料量巨大,任務種類繁多,需要執行多種任務類型,如MapReduce,hive,spark以及shell,python等;
4)需要有一個完善的監控系統監控整個排程和執行的過程,保障任務排程和執行的整個鍊條,過程中出現異常情況能及時發送告警通知。
我們的OFLOW系統就是為了實作以上需求的。
2 OFLOW系統在OPPO的應用
OFLOW目前提供的核心功能主要以下幾點:
1)高效準時的任務排程;
2)靈活的排程政策:時間,上下遊依賴,任務自身依賴;
3)多種任務類型:資料內建、Hive、Python、Java、MapReduce、Spark、SparkSQL、Sqoop、機器學習任務等;
4)業務間隔離,任務程序間隔離;
5)高可用,可擴充;
6)任務配置:參數,失敗重試(次數,間隔),失敗和逾時告警,不同級别告警,任務回調;
7)豐富全面的操作頁面,任務的開發、運維、監控等操作圖形化頁面化;
8)權限管理;
9)實時檢視任務狀态和分析日志,并進行停止、重跑、補錄等各種運維操作;
10)任務曆史資料分析;
11)腳本開發,測試,釋出流程;
12)告警監控:多種異常情況的狀态監控,靈活配置;
13)核心任務重點監控,保障準點率;
14)支援API接入。
目前OFLOW在我司已經承擔了非常多的任務的排程。
OFLOW現有國内,新加坡,印度,歐盟和北美5大叢集,歐盟和北美叢集最近不久上線的,任務暫時還沒上量。目前主力叢集是國内,新加坡和印度。
目前使用者可以通過以下幾種方式接入到OFLOW:
1)oflow的webserver;
2)南天門平台,其中的資料研發 - 離線任務子產品,資料內建任務子產品,離線腳本開發子產品。後端的任務排程和執行全部也是在oflow系統上;
3)oflow還支援通過api的方式接入,目前也已經有多個業務通過api的方式使用oflow系統;
圖4:oflow服務接入方式
3 OFLOW系統的設計和演進
根據前面的資訊,可以看到整個離線排程系統最核心的是兩個元件,一個的排程引擎,一個是執行引擎。
排程引擎根據任務屬性(周期,延遲,依賴關系等)排程任務,根據任務優先級,隊列和資源情況分發到不同的執行節點;
執行引擎擷取滿足執行條件的任務,執行任務,同時輸出任務執行過程中的日志,并監控任務執行過程。
在目前市面上常見的離線排程系統中,airflow可以說是其中的佼佼者,經過了多年的發展,功能已經非常完善,在開源社群也非常活躍。
Airflow于2014年10月由Airbnb的Maxime Beauchemin開始;
2015年6月宣布正式加入Airbnb Github;
2016年3月加入了Apache Software Foundation的孵化計劃;
目前的更新疊代版本已經到了1-10版本;2-1版本。
我們oppo的離線排程系統是在airflow 1.8版本上引入進來的。
下面是幾個在airflow系統中的概念,在其它的離線排程系統中也有類似的概念。
1)DAG:即有向無環圖(Directed Acyclic Graph),将所有需要運作的tasks按照依賴關系組織起來,描述的是所有tasks執行的依賴關系。在airflow中,DAG由一個可執行的python腳本來定義。
2)Operators:可以了解為一個任務模闆,描述了DAG中一個具體的task要做的事情。airflow内置了很多operators,如BashOperator 用來執行bash 指令,PythonOperator 調用任意Python 函數,EmailOperator 用于發送郵件,HTTPOperator 用于發送HTTP請求, SqlOperator 用于執行SQL指令…同時,使用者可以自定義Operator,這給使用者提供了極大的便利性。他的作用就像java中的class檔案。
3)Sensor是一類特殊的Operator,是被特定條件觸發的,比如ExteralTaskSensor, TimeSensor, TimeDeltaSensor。
4)Tasks:Task 是 Operator的一個執行個體,也就是DAGs中的一個node, 當使用者執行個體化一個operator,即用一些參數特例化一個operator,就生成了一個task。
5)DagRun:當dag檔案被airflow識别并排程時,運作中的DAG即為dagRun。在webUi界面可以看到。
6)Task Instance:task的一次運作。即運作起來的task,task instance 有自己的狀态,包括“running”, “success”, “failed”, “skipped”, “up for retry”等。
在airflow中,定義dag和dag中的任務是通過一個python檔案實作的,這就是一個例子。
這個py檔案是來定義dag的,雖然它也開源直接運作,但單獨運作并沒有什麼效果,隻是檢測python文法是否正确。他也不執行具體的工作,隻是描述任務之間的依賴關系,以及排程的時間間隔等要求。
這個需要在任務排程和執行時進行解析,才能按照設定邏輯排程,按照使用者設定的執行步驟運作。
圖5&圖6:原生airflow定義task和dag的方式
這樣一個python檔案就對資料開發人員提出了比較高的要求,需要平台的使用者對python編碼很熟練才行。
下圖是airflow的整體架構設計,其中airflow home dags用于存儲定義dag和task的python檔案,webserver用于提供web服務,展示dag視圖,執行個體,日志等非常多的資訊,airflow的web頁面也是很完善的。scheduler是排程節點,執行dag解析,任務排程的工作;worker節點是執行節點,可以有很多組,可以監聽不同的隊列,作用是執行scheduler排程起來的任務。
圖7:airflow架構
我們oppo的離線排程系統oflow就是在開源airflow的基礎上開發的。
開發中解決的幾個比較核心的問題是:
1)将dag和task的定義從python檔案修改為web配置資料庫存儲,同時dag的解析也是從解析python檔案修改為了從資料庫查詢并解析。
2)另外一個就是和公司的大資料開發平台結合,實作開發,測試和釋出流程,友善使用者的開發,測試驗證和釋出流。
3)另外還添加了很多的監控告警,用來比較全面的監控任務排程和執行的整個流程;
如下是我們OFLOW平台的整個架構:
圖8:oflow架構設計
1)webserver用來提供web服務,友善使用者進行dag,task的配置以及非常多的資訊查詢;
2)scheduler是排程節點,負責任務的排程,通過解析dag和task,進行一系列的邏輯判斷任務是否滿足排程條件;
3)worker是執行節點,負責任務執行個體的執行;
4)api server是我們後來開發中新增的一個元件,用來解耦worker和我們資料庫的操作,後續也承擔的其它的一些功能;
5)使用mysql存儲dag,task,task_instance等所有的中繼資料資訊;
6)使用celery做消息隊列,broker使用的是redis;同時redis也充當了緩存的作用;
7)oflow也同時接入雲監控負責發送告警資訊,使用ocs用于存儲日志和使用者腳本檔案;
8)同時oflow也接入了診斷平台,這個是最新接入的,協助使用者對異常的oflow任務進行診斷;
如下這個圖顯示了整個任務排程和執行的整個流程:
圖9:oflow中的任務執行個體狀态流轉
目前OFLOW也有了比較全面的監控:
圖10:oflow的監控告警
以上就是OFLOW的整體架構,任務排程和執行整個流程。
目前OFLOW的整個服務也存在一些問題:
1)任務排程間隔問題:
根據前面的任務排程的流程,我們可以看到,oflow任務的排程是通過scheduler周期掃描解析dag和task的。這種方式就會造成任務上下遊之間會有一定時間的延遲。比如A任務完成後,直接下遊任務B并不能馬上被排程執行,需要等待scheduler下次掃描時掃到改任務才能被觸發。如果任務的依賴深度比較深,上下遊鍊條很長,每兩個任務間有一定間隔,整體的間隔時間就會比較久。尤其是在淩晨任務排程高峰這樣的時間點。
2)服務高可用問題:
原生的oflow不支援高可用。目前我們的方案是準備一個備節點,在檢測到scheduler異常時,可以拉起備用節點。
3)業務增長造成的排程壓力問題:
目前oflow每日的任務量非常多,而且也在快速增長,oflow的排程壓力也是越來越高,目前的方案的對scheduler進行橫向擴充,讓不同的scheduler排程不同的dag;
4)排程峰谷的成本問題:
離線排程任務的一個很明顯的特征就是存在任務的高峰和低谷。oflow的天級别和小時級别的排程任務是最多的,這樣就會造成在每天的淩晨時間是任務排程的大高峰,在每小時的前一段時間是排程的小高峰,而其它時間段則是低谷。高峰狀态任務會出現隊列擁堵情況,而低谷時間,機器是處于比較空閑的狀态。如何更有效的利用系統資源,也是值得我們後續思考和優化的點。
4 全新的離線排程系統OFLOW 2.0
下面再向大家介紹一下,近期已經上線試用的OFLOW 2.0的産品特殊和架構設計。
我們oflow 2.0平台想解決的問題有以下幾點:
1)任務實時觸發,降低上下遊任務之間的延遲;
2)不再以dag去組織和排程任務。以dag為排程次元,就會存在跨周期依賴的問題。實際中會有很多任務需要依賴其它dag的任務,比如一個天級别的任務需要依賴另一個小時級别的dag的某個任務在24個周期要全部完成。目前oflow的解決方案是通過一個跨dag依賴任務ExternalTaskSensor去實作的。這個無論是在任務配置上,還是在對概念的了解上,都存在一些問題;
3)另外就是希望能簡化配置,oflow的dag和task的功能比較強大,但是配置也非常多,使用者完成一個dag,一個task的配置需要了解很多概念,輸入很多資訊。這樣好處是比較靈活,但是缺點就是很不友善。我們2.0就希望能夠簡化配置,隐藏一些不必要的概念和配置;
4)同時還希望能更使使用者在任務開發,測試和釋出等一系列流程更加便捷;
5)2.0的各個元件能在高可用和可擴充性上更加便捷簡單。
oflow 2.0系統就通過以和1.0差别很大的設計實作這些需求:
1)任務實時觸發;
2)以為業務流程方式組織任務,而非dag,不再需要跨dag依賴的概念;
3)各個元件的可擴充性;
4)系統的标準化:簡化了很多任務的配置,操作門檻更低。任務執行環境标準化,減少環境問題,降低運維方面的成本。
oflow 2.0的整體架構設計如下:
oflow 2.0目前是沒有供使用者使用的前端頁面,是通過南天門2.0的離線子產品調用oflwo 1.0的api server。是以你們在使用oflow 2.0的離線子產品時,後端的資料存儲,任務觸發,排程,執行等一系列流程都是在oflow 2.0的平台上實作的。
圖11:oflow 2.0的架構設計
1)首先的這個元件就是api server。除了南天門調用之外,oflow 2.0内部的worker執行節點也和api server有很多互動;apiserver主要實作的是和2.0資料庫的互動,業務流程,任務,執行個體等各項操作,以及上遊任務觸發等内在邏輯;
2)Trigger元件的功能比較純粹,就是負責掃描任務進行觸發;
3)scheduler排程節點負責任務的排程解析,通過時間輪,任務依賴資訊管理,任務優先級和隊列等一系列的服務和管理來分析和排程任務;
4)worker節點和1.0的邏輯比較接近,負責任務的實際執行過程,支援了包括shell, python, sparkSQL和資料內建任務這四種大的類型的任務,同時也支援使用者對開發的腳本進行測試,任務執行日志的處理,支援對正在執行的任務進行停止操作,同時還有任務執行結束後的回調邏輯;
5)Monitor元件一方面是負責監控内部各個元件,其它各個元件在啟動後都會向monitor進行注冊,後續一旦節點出問題,monitor可以對在該節點上排程和執行的任務進行處理。monitor同時還負責處理任務執行過程中的各種告警資訊和一些通知性資訊的發送;
其中還有兩個消息隊列,
1)一個是Schedule MQ,負責接收滿足部分排程條件可以開始排程的任務并轉交給scheduler去處理;
2)另一個是Task MQ,負責接收滿足所有依賴條件,可以執行的任務,worker端從隊列中擷取任務并消費。
除了這些開發的元件之外,oflow 2.0也用到了一些通用的産品,包括MySQL, Redis,以及對象存儲存在,雲監控系統,以及調用了公司IT系統的一些api。
這張圖展示了OFLOW的任務排程和執行的整個流程:
圖12:oflow 2.0任務執行個體的排程和執行流程
其中排程開始入口有兩個,一個是trigger, 一個是webserver。
trigger負責提前5分鐘掃描即将要執行的任務,掃描出來之後放入到schedule mq中;
webserver負責多個觸發邏輯,一方面是使用者手動觸發的任務重跑和補錄操作,另一個是上遊某個任務完成後,将其直接下遊擷取出來,放入到schedule mq;
這些消息在schedule mq中會被scheduler消費,schedule會分析任務執行個體的所有依賴關系,包括時間依賴,上下遊依賴,自身依賴等資訊。如果任務的各種依賴條件都滿足,則會被放到task mq中被worker消費;不滿足時間依賴的任務會被放入到時間輪中,等達到相應時間刻度後會自動觸發;不滿足執行條件的任務的所有依賴資訊儲存在redis中,等後續時間到達,或者依賴的上遊任務完成,會不斷更新該執行個體的依賴資訊,直到所有依賴條件滿足。滿足依賴條件的任務,schedule也會分析任務所屬的項目以及任務優先級等配置資訊,将任務放入到task mq中的不同的消息隊列中;
worker會從task mq中消費任務。拿到任務後,通過擷取的任務的詳細資訊,然後執行。判斷任務執行結果,如果執行成功,則會通知到api server, api server除了更新執行個體狀态等資訊外,還會同時查詢該任務的直接下遊,将其直接下遊放入到schedule mq中;如果任務失敗,則會根據是否還有重試次數決定是否要重試,如果沒有重試次數則會認定任務失敗,通知到api server, api serer更新執行個體狀态。
目前OFLOW 2.0已經完成了所有的設計,開發和測試環境,應經過了一段時間的内測和壓力測試等環節。最近也已經開放試用了。歡迎大家試用2.0系統,并在試用過程中給與回報和建議。
目前使用者如果想使用我們的OFLOW 2.0系統的話,可以登入南天門2.0平台上試用。
5 結語
以上就是我跟大家分享的OFLOW的一些資訊。
在此我也展望一下我們後續OFLOW平台的發展:
1)OFLOW 1.0的排程性能問題。由于2.0和1.0系統的變化較大,後續OFLOW 1.0和2.0平台會在一段較長的時間内共存,是以對1.0系統的排程性能我們也需要不斷去優化,以應對高速增長的任務量;
一方面是想辦法縮短任務間的排程間隔,以提升任務執行效率;
另一方面是希望能探索更便捷有效的擴充方式,應對排程任務量的增加。
圖13:oflow的任務增長趨勢
2)互動體驗上
頁面互動的友好性上進行完善;添加一系列的批量任務操作和運維方面的功能;同時還希望以dag或者task等次元展示曆史統計資訊,以供使用者參考;另外就是針對任務操作審計,任務的監控系統進行優化;
3)成本優化
另外一個就是前面提到的成本優化,下圖反映的是一天中24個小時的任務并發執行情況,任務存在非常明顯的高峰和低谷。
圖14:oflow每日各時間段内的執行個體數量
後續考慮想辦法對任務錯峰執行,比如在計費模式上去鼓勵使用者将時效性要求不高的任務放在任務低谷進行執行;另外一個就是希望探索一下資源的動态擴縮容來實作成本優化。
4)另外還希望後續OFLOW不單單起到一個任務排程的作用,希望後續能和後端的大資料叢集有更多的互動;
5)還有一點就是希望對監控進行進一步的完善。其中比較關鍵的一個是核心任務的鍊路的識别和監控。
就是不但要能監控到核心任務,還能将該核心任務的所有上遊邏輯監控到,鍊路中的某個環節一旦異常,能夠很快的告警出來;另外一點是使用者收到告警時的處理,很多使用者收到任務告警後不清楚如何處理,後續oflow會想辦法引導使用者處理。