直譯過來就是trident狀态,這裡的狀态主要涉及到trident如何實作一緻性語義規則,trident的計算結果将被如何送出,如何儲存,如何更新等等。我們知道trident的計算都是以batch為機關的,但是batch在中的tuple在處理過程中有可能會失敗,失敗之後bach又有可能會被重播,這就涉及到很多事務一緻性問題。trident state就是管理這些問題的一套方案,與這套方案對應的就是trident state api。這樣說可能還比較抽象,下面就用一個例子具體說明一下。
假設有這麼一個需求,統計一個資料流中各個單詞出現的數量,并把單詞和其數量更新到資料庫中。假設我們在資料庫中隻有兩個字段,單詞和其數量,在計數過程中,如果遇到相同的單詞則就把其數量加一。但是這麼做有一個問題,如果某個單詞是被重播的單詞,就有可能導緻這個單詞被多加了一遍。是以,在資料庫中隻儲存單詞和其數量兩個字段是無法做到“資料隻被處理一次”的語義要求的。
trident定義了如下語義規則:
有了這三個規則,我們就可以通過txid知道batch是否被處理過,然後就可以根據實際情況來更新狀态資訊了。很明顯,要滿足這幾個語義規則,就需要spout來支援,因為把tuple封裝成batch,配置設定txid等等都是有spout來負責的。
但是在具體應用場景中,storm應該能夠提供不同的容錯級别,因為某些情況下我們并不需要強一緻性。為了更靈活的處理,trident提供了三類spout,分别是:
注意,所有的trident spout都是以batch的形式發送資料,每個batch也都會配置設定一個唯一的txid,決定它們有不同性質的地方在于它們對各自的batch提供了什麼樣的保證。
我們已經知道trident 提供了三種類型的spout來服務trident state管理,那麼對應的trident state也有三種類型:
transactional spouts對batch的發送提供了如下保證:
我們現在再看上面1.1節提到的那個執行個體,我們要把單詞和其數量儲存在資料庫中,為了保證“資料隻被處理一次”,除了要儲存單詞和數量兩個字段之外,我們再加一個字段txid。在更新資料時,我們先對比一下目前的資料的txid和資料庫中資料的txid,若txid相同,說明是被重播的資料,直接跳過即可,如果不同,則把兩個數值相加即可。
下面具體說明一下,假設目前處理的batch的txid=3,其中的tuples為:
再假設資料庫中儲存的資料為:
資料庫中“man”單詞的txid為1,而目前batch的txid為3,說明目前batch中的“man”單詞未被累加過,是以需要把目前batch中”man”的個數累加到資料庫中。資料庫中“dog”單詞的txid為3,和目前batch的txid相同,說明已經被累計過了直接跳過。最終資料庫中的結果變為:
總結一下整個處理過程:
opaque transactional spouts提供了如下保證:
因為重播的batch中的tuple集合可能不一樣,是以對于opaque transactional spout,就不能根據txid是否一緻來決定是否需要更新狀态了。我們需要在資料庫中儲存更多的狀态資訊,除了單詞名,數量、txid之外,我們還需要儲存一個pre-value來記錄前一次計算的值。我們再用上面例子具體說明一下。
假設資料庫中的記錄如下:
假設目前batch的count值為2,txid=3。因為目前txid和資料庫中的不同,我們需要把prevalue替換成value的值,累計value值,然後更新txid值為3,結果如下:
再假設目前batch的count值為1,txid=2。這是目前txid和資料庫中的相同,雖然兩個txid值相同,但由于兩個batch的内容已經變了,是以上次的更新可以忽略掉,需要對資料庫中的value值進行重新計算,即把目前值和prevalue值相加,結果如下:
no-transactional spouts對每個batch的内容不做任何保證。如果失敗的batch沒被重發,它有會出現“最多被處理一次”的請況,如果tuples被多個batch處理,則會發生“最少被處理一次的情況”,很難保證“資料隻被處理一次”的情況。
下面這個表格描述了“資料隻被處理一次”的spout/state的類型組合:

