天天看點

Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

作者: xumingming | 可以轉載, 但必須以超連結形式标明文章原始出處和作者資訊及版權聲明

網址: http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/

本文翻譯自: https://github.com/nathanmarz/storm/wiki/Transactional-topologies

概述

Storm通過保證每個tuple至少被處理一次來提供 可靠的資料處理 。關于這一點最常被問到的問題就是 “既然tuple可能會被重寫發射(replay), 那麼我們怎麼在storm上面做統計個數之類的事情呢?storm有可能會重複計數吧?”

Storm 0.7.0引入了Transactional Topology, 它可以保證每個tuple”被且僅被處理一次”, 這樣你就可以實作一種非常準确,非常可擴充,并且高度容錯方式來實作計數類應用。

跟 Distributed RPC 類似, transactional topology其實不能算是storm的一個特性,它其實是用storm的底層原語spout, bolt, topology, stream等等抽象出來的一個特性。

這篇文章解釋了事務性topology是怎樣的一種抽象,怎樣使用它的api,同時也讨論了有關它實作的一些細節。

概念

讓我們一步步地建立transactional topology的抽象。我們先提出一種最簡單的抽象方式, 然後一步步的完善改進,最後介紹storm代碼裡面所使用的抽象方式。

第一個設計: 最簡單的抽象方法

事務性topology背後的核心概念是要在處理資料的提供一個強順序性。這種強順序性最簡單的表現、同時也是我們第一個設計就是:我們每次隻處理一個tuple, 除非這個tuple處理成功,否則我們不去處理下一個tuple。

每一個tuple都跟一個transaction id相關聯。如果這個tuple處理失敗了,然後需要重寫發射,那麼它會被重新發射 — 并且附着同樣的transaction id。這裡說的trasaction id其實就是一個數字, 來一個tuple,它就遞增一個。是以第一個tuple的transaction id是1, 第二個tuple的transaction id是2,等等等等。

tuple的強順序性使得我們即使在tuple重發的時候也能夠實作“一次而且隻有一次”的語義。 讓我們看個例子:

比如你想統一個stream裡面tuple的總數。那麼為了保證統計數字的準确性,你在資料庫裡面不但要儲存tuple的個數, 還要儲存這個數字所對應的最新的transaction id。 當你的代碼要到資料庫裡面去更新這個數字的時候,你要判斷隻有當新的transaction id跟資料庫裡面儲存的transaction id不一樣的時候才去更新。考慮兩種情況:

  • 資料庫裡面的transaction id跟目前的transaction id不一樣: 由于我們transaction的強順序性,我們知道目前的tuple肯定沒有統計在資料庫裡面。是以我們可以安全地遞增這個數字,并且更新這個transaction id.
  • 資料庫裡面的transaction id一樣: 那麼我們知道目前tuple已經統計在資料庫裡面了,那麼可以忽略這個更新。這個tuple肯定之前在更新了資料庫之後,回報給storm的時候失敗了(ack逾時之類的)。

這個邏輯以及事務的強順序性保證資料庫裡面的個數(count)即使在tuple被重發的時候也是準确的。這個主意(儲存count + transaction-id)是Kafka的開發者在 這個設計文檔 裡面提出來的。

更進一步來說,這個topology可以在一個事務裡面更新很多不同的狀态,并且可以到達”一次而且隻有一次的邏輯”。如果有任何失敗,那麼已經成功的更新你再去更新它會忽略,失敗的更新你去再次更新它則會接受。比如,如果你在處理一個url流,你可以更新每個url的轉發次數, 同時更新每個domain下url的轉發次數。

這個簡單設計有一個很大的問題, 那就是你需要等待一個tuple完全處理成功之後才能去處理下一個tuple。這個性能是非常差的。這個需要大量的資料庫調用(隻要每個tuple一個資料庫調用), 而且這個設計也沒有利用到storm的并行計算能力, 是以它的可擴充能力是非常差的。

第二個設計

與每次隻處理一個tuple的簡單方案相比, 一個更好的方案是每個transaction裡面處理一批tuple。是以如果你在做一個計數應用, 那麼你每次更新到總數裡面的是這一整個batch的tuple數量。如果這個batch失敗了,那麼你重新replay這整個batch。相應地, 我們不是給每個tuple一個transaction id而是給整個batch一個transaction id,batch與batch之間的處理是強順序性的, 而batch内部是可以并行的。下面這個是設計圖:

Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

是以如果你每個batch處理1000個tuple的話, 那麼你的應用将會少1000倍的資料庫調用。同時它利用了storm的并行計算能力(每個batch内部可以并行)

雖然這個設計比第一個設計好多了, 它仍然不是一個完美的方案。topology裡面的worker會花費大量的時間等待計算的其它部分完成。 比如看下面的這個計算。

Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

