天天看點

Trident State 詳解

直譯過來就是trident狀态,這裡的狀态主要涉及到trident如何實作一緻性語義規則,trident的計算結果将被如何送出,如何儲存,如何更新等等。我們知道trident的計算都是以batch為機關的,但是batch在中的tuple在處理過程中有可能會失敗,失敗之後bach又有可能會被重播,這就涉及到很多事務一緻性問題。trident state就是管理這些問題的一套方案,與這套方案對應的就是trident state api。這樣說可能還比較抽象,下面就用一個例子具體說明一下。

假設有這麼一個需求,統計一個資料流中各個單詞出現的數量,并把單詞和其數量更新到資料庫中。假設我們在資料庫中隻有兩個字段,單詞和其數量,在計數過程中,如果遇到相同的單詞則就把其數量加一。但是這麼做有一個問題,如果某個單詞是被重播的單詞,就有可能導緻這個單詞被多加了一遍。是以,在資料庫中隻儲存單詞和其數量兩個字段是無法做到“資料隻被處理一次”的語義要求的。

trident定義了如下語義規則:

有了這三個規則,我們就可以通過txid知道batch是否被處理過,然後就可以根據實際情況來更新狀态資訊了。很明顯,要滿足這幾個語義規則,就需要spout來支援,因為把tuple封裝成batch,配置設定txid等等都是有spout來負責的。

但是在具體應用場景中,storm應該能夠提供不同的容錯級别,因為某些情況下我們并不需要強一緻性。為了更靈活的處理,trident提供了三類spout,分别是:

注意,所有的trident spout都是以batch的形式發送資料,每個batch也都會配置設定一個唯一的txid,決定它們有不同性質的地方在于它們對各自的batch提供了什麼樣的保證。

我們已經知道trident 提供了三種類型的spout來服務trident state管理,那麼對應的trident state也有三種類型:

transactional spouts對batch的發送提供了如下保證:

這三個特性是“最完美”的保證,也最容易了解,stream被分割成固定的batch,而且不會改變。storm就提供了一個transactional spout的實作:transactionaltridentkafkaspout。

我們現在再看上面1.1節提到的那個執行個體,我們要把單詞和其數量儲存在資料庫中,為了保證“資料隻被處理一次”,除了要儲存單詞和數量兩個字段之外,我們再加一個字段txid。在更新資料時,我們先對比一下目前的資料的txid和資料庫中資料的txid,若txid相同,說明是被重播的資料,直接跳過即可,如果不同,則把兩個數值相加即可。

下面具體說明一下,假設目前處理的batch的txid=3,其中的tuples為:

再假設資料庫中儲存的資料為:

資料庫中“man”單詞的txid為1,而目前batch的txid為3,說明目前batch中的“man”單詞未被累加過,是以需要把目前batch中”man”的個數累加到資料庫中。資料庫中“dog”單詞的txid為3,和目前batch的txid相同,說明已經被累計過了直接跳過。最終資料庫中的結果變為:

總結一下整個處理過程:

上面已經提到過,并不是所有情形下都需要保證強一緻性。例如在transactionaltridentkafkaspout中(關于kafka相關介紹,點這裡),如果它的一個batch中的tuples來自一個topic的所有partitions,如果要滿足transactionnal spout語義的話,一旦這個batch因為某些失敗而被重發,重發batch中的所有tuple必須與這個batch中的完全一緻,而恰好kafka叢集某個節點down掉導緻這個topic其中一個partition無法使用,那麼就會導緻這個batch無法湊齊所有tuple(無法擷取失敗partition上的資料),整個處理過程被挂起。而opaque transactional spouts就可以解決這個問題。

opaque transactional spouts提供了如下保證:

怎麼了解這個特性呢,簡要來說就opaquetransactional spout和transactional spouts基本差不多,隻是在opaque transactional spout中,相同txid的batch中的tuple集合可能不一樣。opaquetridentkafkaspout就是符合這種特性的spout的,是以它可以容忍kafka節點失敗。

因為重播的batch中的tuple集合可能不一樣,是以對于opaque transactional spout,就不能根據txid是否一緻來決定是否需要更新狀态了。我們需要在資料庫中儲存更多的狀态資訊,除了單詞名,數量、txid之外,我們還需要儲存一個pre-value來記錄前一次計算的值。我們再用上面例子具體說明一下。

假設資料庫中的記錄如下:

假設目前batch的count值為2,txid=3。因為目前txid和資料庫中的不同,我們需要把prevalue替換成value的值,累計value值,然後更新txid值為3,結果如下:

再假設目前batch的count值為1,txid=2。這是目前txid和資料庫中的相同,雖然兩個txid值相同,但由于兩個batch的内容已經變了,是以上次的更新可以忽略掉,需要對資料庫中的value值進行重新計算,即把目前值和prevalue值相加,結果如下:

no-transactional spouts對每個batch的内容不做任何保證。如果失敗的batch沒被重發,它有會出現“最多被處理一次”的請況,如果tuples被多個batch處理,則會發生“最少被處理一次的情況”,很難保證“資料隻被處理一次”的情況。

下面這個表格描述了“資料隻被處理一次”的spout/state的類型組合:

Trident State 詳解

總的來說, opaque transactional states即有一定的容錯性又能保證資料一緻性,但它的代價是需要在資料庫中儲存更多的狀态資訊(txid和prevalue)。transactional states雖然需要較少的狀态資訊(txid),但是它需要transactional spouts的支援。non-transactional states需要在資料庫中儲存最少的狀态資訊但難以保證“資料隻被處理一次”的語義。

是以,在實際應用中,spout和state類型的選擇需要根據我們具體應用需求來決定,當然在容錯性和增加存儲代價之間也需要做個權衡。

