本節書摘來自華章計算機《storm企業級應用:實戰、運維和調優》一書中的第1章,第1.1節,作者:馬延輝 陳書美 雷葆華著, 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。
所謂實時流計算,就是近幾年由于資料得到廣泛應用之後,在資料持久性模組化不滿足現狀的情況下,急需資料流的瞬時模組化或者計算處理。這種實時計算的應用執行個體有金融服務、網絡監控、電信資料管理、web應用、生産制造、傳感檢測,等等。在這種資料流模型中,單獨的資料單元可能是相關的元組(tuple),如網絡測量、呼叫記錄、網頁通路等産生的資料。但是,這些資料以大量、快速、時變(可能是不可預知)的資料流持續到達,由此産生了一些基礎性的新的研究問題——實時計算。實時計算的一個重要方向就是實時流計算。
1.1.1 實時流計算背景
資料的價值随着時間的流逝而降低,是以事件出現後必須盡快對它們進行處理,最好事件出現時便立刻對其進行處理,發生一個事件進行一次處理,而不是緩存起來成一批處理。例如商用搜尋引擎,像google、bing和yahoo!等,通常在使用者查詢響應中提供結構化的web結果,同時也插入基于流量的點選付費模式的文本廣告。為了在頁面上的最佳位置展現最相關的廣告,通過一些算法來動态估算給定上下文中一個廣告被點選的可能性。上下文可能包括使用者偏好、地理位置、曆史查詢、曆史點選等資訊。一個主搜尋引擎可能每秒鐘處理成千上萬次查詢,每個頁面都可能會包含多個廣告。為了及時處理使用者回報,需要一個低延遲、可擴充、高可靠的處理引擎。
對于這些實時性要求很高的應用,若把持續到達的資料簡單地放到傳統資料庫管理系統(dbms)中,并在其中進行操作,是不切實際的。傳統的dbms并不是為快速連續地存放單獨的資料單元而設計的,而且也不支援“持續處理”,而“持續處理”是資料流應用的典型特征。另外,現在人們都認識到,“近似性”和“自适應性”是對資料流進行快速查詢和其他處理(如資料分析和資料采集)的關鍵要素,而傳統dbms的主要目标恰恰與之相反:通過穩定的查詢設計,得到精确的答案。
另外一些方案是采用mapreduce來處理實時資料流。但是,盡管mapreduce做了實時性改進,也很難穩定地滿足應用需求。這是因為hadoop mapreduce架構為批處理做了高度優化,典型的是通過排程批量任務來操作靜态資料,任務不是常駐服務,資料也不是實時流入;而資料流計算的典型範式之一是不确定資料速率的事件流流入系統,系統處理能力必須與事件流量比對。
1.1.2 實時計算應用場景
網際網路領域的實時流計算一般都是針對海量資料進行的,除了非實時計算的需求(如計算結果準确)以外,實時計算最重要的一個需求是能夠實時響應計算結果,一般要求為秒級。個人了解,網際網路行業的實時計算可以分為以下兩種應用場景。
1)資料源是實時的、不間斷的,要求對使用者的響應時間也是實時的。
主要用于網際網路流式資料處理。所謂流式資料,是指将資料看作資料流的形式來處理。資料流則是在時間分布和數量上無限的一系列資料記錄的集合體;資料記錄是資料流的最小組成單元。例如,對于大型網站,活躍的流式資料非常常見,這些資料包括網站的通路pv/uv、使用者通路的内容、搜尋的内容等。實時的資料計算和分析可以動态實時地重新整理使用者通路資料,展示網站實時流量的變化情況,分析每天各小時的流量和使用者分布情況,這對于大型網站來說具有重要的實際意義。
2)資料量大且無法或沒必要預算,但要求對使用者的響應時間是實時的。
主要用于特定場合下的資料分析處理。當資料量很大,同時發現無法窮舉所有可能條件的查詢組合或者大量窮舉出來的條件組合無用時,實時計算就可以發揮作用,将計算過程推遲到查詢階段進行,但需要為使用者提供實時響應。
1.1.3 實時計算處理流程
網際網路上海量資料(一般為日志流)的實時計算過程可以劃分為3個階段:資料的産生與收集階段、傳輸與分析處理階段、存儲對對外提供服務階段,如圖1-1所示。下面分别進行簡單介紹。

