本節書摘來自華章計算機《storm分布式實時計算模式》一書中的第1章,第1.3節,作者:(美)p. taylor goetz brian o’neill 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。
前面介紹了storm的基礎概念,我們已經準備好實作一個簡單的應用。現在開始着手開發一個storm topology,并且在本地模式執行。storm本地模式會在一個jvm執行個體中模拟出一個storm叢集。大大簡化了使用者在開發環境或者ide中進行開發和調試。後續章節将會示範如何将本地模式下開發好的topology部署到真實的storm叢集環境。
1.3.1 配置開發環境
建立一個storm項目其實就是将storm及其依賴的類庫添加到java classpath中。在第2章中,你将了解到,将storm topology釋出到叢集環境中,需要将編譯好的類和相關依賴打包在一起。基于這個原因,我們強烈建議使用建構管理工具來管理項目,比如apache maven、gradle或者leinengen。在單詞計數這個例子中,我們使用maven。
首先,建立一個maven項目:

https://yqfile.alicdn.com/1548115a59a6387821373f6453d1ffe663ecce78.png
" >
然後,編輯配置檔案pom.xml,添加storm依賴

https://yqfile.alicdn.com/f669783888dff24ca5e17697e3cac2f5fb325204.png

https://yqfile.alicdn.com/9536aec55eab80583717ac6a3af5b68a4cb0f95b.png
之後,通過執行下述指令編譯項目,來測試配置maven是否正确。
1.3.2 實作sentencespout
為簡化起見,sentencespout的實作通過重複靜态語句清單來模拟資料源。每句話作為一個單值的tuple向後循環發射。完整實作如例1.1所示。
例1.1 sentencespout.java

https://yqfile.alicdn.com/4ecf2bf819c4d284269ba398c569b1fc80ea3c32.png

https://yqfile.alicdn.com/1bb937f8b8240992dedbe50c764bb7add94a0e3d.png
baserichspout類是ispout接口和icomponent接口的一個簡便的實作。接口對本例中用不到的方法提供了預設實作。使用這個類,我們可以專注在所需要的方法上。方法declareoutputfields()是在icomponent接口中定義的,所有storm的元件(spout和bolt)都必須實作這個接口。storm的元件通過這個方法告訴storm該元件會發射哪些資料流,每個資料流的tuple中包含哪些字段。本例中,我們聲明了spout會發射一個資料流,其中的tuple包含一個字段(sentence)
open()方法在ispout接口中定義,所有spout元件在初始化時調用這個方法。open()方法接收三個參數,一個包含了storm配置資訊的map,topologycontext對象提供了topology中元件的資訊,spoutoutputcollector對象提供了發射tuple的方法。本例中,初始化時不需要做額外操作,是以open()方法實作僅僅是簡單将spoutoutputcollector對象的引用儲存在變量中。
nexttuple()方法是所有spout實作的核心所在,storm通過調用這個方法向輸出的collector發射tuple。這個例子中,我們發射目前索引對應的語句,并且遞增索引指向下一個語句。
1.3.3 實作語句分割bolt
例1.2列出了splitsentencebolt類的實作。
例1.2 splitsentencebolt.java