總的來說, opaque transactional states即有一定的容錯性又能保證資料一緻性,但它的代價是需要在資料庫中儲存更多的狀态資訊(txid和prevalue)。transactional states雖然需要較少的狀态資訊(txid),但是它需要transactional spouts的支援。non-transactional states需要在資料庫中儲存最少的狀态資訊但難以保證“資料隻被處理一次”的語義。
是以,在實際應用中,spout和state類型的選擇需要根據我們具體應用需求來決定,當然在容錯性和增加存儲代價之間也需要做個權衡。
上面講的看上去有點啰嗦,慶幸的是trident state api 在内部為我們實作了所有狀态管理的邏輯,我們不需要再進行諸如對比txid,在資料庫中存儲多個值等操作,僅需要簡單調用trident api即可,例如:
所有的管理opaque transactional states狀态的邏輯都在memcachedstate.opaque()方法内部實作了。另外,所有的更新操作都是以batch為機關的,這樣減少了對資料庫的調用次數,極大的提高了效率。下面就向大家介紹一下和trident state 相關的api。
trident api中最基本的state接口隻有兩個方法:
state接口隻定義了狀态什麼時候開始更新,什麼時候結束更新,并且我們都能獲得一個txid。具體這個state如何工作,如何更新state,如何查詢state,trident并沒有對此作出限制,我們可以自己任意實作。
假設我們有一個location資料庫,我們要通過trident查新和更新這個資料庫,那麼我們可以自己實作這樣一個locationdb state,因為我們需要查詢和更新,是以我們為這個locationdb 可以添加對location的get和set的實作:
trident提供了state factory接口,我們實作了這個接口之後,trident 就可以通過這個接口獲得具體的trident state執行個體了,下面我們就實作一個可以制造locationdb執行個體的locationdbfactory:
這個接口是用來幫助trident查詢一個state,這個接口定義了兩個方法:
接口的第一個方法<code>batchretrieve()</code>有兩個參數,分别是要查詢的state源和查詢參數,因為trident都是以batch為機關處理的,是以這個查詢參數是一個<code>list<tridenttuple></code>集合。關于第二個方法<code>execute()</code>有三個參數,第一個代表查詢參數中的某個tuple,第二個代表這個查詢參數tuple對應的查詢結果,第三個則是一個消息發送器。下面就看一個quarylocation的執行個體:
<code>querylocation</code>接收到trident發送的查詢參數,參數是一個batch,batch中tuple内容是userid資訊,然後<code>batchretrieve()</code>方法負責從state源中擷取每個userid對應的的location。最終batchretrieve()查詢的結果會被<code>execute()</code>方法發送出去。
但這裡有個問題,<code>batchretrieve()</code>方法中針對每個userid都做了一次查詢state操作,這樣處理顯然效率不高,也不符合trident所有操作都是針對batch的原則。是以,我們要對<code>locationdb</code>這個state做一下改造,提供一個<code>bulkgetlocations()</code>方法來替換掉<code>getlocation()</code>方法,請看改造後的locationdb的實作:
我們可以看到,改造的<code>locationdb</code>對location的查詢和更新都是批量操作的,這樣顯然可以提高處理效率。此時,我們再稍微改一下<code>queryfunction中</code>的<code>batchretrieve()</code>方法:
<code>querylocation</code>在topology中可以這麼使用:
當我們要更新一個state源時,我們需要實作一個<code>updatestate</code>接口。updatestate接口隻提供了一個方法:
下面我們來具體看一下<code>locationupdater</code>的實作:
對于<code>locationupdater</code>在topology中可以這麼使用:
通過調用trident stream的<code>partitionpersist</code>方法可以更新一個state。在上面這個執行個體中,<code>locationupdater</code>接收一個state和要更新的batch,最終通過調用<code>locationfactory</code>制造的<code>locationdb</code>中的<code>setlocationsbulk()</code>方法把batch中的userid及其location批量更新到state中。
partitionpersist操作會傳回一個tridentstate對象,這個對象即是被tridenttopology更新後的locationdb,是以,我們可以在topology中續繼續對這個傳回的state做查詢操作。
另外一點需要注意的是,從上面stateupdater接口可以看出,在它的<code>updatestate()</code>方法中還提供了一個<code>tridentcollector</code>,是以在執行stateupdate的同時仍然可以形成一個新的stream。若要操作stateupdater形成的stream,可以通過調用<code>tridentstate.newvaluestream()</code>方法實作。
trident另一個update state的方法時<code>persistentaggregate</code>,請看下面word count的例子:
persistentaggregate是在partitionpersist之上的另一個抽象,它會對trident stream進行聚合之後再把聚合結果更新到state中。在上面這個例子中,因為聚合的是一個<code>groupedstream</code>,trident要求這種情況下state需要實作<code>mapstate</code>接口,被grouped的字段會被做為mapsate的key,被grouped的資料計算的結果會被做為mapsate的value。mapsate接口定義如下:
如果我們聚合的不是一個groupedstream,trident要求我們的state實作<code>snapshottable</code>接口:
在trident中實作<code>mapstate</code>很簡單,大部分工作trident已經替我們做了。<code>opaquemap</code>,<code>transactionalmap</code>, 和<code>nontransactionalmap</code>類已經替我們完成了和容錯相關的處理邏輯. 我們僅僅提供一個 <code>ibackingmap</code>的實作類即可, ibackingmap的接口定義如下:
另外,