天天看點

storm簡介

伴随着資訊科技日新月異的發展,資訊呈現出爆發式的膨脹,人們擷取資訊的途徑也更加多樣、更加便捷,同時對于資訊的時效性要求也越來越高。舉個搜尋場景中的例子,當一個賣家釋出了一條寶貝資訊時,他希望的當然是這個寶貝馬上就可以被賣家搜尋出來、點選、購買啦,相反,如果這個寶貝要等到第二天或者更久才可以被搜出來,估計這個大哥就要罵娘了。再舉一個推薦的例子,如果使用者昨天在淘寶上買了一雙襪子,今天想買一副泳鏡去遊泳,但是卻發現系統在不遺餘力地給他推薦襪子、鞋子,根本對他今天尋找泳鏡的行為視而不見,估計這哥們心裡就會想推薦你妹呀。其實稍微了解點背景知識的碼農們都知道,這是因為背景系統做的是每天一次的全量處理,而且大多是在夜深人靜之時做的,那麼你今天白天做的事情當然要明天才能反映出來啦。

實作一個實時計算系統

全量資料處理使用的大多是鼎鼎大名的hadoop或者hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量資料處理上得到了廣泛的使用。但是,hadoop不擅長實時計算,因為它天然就是為批處理而生的,這也是業界一緻的共識。否則最近這兩年也不會有s4,storm,puma這些實時計算系統如雨後春筍般冒出來啦。先抛開s4,storm,puma這些系統不談,我們首先來看一下,如果讓我們自己設計一個實時計算系統,我們要解決哪些問題。

低延遲。都說了是實時計算系統了,延遲是一定要低的。

高性能。性能不高就是浪費機器,浪費機器是要受批評的哦。

分布式。系統都是為應用場景而生的,如果你的應用場景、你的資料和計算單機就能搞定,那麼不用考慮這些複雜的問題了。我們所說的是單機搞不定的情況。

可擴充。伴随着業務的發展,我們的資料量、計算量可能會越來越大,是以希望這個系統是可擴充的。

容錯。這是分布式系統中通用問題。一個節點挂了不能影響我的應用。

好,如果僅僅需要解決這5個問題,可能會有無數種方案,而且各有千秋,随便舉一種方案,使用消息隊列+分布在各個機器上的工作程序就ok啦。我們再繼續往下看。

容易在上面開發應用程式。親,你設計的系統需要應用程式開發人員考慮各個處理元件的分布、消息的傳遞嗎?如果是,那有點麻煩啊,開發人員可能會用不好,也不會想去用。

消息不丢失。使用者釋出的一個寶貝消息不能在實時處理的時候給丢了,對吧?更嚴格一點,如果是一個精确資料統計的應用,那麼它處理的消息要不多不少才行。這個要求有點高哦。

消息嚴格有序。有些消息之間是有強相關性的,比如同一個寶貝的更新和删除操作消息,如果處理時搞亂順序完全是不一樣的效果了。

不知道大家對這些問題是否都有了自己的答案,下面讓我們帶着這些問題,一起來看一看storm的解決方案吧。

Storm是什麼

如果隻用一句話來描述storm的話,可能會是這樣:分布式實時計算系統。按照storm作者的說法,storm對于實時計算的意義類似于hadoop對于批處理的意義。我們都知道,根據google mapreduce來實作的hadoop為我們提供了map, reduce原語,使我們的批處理程式變得非常地簡單和優美。同樣,storm也為實時計算提供了一些簡單優美的原語。我們會在第三節中詳細介紹。

我們來看一下storm的适用場景。

流資料處理。Storm可以用來處理源源不斷流進來的消息,處理之後将結果寫入到某個存儲中去。

分布式rpc。由于storm的處理元件是分布式的,而且處理延遲極低,是以可以作為一個通用的分布式rpc架構來使用。當然,其實我們的搜尋引擎本身也是一個分布式rpc系統。

說了半天,好像都是很玄乎的東西,下面我們開始具體講解storm的基本概念和它内部的一些實作原理吧。

Storm的基本概念

首先我們通過一個 storm 和hadoop的對比來了解storm中的基本概念。

Hadoop

Storm

系統角色

JobTracker

Nimbus

TaskTracker

Supervisor

Child

Worker

應用名稱

Job

Topology

元件接口

Mapper/Reducer

Spout/Bolt

表3-1

接下來我們再來具體看一下這些概念。

Nimbus:負責資源配置設定和任務排程。

Supervisor:負責接受nimbus配置設定的任務,啟動和停止屬于自己管理的worker程序。

Worker:運作具體處理元件邏輯的程序。

Task:worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之後,task不再與實體線程對應,同一個spout/bolt的task可能會共享一個實體線程,該線程稱為executor。

下面這個圖描述了以上幾個角色之間的關系。

<a href="http://www.searchtb.com/wp-content/uploads/2012/08/deploy0.jpg"></a>

圖3-1

Topology:storm中運作的一個實時應用程式,因為各個元件間的消息流動形成邏輯上的一個拓撲結構。

Spout:在一個topology中産生源資料流的元件。通常情況下spout會從外部資料源中讀取資料,然後轉換為topology内部的源資料。Spout是一個主動的角色,其接口中有個nextTuple()函數,storm架構會不停地調用此函數,使用者隻要在其中生成源資料即可。

Bolt:在一個topology中接受資料然後執行處理的元件。Bolt可以執行過濾、函數操作、合并、寫資料庫等任何操作。Bolt是一個被動的角色,其接口中有個execute(Tuple input)函數,在接受到消息後會調用此函數,使用者可以在其中執行自己想要的操作。

