天天看點

storm記錄--12- Storm Transaction 原理+實戰

Storm guarantees data processing by providing an at least once processing guarantee. The most common question asked about Storm is "Given that tuples can be replayed, how do you do things like counting on top of Storm? Won't you overcount?"

Storm 0.7.0 introduces transactional topologies, which enable you to get exactly once messaging semantics for pretty much any computation. So you can do things like counting in a fully-accurate, scalable, and fault-tolerant way.

Storm預設的reliable特性支援at least once processing guarantee.      

這個在某些場景下明顯是不夠的, 比如計數, 不斷的replay必然導緻計數不準, 那麼需要支援exactly once semantics.

Storm 0.7就提供transactional topology特性來支援, 其實這個和DRPC一樣, Storm隻是提供一種特殊的topology的封裝, 當然transactional topology更複雜.

這裡說transactional topologies為了提供strong ordering, 這個要求是要強于之前說的exactly once semantics.

對于每個transaction有唯一的transaction id來辨別, 對于第一種design, 每個transaction就是一個tuple      

拿計數作為例子, 每個tuple産生的number, 最終需要累加到資料庫裡面      

不使用transactional, 重複replay一個tuple, 必然會導緻該tuple的number被反複累加到資料庫

怎麼處理? 其實想法很簡單, 引入transaction的概念, 并在累加number到資料庫的同時記下該transactioin id.     

這樣如果replay該tuple, 隻需要對比transaction id就知道該transaction已經累加過, 可以直接ignore

看到這裡, 就知道保持strong ordering的重要性, 強順序意味着, 如果目前的transaction失敗, 會反複被replay, 直到成功才繼續下一個transaction.     

這意味着, 在資料庫我們隻需要記錄latest的transaction id, 而不是累加過的所有transaction id, 實作上會簡單許多.

但是design1的問題是效率太低, 完全線性的處理tuple, 無法利用storm的并發能力, 而且資料庫的負載很高, 每個tuple都需要去操作資料庫

The core idea behind transactional topologies is to provide a strong ordering on the processing of data.     

The simplest manifestation of this, and the first design we'll look at, is processing the tuples one at a time and not moving on to the next tuple until the current tuple has been successfully processed by the topology.

Each tuple is associated with a transaction id. If the tuple fails and needs to be replayed, then it is emitted with the exact same transaction id. A transaction id is an integer that increments for every tuple, so the first tuple will have transaction id <code>1</code>, the second id <code>2</code>, and so on. 

There is a significant problem though with this design of processing one tuple at time. Having to wait for each tuple to be completely processed before moving on to the next one is horribly inefficient. It entails a huge amount of database calls (at least one per tuple), and this design makes very little use of the parallelization capabilities of Storm. So it isn't very scalable.  

Design2的想法很簡單, 用batch tuple來作為transaction的機關, 而不是一個tuple.     

這樣帶來的好處是, batch内部的tuple可以實作并行, 并且以batch為機關去更新資料庫, 大大減少資料庫負載.      

但本質上和Design1沒有差別, batch之間仍然是串行的, 是以效率仍然比較低

Instead of processing one tuple at a time, a better approach is to process a batch of tuples for each transaction.    

So if you're doing a global count, you would increment the count by the number of tuples in the entire batch. If a batch fails, you replay the exact batch that failed.     

Instead of assigning a transaction id to each tuple, you assign a transaction id to each batch, and the processing of the batches is strongly ordered. Here's a diagram of this design:

這個設計展現出storm的創意, 将topology的過程分為processing和commit, processing就是進行局部的計算和統計, 隻有commit時才會把計算的結果更新到全局資料集(資料庫)     

那麼對于processing階段完全沒有必要限制, 隻要保證在commit的時候按照順序一個個commit就ok.

比如對于計數, 不同的batch的局部計數過程沒有任何限制, 可以完全并行的完成, 但是當需要将計數結果累加到資料庫的時候, 就需要用transaction來保證隻被累加一次

processing和commit階段合稱為transaction, 任何階段的失敗都會replay整個transaction

A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:

Computing the partial count for the batch 

Updating the global count in the database with the partial count 

