天天看點

storm筆記:Trident狀态

在storm筆記:Trident應用中說了下Trident的使用,這裡說下Trident幾種狀态的變化及其對應API的使用。

本文内容來自Trident State,部分内容根據實際情況做出修改。

Trident中有對狀态資料進行讀取和寫入操作的一流抽象工具。狀态既可以儲存在拓撲内部,比如儲存在内容中并由HDFS存儲,也可以通過外部存儲(比如Memcached或Cassandra)存儲在資料庫中。而對于Trident的API而言,這兩種機制沒有任何差別。

Trident以容錯的方式管理狀态,以便在重試或失敗時的狀态更新是幂等的。在大資料進行中,資料處理的幂等性是非常重要的一個名額,這樣能夠保證每個消息即使處理了多次,結果也像是隻處理了一次一樣。

在進行狀态更新時可能需要各種級别的容錯能力,在這之前,我們來看一個例子說明實作“恰好一次”語義所需的技巧。比如,正在對流中的資料進行計數聚合操作,每次處理新的元組時,都會将運作的計數結果存儲在資料庫中。

如果發生故障時,元組将重新執行計數操作。這就會在執行狀态更新時出現問題,因為這個時候不知道是不是已經更新過該元組狀态。也許還沒有處理該元組資料,這個時候就需要增加計數。也許已經處理該元組,并成功增加計數,但是在下一步的時候出現問題,這種情況下,就不應該增加計數。也有可能是處理元組正常,更新計數是異常,這個時候就需要更新計數。

是以說,如果隻是在資料庫中存儲計數資訊,就不知道元組是否已經處理過。是以,就需要更多的資訊作為輔助。Trident提供了下面三個性質,來實作“恰好一次”的處理:

元組都是以小批次處理

每批元組都會給出一個唯一ID,稱為事務ID(transaction id,txid)。如果批次重複處理,txid也會相同。

狀态的更新操作是按照元組批次的順序執行的。也就是說,在批次2狀态更新成功之前,不會進行批次3的狀态更新。

根據這些特性,就可以通過檢查到該元組的批次是否已被處理,并根據檢測結果采取适當的操作更新狀态了。采取的具體操作取決于Spout的類型。Spout有三種類型:“非事務型(non-transactional)”,“事務型(transactional)”和“不透明事務型(opaque transactional)”。對應的容錯能力也是三種:“非事務”,“事務”和“不透明事務”。下面來看看Spout的各個類型及對應的容錯能力。

事務型Spout

Trident是按照批次發送元組進行處理的,每個批次的元組被賦予唯一的事務ID。Spout的特性根據他們所提供容錯性保證機制來決定的,而且這種機制也會對每個批次發生作用。事務型Spout有如下特性:

每個批次的txid不變,對于一個特定的txid,重複執行時,它所包含的元組資料與第一次完全相同。

元組隻會在一個批次出現,不會重複(某個元組隻會出現在一個批次中,不會出現在多個批次中)。

每個元組都會出現一次(不會遺漏任何的元組資料)

這是最簡單最容易了解的一種Spout類型,資料流被分割成固定的批次。storm中有與Kafka內建的事務型Spout的擴充,代碼在這裡。

既然事務型Spout這麼簡單易懂,為什麼不在Trident中完全使用事務型Spout呢?其實就在于它的容錯能力。比如,TransactionalTridentKafkaSpout的工作方式是,同一個txid的批次中将包含kafka所有分區的元組。一旦某個批次發出後,出現異常,需要重新發出,就需要完全相同的元組集合才能滿足事務型Spout要求的語義。但是這個時候,kafka某個節點異常(節點關閉或分區不可用),就無法擷取完全相同的的一批元組,那整個拓撲就會應為第3條語義(批次按順序執行)停止。

這就是要有“不透明事務型”Spout的原因了,它能夠容忍資料源節點丢失,而且又能保證資料恰好被操作一次。

注:對kafka比較熟悉的應該會想到,如果某一個topic支援複制,那即使一個節點不可用,還會有其他複制節點頂上,那TransactionalTridentKafkaSpout也能夠避免上面的問題。

下面繼續看看如何設計一個支援恰好一次特性的“事務型”Spout語義(簡單的說就是同一個txid對應的批次元組資料完全一緻)的狀态實作,這種狀态稱為“事務型狀态”。

