天天看點

Storm中的可靠性

     我們知道storm有一個很重要的特性,那就是storm api能夠保證它的一個tuple能夠被完全處理,這一點尤為重要,其實storm中的可靠性是由spout和bolt元件共同完成的,下面就從spout和bolt兩個友善給大家介紹一下storm中的可靠性,最後會給出一個實作了可靠性的例子。

     在storm中,消息處理可靠性從spout開始的。storm為了保證資料能正确的被處理, 對于spout産生的每一個tuple,storm都能夠進行跟蹤,這裡面涉及到了ack/fail的處理, 如果一個tuple被處理成功,那麼spout便會調用其ack方法,如果失敗,則會調用fail方法。而topology中處理tuple的每一個bolt都會通過outputcollector來告知storm,目前bolt處理是否成功。

     我們知道spout必須能夠追蹤它發射的所有tuples或其子tuples,并且在這些tuples處理失敗時能夠重發。那麼spout如何追蹤tuple呢?storm是通過一個簡單的anchor機制來實作的(在下面的bolt可靠性中會講到)。

Storm中的可靠性

     如上圖所示,實線代表的是spout發射的根tuple,而虛線代表的就是來源于根tuple的子tuples。這個圖就是一個tupletree。在這個tree中,所有的bolt都會ack或fail一個tuple,如果tree中所有的bolt都ack了經過它的tuple,那麼spout的ack方法就會被調用,表示整個消息被處理完成。如果tree中的任何一個bolt fail一個tuple,或者整個處理過程逾時,則spout的fail方法便會被調用。

     另外一點, storm隻是通過ack/fail機制來告訴應用方bolt中間的處理情況, 對于成功/失敗該如何處理, 必須由應用自己來決定, 因為storm内部也沒有儲存失敗的具體資料, 但是也有辦法知道失敗記錄,因為spout的ack/fail方法會附帶一個msgid對象, 我們可以在最初發射tuple的時候将将msgid設定為tuple, 然後在ack/fail中對該tuple進行處理。這裡其實有個問題, 就是每個bolt執行完之後要顯式的調用ack/fail,否則會出現tuple不釋放導緻oom.

不知道storm在最初設計的時候,為什麼不将bolt的ack設定為預設調用。

     storm的ispout接口定義了三個與可靠性有關的方法:nexttuple,ack和fail。

     我們知道,當storm的spout發射一個tuple後,他便會調用nexttuple()方法,在這個過程中,保證可靠性處理的第一步就是為發射出的tuple配置設定一個唯一的id,并把這個id傳給emit()方法:

     為tuple配置設定一個唯一id的目的就是為了告訴storm,spout希望這個tuple産生的tuple tree在處理完成或失敗後告知它,如果tuple被處理成功,spout的ack()方法就會被調用,相反如果處理失敗,spout的fail()方法就會被調用,tuple的id也都會傳入這兩個方法中。

     需要注意的是,雖然spout有可靠性機制,但這個機制是否啟用由我們控制的。ibasicbolt在emit一個tuple後自動調用ack()方法,用來實作比較簡單的計算。如果是irichbolt的話,如果想要實作anchor,必須自己調用ack方法。

     bolt中的可靠性主要靠兩步來實作:

發射衍生tuple的同時anchor原tuple

對各個tuples做ack或fail處理     

     anchor一個tuple就意味着在輸入tuple和其衍生tuple之間建立了關聯,關聯之後的tuple便加入了tuple tree。我們可以通過如下方式anchor一個tuple:

     如果我們發射新tuple的時候不同時發射元tuple,那麼新發射的tuple不會參與到整個可靠性機制中,它們的fail不會引起root tuple的重發,我們成為unanchor:

     ack和fail一個tuple的操作方法: 

     上面講過了,ibasicbolt 實作類不關心ack/fail, spout的ack/fail完全由後面的bolt的ack/fail來決定. 其execute方法的basicoutputcollector參數也沒有提供ack/fail方法給你調用. 相當于忽略了該bolt的ack/fail行為。

     在 irichbolt實作類中, 如果outputcollector.emit(oldtuple,newtuple)這樣調用來發射tuple(anchoring), 那麼後面的bolt的ack/fail會影響spout ack/fail, 如果collector.emit(newtuple)這樣來發射tuple(在storm稱之為anchoring), 則相當于斷開了後面bolt的ack/fail對spout的影響.spout将立即根據目前bolt前面的ack/fail的情況來決定調用spout的ack/fail.

是以某個bolt後面的bolt的成功失敗對你來說不關心, 你可以直接通過這種方式來忽略.中間的某個bolt fail了, 不會影響後面的bolt執行, 但是會立即觸發spout的fail. 相當于短路了, 後面bolt雖然也執行了, 但是ack/fail對spout已經無意義了. 也就是說, 隻要bolt集合中的任何一個fail了, 會立即觸發spout的fail方法. 而ack方法需要所有的bolt調用為ack才能觸發. 是以ibasicbolt用來做filter或者簡單的計算比較合适。

    storm的可靠性是由spout和bolt共同決定的,storm利用了anchor機制來保證處理的可靠性。如果spout發射的一個tuple被完全處理,那麼spout的ack方法即會被調用,如果失敗,則其fail方法便會被調用。在bolt中,通過在emit(oldtuple,newtuple)的方式來anchor一個tuple,如果處理成功,則需要調用bolt的ack方法,如果失敗,則調用其fail方法。一個tuple及其子tuple共同構成了一個tupletree,當這個tree中所有tuple在指定時間内都完成時spout的ack才會被調用,但是當tree中任何一個tuple失敗時,spout的fail方法則會被調用。

     ibasicbolt類會自動調用ack/fail方法,而irichbolt則需要我們手動調用ack/fail方法。我們可以通過topology_message_timeout_secs參數來指定一個tuple的處理完成時間,若這個時間未被處理完成,則spout也會調用fail方法。

一個實作可靠性的spout:    

一個實作可靠性的bolt:

繼續閱讀