上面講的看上去有點啰嗦,慶幸的是trident state api 在内部為我們實作了所有狀态管理的邏輯,我們不需要再進行諸如對比txid,在資料庫中存儲多個值等操作,僅需要簡單調用trident api即可,例如:

所有的管理opaque transactional states狀态的邏輯都在memcachedstate.opaque()方法内部實作了。另外,所有的更新操作都是以batch為機關的,這樣減少了對資料庫的調用次數,極大的提高了效率。下面就向大家介紹一下和trident state 相關的api。

trident api中最基本的state接口隻有兩個方法:

state接口隻定義了狀态什麼時候開始更新,什麼時候結束更新,并且我們都能獲得一個txid。具體這個state如何工作,如何更新state,如何查詢state,trident并沒有對此作出限制,我們可以自己任意實作。

假設我們有一個location資料庫,我們要通過trident查新和更新這個資料庫,那麼我們可以自己實作這樣一個locationdb state,因為我們需要查詢和更新,是以我們為這個locationdb 可以添加對location的get和set的實作:

trident提供了state factory接口,我們實作了這個接口之後,trident 就可以通過這個接口獲得具體的trident state執行個體了,下面我們就實作一個可以制造locationdb執行個體的locationdbfactory:

這個接口是用來幫助trident查詢一個state,這個接口定義了兩個方法:

接口的第一個方法<code>batchretrieve()</code>有兩個參數,分别是要查詢的state源和查詢參數,因為trident都是以batch為機關處理的,是以這個查詢參數是一個<code>list&lt;tridenttuple&gt;</code>集合。關于第二個方法<code>execute()</code>有三個參數,第一個代表查詢參數中的某個tuple,第二個代表這個查詢參數tuple對應的查詢結果,第三個則是一個消息發送器。下面就看一個quarylocation的執行個體:

<code>querylocation</code>接收到trident發送的查詢參數,參數是一個batch,batch中tuple内容是userid資訊,然後<code>batchretrieve()</code>方法負責從state源中擷取每個userid對應的的location。最終batchretrieve()查詢的結果會被<code>execute()</code>方法發送出去。

但這裡有個問題,<code>batchretrieve()</code>方法中針對每個userid都做了一次查詢state操作,這樣處理顯然效率不高,也不符合trident所有操作都是針對batch的原則。是以,我們要對<code>locationdb</code>這個state做一下改造,提供一個<code>bulkgetlocations()</code>方法來替換掉<code>getlocation()</code>方法,請看改造後的locationdb的實作:

我們可以看到,改造的<code>locationdb</code>對location的查詢和更新都是批量操作的,這樣顯然可以提高處理效率。此時,我們再稍微改一下<code>queryfunction中</code>的<code>batchretrieve()</code>方法:

<code>querylocation</code>在topology中可以這麼使用:

當我們要更新一個state源時,我們需要實作一個<code>updatestate</code>接口。updatestate接口隻提供了一個方法:

下面我們來具體看一下<code>locationupdater</code>的實作:

對于<code>locationupdater</code>在topology中可以這麼使用:

通過調用trident stream的<code>partitionpersist</code>方法可以更新一個state。在上面這個執行個體中,<code>locationupdater</code>接收一個state和要更新的batch,最終通過調用<code>locationfactory</code>制造的<code>locationdb</code>中的<code>setlocationsbulk()</code>方法把batch中的userid及其location批量更新到state中。

partitionpersist操作會傳回一個tridentstate對象,這個對象即是被tridenttopology更新後的locationdb,是以,我們可以在topology中續繼續對這個傳回的state做查詢操作。

另外一點需要注意的是,從上面stateupdater接口可以看出,在它的<code>updatestate()</code>方法中還提供了一個<code>tridentcollector</code>,是以在執行stateupdate的同時仍然可以形成一個新的stream。若要操作stateupdater形成的stream,可以通過調用<code>tridentstate.newvaluestream()</code>方法實作。

trident另一個update state的方法時<code>persistentaggregate</code>,請看下面word count的例子:

persistentaggregate是在partitionpersist之上的另一個抽象,它會對trident stream進行聚合之後再把聚合結果更新到state中。在上面這個例子中,因為聚合的是一個<code>groupedstream</code>,trident要求這種情況下state需要實作<code>mapstate</code>接口,被grouped的字段會被做為mapsate的key,被grouped的資料計算的結果會被做為mapsate的value。mapsate接口定義如下:

對于mapsate的實作有memorymapstate。

如果我們聚合的不是一個groupedstream,trident要求我們的state實作<code>snapshottable</code>接口:

對于snapshottable的實作有 memcachedstate。

在trident中實作<code>mapstate</code>很簡單,大部分工作trident已經替我們做了。<code>opaquemap</code>,<code>transactionalmap</code>, 和<code>nontransactionalmap</code>類已經替我們完成了和容錯相關的處理邏輯. 我們僅僅提供一個 <code>ibackingmap</code>的實作類即可, ibackingmap的接口定義如下:

opaquemap’s調用的multiput将會把value值自動封裝成opaquevalue來處理, transactionalmap’s 将會把value封裝成transactionalvalue再進行處理, 而nontransactionalmaps 則不會對value做處理,直接傳遞給topology。

另外,

trident提供的cachedmap 類會對map中的key/value做自動的lru緩存 。

trident提供的snapshottablemap類會把mapstate轉換成snapshottable對象(把mapstate中的所有key/value對聚合成一個固定的key)。

詳細更詳細的了解整個mapstate的實作過程,請檢視 memcachedstate 的實作,memcachedstate除了把上面介紹的相關接口整合到一起之外,還提供了對opaque transactional, transactional, non-transactional三個語義規則的支援。