直译过来就是trident状态,这里的状态主要涉及到trident如何实现一致性语义规则,trident的计算结果将被如何提交,如何保存,如何更新等等。我们知道trident的计算都是以batch为单位的,但是batch在中的tuple在处理过程中有可能会失败,失败之后bach又有可能会被重播,这就涉及到很多事务一致性问题。trident state就是管理这些问题的一套方案,与这套方案对应的就是trident state api。这样说可能还比较抽象,下面就用一个例子具体说明一下。
假设有这么一个需求,统计一个数据流中各个单词出现的数量,并把单词和其数量更新到数据库中。假设我们在数据库中只有两个字段,单词和其数量,在计数过程中,如果遇到相同的单词则就把其数量加一。但是这么做有一个问题,如果某个单词是被重播的单词,就有可能导致这个单词被多加了一遍。因此,在数据库中只保存单词和其数量两个字段是无法做到“数据只被处理一次”的语义要求的。
trident定义了如下语义规则:
有了这三个规则,我们就可以通过txid知道batch是否被处理过,然后就可以根据实际情况来更新状态信息了。很明显,要满足这几个语义规则,就需要spout来支持,因为把tuple封装成batch,分配txid等等都是有spout来负责的。
但是在具体应用场景中,storm应该能够提供不同的容错级别,因为某些情况下我们并不需要强一致性。为了更灵活的处理,trident提供了三类spout,分别是:
注意,所有的trident spout都是以batch的形式发送数据,每个batch也都会分配一个唯一的txid,决定它们有不同性质的地方在于它们对各自的batch提供了什么样的保证。
我们已经知道trident 提供了三种类型的spout来服务trident state管理,那么对应的trident state也有三种类型:
transactional spouts对batch的发送提供了如下保证:
我们现在再看上面1.1节提到的那个实例,我们要把单词和其数量保存在数据库中,为了保证“数据只被处理一次”,除了要保存单词和数量两个字段之外,我们再加一个字段txid。在更新数据时,我们先对比一下当前的数据的txid和数据库中数据的txid,若txid相同,说明是被重播的数据,直接跳过即可,如果不同,则把两个数值相加即可。
下面具体说明一下,假设当前处理的batch的txid=3,其中的tuples为:
再假设数据库中保存的数据为:
数据库中“man”单词的txid为1,而当前batch的txid为3,说明当前batch中的“man”单词未被累加过,所以需要把当前batch中”man”的个数累加到数据库中。数据库中“dog”单词的txid为3,和当前batch的txid相同,说明已经被累计过了直接跳过。最终数据库中的结果变为:
总结一下整个处理过程:
opaque transactional spouts提供了如下保证:
因为重播的batch中的tuple集合可能不一样,所以对于opaque transactional spout,就不能根据txid是否一致来决定是否需要更新状态了。我们需要在数据库中保存更多的状态信息,除了单词名,数量、txid之外,我们还需要保存一个pre-value来记录前一次计算的值。我们再用上面例子具体说明一下。
假设数据库中的记录如下:
假设当前batch的count值为2,txid=3。因为当前txid和数据库中的不同,我们需要把prevalue替换成value的值,累计value值,然后更新txid值为3,结果如下:
再假设当前batch的count值为1,txid=2。这是当前txid和数据库中的相同,虽然两个txid值相同,但由于两个batch的内容已经变了,所以上次的更新可以忽略掉,需要对数据库中的value值进行重新计算,即把当前值和prevalue值相加,结果如下:
no-transactional spouts对每个batch的内容不做任何保证。如果失败的batch没被重发,它有会出现“最多被处理一次”的请况,如果tuples被多个batch处理,则会发生“最少被处理一次的情况”,很难保证“数据只被处理一次”的情况。
下面这个表格描述了“数据只被处理一次”的spout/state的类型组合:

总的来说, opaque transactional states即有一定的容错性又能保证数据一致性,但它的代价是需要在数据库中保存更多的状态信息(txid和prevalue)。transactional states虽然需要较少的状态信息(txid),但是它需要transactional spouts的支持。non-transactional states需要在数据库中保存最少的状态信息但难以保证“数据只被处理一次”的语义。
因此,在实际应用中,spout和state类型的选择需要根据我们具体应用需求来决定,当然在容错性和增加存储代价之间也需要做个权衡。
上面讲的看上去有点啰嗦,庆幸的是trident state api 在内部为我们实现了所有状态管理的逻辑,我们不需要再进行诸如对比txid,在数据库中存储多个值等操作,仅需要简单调用trident api即可,例如:
所有的管理opaque transactional states状态的逻辑都在memcachedstate.opaque()方法内部实现了。另外,所有的更新操作都是以batch为单位的,这样减少了对数据库的调用次数,极大的提高了效率。下面就向大家介绍一下和trident state 相关的api。
trident api中最基本的state接口只有两个方法:
state接口只定义了状态什么时候开始更新,什么时候结束更新,并且我们都能获得一个txid。具体这个state如何工作,如何更新state,如何查询state,trident并没有对此作出限制,我们可以自己任意实现。
假设我们有一个location数据库,我们要通过trident查新和更新这个数据库,那么我们可以自己实现这样一个locationdb state,因为我们需要查询和更新,所以我们为这个locationdb 可以添加对location的get和set的实现:
trident提供了state factory接口,我们实现了这个接口之后,trident 就可以通过这个接口获得具体的trident state实例了,下面我们就实现一个可以制造locationdb实例的locationdbfactory:
这个接口是用来帮助trident查询一个state,这个接口定义了两个方法:
接口的第一个方法<code>batchretrieve()</code>有两个参数,分别是要查询的state源和查询参数,因为trident都是以batch为单位处理的,所以这个查询参数是一个<code>list<tridenttuple></code>集合。关于第二个方法<code>execute()</code>有三个参数,第一个代表查询参数中的某个tuple,第二个代表这个查询参数tuple对应的查询结果,第三个则是一个消息发送器。下面就看一个quarylocation的实例:
<code>querylocation</code>接收到trident发送的查询参数,参数是一个batch,batch中tuple内容是userid信息,然后<code>batchretrieve()</code>方法负责从state源中获取每个userid对应的的location。最终batchretrieve()查询的结果会被<code>execute()</code>方法发送出去。
但这里有个问题,<code>batchretrieve()</code>方法中针对每个userid都做了一次查询state操作,这样处理显然效率不高,也不符合trident所有操作都是针对batch的原则。所以,我们要对<code>locationdb</code>这个state做一下改造,提供一个<code>bulkgetlocations()</code>方法来替换掉<code>getlocation()</code>方法,请看改造后的locationdb的实现:
我们可以看到,改造的<code>locationdb</code>对location的查询和更新都是批量操作的,这样显然可以提高处理效率。此时,我们再稍微改一下<code>queryfunction中</code>的<code>batchretrieve()</code>方法:
<code>querylocation</code>在topology中可以这么使用:
当我们要更新一个state源时,我们需要实现一个<code>updatestate</code>接口。updatestate接口只提供了一个方法:
下面我们来具体看一下<code>locationupdater</code>的实现:
对于<code>locationupdater</code>在topology中可以这么使用:
通过调用trident stream的<code>partitionpersist</code>方法可以更新一个state。在上面这个实例中,<code>locationupdater</code>接收一个state和要更新的batch,最终通过调用<code>locationfactory</code>制造的<code>locationdb</code>中的<code>setlocationsbulk()</code>方法把batch中的userid及其location批量更新到state中。
partitionpersist操作会返回一个tridentstate对象,这个对象即是被tridenttopology更新后的locationdb,所以,我们可以在topology中续继续对这个返回的state做查询操作。
另外一点需要注意的是,从上面stateupdater接口可以看出,在它的<code>updatestate()</code>方法中还提供了一个<code>tridentcollector</code>,因此在执行stateupdate的同时仍然可以形成一个新的stream。若要操作stateupdater形成的stream,可以通过调用<code>tridentstate.newvaluestream()</code>方法实现。
trident另一个update state的方法时<code>persistentaggregate</code>,请看下面word count的例子:
persistentaggregate是在partitionpersist之上的另一个抽象,它会对trident stream进行聚合之后再把聚合结果更新到state中。在上面这个例子中,因为聚合的是一个<code>groupedstream</code>,trident要求这种情况下state需要实现<code>mapstate</code>接口,被grouped的字段会被做为mapsate的key,被grouped的数据计算的结果会被做为mapsate的value。mapsate接口定义如下:
如果我们聚合的不是一个groupedstream,trident要求我们的state实现<code>snapshottable</code>接口:
在trident中实现<code>mapstate</code>很简单,大部分工作trident已经替我们做了。<code>opaquemap</code>,<code>transactionalmap</code>, 和<code>nontransactionalmap</code>类已经替我们完成了和容错相关的处理逻辑. 我们仅仅提供一个 <code>ibackingmap</code>的实现类即可, ibackingmap的接口定义如下:
另外,