Storm介紹
更多幹貨
- 分布式實戰(幹貨)
- spring cloud 實戰(幹貨)
- mybatis 實戰(幹貨)
- spring boot 實戰(幹貨)
- React 入門實戰(幹貨)
- 建構中小型網際網路企業架構(幹貨)
- python 學習持續更新
- ElasticSearch 筆記
- kafka storm 實戰 (幹貨)
- scala 學習持續更新
- RPC
- 深度學習
- GO 語言 持續更新
Storm 大資料實事計算系統,是Twitter開源的一個分布式的實時計算系統
全量資料處理使用的大多是鼎鼎大名的hadoop、hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量資料處理上得到了廣泛的使用,但是,hadoop不擅長實時計算,因為它天然就是為批處理而生的,這也是業界的共識。而s4,storm,puma,spark等則是實時計算系統。
優點
- 簡單程式設計模型。類似于MapReduce降低了并行批處理的複雜性,Strom降低了實時處理的複雜性。
- 服務化,一個服務架構支援熱部署,即時上線或下線
- 分布式 可橫向拓展,現在的項目不帶個分布式特性都不好意思開源
- 高可靠性
- 程式設計模型簡單
- 高效實時
- 可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責消息的重試。Storm創新性提出的ack消息追蹤架構和複雜的事務性處理,能夠滿足很多級别的資料處理需求。不過,越高的資料處理需求,性能下降越嚴重。
- 快速。系統的設計保證了消息能得到快速的處理,zeroMQ作為其底層消息隊列
- 本地模式。storm有一個“本地模式”,可以在處理過程中完全模拟storm叢集。這讓你可以快速進行開發
- 多語言。 實際上,Storm的多語言更像是臨時添加上去似的。因為,你的送出部分還是要使用Java實作。
使用場景:資料的實時,持續計算,分布式RPC等。
發展
- 有50個大大小小的公司在使用Storm,相信更多的不留名的公司也在使用。這些公司中不乏淘寶,百度,Twitter,Groupon,雅虎等重量級公司。
- 從開源時候的0.5.0版本,到現在的0.8.0+,和即将到來的0.9.0+。先後添加了以下重大的新特性:
- 使用kryo作為Tuple序列化的架構(0.6.0)
- 添加了Transactional topologies(事務性拓撲)的支援(0.7.0)
- 添加了Trident的支援(0.8.0)
- 引入netty作為底層消息機制(0.9.0)
目前
Storm被廣泛應用于實時分析,線上機器學習,持續計算、分布式遠端調用等領域。來看一些實際的應用:
- 一淘-實時分析系統pora:實時分析使用者的屬性,并回報給搜尋引擎。最初,使用者屬性分析是通過每天在雲梯上定時運作的MR job來完成的。為了滿足實時性的要求,希望能夠實時分析使用者的行為日志,将最新的使用者屬性回報給搜尋引擎,能夠為使用者展現最貼近其目前需求的結果。
- 攜程-網站性能監控:實時分析系統監控攜程網的網站性能。利用HTML5提供的performance标準獲得可用的名額,并記錄日志。Storm叢集實時分析日志和入庫。使用DRPC聚合成報表,通過曆史資料對比等判斷規則,觸發預警事件。如果,業務場景中需要低延遲的響應,希望在秒級或者毫秒級完成分析、并得到響應,而且希望能夠随着資料量的增大而拓展。那就可以考慮下,使用Storm了。
- 試想下,如果,一個遊戲新版本上線,有一個實時分析系統,收集遊戲中的資料,營運或者開發者可以在上線後幾秒鐘得到持續不斷更新的遊戲監控報告和分析結果,然後馬上針對遊戲的參數和平衡性進行調整。這樣就能夠大大縮短遊戲疊代周期,加強遊戲的生命力(實際上,zynga就是這麼幹的!雖然使用的不是Storm……Zynga研發之道探秘:用資料說話)。
- 除了低延遲,Storm的Topology靈活的程式設計方式和分布式協調也會給我們帶來友善。使用者屬性分析的項目,需要處理大量的資料。使用傳統的MapReduce處理是個不錯的選擇。但是,處理過程中有個步驟需要根據分析結果,采集網頁上的資料進行下一步的處理。這對于MapReduce來說就不太适用了。但是,Storm的Topology就能完美解決這個問題。基于這個問題,我們可以畫出這樣一個Storm的Topology的處理圖。
- 個性化推薦與應用:網頁嵌入廣告(個性化推銷,根據使用者購買及其搜尋)
- 視訊推薦系統:優酷登入後分析以往的觀看記錄來進行推薦。
- 推薦系統介紹:統計分析使用者以往的觀看記錄,将統計後的結果作為推薦的依據,然後将視訊個性化的推薦給使用者,提高使用者觀看視訊的可能性。
- 資料過濾,對使用者發送的短信、微網誌等敏感詞實時的過濾,對包含敏感詞的資訊進行儲存,并監控發送消息的主體。
Storm概念
Spout(消息源)
- Spout是Storm裡面特有的名詞, Stream的源頭. 通常是從外部資料源讀取tuples, 并emit到topology.
- Spout可以同時emit多個tuple stream, 通過OutputFieldsDeclarer中的declareStream method來定義
- Spout需要實作IRichSpout接口, 最重要的方法是nextTuple, storm會不斷調用該接口從spout中取資料
- 同時需要注意, Spout分為reliable or unreliable兩種, 對于reliable, 還支援ack和fail方法, 具體參考"Reliability”
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike",
"jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
Bolt(消息處理者)
- Bolt支援多個輸入流和emit多個輸出流, 輸出流和spout一樣, 通過OutputFieldsDeclarer中的declareStream method來定義; 對于輸入流, 如果想subscribe上層節點的多個輸出streaming, 需要顯式的通過stream_id去訂閱, 如果不明确指定stream_id, 預設會訂閱default stream.
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Stream grouping(資料的分發方式)
Topology(拓撲)
- 可以了解為類似MapReduce job
- 根本差別, MR job執行完就結束, 而Topology會一直存在. 因為MR流動的是代碼, 而Storm流動的資料.
- 是以Storm不可能替代MR, 因為對于海量資料, 資料的流動是不合理的
- 另一個差別, 我自己的想法, Topology對工作流有更好的支援, 而MR job往往隻能完成一個map/reduce的過程, 而對于複雜的操作, 需要多個MR job才能完成.
- 而Topology的定義更加靈活, 可以簡單的使用一個topology支援比較複雜的工作流場景
- Storm Topology是基于Thrift結構, 并且Nimbus是個Thrift server, 是以對于Topology可以用任何語言實作, 最終都是轉化為Thrift結構
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new RandomSentenceSpout(), 5 );
builder.setBolt(2, new SplitSentence(), 8 ).shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 12).fieldsGrouping(2, new Fields("word"));
- Topology有一個spout, 兩個bolt. setSpout和setBolt的參數都是一樣, 分别為id(在Topology中的唯一辨別); 處理邏輯(對于Spout就是資料産生function); 并發線程數(task數)
- 其中對于spout需要實作IRichSpout接口, 而bolt需要實作IRichBolt接口
- 比較特别的是, setBolt方法會傳回一個InputDeclarer對象, 并且該對象是用來定義Bolt輸入的, 比如上面.shuffleGrouping(1), 用1(spout)的輸出流作為輸入
Worker(工作程序)
Task(執行具體邏輯的任務)
Executor(執行Task的線程)
- 一個Topology可以包含一個或多個worker(并行的跑在不同的machine上), 是以worker process就是執行一個topology的子集, 并且worker隻能對應于一個topology
- 一個worker可用包含一個或多個executor, 每個component (spout或bolt)至少對應于一個executor, 是以可以說executor執行一個compenent的子集, 同時一個executor隻能對應于一個component
-
Task就是具體的處理邏輯對象, 一個executor線程可以執行一個或多個tasks
但一般預設每個executor隻執行一個task, 是以我們往往認為task就是執行線程, 其實不然
task代表最大并發度, 一個component的task數是不會改變的, 但是一個componet的executer數目是會發生變化的
當task數大于executor數時, executor數代表實際并發數
Configuration(配置)
- 對于并發度的配置, 在storm裡面可以在多個地方進行配置, 優先級如上面所示...
- worker processes的數目, 可以通過配置檔案和代碼中配置, worker就是執行程序, 是以考慮并發的效果, 數目至少應該大于machines的數目
- executor的數目, component的并發線程數,隻能在代碼中配置(通過setBolt和setSpout的參數), 例如, setBolt("green-bolt", new GreenBolt(), 2)
- tasks的數目, 可以不配置, 預設和executor1:1, 也可以通過setNumTasks()配置
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4) //set tasks number to 4
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
-
代碼, 很清晰, 通過setBolt和setSpout一共定義2+2+6=10個executor threads.并且同setNumWorkers設定2個workers, 是以storm會平均在每個worker上run 5個executors
而對于green-bolt, 定義了4個tasks, 是以每個executor中有2個tasks
rebalancing動态的改變并發度
- Storm支援在不restart topology的情況下, 動态的改變(增減)worker processes的數目和executors的數目, 稱為rebalancing.
- 通過Storm web UI, 或者通過storm rebalance指令
常用的類
BaseRichSpout (消息生産者)
BaseBasicBolt(消息處理者)
TopologyBuilder(拓撲的建構器)
Values(将資料存放的values,發送到下個元件)
Tuple
- (發送的資料被封裝到Tuple,可以通tuple接收上個元件發送的消息)
- 命名的value序列, 可以了解成Key/value序列, 每個value可以是任何類型, 動态類型不需要事先聲明.
- Tuple在傳輸中需要序列化和反序列化, storm內建了普通類型的序列化子產品, 使用者可以自定義特殊類型的序列化邏輯
Config(配置)
StormSubmitter / LocalCluster (拓撲送出器)
storm裡面各個對象的示意圖