比如,現在有一個單詞計數的拓撲,需要将單詞計數存儲在key/value資料庫中。key是單詞,value中包含單詞數量。另外,為了确定同一批次元組是否已經被執行,需要将txid也一同存儲在value中。這樣,當需要更新單詞數量的時候,先比較txid是否相同,如果相同,就跳過更新。如果不同,就更新計數。

考慮這個為什麼它工作的例子。 假設您正在處理由以下批次元組組成的txid 3:

比如,要處理一個txid是3的一批元組:

["man"]
["man"]
["dog"]      

目前資料庫中存儲的資料為:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]      

在這個時候,發現“man”對應的txid是1,目前的txid是3,就可以更新了。然後“dog”對應的txid是3,說明同一批次的元組資料已經發送過了,就不需要更新。從這點可以看出,txid是3的批次元組是重複發送的,在更新“dog”數量後,在更新“man”數量前,出現了錯誤。最後的結果就是:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]      

不透明事務型Spout

前面已經提過,不透明事務型Spout不能保證相同txid對應的批次中的元組資料完全一緻。其特點如下:

每個元組都會在有且僅有一個批次中處理成功。

[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)具有這種特性,同時對kafka節點異常有很好的容錯性。OpaqueTridentKafkaSpout在發送一個批次元組的時候,會從上次成功之後的位置開始發送,這樣就能夠保證元組不會漏發或重發。

基于上面的特點,不透明事務型Spout就不同通過txid來直接判斷是否可以跳過狀态更新,因為具有相同txid的批次中元組可能發生了變化。

這就需要存儲更多的狀态資訊了,而不僅僅是一個結果和一個txid了,還需要存儲前一個結果值。

比如,目前批次的計數是2,需要進行一次狀态更新,資料庫中的資料如下:

{
  "value": 4,
  "prevValue": 1,
  "txid": 2
}      

如果目前的txid是3,與資料庫中的不同。在這種情況下,需要将prevValue的值該為value的值,value的值增加2,更新txid為3,最後的結果就是:

{
  "value": 6,
  "prevValue": 4,
  "txid": 3
}      

如果目前的txid是2,等于資料庫中的txid。因為txid相同,說明上一次txid為2的批次處理失敗,但是本次的元組可能與上一次不同了。這個時候,就需要使用本次資料覆寫上次處理結果。也就是說,prevValue值不變,value的值改為prevValue加2,txid不變,最後的結果如下:

{
  "value": 3,
  "prevValue": 1,
  "txid": 2
}      

這種方式的可行性依賴于Trident的強順序性。也就是說,一旦開始處理一個新的批次,就不會重複執行上一個批次。不透明事務型Spout保證了不同批次之間沒有重複的情況,也就是每個元組隻會在一個批次中處理成功,是以就可以放心的使用前一個值與目前值覆寫已存資料了。

非事務型Spout

非事務型Spout不能為批次提供任何保證。是以可能出現”至多一次”的處理,即在某個批次處理過程中失敗了,但是不會在重新處理;也可能提供“至少一次”的處理,即可能會有多個批次分别處理某個元組。也就是沒有辦法實作“恰好一次”的語義。

不同類型spout和狀态總結

下面是不同的spout/狀态組合是否支援“恰好一次”處理語義:

storm筆記:Trident狀态

不透明事務狀态有最強的容錯性,但是因為存儲txid和兩個結果帶來更大的開銷。事務型狀态隻需要存儲一個狀态結果,但是隻對事務型Spout有效。非事務型狀态要求存儲的資料更少,但是不能實作“恰好一次”的處理語義。

是以在選擇容錯與存儲空間中,需要根據具體的需要選擇合适的組合。

狀态API

根據前面來看,“恰好一次”語義的原理有些複雜,但是作為使用者,并不需要了解這些txid對比、多值存儲,因為Trident已經在State中封裝了所有容錯處理邏輯,隻需要想下面着用攜帶碼就行:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
                .parallelismHint(6);      

所有的不透明事務狀态邏輯已經封裝在MemcachedState.opaque中,另外,狀态更新會自動調整為批次操作,這樣可以減少與資料庫之間反複互動帶來的資源浪費。

基本的State接口隻有兩個方法:

public interface State {
    void beginCommit(Long txid); // 對于像DRPC流發生的partitionPersist這樣的事情,可以是null
    void commit(Long txid);
}      

