這章讨論Storm's reliability capabilities, 如何保證從spout emit出來的所有tuple都被正确的執行(fully processed)?
What does it mean for a message to be "fully processed"?
首先的問題是, 什麼叫tuple或message被fully processed? 因為tuple被emit出去後, 可能會被多級bolt處理, 并且bolt也有可能由該tuple生成多組tuples, 是以情況還是比較複雜的
最終由一個tuple trigger(觸發)的所有tuples會形成一個樹或DAG(有向無環圖)
隻有當tuple tree上的所有節點都被成功處理的時候, storm才認為該tuple被fully processed
如果tuple tree上任一節點失敗或者逾時, 都被看作該tuple fail, 失敗的tuple會被重發
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed.
A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout.
This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.
What happens if a message is fully processed or fails to be fully processed?
該機制是如何實作的?
首先, 所有tuple都有一個唯一辨別msgId, 當tuple被emit的時候确定
_collector.emit(new Values("field1", "field2", 3) , msgId);
其次, 看看下面的ISpout接口, 除了擷取tuple的nextTuple
還有ack和fail, 當Storm detect到tuple被fully processed, 會調用ack, 如果逾時或detect fail, 則調用fail
此處需要注意的是, tuple隻有在被産生的那個spout task上可以被ack或fail, 具體原因看後面的實作解釋就了解了
a tuple will be acked or failed by the exact sametask that created it. So if a
Spout
is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.
Spout
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
最後, 在spout怎麼實作的, 其實比較簡單.
對于Spout queue, get message隻是open而不是pop, 并且把tuple狀态改為pending, 防止該tuple被多次發送.
一直等到該tuple被ack, 才真正的pop該tuple, 當然該tuple如果fail, 就重新把狀态改回初始狀态
這也解釋, 為什麼tuple隻能在被emit的spout task被ack或fail, 因為隻有這個task的queue裡面有該tuple
When
KestrelSpout
takes a message off the Kestrel queue, it "opens" the message.
This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed.
While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue.
What is Storm's reliability API?
前面一直沒有說明的一個問題是, storm本身通過什麼機制來判斷tuple是否成功被fully processed?
要解決這個問題, 可以分為兩個問題,
1. 如何知道tuple tree的結構?
2. 如何知道tuple tree上每個節點的運作情況, success或fail?
答案很簡單, 你必須告訴它, 如何告訴它?
1. 對于tuple tree的結構, 需要知道每個tuple由哪些tuple産生, 即tree節點間的link
tree節點間的link稱為anchoring. 當每次emit新tuple的時候, 必須顯式的通過API建立anchoring
Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.
Each word tuple is anchored by specifying the input tuple as the first argument to
.
emit
看下面的代碼例子,
_collector.emit(tuple, new Values(word));
emit的第一個參數是tuple, 這就是用于建anchoring
當然你也可以直接調用unanchoring的emit版本, 如果不需要保證reliable的話, 這樣效率會比較高
_collector.emit(new Values(word));
同時前面說了, 可能一個tuple依賴于多個輸入,
An output tuple can be anchored to more than one input tuple.
This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts.
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
對于Multi-anchoring的情況會導緻tuple tree變為tuple DGA, 目前storm的版本已經可以很好的支援DAG
Multi-anchoring adds the output tuple into multiple tuple trees.
Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs,

