攜程大資料平台負責人張翼分享攜程的實時大資料平台的疊代,按照時間線介紹采用的技術以及踩過的坑。攜程最初基于穩定和成熟度選擇了Storm+Kafka,解決了資料共享、資源控制、監控告警、依賴管理等問題之後基本上覆寫了攜程所有的技術團隊。今年的兩個新嘗試是Streaming CQL(華為開源)和JStorm(阿裡開源),意在提升開發效率、性能和處理消息擁塞能力,目前已有三分之一的Storm應用已經遷到JStorm 2.1上。
今天給大家分享的是攜程在實時資料平台的一些實踐,按照時間順序來分享我們是怎麼一步一步建構起這個實時資料平台的,目前有一些什麼新的嘗試,未來的方向是怎麼樣的,希望對需要建構實時資料平台的公司和同學有所借鑒。
一、為什麼要做實時資料平台
首先先介紹一下背景,為什麼我們要做這個資料平台?其實了解攜程的業務的話,就會知道攜程的業務部門是非常多的,除了酒店和機票兩大業務之外,有近20個SBU和公共部門,他們的業務形态差異較大,變化也快,原來那種Batch形式的資料處理方式已經很難滿足各個業務資料擷取和分析的需要,他們需要更為實時地分析和處理資料。
其實在這個統一的實時平台之前,各個部門自己也做一些實時資料分析的應用,但是其中存在很多的問題:
- 技術選型五花八門,消息隊列有用ActiveMQ的,有用RabbitMQ的,也有用Kafka的,分析平台有用Storm的,有用Spark-streaming的,也有自己寫程式處理的;由于業務部門技術力量參差不齊,并且他們的主要精力還是放在業務需求的實作上,是以這些實時資料應用的穩定性往往難以保證。
- 缺少周邊設施,比如說像報警、監控這些東西。
- 資料和資訊的共享不順暢,如果度假要使用酒店的實時資料,兩者分析處理的系統不同就會很難弄。是以在這樣前提下,就需要打造一個統一的實時資料平台。
二、需要怎樣的實時資料平台
這個統一的資料平台需要滿足4個需求:
- 首先是穩定性,穩定性是任何平台和系統的生命線;
- 其次是完整的配套設施,包括測試環境,上線、監控和報警;
- 再次是友善資訊共享,資訊共享有兩個層面的含義,1、是資料的共享;2、是應用場景也可以共享,比如說一個部門會受到另一個部門的一個實時分析場景的啟發,在自己的業務領域内也可以做一些類似的應用;
- 最後服務響應的及時性,使用者在開發、測試、上線及維護整個過程中都會遇到各種各樣的問題,都需要得到及時的幫助和支援。
三、如何實作
在明确了這些需求之後我們就開始建構這個平台,當然第一步面臨的肯定是一個技術選型的問題。消息隊列這邊Kafka已經成為了一個既定的事實标準;但是在實時處理平台的選擇上還是有蠻多候選的系統,如Linkedin的Samza、Apache的S4,最主流的當然是Storm和Spark-streaming啦。
出于穩定和成熟度的考量,當時我們最後是選擇了Storm作為實時平台。如果現在讓我重新再來看的話,我覺得Spark-streaming和Storm都是可以的,因為這兩個平台現在都已經比較成熟了。

