天天看點

Storm入門 第二章準備開始

準備開始

在本章,我們要建立一個storm工程和我們的第一個storm拓撲結構。

操作模式

開始之前,有必要了解一下storm的操作模式。有下面兩種方式。

本地模式

在本地模式下,storm拓撲結構運作在本地計算機的單一jvm程序上。這個模式用于開發、測試以及調試,因為這是觀察所有元件如何協同工作的最簡單方法。在這種模式下,我們可以調整參數,觀察我們的拓撲結構如何在不同的storm配置環境下運作。要在本地模式下運作,我們要下載下傳storm開發依賴,以便用來開發并測試我們的拓撲結構。我們建立了第一個storm工程以後,很快就會明白如何使用本地模式了。

note: 在本地模式下,跟在叢集環境運作很像。不過很有必要确認一下所有元件都是線程安全的,因為當把它們部署到遠端模式時它們可能會運作在不同的jvm程序甚至不同的實體機上,這個時候它們之間沒有直接的通訊或共享記憶體。

我們要在本地模式運作本章的所有例子。

遠端模式

在遠端模式下,我們向storm叢集送出拓撲,它通常由許多運作在不同機器上的流程組成。遠端模式不會出現調試資訊, 是以它也稱作生産模式。不過在單一開發機上建立一個storm叢集是一個好主意,可以在部署到生産環境之前,用來确認拓撲在叢集環境下沒有任何問題。

我們在這個工程裡建立一個簡單的拓撲,數單詞數量。我們可以把這個看作storm的“hello world”。不過,這是一個非常強大的拓撲,因為它能夠擴充到幾乎無限大的規模,而且隻需要做一些小修改,就能用它建構一個統計系統。舉個例子,我們可以修改一下工程用來找出twitter上的熱點話題。

要建立這個拓撲,我們要用一個spout讀取文本,第一個bolt用來标準化單詞,第二個bolt為單詞計數,如圖2-1所示。

Storm入門 第二章準備開始

建構storm運作環境的第一步是檢查你安裝的java版本。打開一個控制台視窗并執行指令:java -version。控制台應該會顯示出類似如下的内容:

開始之前,先為這個應用建一個目錄(就像你平常為java應用做的那樣)。這個目錄用來存放工程源碼。

接下來我們要下載下傳storm依賴包,這是一些jar包,我們要把它們添加到應用類路徑中。你可以采用如下兩種方式之一完成這一步:

下載下傳所有依賴,解壓縮它們,把它 們添加到類路徑

note: storm的maven依賴引用了運作storm本地模式的所有庫。

要運作我們的拓撲,我們可以編寫一個包含基本元件的pom.xml檔案。

開頭幾行指定了工程名稱和版本号。然後我們添加了一個編譯器插件,告知maven我們的代碼要用java1.6編譯。接下來我們定義了maven倉庫(maven支援為同一個工程指定多個倉庫)。clojars是存放storm依賴的倉庫。maven會為運作本地模式自動下載下傳必要的所有子包依賴。

一個典型的maven java工程會擁有如下結構:

java目錄下的子目錄包含我們的代碼,我們把要統計單詞數的檔案儲存在resource目錄下。

note:指令mkdir -p 會建立所有需要的父目錄。

我們将為運作單詞計數建立所有必要的類。可能這個例子中的某些部分,現在無法講的很清楚,不過我們會在随後的章節做進一步的講解。

note: 一個spout釋出一個定義域清單。這個架構允許你使用不同的bolts從同一個spout流讀取資料,它們的輸出也可作為其它bolts的定義域,以此類推。

例2-1包含wordread類的完整代碼(我們将會分析下述代碼的每一部分)。

第一個被調用的spout方法都是public void open(map conf, topologycontext context, spoutoutputcollector collector)。它接收如下參數:配置對象,在定義topology對象是建立;topologycontext對象,包含所有拓撲資料;還有spoutoutputcollector對象,它能讓我們釋出交給bolts處理的資料。下面的代碼主是這個方法的實作。

我們在這個方法裡建立了一個filereader對象,用來讀取檔案。接下來我們要實作public void nexttuple(),我們要通過它向bolts釋出待處理的資料。在這個例子裡,這個方法要讀取檔案并逐行釋出資料。

note: values是一個arrarlist實作,它的元素就是傳入構造器的參數。

nexttuple()會在同一個循環内被ack()和fail()周期性的調用。沒有任務時它必須釋放對線程的控制,其它方法才有機會得以執行。是以nexttuple的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負載,會在傳回前休眠一毫秒。如果任務完成了,檔案中的每一行都已被讀出并分發了。

note:元組(tuple)是一個具名值清單,它可以是任意java對象(隻要它是可序列化的)。預設情況,storm會序列化字元串、位元組數組、arraylist、hashmap和hashset等類型。