計算拓補:Topology
- 一個實時計算應用程式的邏輯在storm裡面被封裝到topology對象裡面, 我把它叫做計算拓補. Storm裡面的topology相當于Hadoop裡面的一個MapReduce Job, 它們的關鍵差別是:一個MapReduce Job最終總是會結束的, 然而一個storm的topoloy會一直運作 — 除非你顯式的殺死它。 一個Topology是Spouts和Bolts組成的圖狀結構, 而連結Spouts和Bolts的則是Stream groupings。
消息源: Spout
- 消息源Spouts是storm裡面一個topology裡面的消息生産者。一般來說消息源會從一個外部源讀取資料并且向topology裡面發出消息: tuple。 消息源Spouts可以是可靠的也可以是不可靠的。一個可靠的消息源可以重新發射一個tuple如果這個tuple沒有被storm成功的處理, 但是一個不可靠的消息源Spouts一旦發出一個tuple就把它徹底忘了 — 也就不可能再發了。
- 消息源Spouts可以發射多條消息流stream。要達到這樣的效果, 使用OutFieldsDeclarer.declareStream來定義多個stream,然後使用SpoutOutputCollector來發射指定的sream。
消息處理者: Bolt
- 所有的消息處理邏輯被封裝在bolts裡面。Bolts可以做很多事情: 過濾, 聚合, 查詢資料庫等等。
- Bolts的主要方法是execute,它以一個tuple作為輸入,Bolts使用OutputCollector來發射tuple, Bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知storm這個tuple被處理完成了。– 進而我們通知這個tuple的發射者Spouts。 一般的流程是:Bolts處理一個輸入tuple, 發射0個或者多個tuple,然後調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack
Task:任務
- 每一個Spout和Bolt會被當作很多task在整個叢集裡面執行。預設情況下每一個task對應到一個線程(Executor),這個線程用來執行這個task,而stream grouping則是定義怎麼從一堆task發射tuple到另外一堆task。
配置Configuration
- storm裡面有一堆參數可以配置來調整nimbus, supervisor以及正在運作的topology的行為, 一些配置是系統級别的, 一些配置是topology級别的。所有有預設值的配置的預設配置是配置在default.xml裡面的。你可以通過定義個storm.xml在你的classpath厘米來覆寫這些預設配置。并且你也可以在代碼裡面設定一些topology相關的配置資訊 – 使用StormSubmitter。當然,這些配置的優先級是: default.xml < storm.xml < TOPOLOGY-SPECIFIC配置。
消息流:Stream
- 消息流是storm裡面的最關鍵的抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuples會被以一種分布式的方式并行地建立和處理。 對消息流的定義主要是對消息流裡面的tuple的定義, 我們會給tuple裡的每個字段一個名字。 并且不同tuple的對應字段的類型必須一樣。 也就是說: 兩個tuple的第一個字段的類型必須一樣, 第二個字段的類型必須一樣, 但是第一個字段和第二個字段可以有不同的類型。 在預設的情況下,tuple的字段類型可以是:integer, long, short, byte, string, double, float, boolean和byte array。 你還可以自定義類型 — 隻要你實作對應的序列化器
消息分發政策:Stream groupings
- Shuffle Grouping:随機分組,随機派發stream裡面的tuple,保證每個bolt接收到的tuple數目相同。
- Fields Grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts,而不同的userid則會被配置設定到不同的Bolts。
- All Grouping:廣播發送,對于每一個tuple,所有的Bolts都會收到。
- Global Grouping: 全局分組,這個tuple被配置設定到storm中的一個bolt的其中一個task。再具體一點就是配置設定給id值最低的那個task。
- Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果,有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裡面去執行。
- Direct Grouping:直接分組, 這是一種比較特别的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。隻有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來擷取處理它的消息的taskid (OutputCollector.emit方法也會傳回taskid)
- Local or shuffle grouping:如果目标bolt有一個或者多個task在同一個工作程序中,tuple将會被随機發生給這些tasks。否則,和普通的Shuffle Grouping行為一緻。
Storm 消息機制
首先, 所有tuple都有一個唯一辨別msgId, 當tuple被emit的時候确定
_collector.emit(new Values("field1", "field2", 3) , msgId);
其次, 看看下面的ISpout接口, 除了擷取tuple的nextTuple
還有ack和fail, 當Storm detect到tuple被fully processed, 會調用ack, 如果逾時或detect fail, 則調用fail
此處需要注意的是, tuple隻有在被産生的那個spout task上可以被ack或fail, 具體原因看後面的實作解釋就了解了
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
最後, 在spout怎麼實作的, 其實比較簡單.
對于Spout queue, get message隻是open而不是pop, 并且把tuple狀态改為pending, 防止該tuple被多次發送.
一直等到該tuple被ack, 才真正的pop該tuple, 當然該tuple如果fail, 就重新把狀态改回初始狀态
這也解釋, 為什麼tuple隻能在被emit的spout task被ack或fail, 因為隻有這個task的queue裡面有該tuple
前面一直沒有說明的一個問題是, storm本身通過什麼機制來判斷tuple是否成功被fully processed?
要解決這個問題, 可以分為兩個問題,
- 如何知道tuple tree的結構?
- 如何知道tuple tree上每個節點的運作情況, success或fail?
答案很簡單, 你必須告訴它, 如何告訴它?
- 對于tuple tree的結構, 需要知道每個tuple由哪些tuple産生, 即tree節點間的link
- tree節點間的link稱為anchoring. 當每次emit新tuple的時候, 必須顯式的通過API建立anchoring
_collector.emit(tuple, new Values(word));
emit的第一個參數是tuple, 這就是用于建anchoring
當然你也可以直接調用unanchoring的emit版本, 如果不需要保證reliable的話, 這樣效率會比較高
_collector.emit(new Values(word));
同時前面說了, 可能一個tuple依賴于多個輸入,
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
對于Multi-anchoring的情況會導緻tuple tree變為tuple DGA, 目前storm的版本已經可以很好的支援DAG
對于tuple tree上每個節點的運作情況, 你需要在每個bolt的邏輯處理完後, 顯式的調用OutputCollector的ack和fail來彙報
看下面的例子, 在execute函數的最後會調用
_collector.ack(tuple);
我比較迷惑, 為啥ack是OutputCollector的function, 而不是tuple的function?
而且就算ack也是應該對bolt的input進行ack, 為啥是output, 可能因為所有input都是其他bolt的output産生...這個設計的比較不合理
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
storm為了保證reliable, 必然是要犧牲效率的, 此處storm會在task memory裡面去記錄你彙報的tuple tree的結構和運作情況.
而隻有當某tuple節點被ack或fail後才會被從記憶體中删除, 是以如果你總是不去ack或fail, 那麼會導緻task的out of memory
簡單的版本, BasicBolt
上面的機制, 會給程式員造成負擔, 尤其對于很多簡單的case, 比如filter, 每次都要去顯式的建立anchoring和ack…
是以storm提供簡單的版本, 會自動的建立anchoring, 并在bolt執行完自動調用ack
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
記錄級容錯
首先來看一下什麼叫做記錄級容錯?Storm允許使用者在spout中發射一個新的源tuple時為其指定一個message id,這個message id可以是任意的object對象。多個源tuple可以公用一個message id, 表示這多個源tuple對使用者來說是同一個消息單元。Storm中記錄級容錯的意思是說,storm會告知使用者每一個消息單元是否在指定時間内被完全處理了。那什麼叫做完全處理呢,就是該message id綁定的源tuple及由該源tuple後續生成的tuple經過了topology中每一個應該到達的bolt的處理。
舉個例子。在圖中,在spout由message 1綁定的tuple1和tuple2經過了bolt1和bolt2處理生成兩個新的tuple,并流向了bolt3。當這個過程完全處理完時,稱message 1被完全處理了
在storm的topology中有一個系統級元件,叫做acker。這個acker的任務就是跟蹤從spout中流出來的每一個message id綁定的若幹tuple的處理路徑,如果在使用者設定的最大逾時時間内這些tuple沒有被完全處理,那麼acker就 會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛才的描述中,我們提到了 “記錄tuple的處理路徑”。采用一數學定理:異或。
- A xor A = 0
- A xor B.... Xor B xor A = 0, 其中每一個操作數出現且僅出現兩次。
Storm中使用的巧妙方法就是基于這個定理。具體過程:在spout中系統會為使用者指定message id生成一個對應的64位整數,作為一個root id. root id會傳遞給acker及後續的bolt作為該消息單元的唯一辨別。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予改tuple一個64位的整數的id。Spout發射完某個message id 對應的源tuple之後,會告知acker自己發射的root id 及生成的那些源tuple的id.
而bolt呢,每次接收到一個輸入tuple處理完之後,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker隻需要對這些id做一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面圖示說明:
可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生産的tuple id并不是完全各異,acker可能會在消息單元完全處理完成之前就錯誤的計算為0.這個錯誤在理論上的确是存在的,但是在實際中其概念極低,完全可以忽略。
Storm DRPC實戰
DRPC ,Distributed Remote Procedure Call
- RPC本身是個成熟和古老的概念, Storm裡面引入DRPC主要是利用storm的實時計算能力來并行化CPU intensive的計算
- DRPC, 隻是storm應用的一個場景, 并且storm提供相應的程式設計架構, 以友善程式員
- 提供DRPC server的實作, 并提供對DRPC場景經行封裝的Topology
- 對于storm, Topology内部其實是沒有任何差別的, 主要就是實作和封裝Topology和DRPC Server之間互動的spout和bolt
具體互動過程如下圖
public static class ExclaimBolt implements IBasicBolt {
public void prepare(Map conf, TopologyContext context) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder
= new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
// ...
}
使用LinearDRPCTopologyBuilder
- 首先, 使用builder建立topology的時候, topology name要等于function name(PRC call)
- 其次, 這裡不需要指定spout, 因為已經做了封裝, 會自動從DRPC server讀取資料
- 再者, 所有bolt的輸出的第一個field必須是request-id, 最後一般bolt的輸出一定是(request-id, result)
- 最後, builder會自動将結果發給DRPC server
LinearDRPC例子
前面的例子, 無法說明為什麼要使用storm來實作RPC, 那個操作直接在RPC server上就可以完成
當隻有對并行計算和資料量有一定要求時, 才能展現出價值...
ReachTopology, 每個人發送的URL都會被所有follower收到, 是以要計算某URL的reach, 隻需要如下幾步,
找出所有發送該URL的tweeter, 取出他們的follower, 去重複, 計數
public class ReachTopology {
public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {
{
put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
put("engineering.twitter.com/blog/5", Arrays.asList("admam", "david", "sally", "nathan"));
}
};
public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {
{
put("sally", Arrays.asList("bob", "time", "alice", "adam", "jai"));
put("bob", Arrays.asList("admam", "david", "vivian", "nathan"));
}
};
public static class GetTweeters extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object id = tuple.getValue(0);
String url = tuple.getString(1);
List<String> tweeters = TWEETERS_DB.get(url);
if (tweeters != null) {
for (String tweeter: tweeters) {
collector.emit(new Values(id, tweeter));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "tweeter"));
}
}
public static class GetFollowers extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object id = tuple.getValue(0);
String tweeter = tuple.getString(1);
List<String> followers = FOLLOWERS_DB.get(url);
if (followers != null) {
for (String follower: followers) {
collector.emit(new Values(id, follower));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "follower"));
}
}
public static class PartialUniquer extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Set<String> _followers = new HashSet<String>();
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_followers.add(tuple.getString(1));
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _followers.size()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}
}
public static class CountAggregator extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count += tuple.getInteger(1);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "reach"));
}
}
public static LinearDRPCTopologyBuilder construct() {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 4);
builder.addBolt(new GetFollowers, 12).shuffleGrouping;
builder.addBolt(new PartialUniquer(), 6).filedsGrouping(new Fields("id","follower"));
builder.addBolt(new CountAggregator(), 3).filedsGrouping(new Fields("id"));
return builder;
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = construct();
Config conf = new Config();
if (args == null || args.length ==0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("reach-drpc",conf, builder.createLocalTopology(drpc));
String urlsToTry = new String[] { "foo.com/blog/1","engineering.twitter.com/blog/5"}
cluster.shutdown();
drpc.shutdown();
} else {
config.setNumberWorkers(6);
StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
}
Storm 配置詳解
- Storm.zookeeper.servers zookeeper伺服器清單
- Storm.zookeeper.port zookeeper 連接配接端口
- Storm.local.dir storm使用的本地檔案系統目錄(必須存在并且storm程序可讀寫)
- Storm.cluster.mode storm叢集運作模式([distributed][local])
- Storm.local.mode.zmq zookeeper中storm的根目錄位置
- Storm.zookeeper.session.timeout 用戶端連接配接zookeeper逾時時間
- Storm.id 運作中拓撲的id,由storm name和一個唯一随機數組成。
- Nimbus.host nimbus伺服器位址
- Nimbus.thrift.port nimbus的thrift監聽端口
- Nimbus.childopts 通過storm-deploy項目部署時指定給nimbus程序的jvm選項
- Nimbus.task.timeout.secs 心跳逾時時間,逾時後nimbus會認為task死掉并重配置設定給一個位址。
- Nimbus.monitor.freq.secs nimbus檢查心跳和重新配置設定任務的時間間隔。注意如果是機器宕掉nimbus會立即接管并處理
- Nimbus.supervisor.timeout supervisor的心跳逾時時間,一旦超過nimbus會認為該supervisor已死并停止為它分發新任務。
- Nimbus.task.launch.secs task啟動時的一個特殊逾時設定。在啟動後第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs.
- Nimbus.reassign 當發現task失敗時nimbus是否重新配置設定執行。預設為真,不建議修改。
- Nimbus.file.copy.expiration.secs nimbus判斷上傳、下載下傳連結的逾時時間,當空閑時間超過該設定時nimbus會認為連結死掉并主動斷開
- Ui.port storm ui的服務端口
- Drpc.servers DRPC伺服器清單,以便DRPCSpout知道和誰通訊
- Drpc.port storm drpc的服務端口
- Supervisor.slots.ports supervisor上能夠運作workers的端口清單。每個worker占用一個端口,且每個端口隻運作一個worker。通過這項配置可以調整每台機器上運作的worker數
- Supervisor.childopts 在storm-deploy項目中使用,用來配置supervisor守護程序的jvm選項
- Supervisor.worker.timeout.secs supervisor中的worker心跳逾時時間,一旦逾時supervisor會嘗試重新開機worker程序
- Supervisor.worker.start.timeout.secs supervisor初始啟動時,worker的心跳逾時時間,當超過該時間,supervisor會嘗試啟動worker。因為JVM初始啟動和配置會帶來的額外消耗,進而使得第一次心跳會超過supervisor.worker.timeout.secs的設定
- Supervisor.enable supervisor是否應當運作配置設定給他的worker。預設為true.該選項用來進行storm的單元測試,一般不應該修改。
- Supervisor.hearbeat.frequency.secs supervisor心跳發送頻率(多久發送一次)
- Supervisor.monitor.frequency.secs supervisor檢查worker心跳的頻率
- Worker.childopts supervisor啟動worker時使用的jvm選項。所有的“%ID%”字串會替換為對應worker的辨別符
- Worker.heartbeat.frequency.secs worker的心跳發送時間間隔
- Task.hearbeat.frequency.secs task彙報狀态心跳時間間隔
- Task.refresh.poll.secs task與其他tasks之間連結同步的頻率。(如果task被重配置設定,其他tasks向它發送消息需要重新整理連接配接)。一般來講,重配置設定發生時其他tasks會立即得到通知。該配置僅僅為了防止未通知的情況。
- Topology.debug 如果設定為true,storm将記錄發射的每條資訊
- Topology.optimize master 是否在合适時機通過在單個線程内運作多個task以達到優化topologies的目的。
- Topology.workers 執行該topology叢集中應當啟動的程序數量。每個程序内部将以線程方式執行一定數目的tasks.topology的元件結合該參數和并行度提示來優化性能
- Topology.ackers topology中啟動的acker任務數。Acker儲存由spout發送的tuples的記錄。并探測tuple何時被完全處理。當acker探測到tuple被處理完畢時會向spout發送确認資訊。通常應當根據topology的吞吐量來确認acker的數目。但一般不需要太多。當設定為0時,相當于禁用了消息的可靠性,storm會在spout發送tuples後立即進行确認。
- Topology.message.timeout.secs topology中spout發送消息的最大處理時間。如果一條消息在該時間視窗内未被成功ack,Storm會告知spout這條消息失敗。而部分spout實作了失敗消息重播功能。
- Topology.skip.missing.kryo.registrations storm是否應該跳過他不能識别的kryo序列化方案。如果設定為否task可能會轉載失敗或者在運作時抛出錯誤。
- Topology.max.task.paralelism 在一個topology中能夠允許最大元件并行度。改項設定主要用在本地模式中測試線程數限制。
- Topology.max.spout.pending 一個spout task中處于pending狀态的最大的tuples數量。該配置應用于單個task,而不是整個spouts或topology
- Topology.state.synchronization.timeout.secs 元件同步狀态源的最大逾時時間(保留選項)
- Topology.stats.sample.rate 用啊你産生task統計資訊的tuples抽樣百分比
- Topology.fall.back.on.java.serialization topology中是否使用java的序列化方案
- Zmq.linger.millis 當連接配接關閉時,連結測試重新發送資訊到目标主機的持續時長。這是一個不常用的進階選項,基本上可以忽略
- Java.labrary.path JVM啟動(如 nimbus, Supervisor和workers)時的java。Library.path設定。該選項告訴JVM在哪些路徑下定位本地庫
Strom叢集部署
Nimbus和Supervisor
- 在Strom的叢集裡面有兩種節點:控制節點和工作節點。控制節點上面運作一個叫Nimbus程序,Nimbus負責在叢集裡面分發代碼,配置設定計算任務,并且監控狀态。這兩種元件都是快速失敗的,沒有狀态。任務狀态和心跳資訊等都儲存在Zookeeper上的,送出的代碼資源都在本地機器的硬碟上。
- Nimbus負責在叢集裡面發送代碼,配置設定工作給機器,并且監控狀态。全局隻有一個。
- 每一個工作節點上面運作一個叫做Supervisor程序。Supervisor負責監聽從Nimbus配置設定給它執行的任務,劇此啟動或停止執行任務的工作程序
- Zookeeper是Storm重點依賴的外部資源。Nimbus和Supervisor甚至實際運作的Worker都是把心跳儲存在Zookeeper上的。Nimbus也是根據Zookeerper上的心跳和任務運作狀況,進行排程和任務配置設定的。
-
Nimbus和Supervisor之間的所有協調工作都是通過一個Zookeeper叢集來完成
nimbus程序和supervisor都是快速失敗(fail-fast)和無狀态的, 所有的狀态都存儲在Zookeeper或本地磁盤上
這也就意味着你可以用kill -9來殺死nimbus和supervisor程序, 然後再重新開機它們, 它們可以繼續工作
更重要的是, nimbus和supervisor的fail或restart不會影響worker的工作, 不象Hadoop, Job tracker的fail會導緻job失敗
-
Storm送出運作的程式稱為Topology。
Topology處理的最小的消息機關是一個Tuple,也就是一個任意對象的數組。
Topology由Spout和Bolt構成。Spout是發出Tuple的結點。Bolt可以随意訂閱某個Spout或者Bolt發出的Tuple。Spout和Bolt都統稱為component。
安裝Strom
安裝storm 需要安裝如下軟體:
JDK
Zeromq
jzmq-master
Zookeeper
Python
storm
安裝ZeroMQ
wget http://download.zeromq.org/zeromq-2.2.0.tar.gz
tar zxf zeromq-2.2.0.tar.gz
cd zeromq-2.2.0
./configure (yum install libuuid-devel)
make
make install
zeroMQ有可能缺失g++
安裝g++
yum install gcc gcc-c++
注意事項:如果./configure 或者make執行失敗,請先安裝util-linux-ng-2.17
#unzip util-linux-ng-2.17-rc1.zip
#cd util-linux-ng-2.17
#./configure
#make
#mv /sbin/hwclock /sbin/hwclock.old
#cp hwclock/hwclock /sbin/
#hwclock --show
#hwclock -w
#make install
注意:./configure出現如下錯誤: configure:error:ncurses or ncursesw selected, but library not found (--without-ncurses to disable)我們加上參數--without-ncurses
安裝jzmq
#yum install git
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install
如果缺失libtool,則先安裝
yum install libtool
安裝Python
wget http://www.python.org/ftp/python/2.7.2/python-2.7.2.tgz
tar zxvf python-2.7.2.tgz
cd pyton-2.7.2
./configure
make
make install
安裝storm
wget http://cloud.github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
vim /etc/profile
export STORM_HOME=/usr/local/storm-0.8.1
如果unzip不能用
yum install unzip
配置Storm
修改storm/conf/storm.yaml檔案
storm.zookeeper.servers:
- "zk1"
- "zk2"
- "zk3"
nimbus.host: "zk1"
storm.local.dir: "/usr/tmp/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
(注意:先搭建zookeeper叢集)
說明下:storm.local.dir 表示storm需要用到的本地目錄。
nimbus.host 表示哪一台機器是master機器,即nimbus.
storm.zookeeper.servers表示哪幾台機器是zookeeper伺服器
storm.zookeeper.port表示zookeeper的端口号,這裡一定要與zookeeper配置的端口号一緻,否則會出現通信錯誤。
supervisor.slots.ports表示supervisor節點的槽數,就是最多能跑幾個worker程序(每個sprout或者bolt預設隻啟動一個worker,但是可以通過conf修改成多個)
java.library.path這是storm所依賴的本地依賴(ZeroMQ 和JZMQ)的加載位址,預設的是:/usr/local/lib :/opt/local/lib:usr/lib,大多數情況下是對的,是以你應該不用更改這個配置
注意:配置時一定注意在每一項的開始時要加空格(最好加兩個空格),冒号後也必須要加空格,否則storm不認識這個配置檔案。
在目錄/usr/tmp下面增加storm檔案夾
啟動storm:
啟動zookeeper環境(啟動不正常,執行service iptables stop關閉防火牆)
執行storm nimbus 啟動nimbus
執行storm supervisor 啟動從節點
執行storm ui 啟動ui(ui 和nimbus 需要在同一台機子上面)
注意事項:
storm背景程序被啟動後,将在storm安裝部署目錄下的logs/子目錄下生成各個程序的日志檔案。
為了友善使用,可以将bin/storm加入到系統環境變量中
啟動完畢,通過http://ip:8080/通路UI
#./storm nimbus
#Jps
nimbus
quorumPeerMain
啟動UI
#./storm ui > /dev/null 2>&1 &
#Jps
Nimbus
Core
QuorumPeerMain
配置主節點和從節點
1.配置slave節點的配置檔案
#cd /cloud/storm-0.8.2/conf/
#vi storm.yaml
storm.zookeeper.servers:
-”master”
nimbus.host: “master” ”master”
nimbus.host: “master”
2.啟動主節點zookeeper
#./zkserver.sh start
#jps
QuorumPeerMain
3.啟動主節點strom
# ./storm nimbus > ../logs/info 2>&1 &
4.啟動子節點
#./storm supervisor > /dev/null 2>&1 &
#jps
supervisor
5.啟動主節點監控頁面
#./storm ui > /dev/null 2>&1 &
6.在主節點上運作例子
#./storm jar /home/lifeCycle.jar cn.topology.TopoMain
#Jps
QuorumPeerMain
Jps
Core
Nimbus
#./storm list
storm list 檢視再運作的作業清單
7.在從節點上檢視jps
#jps
Worker
supervisor
public static void main(String[] args)
throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
//指定spout的并行數量2
builder.setSpout("random", new RandomWordSpout(), Integer.valueOf(2));
//指定bolt的并行數4
builder.setBolt("transfer", new TransferBolt(), Integer.valueOf(4)).shuffleGrouping("random");
builder.setBolt("writer", new WriterBolt(), Integer.valueOf(4)).fieldsGrouping("transfer", new Fields(new String[] { "word" }));
Config conf = new Config();
//指定worker的數量
conf.setNumWorkers(2);
conf.setDebug(true);
log.warn("$$$$$$$$$$$ submitting topology...");
StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());
log.warn("$$$$$$$4$$$ topology submitted !");
} //指定spout的并行數量2
builder.setSpout("random", new RandomWordSpout(), Integer.valueOf(2));
//指定bolt的并行數4
builder.setBolt("transfer", new TransferBolt(), Integer.valueOf(4)).shuffleGrouping("random");
builder.setBolt("writer", new WriterBolt(), Integer.valueOf(4)).fieldsGrouping("transfer", new Fields(new String[] { "word" }));
Config conf = new Config();
//指定worker的數量
conf.setNumWorkers(2);
conf.setDebug(true);
log.warn("$$$$$$$$$$$ submitting topology...");
StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());
log.warn("$$$$$$$4$$$ topology submitted !");
}
8.停作業
#./storm kill life-cycle
Storm-start例子
環境裝備:
- 下載下傳Storm-start 點選打開連結
- 進入下載下傳目錄,對zip檔案解壓
- 進入壓縮後的檔案目錄,修改m2-pom.xml(将twitter4j-core 和 twitter4j-stream 替換為下面的部分)
- maven的使用參照 點選打開連結
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>[2.2,)</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>[2.2,)</version>
</dependency>
- 編譯項目。轉到項目跟目錄,使用mvn -f m2-pom.xml
- package進行編譯
- 複制storm-starter目錄下的m2_pom.xml為pom.xml放在與m2_pom.xml同一目錄下
- 打jar包 mvn jar:jar
- 如果還需要對工程代碼進行修改可以導入eclipse
- 使用mvn eclipse: eclipse 編譯成eclipse工程
- eclipse import project
WordCount例子
例子:統計文本中每個單詞出現的次數
WordCountTopo:
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import cn.storm.bolt.WordCounter;
import cn.storm.bolt.WordSpliter;
import cn.storm.spout.WordReader;
import java.io.PrintStream;
public class WordCountTopo
{
public static void main(String[]args)
{
if (args.length != 2) {
System.err.println("Usage: inputPaht timeOffset");
System.err.println("such as : java -jar WordCount.jar D://input/ 2");
System.exit(2);
}
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-spilter",new WordSpliter()).shuffleGrouping("word-reader");
builder.setBolt("word-counter",new WordCounter()).shuffleGrouping("word-spilter");
String inputPaht = args[0];
String timeOffset = args[1];
Config conf = new Config();
conf.put("INPUT_PATH",inputPaht);
conf.put("TIME_OFFSET",timeOffset);
conf.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount",conf,builder.createTopology());
}
}
WordReader
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
public class WordReader extends BaseRichSpout
{
private static final long serialVersionUID = 2197521792014017918L;
private String inputPath;
private SpoutOutputCollector collector;
public void open(Map conf,TopologyContext context,SpoutOutputCollector collector)
{
this.collector =collector;
this.inputPath = ((String)conf.get("INPUT_PATH"));
}
public void nextTuple()
{
Collection files = FileUtils.listFiles(new File(this.inputPath), FileFilterUtils.notFileFilter(FileFilterUtils.suffixFileFilter(".bak")),null);
for (File f : files)
try {
List lines = FileUtils.readLines(f,"UTF-8");
for (String line : lines) {
this.collector.emit(new Values(new Object[] { line }));
}
FileUtils.moveFile(f,new File(f.getPath() + System.currentTimeMillis() +".bak"));
} catch (IOExceptione) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields(new String[] {"line" }));
}
}
WordSpliter
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.commons.lang.StringUtils;
public class WordSpliter extends BaseBasicBolt
{
private static final long serialVersionUID = -5653803832498574866L;
public void execute(Tuple input,BasicOutputCollector collector)
{
String line = input.getString(0);
String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if (StringUtils.isNotBlank(word)) {
word = word.toLowerCase();
collector.emit(new Values(new Object[] {word }));
}
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields(new String[] {"word" }));
}
}
WordCounter
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
public class WordCounterextends BaseBasicBolt
{
private static final long serialVersionUID = 5683648523524179434L;
private HashMap<String, Integer>counters =new HashMap();
public void prepare(Map stormConf,TopologyContext context)
{
final long timeOffset = Long.parseLong(stormConf.get("TIME_OFFSET").toString());
new Thread(new Runnable()
{
public void run() {
while (true) {
for (Map.Entry entry : WordCounter.this.counters.entrySet()) {
System.out.println((String)entry.getKey() +" : " +entry.getValue());
}
System.out.println("---------------------------------------");
try {
Thread.sleep(timeOffset * 1000L);
} catch (InterruptedExceptione) {
e.printStackTrace();
}
}
}
}).start();
}
public void execute(Tuple input,BasicOutputCollector collector)
{
String str = input.getString(0);
if (!this.counters.containsKey(str)) {
this.counters.put(str, Integer.valueOf(1));
} else {
Integer c = Integer.valueOf(((Integer)this.counters.get(str)).intValue() + 1);
this.counters.put(str,c);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
}
}
運作
例如nick.txt 中的内容是:
Strom strom hive haddop
統計結果:
Hadoop: 1
Hive 1
Storm 2