原文位址:https://github.com/nathanmarz/storm/wiki/Trident-state
-----------------------------
Trident在讀寫有狀态的資料源方面是有着一流的抽象封裝的。狀态即可以保留在topology的内部,比如說記憶體和HDFS,也可以放到外部存儲當中,比如說Memcached或者Cassandra。這些都是使用同一套Trident API。
Trident以一種容錯的方式來管理狀态以至于當你在更新狀态的時候你不需要去考慮錯誤以及重試的情況。這種保證每個消息被處理有且隻有一次的原理會讓你更放心的使用Trident的topology。
在進行狀态更新時,會有不同的容錯級别。在外面一起來讨論這點之前,讓我們先通過一個例子來說明一下如果想要坐到有且隻有一次處理的必要的技巧。假定你在做一個關于某stream的計數聚合器,你想要把運作中的計數存放到一個資料庫中。如果你在資料庫中存了一個值表示這個計數,每次你處理一個tuple之後,就将資料庫存儲的計數加一。
當錯誤發生,truple會被重播。這就帶來了一個問題:當狀态更新的時候,你完全不知道你是不是在之前已經成功處理過這個tuple。也許你之前從來沒處理過這個tuple,這樣的話你就應該把count加一。另外一種可能就是你之前是成功處理過這個tuple的,但是這個在其他的步驟處理這個tuple的時候失敗了,在這種情況下,我們就不應該将count加一。再或者,你接受到過這個tuple,但是上次處理這個tuple的時候在更新資料庫的時候失敗了,這種情況你就應該去更新資料庫。
如果隻是簡單的存計數到資料庫的話,你是完全不知道這個tuple之前是否已經被處理過了的。是以你需要更多的資訊來做正确的決定。Trident提供了下面的語義來實作有且隻有一次被處理的目标。
- Tuples 是被分成小的集合被批量處理的 (see the tutorial)
- 每一批tuples被給定一個唯一ID作為事務ID (txid). 當這一批tuple被重播時, txid不變.
- 批與批之間的狀态更新時嚴格順序的。比如說第三批tuple的狀态的更新必須要等到第二批tuple的狀态更新成功之後才可以進行.
有了這些定義,你的狀态實作可以檢測到目前這批tuple是否以前處理過,并根據不同的情況進行不同的處理。你需要才去的行動取決于你的輸入spout。有三種不同類型的可以容錯的spout: 非事務的,事務的,以及不透明事務的spout。對應的,也有3種容錯的狀态:非事務的,事務的,以及不透明事務的狀态。讓我們一起來看看每一種spout類型能夠支援什麼樣的容錯類型。
Transactional spouts
記住,Trident是以小批量(batch)的形式在處理tuple,并且每一批都會配置設定一個唯一的transaction id。 不同的spout會根據他們可以給予不同的批量tuple的guarantee的能力有不同的屬性。一個transactional spout會有如下這些屬性:
1. 有着同樣txid的batch一定是一樣的。當重播一個txid對應的batch時,一定會重播和之前對應txid的batch中同樣的tuples。
2. 各個batch之間是沒有交集的。每個tuple隻能屬于一個batch
3. 每一個tuple都屬于一個batch,無一例外
這是一類非常容易了解的spout, tuple 流被劃分為固定的batch并且永不改變。trident-kafka 有一個 transactional spout 的實作.
你也許會問:為什麼我們不總是使用transactional spout?這很容易了解。一個原因是并不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txid的batch所包含的一個屬于一個topic的來自于所有Kafka partition的tuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足 transactional spout的語義。現在我們假定一個batch被TransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,并且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。
這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對于丢失源節點這種情況是容錯的,仍然能夠幫你達到有且隻有一次處理的語義。後面會對這種spout有所介紹。
(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的)
在外面來讨論"opaque transactional" spout之前,我們先來看看你應該怎樣設計一個State來實作transactional spout的有且隻有一次執行的語義。這個State的類型是"transactional state" 并且它利用了任何一個txid總是對應同樣的tuple序列這個語義。
假如說你有一個用來計算單詞出現次數的topology,你想要将單詞的出現次數以key/value對的形式存儲到資料庫中。key就是單詞,value就是這個這個單詞出現的次數。你已經看到隻是存儲一個數量是不足以知道你是否已經處理過一個batch的。你可以通過将value和txid一起存儲到資料庫中。這樣的話,當更新這個count之前,你可以先去比較資料庫中存儲的txid和現在要存儲的txid。如果一樣,就跳過什麼都不做,因為這個value之前已經被處理過了。如果不一樣,就執行存儲。這個邏輯可以工作的前提就是txid永不改變,并且Trident保證狀态的更新是在batch之間嚴格順序進行的。
考慮下面這個例子的運作邏輯, 假定你在處理一個txid為3的包含下面tuple的batch:
[java] view plain copy
- ["man"]
- ["man"]
- ["dog"]
假定資料庫中目前儲存了下面這樣的key/value 對:
[java] view plain copy
- man => [count=3, txid=1]
- dog => [count=4, txid=3]
- apple => [count=10, txid=2]
單詞“man”對應的txid是1. 因為目前的txid是3,你可以确定你還沒有為這個batch中的tuple更新過這個單詞的數量。是以你可以放心的給count加2并更新txid為3. 與此同時,單詞“dog”的txid和目前的txid是相同的,是以你可以跳過這次更新。此時資料庫中的資料如下:
[java] view plain copy
- man => [count=5, txid=3]
- dog => [count=4, txid=3]
- apple => [count=10, txid=2]
接下來我們一起再來看看 opaque transactional spout已經怎樣去為這種spout設計相應的state。
Opaque transactional spouts
正如之前說過的,opaque transactional spout并不能確定一個txid所對應的batch的一緻性。一個opaque transactional spout有如下屬性:
- 每個tuple隻在一個batch中被成功處理。然而,一個tuple在一個batch中被處理失敗後,有可能會在另外的一個batch中被成功處理
OpaqueTridentKafkaSpout 是一個擁有這種屬性的spout,并且它是容錯的,即使Kafak的節點丢失。當OpaqueTridentKafkaSpout 發送一個batch的時候, 它會從上個batch成功結束發送的位置開始發送一個tuple序列。這就確定了永遠沒有任何一個tuple會被跳過或者被放在多個batch中被多次成功處理的情況.
使用opaque transactional spout,再使用和transactional spout相同的處理方式:判斷資料庫中存放的txid和目前txid去做對比已經不好用了。這是因為在state的更新過程之間,batch可能已經變了。
你隻能在資料庫中存儲更多的資訊。除了value和txid,你還需要存儲之前的數值在資料庫中。讓我們還是用上面的例子來說明這個邏輯。假定你目前batch中的對應count是“2”, 并且我們需要進行一次狀态更新。而目前資料庫中存儲的資訊如下:
[java] view plain copy
- { value = 4,
- prevValue = 1,
- txid = 2
- }
如果你目前的txid是3, 和資料庫中的txid不同。那麼就将value中的值設定到prevValue中,根據你目前的count增加value的值并更新txid。更新後的資料庫資訊如下:
[java] view plain copy
- { value = 6,
- prevValue = 4,
- txid = 3
- }
現在外面再假定你的目前txid是2,和資料庫中存放的txid相同。這就說明資料庫裡面value中的值包含了之前一個和目前txid相同的batch的更新。但是上一個batch和目前這個batch可能已經完全不同了,以至于我們需要無視它。在這種情況下,你需要在prevValue的基礎上加上目前count的值并将結果存放到value中去。資料庫中的資訊如下所示:
[java] view plain copy
- { value = 3,
- prevValue = 1,
- txid = 2
- }
因為Trident保證了batch之間的強順序性,是以這種方法是有效的。一旦Trident去處理一個新的batch,它就不會重新回到之前的任何一個batch。并且由于 opaque transactional spout確定在各個batch之間是沒有共同成員的,每個tuple隻會在一個batch中被成功處理,你可以安全的在之前的值上進心更新。
Non-transactional spouts
Non-transactional spout(非事務spout)不確定每個batch中的tuple的規則。是以他可能是最多被處理一次的,如果tuple被處理失敗就不重發的話。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實作有且隻有一次被成功處理的語義的。
Summary of spout and state types
這個圖展示了哪些spout和state的組合能夠實作有且隻有一次被成功處理的語義:

Opaque transactional state有着最為強大的容錯性。但是這是以存儲更多的資訊作為代價的。Transactional states 需要存儲較少的狀态資訊,但是僅能和 transactional spouts協同工作. Finally, non-transactional state所需要存儲的資訊最少,但是卻不能實作有且隻有一次被成功處理的語義。
State和Spout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更适合你。
State APIs
你已經看到一些錯綜複雜的方法來實作有且隻有一次被執行的語義。Trident這樣做的好處把所有容錯想過的邏輯都放在了State裡面。 作為一個使用者,你并不需要自己去處理複雜的txid,存儲多餘的資訊到資料庫中,或者是任何其他類似的事情。你隻需要寫如下這樣簡單的code:
[java] view plain copy
- TridentTopology topology = new TridentTopology();
- TridentState wordCounts =
- topology.newStream("spout1", spout)
- .each(new Fields("sentence"), new Split(), new Fields("word"))
- .groupBy(new Fields("word"))
- .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
- .parallelismHint(6);
所有管理 opaque transactional state所需的邏輯都在MemcachedState.opaque方法的調用中被涵蓋了,除此之外,資料庫的更新會自動以batch的形式來進行以避免多次通路資料庫。
State的基本接口隻包含下面兩個方法:
[java] view plain copy
- public interface State {
- void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream
- void commit(Long txid);
- }
當一個State更新開始時,以及當一個State更新結束時你都會被告知,并且會告訴你該次的txid。Trident并沒有對你的state的工作方式有任何的假定。
假定你自己搭了一套資料庫來存儲使用者位置資訊,并且你想要在Trident中去通路這個資料。你的state的實作應該有使用者資訊的set、get方法
[java] view plain copy
- public class LocationDB implements State {
- public void beginCommit(Long txid) {
- }
- public void commit(Long txid) {
- }
- public void setLocation(long userId, String location) {
- // code to access database and set location
- }
- public String getLocation(long userId) {
- // code to get location from database
- }
- }
然後你還需要提供給Trident一個StateFactory來在Trident的task中建立你的State對象。LocationDB 的 StateFactory可能會如下所示:
[java] view plain copy
- public class LocationDBFactory implements StateFactory {
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
- return new LocationDB();
- }
- }
Trident提供了一個QueryFunction接口用來實作Trident中在一個source state上查詢的功能。同時還提供了一個StateUpdater來實作Trident中更新source state的功能。比如說,讓我們寫一個查詢位址的操作,這個操作會查詢LocationDB來找到使用者的位址。讓我們以怎樣在topology中實作該功能開始,假定這個topology會接受一個使用者id作為輸入資料流。
[java] view plain copy
- TridentTopology topology = new TridentTopology();
- TridentState locations = topology.newStaticState(new LocationDBFactory());
- topology.newStream("myspout", spout)
- .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
接下來讓我們一起來看看QueryLocation 的實作應該是什麼樣的:
[java] view plain copy
- public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
- public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
- List<String> ret = new ArrayList();
- for(TridentTuple input: inputs) {
- ret.add(state.getLocation(input.getLong(0)));
- }
- return ret;
- }
- public void execute(TridentTuple tuple, String location, TridentCollector collector) {
- collector.emit(new Values(location));
- }
- }
QueryFunction的執行分為兩部分。首先Trident收集了一個batch的read操作并把他們統一交給batchRetrieve。在這個例子中,batchRetrieve會接受到多個使用者id。batchRetrieve應該返還一個和輸入tuple數量相同的result序列。result序列中的第一個元素對應着第一個輸入tuple的結果,result序列中的第二個元素對應着第二個輸入tuple的結果,以此類推。
你可以看到,這段代碼并沒有想Trident那樣很好的利用batch的優勢,而是為每個輸入tuple去查詢了一次LocationDB。是以一種更好的操作LocationDB方式應該是這樣的:
[java] view plain copy
- public class LocationDB implements State {
- public void beginCommit(Long txid) {
- }
- public void commit(Long txid) {
- }
- public void setLocationsBulk(List<Long> userIds, List<String> locations) {
- // set locations in bulk
- }
- public List<String> bulkGetLocations(List<Long> userIds) {
- // get locations in bulk
- }
- }
接下來,你可以這樣改寫上面的QueryLocation:
[java] view plain copy
- public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
- public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
- List<Long> userIds = new ArrayList<Long>();
- for(TridentTuple input: inputs) {
- userIds.add(input.getLong(0));
- }
- return state.bulkGetLocations(userIds);
- }
- public void execute(TridentTuple tuple, String location, TridentCollector collector) {
- collector.emit(new Values(location));
- }
- }
通過有效減少通路資料庫的次數,這段代碼比上一個實作會高效的多。
如何你要更新State,你需要使用StateUpdater接口。下面是一個StateUpdater的例子用來将新的位址資訊更新到LocationDB當中。
[java] view plain copy
- public class LocationUpdater extends BaseStateUpdater<LocationDB> {
- public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
- List<Long> ids = new ArrayList<Long>();
- List<String> locations = new ArrayList<String>();
- for(TridentTuple t: tuples) {
- ids.add(t.getLong(0));
- locations.add(t.getString(1));
- }
- state.setLocationsBulk(ids, locations);
- }
- }
下面列出了你應該如何在Trident topology中使用上面聲明的LocationUpdater:
[java] view plain copy
- TridentTopology topology = new TridentTopology();
- TridentState locations =
- topology.newStream("locations", locationsSpout)
- .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
partitionPersist 操作會更新一個State。其内部是将 State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。
在這段代碼中,隻是簡單的從輸入的tuple中提取處userid和對應的location,并一起更新到State中。
partitionPersist 會傳回一個TridentState對象來表示被這個Trident topoloy更新過的location db。 然後你就可以使用這個state在topology的任何地方進行查詢操作了。
同時,你也可以看到我們傳了一個TridentCollector給StateUpdaters。 emit到這個collector的tuple就會去往一個新的stream。在這個例子中,我們并沒有去往一個新的stream的需要,但是如果你在做一些事情,比如說更新資料庫中的某個count,你可以emit更新的count到這個新的stream。然後你可以通過調用TridentState#newValuesStream方法來通路這個新的stream來進行其他的處理。
persistentAggregate
Trident有另外一種更新State的方法叫做persistentAggregate。 你在之前的word count例子中應該已經見過了,如下:
[java] view plain copy
- TridentTopology topology = new TridentTopology();
- TridentState wordCounts =
- topology.newStream("spout1", spout)
- .each(new Fields("sentence"), new Split(), new Fields("word"))
- .groupBy(new Fields("word"))
- .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎麼去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的stream,Trident會期待你提供的state是實作了MapState接口的。用來進行group的字段會以key的形式存在于State當中,聚合後的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:
[java] view plain copy
- public interface MapState<T> extends State {
- List<T> multiGet(List<List<Object>> keys);
- List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
- void multiPut(List<List<Object>> keys, List<T> vals);
- }
當你在一個未經過group的stream上面進行聚合的話,Trident會期待你的state實作 Snapshottable接口:
[java] view plain copy
- public interface Snapshottable<T> extends State {
- T get();
- T update(ValueUpdater updater);
- void set(T o);
- }
MemoryMapState 和 MemcachedState 都實作了上面的2個接口。
Implementing Map States
在Trident中實作MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類實作了所有相關的邏輯,包括容錯的邏輯。你隻需要将一個IBackingMap 的實作提供給這些類就可以了。IBackingMap接口看上去如下所示:
[java] view plain copy
- public interface IBackingMap<T> {
- List<T> multiGet(List<List<Object>> keys);
- void multiPut(List<List<Object>> keys, List<T> vals);
- }
OpaqueMap's會用OpaqueValue的value來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps隻是簡單的把從Topology擷取的object傳遞給multiPut。
Trident還提供了一種CachedMap類來進行自動的LRU cache。
另外,Trident 提供了 SnapshottableMap 類将一個MapState 轉換成一個 Snapshottable 對象.
大家可以看看 MemcachedState的實作,進而學習一下怎樣将這些工具組合在一起形成一個高性能的MapState實作。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。