(1)資料實時采集
需求:功能上保證可以完整地收集到所有日志資料,為實時應用提供實時資料;響應時間上要保證明時性、低延遲(在1s左右);配置簡單,部署容易;系統穩定可靠等。
目前,網際網路企業的海量資料采集工具有facebook開源的scribe、linkedin開源的kafka、cloudera開源的flume,淘寶開源的timetunnel、hadoop的chukwa等,它們均可以滿足每秒數百mb的日志資料采集和傳輸需求。
(2)資料實時計算
傳統的資料操作,首先将資料采集并存儲在dbms中,然後通過查詢和dbms進行互動,得到使用者想要的答案。在整個過程中,使用者是主動的,而dbms系統是被動的,過程操作如圖1-2所示。
但是,對于現在大量存在的實時資料,如股票交易的資料,這類資料實時性強,資料量大,沒有止境,傳統的架構并不合适。流計算就是專門針對這種資料類型準備的。在流資料不斷變化的運動過程中實時地進行分析,捕捉到可能對使用者有用的資訊,并把結果發送出去。在整個過程中,資料分析處理系統是主動的,而使用者卻處于被動接收的狀态,處理流程如圖1-3所示。
需求:适應流式資料、不間斷查詢;系統穩定可靠、可擴充性好、可維護性好等。
有關計算的一些注意點:分布式計算、并行計算(節點間的并行、節點内的并行)、熱點資料的緩存政策、服務端計算。
(3)實時查詢服務
全記憶體:直接提供資料讀取服務,定期轉存到磁盤或資料庫進行持久化。
半記憶體:使用redis、memcache、mongodb、berkeleydb等記憶體資料庫提供資料實時查詢服務,由這些系統進行持久化操作。
全磁盤:使用hbase等以分布式檔案系統(hdfs)為基礎的nosql資料庫,對于keyvalue記憶體引擎,關鍵是設計好key的分布。
1.1.4 實時計算架構
最近這幾年随着實時計算的流行,相繼出現了以下實時計算的架構。
1.?ibm的streambase
streambase是ibm開發的一款商業流式計算系統,在金融行業和政府部門使用,其本身是商業應用軟體,但提供了開發版。相對于付費使用的企業版,開發版的功能更少,但這并不妨礙我們從外部使用api接口來對streambase本身進行分析。
streambase使用java開發,ide是基于eclipse進行二次開發,功能非常強大。streambase也提供了相當多的operator、functor以及其他元件來幫助建構應用程式。使用者隻需要通過ide拖拉控件,然後關聯,設定好傳輸的schema并且設定控件計算過程,就可以編譯出一個高效處理的流式應用程式。同時,streambase還提供了類sql來描述計算過程。streambase的架構如圖1-4所示。
streambase server是節點上啟動的管理程序,它負責管理節點上container的執行個體,每個container通過adapter獲得輸入,交給應用邏輯計算,然後通過adapter輸出。各個container互相連接配接,形成一個計算流圖。
adapter負責與異構輸入或輸出互動,源或目的地可能包括csv檔案、jdbc、jms、simulation(streambase提供的流産生模拟器)或使用者定制。
每個streambase server上面都會有一個system container,主要是産生系統監控資訊的流式資料。
ha container用于容錯恢複,可以看出它實際包含兩個部分:heartbeat和ha events,其中heartbeat也是tuple在container之間傳輸。在ha方案下,ha container監控primary server的活動情況,然後将這些資訊轉換成為ha events交給streambase monitor來處理。
monitor就是從system container和ha container中擷取資料并進行處理。streambase認為ha問題應該通過cep方式處理,也就是說出現問題的部件肯定會反映在system container和ha container的輸出流上面,monitor如果通過複雜事件處理這些tuples就能夠檢測到機器故障等問題,并做出相應處理。
2.?yahoo的s4
yahoo! s4(simple scalable streaming system)是一個通用的、分布式的、可擴充的、分區容錯的、可插拔的流式系統。基于s4架構,開發者可以容易地開發面向持續流資料處理的應用。s4的最新版本是v0.6.0,是apache孵化項目,其設計特點有以下幾個方面。
actor計算模型:為了能在普通機型構成的叢集上進行分布式處理,并且在叢集内部不使用共享記憶體,s4架構采用了actor模式,這種模式提供了封裝和位址透明語義,是以在允許應用大規模并發的同時,提供了簡單的程式設計接口。s4系統通過處理單元(processing elements,pes)進行計算,消息在處理單元間以資料事件的形式傳送,pe消費事件,發出一個或多個可能被其他pe處理的事件,或者直接釋出結果。每個pe的狀态對于其他pe不可見,pe之間唯一的互動模式就是發出事件和消費事件。
對等叢集架構:s4采用對等架構,叢集中的所有處理節點都是等同的,沒有中心控制節點,這使得叢集的擴充性很好,處理節點的總數理論上無上限;同時,s4沒有單點容錯的問題。
可插拔體系架構:s4系統使用java語言開發,采用了極富層次的子產品化程式設計,每個通用功能點都盡量抽象出來作為通用子產品,而且盡可能地讓各子產品實作可定制化。
支援部分容錯:基于zookeeper服務的叢集管理層會自動路由事件從失效節點到其他節點。除非顯式儲存到持久性存儲,否則節點故障時,節點上處理事件的狀态會丢失。
s4的重要應用場景是預估點選通過率(ctr)。ctr是廣告點選數除以展現數得到的比率,擁有足夠曆史的展現和點選資料後,ctr是使用者點選廣告可能性的一個很好的估算,精确的來源點選對于個性化和搜尋排名來說都價值無限。據s4的開發者稱,線上流量上的實驗顯示基于s4系統的新ctr計算架構可以在不影響收入的前提下将ctr值提高3%,這主要是通過快速檢測低品質的廣告并把它們過濾出去而獲得的收益。s4系統提供的低延遲處理能夠使得商務廣告部門獲益,但是潛在的風險也不能忽視,那就是事件流的速率快到一定程度後,s4可能無法處理,會導緻事件的丢失,如圖1-5
所示。
3.?twitter的storm
twitter的storm:storm是一個分布式的、容錯的實時計算系統。storm的用途:可用于處理消息和更新資料庫(流處理),在資料流上進行持續查詢,以流的形式傳回結果到用戶端(持續計算),并行化一個類似實時查詢的熱點查詢(分布式的rpc)。
storm為分布式實時計算提供了一組通用原語,可被用于“流處理”中,實時處理消息并更新資料庫。這是管理隊列及工作者叢集的另一種方式。storm也可用于“連續計算”(continuous computation),對資料流做連續查詢,在計算時将結果以流的形式輸出給使用者。它還用于“分布式rpc”,以并行的方式運作昂貴的運算。
storm的主要特點如下:
簡單的程式設計模型。類似于mapreduce降低了并行批處理複雜性,storm降低了進行實時處理的複雜性。
可以使用各種程式設計語言。可以在storm上使用各種程式設計語言。預設支援clojure、java、ruby和python。要增加對其他語言的支援,隻需實作一個簡單的storm通信協定即可。
容錯性。storm會管理工作程序和節點的故障。
水準擴充。計算是在多個線程、程序和伺服器之間并行進行的。
可靠的消息處理。storm保證每個消息至少能得到一次完整處理。當任務失敗時,它會負責從消息源重試消息。
快速。系統的設計保證了消息能得到快速的處理,使用zeromq作為其底層消息隊列。
本地模式。storm有一個“本地模式”,可以在處理過程中完全模拟storm叢集。這可以使使用者快速進行開發和單元測試。
4.?twitter的rainbird
rainbird是一款分布式實時統計系統,可以用于實時資料的統計。
1)統計網站中每一個頁面,域名的點選次數。
2)内部系統的運作監控(統計被監控伺服器的運作狀态)。
3)記錄最大值和最小值。
rainbird建構在cassandra上,使用scala編寫,依賴于zookeeper、scribe和thrift。每秒可以寫入10萬個事件,而且都帶有層次結構,或者進行各種查詢,延遲小于100ms。目前twitter已經在promoted tweets、微網誌中的連結、短位址、每個使用者的微網誌互動等生産環境使用了rainbird。其主要元件的功能如下。
zookeeper:是hadoop子項目中的一款分布式協調系統,用于控制分布式系統中各個元件的一緻性。
cassandra:是nosql中一款非常出色的産品,集合了dynamo和bigtable特性的分布式存儲系統,用于存儲需要統計的資料,并提供用戶端查詢統計資料(需要使用分布式counter更新檔cassandra-1072)。
scribe:是facebook開源的一款分布式日志收集系統,用于在系統中将各個需要統計的資料源收集到cassandra中。
thrift:是facebook開源的一款跨語言c/s網絡通信架構,開發人員基于該架構可以輕松地開發c/s應用。
5.?facebook的puma
puma是facebook的資料流處理系統,早期的處理系統如圖1-6所示,即二代puma。ptail将資料以流的方式傳遞給puma 2,puma 2每秒需要處理百萬級的消息,處理多為aggregation方式的操作,遵循時間序列,涉及的複雜aggregation操作諸如獨立訪次、最頻繁事件,等等。
對于每條消息,puma 2發送“increment”操作到hbase。考慮到自動負載均衡、自動容錯和寫入吞吐等因素,puma選擇hbase而不是mysql作為其存儲引擎。puma 2的伺服器都是對等的,即同時可能有多個puma 2伺服器向hbase中修改同一行資料。是以,facebook為hbase增加了新的功能,支援一條increment操作修改同行資料的多列。
puma 2的架構非常簡單并且易于維護,其涉及的狀态僅僅是ptail的checkpoint,即上遊資料位置周期性地存儲在hbase中。由于是對稱結構,叢集擴容和機器故障的處理都非常友善。不過,puma 2的缺點也很突出,首先,hbase的increment操作是非常昂貴的,因為它涉及讀和寫,而hbase的随機讀效率比較差;另外,複雜aggregation操作也不好支援,需要在hbase上寫很多使用者代碼;再者,puma 2在故障時會産生少量重複資料,因為hbase的increment和ptail的checkpoint并不是一個原子操作。
但值得一提的是,puma并沒有開源出來,使用者可以了解和借鑒其實作原理。
6.?阿裡的jstorm
jstorm是一個alibaba開源的分布式實時計算引擎,可以認為是twitter storm的java版本,使用者按照指定的接口實作一個任務,然後将這個任務遞交給jstorm系統,jstorm會啟動背景服務程序7×24小時運作,一旦某個worker發生故障,排程器立即配置設定一個新的worker替換這個失效的worker。
jstorm處理資料的方式是基于消息的流水線處理,是以特别适合無狀态計算,也就是計算單元依賴的資料全部可以在接受的消息中找到,并且最好一個資料流不依賴另外一個資料流。是以,jstorm适用于下面的場景:
日志分析。從日志中分析出特定的資料,并将結果存入外部存儲器,如資料庫。
管道系統。将資料從一個系統傳輸到另外一個系統,如将資料庫同步到hadoop。
消息轉化器。将接收到的消息按照某種格式轉化,存儲到另外一個系統,如消息中間件中。
統計分析器。從日志或消息中提煉出某個字段,然後進行count或sum計算,最後将統計值存入外部存儲器。
但是,jstorm的活躍度并不高,截至本章書寫時,整個jstorm項目共送出過36次,并且隻有1個committer,相比twitter storm,不管是活躍度,還是認可度都還不是一個數量級的産品。
7.?其他實時計算系統
(1)hstreaming
hstreaming是建立在hadoop上的可擴充的、可持續的資料分析系統。它可以分析、可視化并處理大量連續資料,如一個金融交易系統實時展示資料圖。hstreaming由jana uhlig與volkmar uhlig聯合創立,該公司沒有提供相關産品的開源版本,從官網資訊來看,隻提供相關的解決方案。
hstreaming公司嘗試為hadoop環境添加一個實時的元件,當資料送出到系統,在存儲到磁盤之前會進行資料處理,類似開源的storm和kafka。目前hstreaming已經建立了一個完整的系統,該系統能夠利用實時的引擎來處理視訊、伺服器、傳感器以及其他機器上生成的資料流,而且完全相容hadoop作為一個歸檔和批量處理系統。
(2)esper
esper是espertech公司使用java開發的事件流處理(event stream processing,esp)和複雜事件處理(complex event processing,cep)引擎。cep是一種實時事件處理并從大量事件資料流中挖掘複雜模式的技術。esp是一種從大量事件資料流中過濾、分析有意義的事件,并能夠實時取得這些有意義的資訊的技術。該引擎可應用于網絡入侵探測、sla監測、rfid讀取、航空運輸調控、金融(風險管理、欺詐探測)等領域。esper可以用在股票系統、風險監控系統等實時性要求比較高的系統中。
(3)borealis
borealis是由brandeis university、brown university和mit合作開發的一個分布式流式系統,由之前的流式系統aurora、medusa演化而來,是學術研究的一個産品,2008年已經停止維護。
borealis具有豐富的論文、完整的使用者/開發者文檔,系統是用c++實作的,運作于x86-based linux平台。系統是開源的,同時使用了較多的第三方開源元件,包括用于查詢語言翻譯的antlr、c++的網絡程式設計架構庫nmstl等。
borealis系統的流式模型和其他流式系統基本一緻:接受多元的資料流和輸出流,為了容錯,采用确定性計算,對于容錯性要求高的系統,會對輸入流使用算子進行定序。
8.?架構對比
實時資料流計算是近年來分布式、并行計算領域研究和實踐的重點,無論是工業界,還是學術界,都誕生了多個具有代表性的資料流計算系統,用于解決實際生産問題和進行學術研究。不同的系統滿足不同應用的需求,系統并無好壞之分,關鍵在于服務的對象是誰。圖1-7從開發語言、高可用機制、支援精确恢複、主從架構、資源使用率、恢複時間、支援狀态持久化及支援去重等幾個方面比較了典型的3個資料流計算系統puma、storm和s4。因為streambase是廠商發行商用版本,hstreaming隻提供解決方案,而jstorm和storm非常相似,是以這幾種産品并沒有羅列在圖1-7中。
可以看到,為了高效開發,兩個系統使用java,另一種系統使用函數式程式設計語言clojure;高可用方案,有兩個系統使用primary standby方式,系統恢複時間可控,但系統複雜度增加,資源使用率也較低,因為需要一些機器來當備機;而storm選擇了更簡單可行的上遊回放方式,資源使用率高,就是恢複時間可能稍長些;puma和s4都支援狀态持久化,但s4目前不支援資料去重,未來可能會實作;三個系統都做不到精确恢複,即恢複後的執行結果和無故障發生時保持一緻,因為即使是primary standby方式,也隻是定期checkpoint,并沒有跟蹤每條消息的執行。商用的streambase支援精确恢複,這主要應用于金融領域。