The computation of #2 needs to be strongly ordered across the batches, but there's no reason you shouldn't be able to pipeline the computation of the batches by computing #1 for many batches in parallel. So while batch 1 is working on updating the database, batches 2 through 10 can compute their partial counts.

Storm accomplishes this distinction by breaking the computation of a batch into two phases:

The processing phase: this is the phase that can be done in parallel for many batches 

The commit phase: The commit phases for batches are strongly ordered. So the commit for batch 2 is not done until the commit for batch 1 has been successful. 

The two phases together are called a "transaction".     

Many batches can be in the processing phase at a given moment, but only one batch can be in the commit phase.     

If there's any failure in the processing or commit phase for a batch, the entire transaction is replayed (both phases).  

為了實作上面的Design3, storm在transactional topologies裡面默默的做了很多事     

管理狀态, 通過Zookeeper去記錄所有transaction相關的狀态資訊      

協調transactions, 決定應該執行那個transaction的那個階段      

Fault

檢測, 使用storm acker機制來detect batch是否被成功執行, 并且storm在transactional topology上

對acker機制做了比較大的優化, 使用者不用自己去acking或anchoring, 友善許多      

提供batch bolt接口, 在bolt接口中提高對batch的支援, 比如提供finishbatch接口

最後, transactional topology要求source queue具有replay an exact batch的能力, 這兒說kafka是很好的選擇     

不過我很好奇, 為什麼要由source queue來提供batch replay的功能, 好的設計應該是batch對source queue透明, spout自身控制batch的劃分和replay, 這樣不可以嗎?

When using transactional topologies, Storm does the following for you:

Manages state: Storm stores in Zookeeper all the state necessary to do transactional topologies.       

This includes the current transaction id as well as the metadata defining the parameters for each batch. 

Coordinates the transactions: Storm will manage everything necessary to determine which transactions should be processing or committing at any point. 

Fault detection: Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed.   

Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you. 

First class batch processing API: Storm layers an API on top of regular bolts to allow for batch processing of tuples.       

Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction.       

Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts). 

Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like Kestrel can't do this. Apache Kafka is a perfect fit for this kind of spout, and storm-kafka in storm-contrib contains a transactional spout implementation for Kafka. 

You build transactional topologies by using TransactionalTopologyBuilder. Here's the transactional topology definition for a topology that computes the global count of tuples from the input stream. This code comes from TransactionalGlobalCount in storm-starter.

首先需要使用TransactionalSpout, <code>MemoryTransactionalSpout</code>被用來從一個記憶體變量裡面讀取資料(DATA), 第二個參數制定資料的fields, 第三個參數指定每個batch的最大tuple數量    

接着, 需要使用TransactionalTopologyBuilder, 其他和普通的topology看上去沒有不同, storm的封裝做的很好

下面通過processing和commit階段的bolt來了解對batch和transaction的支援 

首先看看BatchCount, processing階段的bolt, 用于統計局部的tuple數目

BatchCount繼承自BaseBatchBolt, 表明其對batch的支援, 主要反應在finishBatch函數, 而普通的bolt的不同在于, 隻有在finishBatch的時候才會去emit結果, 而不是每次execute都emit結果    

在prepare時, 多出個id, a TransactionAttempt object, 并且從output定義看出, 所有emit的tuple第一個參數必須是id(<code>TransactionAttempt</code>)

The <code>TransactionAttempt</code> contains two values: the "transaction id" and the "attempt id"(表示被replay次數).     The "transaction id" is the unique id chosen for this batch and is the same no matter how many times the batch is replayed.      The "attempt id" is a unique id for this particular batch of tuples and lets Storm distinguish tuples from different emissions of the same batch. Without the attempt id, Storm could confuse a replay of a batch with tuples from a prior time that batch was emitted.

All tuples emitted within a transactional topology must have the <code>TransactionAttempt</code> as the first field of the tuple. This lets Storm identify which tuples belong to which batches. So when you emit tuples you need to make sure to meet this requirement.

其實這裡的BaseBatchBolt, 是通用的batch基類, 也可以用于其他的需要batch支援的場景, 比如DRPC, 隻不過此處的id類型變為RPC id    

如果隻是要support tansactional topology場景, 可以直接使用BaseTransactionalBolt