https://yqfile.alicdn.com/273763604f9b0ec2639fef525dca562500955a1e.png
baserichbolt類是icomponent和ibolt接口的一個簡便實作。繼承這個類,就不用去實作本例不關心的方法,将注意力放在實作我們需要的功能上。
prepare()方法在ibolt中定義,類同與ispout接口中定義的open()方法。這個方法在bolt初始化時調用,可以用來準備bolt用到的資源,如資料庫連接配接。和sentencespout類一樣,splitsentencebolt類在初始化時沒有額外操作,是以prepare()方法僅僅儲存outputcollector對象的引用。
在declareoutputfields()方法中,splitsentencebolt聲明了一個輸出流,每個tuple包含一個字段“word”。
splitsentencebolt類的核心功能在execute()方法中實作,這個方法是ibolt接口定義的。每當從訂閱的資料流中接收一個tuple,都會調用這個方法。本例中,execute()方法按照字元串讀取“sentence”字段的值,然後将其拆分為單詞,每個單詞向後面的輸出流發射一個tuple。
1.3.4 實作單詞計數bolt
wordcountbolt類(見例1.3)是topology中實際進行單詞計數的元件。該bolt的prepare()方法中,執行個體化了一個hashmap的執行個體,用來存儲單詞和對應的計數。大部分執行個體變量通常是在prepare()方法中進行執行個體化,這個設計模式是由topology的部署方式決定的。當topology釋出時,所有的bolt和spout元件首先會進行序列化,然後通過網絡發送到叢集中。如果spout或者bolt在序列化之前(比如說在構造函數中生成)執行個體化了任何無法序列化的執行個體變量,在進行序列化時會抛出notserializableexception異常,topology就會部署失敗。本例中,因為hashmap是可序列化的,是以在構造函數中進行執行個體化也是安全的。但是,通常情況下最好是在構造函數中對基本資料類型和可序列化的對象進行指派和執行個體化,在prepare()方法中對不可序列化的對象進行執行個體化。
在declareoutputfields()方法中,類wordcountbolt聲明了一個輸出流,其中的tuple包括了單詞和對應的計數。execute()方法中,當接收到一個單詞時,首先查找這個單詞對應的計數(如果單詞沒有出現過則計數初始化為0),遞增并存儲計數,然後将單詞和最新計數作為tuple向後發射。将單詞計數作為資料流發射,topology中的其他bolt就可以訂閱這個資料流進行進一步的處理。
例1.3 wordcountbolt.java
1.3.5 實作上報bolt
reportbolt類的作用是對所有單詞的計數生成一份報告。和wordcountbolt類似,reportbolt使用一個hashmap對象來儲存單詞和對應計數。本例中,它的功能是簡單的存儲接收到計數bolt發射出的計數tuple。
上報bolt和上述其他bolt的一個差別是,它是一個位于資料流末端的bolt,隻接收tuple。因為它不發射任何資料流,是以declareoutputfields()方法是空的。
上報bolt中初次引入了cleanup()方法,這個方法在ibolt接口中定義。storm在終止一個bolt之前會調用這個方法。本例中我們利用cleanup()方法在topology關閉時輸出最終的計數結果。通常情況下,cleanup()方法用來釋放bolt占用的資源,如打開的檔案句柄或者資料庫連接配接。
開發bolt時需要謹記的是,當topology在storm叢集上運作時,ibolt.cleanup()方法是不可靠的,不能保證會執行。下一章講到storm的容錯機制時,會讨論其中的原因。但這個例子我們是運作在開發模式中的,可以保證cleanup()被調用。
類reportbolt的完整代碼見示例1.4。
例1.4 reportbolt.java
1.3.6 實作單詞計數topology
我們已經定義了計算所需要的spout和bolt。下面将它們整合為一個可運作的topology(見例1.5)
例1.5 wordcounttopology.java

https://yqfile.alicdn.com/f344eb67ae3a4718239f126f41ffafff592b95b9.png
storm topology通常由java的main()函數進行定義,運作或者送出(部署到叢集的操作)。在本例中,我們首先定義了一系列字元串常量,作為storm元件的唯一辨別符。main()方法中,首先執行個體化了spout和bolt,并生成一個topologybuilder執行個體。topologybuilder類提供了流式接口風格的api來定義topology元件之間的資料流。首先注冊一個sentence spout并且指派給其唯一的id:
然後注冊一個splitsentencebolt,這個bolt訂閱sentencespout發射出來的資料流:

https://yqfile.alicdn.com/b4a75999a7729d021be270d53e20449395a26aa8.png
類topologybuilder的setbolt()方法會注冊一個bolt,并且傳回boltdeclarer的執行個體,可以定義bolt的資料源。這個例子中,我們将sentencespout的唯一id指派給shufflegrouping()方法确立了這種訂閱關系。shufflegrouping()方法告訴storm,要将類sentencespout發射的tuple随機均勻的分發給splitsentencebolt的執行個體。後續在讨論storm的并發性時,會解釋資料流分組的詳情。代碼下一行确立了類splitsentencebolt和類thewordcountbolt之間的連接配接關系:
你将了解到,有時候需要将含有特定資料的tuple路由到特殊的bolt執行個體中。在此我們使用類boltdeclarer的fieldsgrouping()方法來保證所有“word”字段值相同的tuple會被路由到同一個wordcountbolt執行個體中。
定義資料流的最後一步是将wordcountbolt執行個體發射出的tuple流路由到類reportbolt上。本例中,我們希望wordcountbolt發射的所有tuple路由到唯一的reportbolt任務中。globalgrouping()方法提供了這種用法:
所有的資料流都已經定義好,運作單詞計數計算的最後一步是編譯并送出到叢集上:
這裡我們采用了storm的本地模式,使用storm的localcluster類在本地開發環境來模拟一個完整的storm叢集。本地模式是開發和測試的簡便方式,省去了在分布式叢集中反複部署的開銷。本地模式還能夠很友善地在ide中執行storm topology,設定斷點,暫停運作,觀察變量,分析程式性能。當topology釋出到分布式叢集後,這些事情會很耗時甚至難以做到。
storm的config類是一個hashmap的子類,并定義了一些storm特有的常量和簡便的方法,用來配置topology運作時行為。當一個topology送出時,storm會将預設配置和config執行個體中的配置合并後作為參數傳遞給submittopology()方法。合并後的配置被分發給各個spout的bolt的open()、prepare()方法。從這個層面上講,config對象代表了對topology所有元件全局生效的配置參數集合。現在可以運作wordcounttopology類了,main()方法會送出topology,在執行10秒後,停止(解除安裝)該topology,最後關閉本地模式的叢集。程式執行完畢後,在控制台可以看到類似以下的輸出: