天天看點

《Storm分布式實時計算模式》——3.4 Trident運算

本節書摘來自華章計算機《storm分布式實時計算模式》一書中的第3章,第3.4節,作者:(美)p. taylor goetz brian o’neill 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

時間戳已經生成好了,下一步是加入處理事件的邏輯元件。在trident中,這些元件稱為運算(operation)。在我們的topology中,使用兩種不同的運算:filter和function。

運算通過stream對象的方法來調用。這個例子中,我們使用了stream對象的下述方法:

《Storm分布式實時計算模式》——3.4 Trident運算

https://yqfile.alicdn.com/46e7bea64528ebdb3f64b3cb53140489e03591ff.png

" >

注意前面代碼中列出的方法傳回形式為stream對象或者tridentstate對象,傳回可以用來建立新的資料流。是以,運算可以連在一起使用流式接口形式的java代碼。讓我們再看看示例topology中的關鍵代碼:

《Storm分布式實時計算模式》——3.4 Trident運算

https://yqfile.alicdn.com/f0d2d1524271a5239e587c3c0700475aeec9a7b1.png

通常,應用運算需要聲明一個輸入域集合和一個輸出域集合,也就是funcition域。上面代碼中topology第二行聲明我們需要cityassignment對資料流中的每個tuple執行操作。在每個tuple中,cityassignment會在event字段上運算并且增加一個叫做city的新字段,這個字段會附在tuple中向後發射。

每個操作在流式風格的文法上略有不同,這取決于操作需要哪些資訊。下面将介紹不同操作的詳細文法和語義。

3.4.1 trident filter

我們topology邏輯中的第一部分就是個過濾器filter,它會忽略掉我們不關心的疾病事件。在這個例子中,系統隻關心腦膜炎(meningitis)的病情,從前面表格中看到,腦膜炎對應的疾病代碼是320、321和322。

為了通過疾病代碼過濾事件,我們需要利用trident filter。trident通過提供basefilter類,我們通過實作子類就可以友善地對tuple進行過濾,濾除系統不需要的tuple。basefilter類實作了filter接口,這個接口如下面代碼片段所示:

《Storm分布式實時計算模式》——3.4 Trident運算

為了在資料流中過濾tuple,應用需要通過繼承basefilter類來實作這個接口。這個例子中,我們使用下述過濾器過濾事件:

《Storm分布式實時計算模式》——3.4 Trident運算

上面的代碼中,我們從tuple中提取了diagnosisevent并且檢查疾病代碼。因為所有的腦膜炎代碼小于等于322,我們也沒有發送其他代碼,是以隻需要簡單地檢查代碼是否小于322,就可以決定事件是否和腦膜炎有關。

filter操作結果傳回true的tuple将會被發送到下遊進行操作。如果方法傳回false,該tuple就不會發送到下遊。

在我們的topology中,我們在資料流上使用each(inputfields,filter)方法,将這個過濾器應用到資料流的每個tuple中:

《Storm分布式實時計算模式》——3.4 Trident運算

3.4.2 trident function

在filter之外,storm還提供了一個更通用功能的接口function。function和storm的bolt類似,讀取tuple并且發送新的tuple。其中一個差別是,trident function隻能添加資料。function發送資料時,将新字段添加在tuple中,并不會删除或者變更已有的字段。

function接口如下代碼片段所示:

《Storm分布式實時計算模式》——3.4 Trident運算

https://yqfile.alicdn.com/beff14fd4d5d776524b631d21ff364c68ce26b01.png

和storm的bolt類似,function實作了一個包括實際邏輯的方法execute。function的實作也可以選用tridentcollector來發送tuple到新的function中。用這種方式,function也可以用來過濾tuple,起到filter的作用。

我們topology中的第一個function是cityassignment,如下所示:

《Storm分布式實時計算模式》——3.4 Trident運算

https://yqfile.alicdn.com/96f5af484cf5f33107478974af7e25235245791e.png

《Storm分布式實時計算模式》——3.4 Trident運算

在這個function中,我們使用靜态初始化的方式建立了一個我們關心的城市的地圖。示例中,function包括一個地圖,存儲了的坐标資訊包括:philadelphia(phl)、new york city(nyc)、san francisco(sf)和losangeles(la)。

在execute()方法中,函數周遊城市計算事件和城市之間的距離。現實系統中,地理空間的索引效率會高很多。

function聲明的字段數量必須和它發射出值的字段數一緻。如果不一緻,storm就會抛出indexoutofboundsexception異常。

我們topology中的下一個function是hourassignment,用來轉化unix時間戳為紀元時間的小時,可以用來對事件發生進行時間上的分組操作。hourassignment的代碼如下:

《Storm分布式實時計算模式》——3.4 Trident運算
《Storm分布式實時計算模式》——3.4 Trident運算

我們重寫了這個function,同時發射了小時的數值,以及由城市、疾病代碼、小時組合而成的key。實際上,這個組合值會作為聚合計數的唯一辨別符,後面會詳細解釋。

我們topology中最後兩個funciton是用來偵測疾病暴發并且告警的。outbreakdetector類的代碼如下:

《Storm分布式實時計算模式》——3.4 Trident運算

這個function提取出了特定城市、疾病、時間的發生次數,并且檢查計數是否超過了設定的門檻值。如果超過,發送一個新的字段包括一條告警資訊。在上面代碼裡,注意這個function實際上扮演了一個過濾器的角色,但是卻作為一個function的形式來實作,是因為需要在tuple中添加新的字段。因為filter不能改變tuple,當我們既想過濾又想添加字段時必須使用function。

最後一個function的功能就是釋出一個告警(并且結束程式)。代碼如下:

《Storm分布式實時計算模式》——3.4 Trident運算
《Storm分布式實時計算模式》——3.4 Trident運算

https://yqfile.alicdn.com/f6fa126f5a8efa4d10f9909240f12104e8f1e658.png" >

這個方法非常簡單,提取了告警的内容,并寫入日志,最後結束整個程式。

繼續閱讀