架構圖如上,比較簡單,就是從一些業務的伺服器上去收集這個日志,或者是一些業務資料,然後實時地寫入Kafka裡面,Storm作業從Kafka讀取資料,進行計算,把計算結果吐到各個業務線依賴的外部存儲中。
那我們僅僅建構這些就夠了嗎?當然是遠遠不夠的,因為這樣僅僅是一些運維的東西,你隻是把一個系統的各個子產品搭建起來。前面提到的平台的兩個最關鍵的需求:資料共享和平台整體的穩定性很難得到保證,我們需要做系統治理來滿足這兩個平台的關鍵需求。
資料共享
首先說說資料共享的問題,我們通常認為資料共享的前提是指使用者要清晰地知道使用資料源的那個業務含義和其中資料的Schema,使用者在一個集中的地方能夠非常簡單地看到這些資訊;我們解決的方式是使用Avro的方式定義資料的Schema,并将這些資訊放在一個統一的Portal站點上;資料的生産者建立Topic,然後上傳Avro格式的Schema,系統會根據Avro的Schema生成Java類和相應的JAR,并把JAR加入Maven倉庫;對于資料的使用者來說,他隻需要在項目中直接加入依賴即可。
此外,我們封裝了Storm的API,幫使用者實作了反序列化的過程,示例代碼如下,使用者隻要繼承一個類,然後制定消息對應的類,系統能夠自動完成消息的反序列化,你在process方法中拿到的就是已經反序列化好的對象,對使用者非常友善。
資源控制
其次我們來說說資源控制,這個是保證平台穩定性的基礎,我們知道Storm其實在資源隔離方面做得并不是太好,是以我們需要對使用者的Storm作業的并發做一些控制。我們的做法還是封裝Storm的接口,将原來設定topology和executor并發的方法去掉,而把這些設定挪到Portal中。下面是示例的代碼:
另外,我們前面已經提到過了,我們做了一個統一的Portal友善使用者管理,使用者可以檢視Topic相關資訊,也可以用來管理自己的Storm作業,配置,啟動,Rebalance,監控等一系列功能都能夠在上面完成。
在完成了這些功能之後,我們就開始初期業務的接入了,初期業務我們隻接了兩個資料源,這兩個資料源的流量都比較大,就是一個是UBT(攜程的使用者行為資料),另一個是Pprobe的資料(應用流量日志),那基本上是攜程用行為的通路日志。主要應用集中在實時的資料分析和資料報表上。
在平台搭建的初期階段,我們有一些經驗和大家分享一下:
- 最重要的設計和規劃都需要提前做好,因為如果越晚調整的話其實付出的成本會越大的;
- 集中力量實作了核心功能;
- 盡早的接入業務,在核心功能完成并且穩定下來的前提下,越早接入業務越好,一個系統隻有真正被使用起來,才能不斷進化;
- 接入的業務一定要有一定的量,因為我們最開始接入就是整個攜程的整個UBT,就是使用者行為的這個資料,這樣才能比較快的幫助整個平台穩定下來。因為你平台剛剛建設起來肯定是有各種各樣的問題的,就是通過大流量的驗證之後,一個是幫平台穩定下來,修複各種各樣的BUG,第二個是說會幫我們積累技術上和運維上的經驗。
完善“外圍設施”
在這個之後我們就做了一系列工作來完善這個平台的“外圍設施”:
- 把Storm的日志導入到ES裡面,通過Kanban展示出來;原生的Storm日志檢視起來不友善,也沒有搜尋的功能,資料導入ES後可以通過圖示的形式展現出來,也有全文搜尋的功能,排錯時非常友善。
- metrics相關的一些完善;除了Storm本身Build in的metrics之外我們還增加了一些通用的埋點,如從消息到達Kafka到它開始被消費所花的時間等;另外我們還是實作了自定義的MetricsConsumer,它會把所有的metrics資訊實時地寫到攜程自己研發的看闆系統Dashboard和Graphite中,在Graphite中的資訊會被用作告警。
- 建立了完善的告警系統,告警基于輸出到Graphite的metrics資料,使用者可以配置自己的告警規則并設定告警的優先級,對于高優先級的告警,系統會使用TTS的功能自動撥打聯系人的電話,低優先級的告警則是發送郵件;預設情況下,我們會幫使用者添加Failed數量和消費堵塞的預設的告警。
攜程實時大資料平台演進:1/3 Storm應用已遷到JStorm - 提供了适配攜程Message Queue的通用的Spout和寫入Redis,HBASE,DB的通用的Bolt,簡化使用者的開發工作。
- 在依賴管理上也想了一些方法,友善API的更新;在muise-core(我們封裝的Storm API項目)的2.0版本,我們重新整理了相關的API接口,之後的版本盡量保證接口向下相容,然後推動所有業務都更新一遍,之後我們把muise-core的jar包作為标準的Jar包之一放到每台supervisor的storm安裝目錄的lib檔案夾下,在之後的更新中,如果是強制更新,就聯系使用者,逐個重新開機Topology,如果這次更新不需要強制推廣,等到使用者下次重新開機Topology時,這個更新就會生效。
在做完這些工作之後,我們就開始大規模的業務接入了,其實目前基本上覆寫了攜程的所有的技術團隊,應用的類型也比初期要豐富很多。下面給大家簡單介紹一下,在攜程的一些實時應用,主要分為下面四類:
- 實時資料報表;
- 實時的業務監控;
- 基于使用者實時行為的營銷;
- 風控和安全的應用。
第一個展示的是攜程這邊的網站資料監控平台cDataPortal,攜程會對每個網頁通路的性能做一些很詳細的監控,然後會通過各種圖表展示出來。
第二個應用是攜程在AB Testing的應用,其實大家知道AB Testing隻有在經過比較長的一段時間,才能得到結果,需要達到一定的量之後才會在統計上有顯著性;那它哪裡需要實時計算呢?實時計算主要在這邊起到一個監控和告警的作用:當AB Testing上線之後,使用者需要一系列的實時名額來觀察分流的效果,來确定它配置是否正确;另外需要檢視對于訂單的影響,如果對訂單産生了較大的影響,需要能夠及時發現和停止。
第三個應用是和個性化推薦相關,推薦其實更多的是結合使用者的曆史偏好和實時偏好來給大家推薦一些場景。這邊實時偏好的收集其實就是通過這個實時平台來做的。比較相似的應用有根據使用者實時的通路行為推送一些比較感興趣的攻略,團隊遊會根據使用者的實時通路,然後給使用者推送一些優惠券之類的。
四、那些曾經踩過的坑
在說完了實時資料平台在攜程的應用,讓我們簡單來聊聊這個過程中我們的一些經驗。
技術坑
首先是技術上的,先講一下我們遇到的坑吧。
我們使用的Storm版本是0.9.4,我們遇到了兩個Storm本身的BUG,當然這兩個BUG是比較偶發性的,大家可以看一下,如果遇到相應的問題的話,可以參考一下:
-
STORM-763:Nimbus已經将worker配置設定到其他的節點,但是其他worker的netty用戶端不連接配接新的worker;
應急處理:Kill掉這個worker的程序或是重新開機相關的作業。
- STORM-643:當failed list不為空時,并且一些offset已經超出了Range範圍,KafkaUtils會不斷重複地去取相關的message。
使用坑
另外就是在使用者使用過程中的一些問題,比如說如果可能,我們一般會推薦使用者使用localOrShuffleGrouping,在使用它時,上下遊的Bolt數要比對,否則會出現下遊的大多數Bolt沒有收到資料的情況,另外就是使用者要保證Bolt中的成員變量都要是可序列化的,否則在叢集上運作時就會報錯。
團隊坑
然後就是關于支援和團隊的經驗。
- 在大量接入前其告警和監控設施是必須的,這兩個系統是大量接入的前提,否則難以在遇到非常問題時及時發現或是快速定位解決。
- 清晰的說明、指南和Q&A能夠節約很多支援的時間。使用者在開發之前,你隻要提供這個文檔給他看,然後有問題再來咨詢。
- 要把握一個接入節奏,因為我們整個平台的開發人員比較少,也就三個到四個同學,雖然已經全員客服了去應對各個BU的各種各樣的問題,但是如果同時接入太多項目的話還會忙不過來。
- 支援還有重要的一點就是“授人以漁”,在支援的時候給他們講得很細吧,讓他們了解Kafka和Storm的基本知識,這樣的話有一些簡單問題他們可以内部消化,不用所有的問題都來找你的團隊支援。
五、新的探索
前面講的是我們基本上去年的工作,今年我們在兩個方向上做了一些新的嘗試:Streaming CQL和JStorm,和大家分享下這兩個方面的進展。
Streaming CQL
Streaming CQL是華為開源的一個實時流處理的SQL引擎,它的原理就是把SQL直接轉化成為Storm的Topology,然後送出到Storm叢集中。它的文法和标準的SQL很接近,隻是增加了一些視窗函數來應對實時處理的場景。
下面我通過一個簡單的例子給大家展示一個簡單的例子,給大家有個直覺的感受。我的例子是:
- 從kafka中讀取資料,類型為ubt_action;
- 取出其中的page,type,action,category等字段然後每五秒鐘按照page, type字段做一次聚合;
- 最後把結果寫到console中。
如果需要用Storm實作的話,一般你需要實作4個類和一個main方法;使用Streaming CQL的話你隻需要定義輸入的Stream和輸出的Stream,使用一句SQL就能實作業務邏輯,非常簡單和清晰。
那我們在華為開源的基礎上也做了一些工作:
- 增加Redis,HBASE,HIVE(小表,加載記憶體)作為Data Source;
- 增加HBASE,MySQL / SQL Server,Redis作為資料輸出的Sink;
- 修正MultiInsert語句解析錯誤,并回報到社群;
- 為where語句增加了In的功能;
- 支援從攜程的消息隊列Hermes中讀取資料,
Streaming CQL最大的優勢就是能夠使不會寫Java的BI的同僚,非常友善地實作一些邏輯簡單的實時報表和應用,比如下面說到的一個度假的例子基本上70行左右就完成了,原來開發和測試的時間要一周左右,現在一天就可以完整,提高了他們的開發效率。
【案例】度假BU需要實時地統計每個使用者通路“自由行”、“跟團遊”、“半自助遊”産品的占比,進而進一步豐富使用者畫像的資料:
- 資料流:UBT的資料;
- Data Source:使用Hive中的product的次元表;
- 輸出:Hbase。
JStorm
今年我們嘗試的第二個方向就是JStorm,Storm的核心使用Clojure編寫,這給後續深入的研究和維護帶來了一定的困難,而Jstorm是阿裡開源的項目,它完全相容Storm的程式設計模型,核心全部使用Java來編寫,這就友善了後續的研究和深入地調研;阿裡的JStorm團隊非常Open,也非常專業化,我們一起合作解決了一些在使用上遇到的問題;除了核心使用Java編寫這個優勢之外,JStorm對比Storm在性能上也有一定的優勢,此外它還提供了資源隔離和類似于Heron之類的反壓力機制,是以能夠更好的處理消息擁塞的這種情況。
我們現在基本上已經把三分之一的Storm應用已經遷到JStorm上了,我們使用的版本是2.1。在使用過程中有一些經驗跟大家分享一下:
- 我們在與kafka內建中遇到的一些問題,這些在新版本中已經修複了:
- 在JStorm中,Spout的實作有兩種不同的方式:Multi Thread(nextTuple,ack & fail方法在不同的程序中調用)和Single Thread,原生的Storm的Kafka Spout需要使用Single Thread的方式運作;
- 修複了Single Thread模式的1個問題(新版本已經修複)。
- JStorm的metrics機制和Storm的機制完全不相容,是以相關的代碼都需要重寫,主要包括适配了Kafka Spout和我們Storm的API中的Metrics和使用MetricsUploader的功能實作了資料寫入Dashboard和Graphite的功能這兩點,此外我們結合了兩者的API提供了一個統一的接口,能相容兩個環境,友善使用者記錄自定義的metrics。
以上就是我要分享的内容,在結尾處,我簡單總結一下我們的整體架構:
底層是消息隊列和實時處理系統的開源架構,也包括攜程的一些監控和運維的工具,第二層就是API和服務,而最上面通過Portal的形式講所有的功能提供給使用者。
六、未來方向
在分享的最後,我來和大家聊聊實時資料平台未來的發展方向,主要有兩個:
- 繼續推動平台整體向JStorm遷移,當然我們也會調研下剛剛開源的Twitter的Heron,與JStorm做一個對比;
- 對于dataflow模型的調研和落地,去年Google發表了dataflow相關的論文(強烈建議大家讀讀論文或是相應的介紹文章),它是新一代實時處理的模型,能在保證明時性的同時又能保證資料的正确性,目前開源的實作有兩個:Spark 2.0中Structured Streaming和Apache的另一個開源項目BEAM,BEAM實作了Google Dataflow的API,并且在Spark和Flink上實作了相應的Executor。
下半年我們還會做一些調研和探索性的嘗試,并尋找合适的落地場景。
本文作者為攜程大資料平台負責人張翼。張翼浙江大學碩士畢業,2015年初加入攜程,主導了攜程實時資料計算平台的建設,以及攜程大資料平台整合和平台技術的演進。進入網際網路行業近10年,從事大資料平台和架構的工作超過6年。
本文将同步釋出與攜程技術中心微信公号ctriptech。
責編:周建丁([email protected])
CCAI 2016中國人工智能大會将于8月26-27日在京舉行,AAAI主席,國内外衆多院士,MIT、微軟、大疆、百度、微信、滴滴專家領銜全球技術領袖和産業先鋒打造國内人工智能前沿平台,8個重磅主題報告,4大專題論壇,1000+高品質參會嘉賓,探讨人機互動、機器學習、模式識别及産業實戰。大會門票已經剩餘不多,與大牛對話,火速搶票!