繼續看, commit階段的bolt, UpdateGlobalCount, 将統計的結果累加到全局資料庫中

UpdateGlobalCount之間繼承自BaseTransactionalBolt, 是以此處prepare的參數直接是TransactionAttempt attempt(而不是object id)    

并且比較重要的是實作ICommitter接口, 表明這個bolt是個commiter, 意味着這個blot的finishBatch函數需要在commit階段被調用

另一種把bolt辨別為committer的方法是, 在topology build的時候使用<code>setCommitterBolt來替代setBolt</code>

First, notice that this bolt implements the <code>ICommitter</code> interface. This tells Storm that the <code>finishBatch</code> method of this bolt should be part of the commit phase of the transaction.   

So calls to <code>finishBatch</code> for this bolt will be strongly ordered by transaction id (calls to <code>execute</code> on the other hand can happen during either the processing or commit phases).   

An alternative way to mark a bolt as a committer is to use the <code>setCommitterBolt</code> method in <code>TransactionalTopologyBuilder</code> instead of <code>setBolt</code>.

storm會保證commiter裡面的finishBatch被順序執行, 并且在finishBatch裡面, 需要check transaction id, 確定隻有新的transaction的結果才被更新到全局資料庫.

The code for <code>finishBatch</code> in <code>UpdateGlobalCount</code> gets the current value from the database and compares its transaction id to the transaction id for this batch. If they are the same, it does nothing. Otherwise, it increments the value in the database by the partial count for this batch.

A more involved transactional topology example that updates multiple databases idempotently can be found in storm-starter in the TransactionalWords class. 

There are three kinds of bolts possible in a transactional topology:

BasicBolt: This bolt doesn't deal with batches of tuples and just emits tuples based on a single tuple of input. 

BatchBolt: This bolt processes batches of tuples. <code>execute</code> is called for each tuple, and <code>finishBatch</code> is called when the batch is complete. 

BatchBolt's that are marked as committers: The only difference between this bolt and a regular batch bolt is when <code>finishBatch</code> is called. A committer bolt has <code>finishedBatch</code> called during the commit phase. The commit phase is guaranteed to occur only after all prior batches have successfully committed, and it will be retried until all bolts in the topology succeed the commit for the batch. 

上面列出可能遇到的3種bolt, 下面的例子給出不同blot的差別,

紅線标出的都是commiter, 這裡有兩個commiter, 分别是B和D    

A的輸出分别輸出到B和C    

B可以先執行execute(processing), 但不能直接執行finishBatch, 因為需要等待storm排程, 必須等前面的batch commit完後, 才能進行commit    

是以C也無法立刻執行finishBatch, 因為需要等從B過來的tuple    

對于D, 原文說它會在commit階段接收所有的batch tuple, 是以可以直接commit, 這個怎麼保證? 

Notice that even though Bolt D is a committer, it doesn't have to wait for a second commit message when it receives the whole batch. Since it receives the whole batch during the commit phase, it goes ahead and completes the transaction.

Committer bolts act just like batch bolts during the commit phase.   

The only difference between committer bolts and batch bolts is that committer bolts will not call <code>finishBatch</code> during the processing phase of a transaction.

Notice that you don't have to do any acking or anchoring when working with transactional topologies. Storm manages all of that underneath the hood. The acking strategy is heavily optimized.

由于封裝的比較好, 不需要使用者去ack或fail tuple, 那麼怎麼去fail一個batch?     

抛出FailedException, Storm捕獲這個異常會replay Batch, 而不會crash    

When using regular bolts, you can call the <code>fail</code> method on <code>OutputCollector</code> to fail the tuple trees of which that tuple is a member.   

Since transactional topologies hide the acking framework from you, they provide a different mechanism to fail a batch (and cause the batch to be replayed).  

Just throw a FailedException. Unlike regular exceptions, this will only cause that particular batch to replay and will not crash the process. 

Transactional spout和普通的spout完全不同的實作, 本身就是一個mini的topology, 分為coordinator spout和emitter bolt    

The <code>TransactionalSpout</code> interface is completely different from a regular <code>Spout</code> interface. A <code>TransactionalSpout</code> implementation emits batches of tuples and must ensure that the same batch of tuples is always emitted for the same transaction id.