現在我們有了一個spout,用來按行讀取檔案并每行釋出一個元組,還要建立兩個bolts,用來處理它們(看圖2-1)。bolts實作了接口backtype.storm.topology.irichbolt。

bolt最重要的方法是void execute(tuple input),每次接收到元組時都會被調用一次,還會再釋出若幹個元組。

第一個bolt,wordnormalizer,負責得到并标準化每行文本。它把文本行切分成單詞,大寫轉化成小寫,去掉頭尾空白符。

首先我們要聲明bolt的出參:

這裡我們聲明bolt将釋出一個名為“word”的域。

下一步我們實作public void execute(tuple input),處理傳入的元組:

第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理并用collector對象釋出。最後,每次都調用collector對象的ack()方法确認已成功處理了一個元組。

例2-2是這個類的完整代碼。

note:通過這個例子,我們了解了在一次execute調用中釋出多個元組。如果這個方法在一次調用中接收到句子“this is the storm book”,它将會釋出五個元組。

下一個bolt,wordcounter,負責為單詞計數。這個拓撲結束時(cleanup()方法被調用時),我們将顯示每個單詞的數量。

note: 這個例子的bolt什麼也沒釋出,它把資料儲存在map裡,但是在真實的場景中可以把資料儲存到資料庫。

execute方法使用一個map收集單詞并計數。拓撲結束時,将調用clearup()方法列印計數器map。(雖然這隻是一個例子,但是通常情況下,當拓撲關閉時,你應當使用cleanup()方法關閉活動的連接配接和其它資源。)

note:所有拓撲節點的各個程序必須能夠獨立運作,而不依賴共享資料(也就是沒有全局變量或類變量),因為當拓撲運作在真實的叢集環境時,這些程序可能會運作在不同的機器上。

接下來,topologybuilder将用來建立拓撲,它決定storm如何安排各節點,以及它們交換資料的方式。

在spout和bolts之間通過shufflegrouping方法連接配接。這種分組方式決定了storm會以随機配置設定方式從源節點向目标節點發送消息。

下一步,建立一個包含拓撲配置的config對象,它會在運作時與叢集配置合并,并通過prepare方法發送給所有節點。

由spout讀取的檔案的檔案名,指派給wordfile屬性。由于是在開發階段,設定debug屬性為true,strom會列印節點間交換的所有消息,以及其它有助于了解拓撲運作方式的調試資料。

正如之前講過的,你要用一個localcluster對象運作這個拓撲。在生産環境中,拓撲會持續運作,不過對于這個例子而言,你隻要運作它幾秒鐘就能看到結果。

調用createtopology和submittopology,運作拓撲,休眠兩秒鐘(拓撲在另外的線程運作),然後關閉叢集。

例2-3是完整的代碼

<a href="https://github.com/runfriends/gettingstartedwithstorm-cn/blob/master/chapter2/hello%20world%20storm.md#%e8%a7%82%e5%af%9f%e8%bf%90%e8%a1%8c%e6%83%85%e5%86%b5">觀察運作情況</a>

你已經為運作你的第一個拓撲準備好了。在這個目錄下面建立一個檔案,/src/main/resources/words.txt,一個單詞一行,然後用下面的指令運作這個拓撲:mvn exec:java -dexec.mainclass=”topologymain” -dexec.args=”src/main/resources/words.txt。

舉個例子,如果你的words.txt檔案有如下内容: storm test are great is an storm simple application but very powerful really storm is great 你應該會在日志中看到類似下面的内容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個例子中,每類節點隻有一個執行個體。但是如果你有一個非常大的日志檔案呢?你能夠很輕松的改變系統中的節點數量實作并行工作。這個時候,你就要建立兩個wordcounter執行個體。

程式傳回時,你将看到: — 單詞數 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 storm: 3 — 單詞數 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒極了!修改并行度實在是太容易了(當然對于實際情況來說,每個執行個體都會運作在單獨的機器上)。不過似乎有一個問題:單詞is和great分别在每個wordcounter各計數一次。怎麼會這樣?當你調用shufflegrouping時,就決定了storm會以随機配置設定的方式向你的bolt執行個體發送消息。在這個例子中,理想的做法是相同的單詞問題發送給同一個wordcounter執行個體。你把shufflegrouping(“word-normalizer”)換成fieldsgrouping(“word-normalizer”, new fields(“word”))就能達到目的。試一試,重新運作程式,确認結果。 你将在後續章節學習更多分組方式和消息流類型。

<a href="https://github.com/runfriends/gettingstartedwithstorm-cn/blob/master/chapter2/hello%20world%20storm.md#%e7%bb%93%e8%ae%ba">結論 </a>

我們已經讨論了storm的本地和遠端操作模式之間的不同,以及storm的強大和易于開發的特性。你也學習了一些storm的基本概念,我們将在後續章節深入講解它們。