天天看點

Storm入門之第7章使用非JVM語言開發

有時候你可能想使用不是基于jvm的語言開發一個storm工程,你可能更喜歡使用别的語言或者想使用用某種語言編寫的庫。

storm是用java實作的,你看到的所有這本書中的spout和bolt都是用java編寫的。那麼有可能使用像python、ruby、或者javascript這樣的語言編寫spout和bolt嗎?答案是當然

Storm入門之第7章使用非JVM語言開發

可以!可以使用多語言協定達到這一目的。

多語言協定是storm實作的一種特殊的協定,它使用标準輸入輸出作為spout和bolt程序間的通訊通道。消息以json格式或純文字格式在通道中傳遞。

我們看一個用非jvm語言開發spout和bolt的簡單例子。在這個例子中有一個spout産生從1到10,000的數字,一個bolt過濾素數,二者都用php實作。

note: 在這個例子中,我們使用一個很笨的辦法驗證素數。有更好當然也更複雜的方法,它們已經超出了這個例子的範圍。

有一個專門為storm實作的php dsl(譯者注:領域特定語言),我們将會在例子中展示我們的實作。首先定義拓撲。

<b>note:</b>有一種使用非jvm語言定義拓撲的方式。既然storm拓撲是thrift架構,而且nimbus是一個thrift守護程序,你就可以使用任何你想用的語言建立并送出拓撲。但是這已經超出了本書的範疇了。

這裡沒什麼新鮮了。我們看一下numbersgeneratorspout的實作。

你可能已經注意到了,這個spout繼承了shellspout。這是個由storm提供的特殊的類,用來幫助你運作并控制用其它語言編寫的spout。在這種情況下它告訴storm如何執行你的php腳本。

numbergeneratorspout的php腳本向标準輸出分發元組,并從标準輸入讀取确認或失敗信号。

在開始實作numbergeneratorspout.php腳本之前,多觀察一下多語言協定是如何工作的。

spout按照傳遞給構造器的參數從from到to順序生成數字。

接下來看看primenumbersfilterbolt。這個類實作了之前提到的殼。它告訴storm如何執行你的php腳本。storm為這一目的提供了一個特殊的叫做shellbolt的類,你惟一要做的事就是指出如何運作腳本以及聲明要分發的屬性。

在這個構造器中隻是告訴storm如何運作php腳本。它與下列指令等價。

primenumbersfilterbolt.php腳本從标準輸入讀取元組,處理它們,然後向标準輸出分發、确認或失敗。在開始這個腳本之前,我們先多了解一些多語言協定的工作方式。

發起一次握手

開始循環

讀/寫元組

note:有一種特殊的方式可以使用storm的内建日志機制在你的腳本中記錄日志,是以你不需要自己實作日志系統。

下面我們來看一看上述每一步的細節,以及如何用php實作它。

發起握手

為了控制整個流程(開始以及結束它),storm需要知道它執行的腳本程序号(pid)。根據多語言協定,你的程序開始時發生的第一件事就是storm要向标準輸入(譯者注:根據上下文了解,本章提到的标準輸入輸出都是從非jvm語言的角度了解的,這裡提到的标準輸入也就是php的标準輸入)發送一段json資料,它包含storm配置、拓撲上下文和一個程序号目錄。它看起來就像下面的樣子:

腳本程序必須在piddir指定的目錄下以自己的程序号為名字建立一個檔案,并以json格式把程序号寫到标準輸出。

舉個例子,如果你收到/tmp/example\n而你的腳本程序号是123,你應該建立一個名為/tmp/example/123的空檔案并向标準輸出列印文本行 {“pid”: 123}\n(譯者注:此處原文隻有一個n,譯者猜測應是排版錯誤)和end\n。這樣storm就能持續追蹤程序号并在它關閉時殺死腳本程序。下面是php實作:

你已經實作了一個叫做read_msg的函數,用來處理從标準輸入讀取的消息。按照多語言協定的聲明,消息可以是單行或多行json文本。一條消息以end\n結束。

note:flush()方法非常重要;有可能字元緩沖隻有在積累到一定程度時才會清空。這意味着你的腳本可能會為了等待一個來自storm的輸入而永遠挂起,而storm卻在等待來自你的腳本的輸出。是以當你的腳本有内容輸出時立即清空緩沖是很重要的。

<b>開始循環以及讀/寫元組</b>

這是整個工作中最重要的一步。這一步的實作取決于你開發的spout和bolt。

如果是spout,你應當開始分發元組。如果是bolt,就循環讀取元組,處理它們,分發它發,确認成功或失敗。

下面我們就看看用來分發數字的spout。

從指令行擷取參數from和to,并開始疊代。每次從storm得到一條next消息,這意味着你已準備好分發下一個元組。

一旦你發送了所有的數字,而且沒有更多元組可發了,就休眠一段時間。

為了確定腳本已準備好發送下一個元組,storm會在發送下一條之前等待sync\n文本行。調用<b>read_msg()</b>,讀取一條指令,解析json。

對于bolts來說,有少許不同。

循環的從标準輸入讀取元組。解析讀取每一條json消息,判斷它是不是一個元組,如果是,再檢查它是不是一個素數,如果是素數再次分發一個元組,否則就忽略掉,最後不論如何都要确認成功。

note:在json_decode函數中使用的json_bigint_as_string是為了解決一個在java和php之間的資料轉換問題。java發送的一些很大的數字,在php中會丢失精度,這樣就會導緻問題。為了避開這個問題,告訴php把大數字當作字元串處理,并在json消息中輸出數字時不使用雙引号。php5.4.0或更高版本要求使用這個參數。

emit,ack,fail,以及log消息都是如下結構:

emit

其中的數組包含了你分發的元組資料。

ack

其中的id就是你處理的元組的id。

fail

與ack(譯者注:原文是emit從上下json的内容和每個方法的功能上判斷此處就是ack,可能是排版錯誤)相同,其中id就是你處理的元組id。

log

下面是完整的的php代碼。

note:需要重點指出的是,應當把所有的腳本檔案儲存在你的工程目錄下的一個名為multilang/resources的子目錄中。這個子目錄被包含在發送給勞工程序的jar檔案中。如果你不把腳本包含在這個目錄中,storm就不能運作它們,并抛出一個錯誤。