A transactional spout looks like this while a topology is executing:

The coordinator on the left is a regular Storm spout that emits a tuple whenever a batch should be emitted for a transaction. The emitters execute as a regular Storm bolt and are responsible for emitting the actual tuples for the batch. The emitters subscribe to the "batch emit" stream of the coordinator using an all grouping.

The need to be idempotent with respect to the tuples it emits requires a <code>TransactionalSpout</code> to store a small amount of state. The state is stored in Zookeeper. 

下面是transactional spout的工作流程,    

首先coordinator spout隻會有一個task, 并會産生兩種stream, batch stream和commit stream    

它會決定何時開始某transaction processing階段, 此時就往batch stream裡面發送包含TransactionAttempt的tuple    

也決定何時開始某transaction commit階段(當通過acker知道processing階段已經完成的時候, 并且所有

prior transaction都已經被commit), 此時就往commit steam裡面發送一個包含TransactionAttempt

的tuple作為通知, 所有commtting bolt都會預訂(通過setBolt的all grouping方

式)commit stream, 并根據收到的通知完成commit階段.    

對于commit階段和processing階段一樣, 通過acker來判斷是成功還是fail, 前面說了transactional topology對acker機制做了較大的優化, 是以所有acking和anchoring都由storm自動完成了.

對于emitter bolt, 可以并發的, 并且以all grouping的方式訂閱coordinator的batch stream, 即所有emitter都會得到一樣的batch stream, 使用幾個emitter取決于場景.    

于topology而言, emitter bolt是真正産生資料的地方, 當coordinator開始某batch的processing過

程, 并往batch steam放tuple資料時, emitter bolt就會從batch stream收到資料, 并轉發給topology 

Here's how transactional spout works:

Transactional spout is a subtopology consisting of a coordinator spout and an emitter bolt 

The coordinator is a regular spout with a parallelism of 1 

The emitter is a bolt with a parallelism of P, connected to the coordinator's "batch" stream using an all grouping 

When the coordinator determines it's time to enter the processing phase for a transaction, it emits a tuple containing the TransactionAttempt and the metadata for that transaction to the "batch" stream 

Because of the all grouping, every single emitter task receives the notification that it's time to emit its portion of the tuples for that transaction attempt 

Storm automatically manages the anchoring/acking necessary throughout the whole topology to determine when a transaction has completed the processing phase. The key here is that *the root tuple was created by the coordinator, so the coordinator will receive an "ack" if the processing phase succeeds, and a "fail" if it doesn't succeed for any reason (failure or timeout). 

If the processing phase succeeds, and all prior transactions have successfully committed, the coordinator emits a tuple containing the TransactionAttempt to the "commit" stream. 

All committing bolts subscribe to the commit stream using an all grouping, so that they will all receive a notification when the commit happens. 

Like the processing phase, the coordinator uses the acking framework to determine whether the commit phase succeeded or not. If it receives an "ack", it marks that transaction as complete in zookeeper. 

從後面的讨論, 可以知道transactional spout的batch replay是依賴于source queue的    

比如, 對于kafka這種資料是分布在partition上的queue, 需要使用partitioned transactional spout, 用于封裝對從不同partition讀資料的過程 

Partitioned Transactional Spout A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how TransactionalKafkaSpout works. An <code>IPartitionedTransactionalSpout</code> automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability.

于Transactional spout, 并不會象普通tuple一樣由spout緩存和負責replay, 隻會記下該batch資料在

source queue的位置(應該是zookeeper), 當需要replay的時候, Transactional spout會從新去

source queue去讀batch然後replay. 

這樣的問題是過于依賴source queue, 而且會導緻transaction batch無法被replay(比如由于某個partition fail) 

這個問題如何解決? 可以參考原文, 比較好的方法, 是fail目前和後續所有的transaction, 然後重新産生transaction的batch資料, 并跳過失敗部分

個人決定這個設計不太好, 過于依賴source queue

為何不在spout緩存batch資料, 雖然這樣對于比較大的batch可能有效率問題, 或者會限制同時處理的batch數目, 但重新從source queue讀資料來replay也會有很多問題...

源自:http://www.51studyit.com/html/notes/20140329/48.html#