在bolt 1完成它的處理之後, 它需要等待剩下的bolt去處理目前batch, 直到發射下一個batch。

第三個設計(storm采用的設計)

一個我們需要意識到的比較重要的問題是,為了實作transactional的特性,在處理一批tuples的時候,不是所有的工作都需要強順序性的。比如,當做一個全局計數應用的時候, 整個計算可以分為兩個部分。

  • 計算這個batch的局部數量。
  • 把這個batch的局部數量更新到資料庫裡面去。

其中第二步在多個batch之前需要保證強的順序性, 但是第一步并不許要, 是以我們可以把第一步并行化。是以當第一個batch在更新它的個數進入資料庫的時候,第2到10個batch可以開始計算它們的局部數量了。

Storm通過把一個batch的計算分成兩個階段來實作上面所說的原理:

  • processing階段: 這個階段很多batch可以并行計算。
  • commit階段: 這個階段各個batch之間需要有強順序性的保證。是以第二個batch必須要在第一個batch成功送出之後才能送出。

這兩個階段合起來稱為一個transaction。許多batch可以在processing階段的任何時刻并行計算,但是隻有一個batch可以處在commit階段。如果一個batch在processing或者commit階段有任何錯誤, 那麼整個transaction需要被replay。

設計細節

當使用Transactional Topologies的時候, storm為你做下面這些事情:

1) 管理狀态: Storm把所有實作Transactional Topologies所必須的狀态儲存在zookeeper裡面。 這包括目前transaction id以及定義每個batch的一些中繼資料。

2) 協調事務: Storm幫你管理所有事情, 以幫你決定在任何一個時間點是該proccessing還是該committing。

3) 錯誤檢測: Storm利用acking架構來高效地檢測什麼時候一個batch被成功處理了,被成功送出了,或者失敗了。Storm然後會相應地replay對應的batch。你不需要自己手動做任何acking或者anchoring — storm幫你搞定所有事情。

4) 内置的批處理API: Storm在普通bolt之上包裝了一層API來提供對tuple的批處理支援。Storm管理所有的協調工作,包括決定什麼時候一個bolt接收到一個特定transaction的所有tuple。Storm同時也會自動清理每個transaction所産生的中間資料。

5) 最後,需要注意的一點是Transactional Topologies需要一個可以完全重發(replay)一個特定batch的消息的隊列系統(Message Queue)。 Kestrel 之類的技術做不到這一點。而 Apache的Kafka 對于這個需求來說是正合适的。 storm-contrib 裡面的 storm-kafka 實作了這個。

一個基本的例子

你可以通過使用 TransactionalTopologyBuilder 來建立transactional topology. 下面就是一個transactional topology的定義, 它的作用是計算輸入流裡面的tuple的個數。這段代碼來自storm-starter裡面的 TransactionalGlobalCount 。

Storm入門(十)Twitter Storm: Transactional Topolgoy簡介
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(
           DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
           "global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
        .shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");      
Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

TransactionalTopologyBuilder

 接受如下的參數

  • 這個transaction topology的id
  • spout在整個topology裡面的id。
  • 一個transactional spout。
  • 一個可選的這個transactional spout的并行度。

topology的id是用來在zookeeper裡面儲存這個topology的目前進度的,是以如果你重新開機這個topology, 它可以接着前面的進度繼續執行。

一個transaction topology裡面有一個唯一的 

TransactionalSpout

 , 這個spout是通過 

TransactionalTopologyBuilder

 的構造函數來制定的。在這個例子裡面, 

MemoryTransactionalSpout

 被用來從一個記憶體變量裡面讀取資料(DATA)。第二個參數制定資料的fields, 第三個參數指定每個batch的最大tuple數量。關于如何自定義 

TransactionalSpout

 我們會在後面介紹。

現在說說 bolts。這個topology并行地計算tuple的總數量。第一個 

bolt:BatchBolt

 ,随機地把輸入tuple分給各個task,然後各個task各自統計局部數量。第二個 

bolt:UpdateBlobalCount

 , 用全局grouping來從彙總這個batch的總的數量。然後再把總的數量更新到資料庫裡面去。

下面是 

BatchCount

 的定義:

Storm入門(十)Twitter Storm: Transactional Topolgoy簡介
public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;

    int _count = 0;

    @Override
    public void prepare(Map conf, TopologyContext context,
                BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }

    @Override
    public void execute(Tuple tuple) {
        _count++;
    }

    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "count"));
    }
}      
Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

storm會為每個batch建立這個一個 

BatchCount

 對象。而這些 

BatchCount

 是運作在 

BatchBoltExecutor

 裡面的。而 

BatchBoltExecutor

 負責建立以及清理這個對象的執行個體。

這個對象的prepare方法接收如下參數:

  • 包含storm config資訊的map。
  • TopologyContext
  • OutputCollector
  • 這個batch的id。而在Transactional Topologies裡面, 這個id則是一個 TransactionAttempt對象。

這個batch bolt的抽象在DRPC裡面也可以用, 隻是id的類型不一樣而已。BatchBolt其實真的接收一個id類型的參數 — 它是一個java模闆類,是以如果你隻是想在transactioinal topology裡面使用這個BatchBolt,你可以這樣定義:

public abstract class BaseTransactionalBolt
       extends BaseBatchBolt<TransactionAttempt> {
}      

在transaction topology裡面發射的所有的tuple都必須以 

TransactionAttempt

 作為第一個field, 然後storm可以根據這個field來判斷哪些tuple屬于一個batch。是以你在發射tuple的時候需要滿足這個條件。

TransactionAttempt

 包含兩個值: 一個transaction id,一個attempt id。transaction id的作用就是我們上面介紹的對于每個batch是唯一的,而且不管這個batchreplay多少次都是一樣的。attempt id是對于每個batch唯一的一個id, 但是對于統一個batch,它replay之後的attempt id跟replay之前就不一樣了, 我們可以把attempt id了解成replay-times, storm利用這個id來差別一個batch發射的tuple的不同版本。

transaction id對于每個batch加一, 是以第一個batch的transaction id是”1″, 第二個batch是”2″,以此類推。

execute方法會為batch裡面的每個tuple執行一次,你應該把這個batch裡面的狀态保持在一個本地變量裡面。對于這個例子來說, 它在execute方法裡面遞增tuple的個數。

最後, 當這個bolt接收到某個batch的所有的tuple之後, finishBatch方法會被調用。這個例子裡面的BatchCount類會在這個時候發射它的局部數量到它的輸出流裡面去。

下面是 

UpdateGlobalCount

 類的定義。

Storm入門(十)Twitter Storm: Transactional Topolgoy簡介
public static class UpdateGlobalCount
           extends BaseTransactionalBolt
           implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;

    int _sum = 0;

    @Override
    public void prepare(Map conf,
                        TopologyContext context,
                        BatchOutputCollector collector,
                        TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }

    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }

    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null ||
                !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "sum"));
    }
}      
Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

UpdateGlobalCount

 是Transactional Topologies相關的類, 是以它繼承自 

BaseTransactionalBolt

 。在execute方法裡面, 

UpdateGlobalCount

 累積這個batch的計數, 比較有趣的是finishBatch方法。

首先, 注意這個bolt實作了 

ICommitter

 接口。這告訴storm要在這個事務的commit階段調用 

finishBatch

 方法。是以對于finishBatch的調用會保證強順序性(順序就是transaction id的升序), 而相對來說execute方法在任何時候都可以執行,processing或者commit階段都可以。另外一種把bolt辨別為commiter的方法是調用 

TransactionalTopologyBuilder

 的 

setCommiterBolt

 來添加Bolt(而不是setBolt)。

UpdateGlobalCount

 裡面finishBatch方法的邏輯是首先從資料庫中擷取目前的值,并且把資料庫裡面的transaction id與目前這個batch的transaction id進行比較。如果他們一樣, 那麼忽略這個batch。否則把這個batch的結果加到總結果裡面去,并且更新資料庫。

關于transactional topology的更深入的例子可以卡看storm-starter裡面的 TransactionalWords類, 這個類裡面會在一個事務裡面更新多個資料庫。

Transactional Topology API

這一節介紹Transaction topology API

Bolts