2. 對于tuple tree上每個節點的運作情況, 你需要在每個bolt的邏輯處理完後, 顯式的調用
OutputCollector的ack和fail來彙報
This is done by using theand
ack
methods on the
fail
OutputCollector
.
You can use the
method on the
fail
to immediately fail the spout tuple at the root of the tuple tree.
OutputCollector
看下面的例子, 在execute函數的最後會調用,
_collector.ack(tuple);
我比較迷惑, 為啥ack是
OutputCollector
的function, 而不是tuple的function?
而且就算ack也是應該對bolt的input進行ack, 為啥是output, 可能因為所有input都是其他bolt的output産生...這個設計的比較不合理
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
storm為了保證reliable, 必然是要犧牲效率的, 此處storm會在task memory裡面去記錄你彙報的tuple tree的結構和運作情況.
而隻有當某tuple節點被ack或fail後才會被從記憶體中删除, 是以如果你總是不去ack或fail, 那麼會導緻task的out of memory
Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.
簡單的版本, BasicBolt
BasicBolt
上
面的機制, 會給程式員造成負擔, 尤其對于很多簡單的case, 比如filter, 每次都要去顯式的建立anchoring和ack…
是以storm提供簡單的版本, 會自動的建立anchoring, 并在bolt執行完自動調用ack
A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the
execute
method. These bolts fall into the categories of filters and simple functions. Storm has an interface called
BasicBolt
that encapsulates this pattern for you.
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
How do I make my applications work correctly given that tuples can be replayed?
問題是如何保證"fully fault-tolerant exactly-once messaging semantics”, 因為replay會導緻一個message在bolt上多次出現, 這樣對類似計數這樣的應用會有很大影響.
從Storm0.7開始, 給出的transactional topologies功能就比較好的解決這個問題
As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.
How does Storm implement reliability in an efficient way?
現在讨論的是Storm如何實作reliablility機制, Storm實作一組特殊的'acker’ task來track每一個spout tuple, 同時acker task的個數你可以根據tuple的數量來配置
A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple.
When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message.
You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.
所有被産生的tuple都會有一個随機的64bit的id用于被track
tuple之間通過emit時的anchor形成tuple tree, 并且每個tuple都知道産生它的spout tuple的id (通過不斷的copy傳遞)
當任何tuple被acked的時候, 都會send message到相應的acker, 具體例子如下圖
When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.
Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".
For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:
當然storm具體怎樣通過acker task來track所有的tuples, 還需要解決下面幾個問題:
1. 當有多個acker的時候, 當一個tuple被acked的時候, 如果知道給哪一個acker發送message?
因為每個tuple都知道産生它的spout tuple id, 是以使用mod hash(hash方法, m mod n)來配置設定spout tuple id, 以保證一個spout tuple id所産生的所有tuple tree都會被配置設定到一個acker上
當某一個tuple被acked的時候, 隻要通過hash找到相應的acker即可
You can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.
2. 如果有多個spout task的時候, storm在最終ack spout tuple的時候, 如何知道對應于哪個spout task, 因為必須在産生tuple的那個spout task進行ack?
答案很簡單, spout task在emit一個新的tuple的時候, 會發message告訴相應的acker它的task id, 是以acker是知道tupleid和taskid的map的
How the acker tasks track which spout tasks are responsible for each spout tuple they're tracking?
When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.
3. 如果Acker在記憶體裡面顯式的監控所有的tuple tree, 會有擴充問題, 當面對海量tuple或複雜workflow的時候, 很有可能會爆記憶體, 怎麼解決這個問題?
Storm這裡采用了一個特别的方法, 這個是storm的主要的突破之一, 該方法的好處就是對于每個spout tuple, 所需要的記憶體是固定的無論多複雜, 并且隻有about 20 bytes
Acker隻需要為每個spout tuple存儲spout tuple id, task id, ack val
這個ack val, 64 bit number, 用于表示整個tuple tree的狀況, 産生方法是tuple tree中所有created和acked的tuple的id進行異或(同為0, 異為1)
當ack val值為0的時候, 即表示tuple tree被完成
這個思路非常巧妙, 兩個相同的數去異或為0, 而created和acked時, 會進行兩次異或, 是以所有created的tuple都被acked時, 異或值最終為0
我考慮到不同的tupleid之間的位有重疊時, 是否會有幹擾, 簡單的試一下, 沒有幹擾
具體acker工作原理參考, Twitter Storm源代碼分析之acker工作流程
Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree. When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.
最後, 考慮task fail的情況,
一般task fail, 導緻逾時, spout會replay
Acker task fail, 會導緻它跟蹤的所有tuple無法被ack, 是以會全部逾時被spout重發
Spout task fail, 如果spout本身fail, 那麼需要源頭來負責replay, 比如RabbitMQ或Kafka
Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:
- Task dies: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
- Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed.
- Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.
As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.
Tuning reliability
當然reliability必然會給系統帶來較大的overload, 比如number of messages就會翻倍, 由于和acker之間的通信
是以如果不需要reliability, 可以通過下面的方法将其關閉
Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks.
If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires less ids to be kept in each downstream tuple, reducing bandwidth usage.
There are three ways to remove reliability.
1. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the
ack
method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.
2. The second way is to omit a message id in the
SpoutOutputCollector.emit
method.
3. Finally, emit them as unanchored tuples
【淘寶講解】
- Storm記錄級容錯的基本原理
首先來看一下什麼叫做記錄級容錯?storm允許使用者在spout中發射一個新的源tuple時為其指定一個message id, 這個message id可以是任意的object對象。多個源tuple可以共用一個message id,表示這多個源 tuple對使用者來說是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知使用者每一個消息單元是否在指定時間内被完全處理了。那什麼叫做完全處理呢,就是該message id綁定的源tuple及由該源tuple後續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message 1綁定的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,并最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。
圖4-1
在storm的topology中有一個系統級元件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id綁定的若幹tuple的處理路徑,如果在使用者設定的最大逾時時間内這些tuple沒有被完全處理,那麼acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,如果曾經嘗試過這麼做的同學可以仔細地思考一下這件事的複雜程度。但是storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來複習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。
storm中使用的巧妙方法就是基于這個定理。具體過程是這樣的:在spout中系統會為使用者指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及後續的bolt作為該消息單元的唯一辨別。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之後,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之後,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker隻需要對這些id做一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面通過一個圖示來說明這個過程。
圖4-1 spout中綁定message 1生成了兩個源tuple,id分别是0010和1011.
圖4-2 bolt1處理tuple 0010時生成了一個新的tuple,id為0110.
圖4-3 bolt2處理tuple 1011時生成了一個新的tuple,id為0111.
圖4-4 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.
可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id并不是完全各異的,acker可能會在消息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的确是存在的,但是在實際中其機率是極低極低的,完全可以忽略。
【acker數量】
針對源碼進行分析。