天天看點

Storm Trident API 實踐

     storm是一個實時的可靠地分布式流計算架構。

     具體就不多說了,舉個例子,它的一個典型的大資料實時計算應用場景:從kafka消息隊列讀取消息(可以是logs,clicks,sensor data)、通過storm對消息進行計算聚合等預處理、把處理結果持久化到nosql資料庫或者hdfs做進一步深入分析。

     trident是對storm的更高一層的抽象,除了提供一套簡單易用的流資料處理api之外,它以batch(一組tuples)為機關進行處理,這樣一來,可以使得一些處理更簡單和高效。

     我們知道把bolt的運作狀态僅僅儲存在記憶體中是不可靠的,如果一個node挂掉,那麼這個node上的任務就會被重新配置設定,但是之前的狀态是無法恢複的。是以,比較聰明的方式就是把storm的計算狀态資訊持久化到database中,基于這一點,trident就變得尤為重要。因為在處理大資料時,我們在與database打交道時通常會采用批處理的方式來避免給它帶來壓力,而trident恰恰是以batch groups的形式處理資料,并提供了一些聚合功能的api。

     trident其實就是一套api,但現階段網上關于trident api中各個函數的用法含義資料不多,下面我就根據一些英文資料和自己的了解,詳細介紹一下trident api各個函數的用法和含義。閱讀本文需要有一定的trident api基礎。

     作用:操作batch中的每一個tuple内容,一般與filter或者function函數配合使用。

     下面通過一個例子來介紹each()方法,假設我們有一個faketweetsbatchspout,它會模拟一個stream,随機産生一個個消息。我們可以通過設定這個spout類的構造參數來改變這個spout的batch size的大小。

     一個通過actor字段過濾消息的filter:

   topology:  

     從上面例子看到,each()方法有一些構造參數

第一個構造參數:作為field selector,一個tuple可能有很多字段,通過設定field,我們可以隐藏其它字段,僅僅接收指定的字段(其它字段實際還在)。

第二個是一個filter:用來過濾掉除actor名叫"dave"外的其它消息。

     一個能把tuple中text内容變成大寫的function:

     topology:

     首先,uppercasefunction函數的輸入是fields("text", "actor"),其作用是把其中的"text"字段内容都變成大寫。

     其次,它比filter多出一個輸出字段,作用是每個tuple在經過這個function函數處理後,輸出字段都會被追加到tuple後面,在本例中,執行完function之後的tuple内容多了一個"uppercased_text",并且這個字段排在最後面。

   我們需要注意的是,上面每個each()方法的第一個field字段僅僅是隐藏掉沒有指定的字段内容,實際上被隐藏的字段依然還在tuple中,如果想要徹底丢掉它們,我們就需要用到project()方法。

   投影操作作用是僅保留stream指定字段的資料,比如有一個stream包含如下字段: [“a”, “b”, “c”, “d”],運作如下代碼:

   則輸出的流僅包含 [“b”, “d”]字段。

     指定topology的并行度,即用多少線程執行這個任務。我們可以稍微改一下我們的filter,通過列印目前任務的partitionindex來區分目前是哪個線程。

filter:

topology:

     如果我們指定執行filter任務的線程數量為5,那麼最終的執行結果會如何呢?看一下我們的測試結果:

     我們可以很清楚的發現,一共有5個線程在執行filter。

     如果我們想要2個spout和5個filter怎麼辦呢?如下面代碼所示,實作很簡單。

     我們注意到上面的例子中用到了shuffle(),shuffle()是一個重定向操作。那什麼是重定向操作呢?重定向定義了我們的tuple如何被route到下一處理層,當然不同的層之間可能會有不同的并行度,shuffle()的作用是把tuple随機的route下一層的線程中,而partitionby()則根據我們的指定字段按照一緻性雜湊演算法route到下一層的線程中,也就是說,如果我們用partitionby()的話,同一個字段名的tuple會被route到同一個線程中。

     比如,如果我們把上面代碼中的shuffle()改成partitionby(new fields("actor")),猜一下結果會怎樣?

     測試結果正如我們上面描述的那樣,相同字段的tuple被route到了同一個partition中。

重定向操作有如下幾種:

shuffle:通過随機配置設定算法來均衡tuple到各個分區