Tuple:一次消息傳遞的基本單元。本來應該是一個key-value的map,但是由于各個元件間傳遞的tuple的字段名稱已經事先定義好,是以tuple中隻要按序填入各個value就行了,是以就是一個value list.

Stream:源源不斷傳遞的tuple就組成了stream。

10.  stream grouping:即消息的partition方法。Storm中提供若幹種實用的grouping方式,包括shuffle, fields hash, all, global, none, direct和localOrShuffle等

相比于s4, puma等其他實時計算系統,storm最大的亮點在于其記錄級容錯和能夠保證消息精确處理的事務功能。下面就重點來看一下這兩個亮點的實作原理。

Storm記錄級容錯的基本原理

首先來看一下什麼叫做記錄級容錯?storm允許使用者在spout中發射一個新的源tuple時為其指定一個message id, 這個message id可以是任意的object對象。多個源tuple可以共用一個message id,表示這多個源 tuple對使用者來說是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知使用者每一個消息單元是否在指定時間内被完全處理了。那什麼叫做完全處理呢,就是該message id綁定的源tuple及由該源tuple後續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message

1綁定的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,并最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。

<a href="http://www.searchtb.com/wp-content/uploads/2012/08/message10.jpg"></a>

圖4-1

在storm的topology中有一個系統級元件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id綁定的若幹tuple的處理路徑,如果在使用者設定的最大逾時時間内這些tuple沒有被完全處理,那麼acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,如果曾經嘗試過這麼做的同學可以仔細地思考一下這件事的複雜程度。但是storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來複習一個數學定理。

A xor A = 0.

A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。

storm中使用的巧妙方法就是基于這個定理。具體過程是這樣的:在spout中系統會為使用者指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及後續的bolt作為該消息單元的唯一辨別。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之後,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之後,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker隻需要對這些id做一個簡單的異或運算,就能判斷出該root

id對應的消息單元是否處理完成了。下面通過一個圖示來說明這個過程。

<a href="http://www.searchtb.com/wp-content/uploads/2012/08/ack10.jpg"></a>

圖4-1 spout中綁定message 1生成了兩個源tuple,id分别是0010和1011.

<a href="http://www.searchtb.com/wp-content/uploads/2012/08/ack20.jpg"></a>

圖4-2 bolt1處理tuple 0010時生成了一個新的tuple,id為0110.

<a href="http://www.searchtb.com/wp-content/uploads/2012/08/ack30.jpg"></a>

圖4-3 bolt2處理tuple 1011時生成了一個新的tuple,id為0111.

<a href="http://www.searchtb.com/wp-content/uploads/2012/08/ack40.jpg"></a>

圖4-4 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.

可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id并不是完全各異的,acker可能會在消息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的确是存在的,但是在實際中其機率是極低極低的,完全可以忽略。

Storm的事務拓撲

事務拓撲(transactional topology)是storm0.7引入的特性,在最近釋出的0.8版本中已經被封裝為Trident,提供了更加便利和直覺的接口。因為篇幅所限,在此對事務拓撲做一個簡單的介紹。

事務拓撲的目的是為了滿足對消息處理有着極其嚴格要求的場景,例如實時計算某個使用者的成交筆數,要求結果完全精确,不能多也不能少。Storm的事務拓撲是完全基于它底層的spout/bolt/acker原語實作的,通過一層巧妙的封裝得出一個優雅的實作。個人覺得這也是storm最大的魅力之一。

事務拓撲簡單來說就是将消息分為一個個的批(batch),同一批内的消息以及批與批之間的消息可以并行處理,另一方面,使用者可以設定某些bolt為committer,storm可以保證committer的finishBatch()操作是按嚴格不降序的順序執行的。使用者可以利用這個特性通過簡單的程式設計技巧實作消息處理的精确。

Storm在淘寶

由于storm的核心是clojure編寫的(不過大部分的拓展工作都是java編寫的),為我們了解它的實作帶來了一定的困難,好在大部分情況下storm都比較穩定,當然我們也在盡力熟悉clojure的世界。我們在使用storm時通常都是選擇java語言開發應用程式。

在淘寶,storm被廣泛用來進行實時日志處理,出現在實時統計、實時風控、實時推薦等場景中。一般來說,我們從類kafka的metaQ或者基于hbase的timetunnel中讀取實時日志消息,經過一系列處理,最終将處理結果寫入到一個分布式存儲中,提供給應用程式通路。我們每天的實時消息量從幾百萬到幾十億不等,資料總量達到TB級。對于我們來說,storm往往會配合分布式存儲服務一起使用。在我們正在進行的個性化搜尋實時分析項目中,就使用了timetunnel + hbase + storm + ups的架構,每天處理幾十億的使用者日志資訊,從使用者行為發生到完成分析延遲在秒級。

Storm的未來

Storm0.7系列的版本已經在各大公司得到了廣泛使用,最近釋出的0.8版本中引入了State,使得其從一個純計算架構演變成了一個包含存儲和計算的實時計算新利器,還有剛才提到的Trident,提供更加友好的接口,同時可定制scheduler的特性也為其針對不同的應用場景做優化提供了更便利的手段,也有人已經在基于storm的實時ql(query language)上邁出了腳本。在服務化方面,storm一直在朝着融入mesos架構的方向努力。同時,storm也在實作細節上不斷地優化,使用很多優秀的開源産品,包括kryo,

Disruptor, curator等等。可以想象,當storm發展到1.0版本時,一定是一款無比傑出的産品,讓我們拭目以待,當然,最好還是參與到其中去吧,同學們。

參考文獻

繼續閱讀