前面已經說過,狀态更新開始和結束時都會擷取txid。Trident并不關心狀态如何操作,使用哪種方式更新,使用哪種方式讀取。

假如有一個包含使用者位址資訊的定制資料庫,需要使用Trident與資料庫互動,State擴充類中包含對于使用者資訊的getter和setter方法:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocation(long userId, String location) {
        // 向資料庫設定位址資訊
    }

    public String getLocation(long userId) {
        // 從資料庫中擷取位址資訊
    }
}      

然後就需要一個StateFactory來建立Trident所需的State對象,LocationDB所需的StateFactory大體結構如下:

public class LocationDBFactory implements StateFactory {
    public State makeState(Map conf, int partitionIndex, int numPartitions) {
        return new LocationDB();
    }
}      

Trident提供了用于查詢狀态源的QueryFunction接口,以及更新狀态源的StateUpdater接口。比如,查詢LocationDB中使用者資訊的QueryLocation:

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));      

QueryLocation的代碼如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for (TridentTuple input : inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}      

QueryFunction操作分為兩步:首先,Trident會将收集到的資料放在一個批次中,發送給batchRetrieve方法。在這個例子中,batchRetrieve方法收到的是一些使用者id。batchRetrieve會傳回一組與輸入元組長度相同的結果。輸入元組與輸出結果中各個元素是彼此對應的。

從這點來看,上面的LocationDB類并沒有發揮Trident批處理優勢,是以需要盡心改造:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
        // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
        // get locations in bulk
    }
}      

對應的QueryLocation類如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for (TridentTuple input : inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}      

這段代碼大幅減少了資料庫操作。

對于更新狀态,可以使用StateUpdater接口。比如下面的更新操作:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for (TridentTuple t : tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}      

對應的更新操作拓撲中就可以是這樣:

TridentTopology topology = new TridentTopology();
TridentState locations =
        topology.newStream("locations", locationsSpout)
                .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());      

partitionPersist方法會更新狀态,StateUpdater接口接收一批元組和狀态資訊,然後更新狀态。上面的LocationUpdater類中僅僅是從元組中抓取使用者id和位址資訊,然後對狀态執行批量處理。然後,partitionPersist會傳回一個表示更新狀态後的TridentState對象。随後就可以在拓撲的其他地方使用stateQuery方法查詢狀态。

在StateUpdater的updateState方法中有一個TridentCollector參數,這個對象是可以将發送進來的元組發送到一個新的資料流中。在這個例子中沒有用到。如果需要進行比如向資料庫更新計數值的後續操作,可以通過TridentState#newValuesStream方法擷取新的資料流資料。

persistentAggregate

Trident使用一個名為persistentAggregate的方法更新狀态。前面已經出現過,這裡再寫一遍:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));      

partitionPersist是一個接收Trident聚合器作為參數并對狀态資料進行更新的方法,persistentAggregate就是建構于partitionPersist上層的一個程式設計抽象。在這個例子中,通過groupBy傳回一個分組資料,Trident需要一個實作MapState接口的對象。分組字段是狀态的key,聚合結果是狀态的value。MapState接口如下:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}      

如果你需要在未分組的資料流上執行聚合操作時,Trident需要一個實作Snapshottable接口的對象:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}      

MemoryMapState 和 MemcachedState 都實作了這些接口.

實作MapState接口

實作MapState接口非常簡單,Trident幾乎把所有事都做完了。OpaqueMap、TransactionalMap和NonTransactionalMap都分别實作了各自的容錯語義。隻需要為這些類提供一個用于對不同key/value進行批量擷取、批量修改的IBackingMap實作就行。IBackingMap接口如下:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys);
    void multiPut(List<List<Object>> keys, List<T> vals);
}      

OpaqueMap會使用OpaqueValue作為vals參數調用multiPut方法;TransactionalMap會使用TransactionalValue作為參數;NonTransactionalMaps會直接把拓撲對象傳入。

Trident還提供了CachedMap類來實作key/value的自動LRU緩存操作。

最後,Trident還提供了SnapshottableMap類,該類通過将全局聚合的結果存入一個固定key中的方法将MapState對象轉化為Snapshottable對象。

可以參考MemcachedState的實作來了解如何将這些工具結合在一起來提供一個高性能的MapState實作。MemcachedState支援不透明事務、事務和非事務語義。