broadcast:每個tuple都被廣播到所有的分區,這種方式在drcp時非常有用,比如在每個分區上做statequery

partitionby:根據指定的字段清單進行劃分,具體做法是用指定字段清單的hash值對分區個數做取模運算,確定相同字段清單的資料被劃分到同一個分區

global:所有的tuple都被發送到一個分區,這個分區用來處理整個stream

batchglobal:一個batch中的所有tuple都被發送到同一個分區,不同的batch會去往不同的分區

partition:通過一個自定義的分區函數來進行分區,這個自定義函數實作了 backtype.storm.grouping.customstreamgrouping

     我們前面講過,trident的一個很重要的特點就是它是以batch的形式處理tuple的。我們可以很容易想到的針對一個batch的最基本操作應該就是聚合。trident提供了聚合api來處理batches,來看一個例子:

 topology:

     這個aggregator很簡單:計算每一個batch的location的數量。通過這個例子我們可以看到aggregator接口:

init():當剛開始接收到一個batch時執行

aggregate():在接收到batch中的每一個tuple時執行

complete():在一個batch的結束時執行     

     我們前面講過aggregate()方法是一個重定向方法,因為它會随機啟動一個單獨的線程來進行這個聚合操作。

     下面我們來看一下測試結果:

     我們可以看到列印的結果,其中每一條的和都是5,這是因為我們的spout的每個batch中tuple數量設定的是5,是以每個線程的計算結果也會是5。 除此之外,trident還提供了其它兩個aggregator接口: combineraggregator, reduceraggregator,具體使用方法請參考trident api。

     如果我們将上面的topology稍微改造一下,猜一下結果會是如何?

     我們一起來分析一下,首先partitionby()方法将tuples按其location字段重定向到下一處理邏輯,而且相同location字段的tuple一定會被配置設定到同一個線程中處理。其次,partitionaggregate()方法,注意它與aggregate不同,它不是一個重定向方法,它僅僅是對目前partition上的各個batch執行聚合操作。因為我們根據location進行了重定向操作,測試資料一共有4個location,而目前一共有3個partition,是以可以猜測我們的最終測試結果中,有一個partition會處理兩個location的batch,最終測試結果如下:

     需要注意的是,partitionaggregate雖然也是聚合操作,但與上面的aggregate完全不同,它不是一個重定向操作。

     我們可以看到上面幾個例子的測試結果,其實我們通常想要的是每個location的數量是多少,那該怎麼處理呢?看下面這個topology:

     我們先看一下執行的結果:

     上面這段代碼計算出了每個location的數量,即使我們的count函數沒有指定并行度。這就是groupby()起的作用,它會根據指定的字段建立一個groupedstream,相同字段的tuple都會被重定向到一起,彙聚成一個group。groupby()之後是aggregate,與之前的聚合整個batch不同,此時的aggregate會單獨聚合每個group。我們也可以這麼認為,groupby會把stream按照指定字段分成一個個stream group,每個group就像一個batch一樣被處理。

     不過需要注意的是,groupby()本身并不是一個重定向操作,但如果它後面跟的是aggregator的話就是,跟的是partitionaggregate的話就不是。

Storm Trident API 實踐

     storm是一個實時流計算架構,trident是對storm的一個更高層次的抽象,trident最大的特點以batch的形式處理stream。

     一些最基本的操作函數有filter、function,filter可以過濾掉tuple,function可以修改tuple内容,輸出0或多個tuple,并能把新增的字段追加到tuple後面。

     聚合有partitionaggregate和aggregator接口。partitionaggregate對目前partition中的tuple進行聚合,它不是重定向操作。aggregator有三個接口:combineraggregator, reduceraggregator,aggregator,它們屬于重定向操作,它們會把stream重定向到一個partition中進行聚合操作。

     重定向操作會改變資料流向,但不會改變資料内容,重定向操會産生網絡傳輸,可能影響一部分效率。而filter、function、partitionaggregate則屬于本地操作,不會産生網絡傳輸。

     groupby會根據指定字段,把整個stream切分成一個個grouped stream,如果在grouped stream上做聚合操作,那麼聚合就會發生在這些grouped stream上而不是整個batch。如果groupby後面跟的是aggregator,則是重定向操作,如果跟的是partitionaggregate,則不是重定向操作。

繼續閱讀