在一個transactional topology裡面最多有三種類型的bolt:

  • BasicBolt : 這個bolt不跟batch的tuple打交道,它隻基于單個tuple的輸入來發射新的tuple。
  • BatchBolt : 這個bolt處理batch在一起的tuples。對于每一個tuple調用execute方法。而在整個batch處理完成的時候調用finishBatch方法
  • 被标記成Committer的BatchBolt: 和普通的BatchBolt的唯一的差別是finishBatch這個方法被調用的時機。作為committer的BatchBolt的finishBatch方法在commit階段調用。一個batch的commit階段由storm保證隻在前一個batch成功送出之後才會執行。并且它會重試直到topology裡面的所有bolt在commit完成送出。有兩個方法可以讓一個普通BatchBolt變成committer: 1) 實作 ICommitter 接口 2) 通過TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology裡面去。
    Processing phase vs. commit phase in bolts
    為了搞清除processing階段與commit階段的差別, 讓我們看個例子:
    Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

    在這個topology裡面隻有用紅線标出來的是committers。

    在processing階段, bolt A會處理從spout發射出來的整個batch。并且發射tuple給bolt B和bolt C。Bolt B是一個committer, 是以它會處理所有的tuple, 但是不會調用 

    finishBatch

     方法。Bolt C同樣也不會調用 

    finishBatch

     方法, 它的原因是:它不知道它有沒有從Bolt B接收到所有的tuple。(因為Bolt B還在等着事務送出)最後Bolt D會接收到Bolt C在調用execute方法的時候發射的所有的tuple。

    當batch送出的時候, Bolt B上的 

    finishBatch

     被調用。Bolt C現在可以判斷它接收到了所有的tuple, 是以可以調用 

    finishBatch

     了。最後Bolt D接收到了它的所有的tuple是以就調用finishBatch了。

    要注意的是,雖然Bolt D是一個committer, 它在接收到整個batch的tuple之後不需要等待第二個commit信号。因為它是在commit階段接收到的整個batch,它會調用finishBatch來完成整個事務。

    Acking
    注意, 你不需要顯式地去做任何的acking或者anchoring。storm在背後都做掉了。(storm對transactional topolgies裡面的acking機制進行了高度的優化)
    Failing a transaction
    在使用普通bolt的時候, 你可以通過調用OutputCollector的fail方法來fail這個tuple所在的tuple樹。由于Transactional Topologies把acking架構從使用者的視野裡面隐藏掉了, 它提供一個不同的機制來fail一個batch(進而使得這個batch被replay)。隻要抛出一個 FailedException 就可以了。跟普通的異常不一樣, 這個異常隻會導緻目前的batch被replay, 而不會使整個程序crash掉。
    Transactional spout

    TransactionalSpout接口跟普通的Spout接口完全不一樣。一個TransactionalSpout的實作一個batch一個batch的tuple, 而且必須保證同一個batch的transaction id始終一樣。

    在transactional topology中運作的時候, transactional spout看起來是這樣的一個結構:

    Storm入門(十)Twitter Storm: Transactional Topolgoy簡介

    在圖的左邊的coordinator是一個普通的storm的spout — 它一直為事務的batch發射tuple。Emitter則像一個普通的storm bolt,它負責為每個batch實際發射tuple。emitter以all grouping的方式訂閱coordinator的”batch emit”流。

    由于TransactionalSpout發射的tuple可能需要會被replay, 是以需要具有幂等性(否則多次replay同一個tuple會使得最後的結果不對), 為了實作幂等性,需要儲存Transactional Spout的少量的狀态,這個狀态是儲存在ZooKeeper裡面的。

    關于如何實作一個 

    TransactionalSpout

     的細節可以參見 Javadoc 。
    Partitioned Transactional Spout
    一種常見的TransactionalSpout是那種從多個queue broker奪取資料然後再發射的tuple。比如 TransactionalKafkaSpout 是這樣工作的。 

    IPartitionedTransactionalSpout

     把這些管理每個分區的狀态以保證可以replay的幂等性的工作都自動化掉了。更多可以參考 Javadoc
    配置
    Transactional Topologies有兩個重要的配置:
    • Zookeeper: 預設情況下,transactional topology會把狀态資訊儲存在主zookeeper裡面(協調叢集的那個)。你可以通過這兩個配置來指定其它的zookeeper:” 

      transactional.zookeeper.servers

       ” 和 “ 

      transactional.zookeeper.port

       “。
    • 同時活躍的batch數量:你必須設定同時處理的batch數量。你可以通過” 

      topology.max.spout.pending

       ” 來指定, 如果你不指定,預設是1。

    實作

    Transactional Topologies的實作是非常優雅的。管理送出協定,檢測失敗并且串行送出看起來很複雜,但是使用storm的原語來進行抽象是非常簡單的。
    • transactional topology裡面的spout是一個子topology, 它由一個spout和一個bolt組成。
      • spout是協調者,它隻包含一個task。
      • bolt是發射者
      • bolt以all grouping的方式訂閱協調者的輸出。
      • 中繼資料的序列化用的是kryo。
    • 協調者使用acking架構來決定什麼時候一個batch被成功執行完成,然後去決定一個batch什麼時候被成功送出。
    • 狀态資訊被以 

      RotatingTransactionalState

       的形式儲存在zookeeper裡面了。
    • commiting bolts以all grouping的方式訂閱協調者的commit流。
    • CoordinatedBolt被用來檢測一個bolt是否收到了一個特定batch的所有tuple。
      • 這一點上面跟DRPC裡面是一樣的。
      • 對于commiting bolt來說, 他會一直等待, 知道從coordinator的commit流裡面接收到一個tuple之後,它才會調用 

        finishBatch

         方法。
      • 是以在沒有從coordinator的commit流接收到一個tuple之前,committing bolt不可能調用 

        finishBolt

         方法。