天天看點

Storm進階原語(三) — Trident topology

本文翻譯自Apache Storm首頁上的Trident topology 介紹一文,同時參考derekjiang部落格。

Trident是在storm基礎上,一個以實時計算為目标的高度抽象。 它在提供處理大吞吐量資料能力(每秒百萬次消息)的同時,也提供了低延時分布式查詢和有狀态流式處理的能力。 如果你對Pig和Cascading這種進階批處理工具很了解的話,那麼應該很容易了解Trident,因為他們之間很多的概念和思想都是類似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 還提供了一些專門的原語,進而在基于資料庫或者其他存儲的前提下來應付有狀态的遞增式處理。Trident也提供一緻性(consistent)、有且僅有一次(exactly-once)等語義,這使得我們在使用trident toplogy時變得容易。

舉例說明

讓我們一起來看一個Trident的例子。在這個例子中,我們主要做了兩件事情:

1、從一個流式輸入中讀取語句并計算每個單詞的個數

2、提供查詢給定單詞清單中每個單詞目前總數的功能

因為這隻是一個例子,我們會從如下這樣一個無限的輸入流中讀取語句作為輸入:

1 2 3 4 5 6

FixedBatchSpout spout =

new

FixedBatchSpout(

new

Fields(

"sentence"

),

3

,

new

Values(

"the cow jumped over the moon"

),

new

Values(

"the man went to the store and bought some candy"

),

new

Values(

"four score and seven years ago"

),

new

Values(

"how many apples can you eat"

));

spout.setCycle(

true

);

這個spout會循環輸出列出的那些語句到sentence stream當中,下面的代碼會以這個stream作為輸入并計算每個單詞的個數:

1 2 3 4 5 6 7

TridentTopology topology =

new

TridentTopology();

TridentState wordCounts =

topology.newStream(

"spout1"

, spout)

.each(

new

Fields(

"sentence"

),

new

Split(),

new

Fields(

"word"

))

.groupBy(

new

Fields(

"word"

))

.persistentAggregate(

new

MemoryMapState.Factory(),

new

Count(),

new

Fields(

"count"

))

.parallelismHint(

6

);

在這段代碼中,我們首先建立了一個TridentTopology對象,該對象提供了相應的接口去構造Trident計算過程。①、TridentTopology類中的newStream方法從輸入源(input source)中讀取資料,并建立一個新的資料流。在這個例子中,我們使用了上面定義的FixedBatchSpout對象作為輸入源。輸入資料源同樣也可以是如Kestrel或者Kafka這樣的隊列服務。Trident會在Zookeeper中儲存一小部分狀态資訊來追蹤資料的處理情況,而在代碼中我們指定的字元串“spout1”就是Zookeeper中用來存儲狀态資訊的Znode節點。

Trident在處理輸入stream的時候會把輸入轉換成batch(包含若幹個tuple)來處理。比如說,輸入的sentence stream可能會被拆分成如下的batch:

Storm進階原語(三) — Trident topology

一般來說,這些小的batch中的tuple可能會在數千或者數百萬這樣的數量級,這完全取決于你的輸入的吞吐量。

Trident提供了一系列非常成熟的批處理API來處理這些小batch。這些API和你在Pig或者Cascading中看到的非常類似, 你可以做groupby、join、 aggregation、執行 function和filter等等。當然,獨立的處理每個小的batch并不是非常有趣的事情,是以Trident提供了功能來實作batch之間的聚合并可以将這些聚合的結果存儲到記憶體、Memcached、Cassandra或者是一些其他的存儲中。同時,Trident還提供了非常好的功能來查詢實時狀态,這些實時狀态可以被Trident更新,同時它也可以是一個獨立的狀态源。

回到我們的這個例子中來,spout輸出了一個隻有單一字段“sentence”的資料流。②、在下一行,topology使用了Split函數來拆分stream中的每一個tuple,Split函數讀取輸入流中的“sentence”字段并将其拆分成若幹個word tuple。每一個sentence tuple可能會被轉換成多個word tuple,比如說”the cow jumped over the moon” 會被轉換成6個 “word” tuples。下面是Split的定義:

1 2 3 4 5 6 7 8

public

class

Split

extends

BaseFunction {

public

void

execute(TridentTuple tuple, TridentCollector collector) {

String sentence = tuple.getString(

);

for

(String word: sentence.split(

" "

)) {

collector.emit(

new

Values(word));

}

}

}

如你所見,真的很簡單。它隻是簡單的根據空格拆分sentence,并将拆分出的每個單詞作為一個tuple輸出。

topology的其他部分計算單詞的個數并将計算結果儲存到了持久存儲中。③、首先,word stream被根據“word”字段進行group操作,④、然後每一個group使用Count聚合器進行持久化聚合。persistentAggregate方法會幫助你把一個狀态源聚合的結果存儲或者更新到存儲當中。在這個例子中,單詞的數量被保持在記憶體中,不過我們可以很簡單的把這些資料儲存到其他的存儲當中,如 Memcached、 Cassandra等。如果我們要把結果存儲到Memcached中,隻是簡單的使用下面這句話替換掉persistentAggregate就可以,這當中的”serverLocations”是Memcached cluster的主機和端口号清單:

1

.persistentAggregate(MemcachedState.transactional(serverLocations),

new

Count(),

new

Fields(

"count"

))

persistentAggregate存儲的資料就是所有batch聚合的結果。

Trident非常酷的一點就是它提供完全容錯的(fully fault-tolerant)、處理一次且僅一次(exactly-once)的語義。這就讓你可以很輕松的使用Trident來進行實時資料處理。Trident會把狀态以某種形式保持起來,當有錯誤發生時,它會根據需要來恢複這些狀态。

④續、persistentAggregate方法會把資料流轉換成一個TridentState對象。在這個例子當中,TridentState對象代表了所有的單詞的數量。我們會使用這個TridentState對象來實作在計算過程中的分布式查詢部分。

上面的是topology中的第一部分,topology的第二部分實作了一個低延時的單詞數量的分布式查詢。這個查詢以一個用空格分割的單詞清單為輸入,并傳回這些單詞的總個數。這些查詢就像普通的RPC調用那樣被執行的,要說不同的話,那就是他們在背景是并行執行的。下面是執行查詢的一個例子:

1 2 3

DRPCClient client =

new

DRPCClient(

"drpc.server.location"

,

3772

);

System.out.println(client.execute(

"words"

,

"cat dog the man"

);

// prints the JSON-encoded result, e.g.: "[[5078]]"

如你所見,除了在storm cluster上并行執行之外,這個查詢看上去就是一個普通的RPC調用。這樣的簡單查詢的延時通常在10毫秒左右。當然,更複雜的DRPC調用可能會占用更長的時間,盡管延時很大程度上是取決于你給計算配置設定了多少資源。

Topology中的分布式查詢部分實作如下所示:

1 2 3 4 5 6

topology.newDRPCStream(

"words"

)

.each(

new

Fields(

"args"

),

new

Split(),

new

Fields(

"word"

))

.groupBy(

new

Fields(

"word"

))

.stateQuery(wordCounts,

new

Fields(

"word"

),

new

MapGet(),

new

Fields(

"count"

))

.each(

new

Fields(

"count"

),

new

FilterNull())

.aggregate(

new

Fields(

"count"

),

new

Sum(),

new

Fields(

"sum"

));

我們仍然是使用TridentTopology對象來建立DRPC stream,并且我們将這個函數命名為“words”。這個函數名會作為第一個參數在使用DRPC Client來執行查詢的時候用到。

每個DRPC請求會被當做隻有一個tuple的batch來處理。在處理的過程中,以這個輸入的單一tuple來表示這個請求。這個tuple包含了一個叫做“args”的字段,在這個字段中儲存了用戶端提供的查詢參數。在這個例子中,這個參數是一個以空格分割的單詞清單。

首先,我們使用Split函數把傳入的請求參數拆分成獨立的單詞。然後對“word”流進行group by操作,之後就可以使用stateQuery來在上面代碼中建立的TridentState對象上進行查詢。stateQuery接受一個state源(在這個例子中,就是我們的topolgoy所計算的單詞的個數)以及一個用于查詢的函數作為輸入。在這個例子中,我們使用了MapGet函數來擷取每個單詞的出現個數。由于DRPC stream是使用跟TridentState完全同樣的group方式(按照“word”字段進行groupby),每個單詞的查詢會被路由到TridentState對象管理和更新這個單詞的分區去執行。

接下來,我們用FilterNull這個過濾器把從未出現過的單詞給過濾掉(說明沒有查詢該單詞),并使用Sum這個聚合器将這些count累加起來得到結果。最終,Trident會自動把這個結果發送回等待的用戶端。

Trident在如何最大程度地保證執行topogloy性能方面是非常智能的。在topology中會自動的發生兩件非常有意思的事情:

1、讀取和更新狀态的操作 (比如說 stateQuery和persistentAggregate ) 會自動地批量處理。 如果目前處理的batch中有20次更新需要被同步到存儲中,Trident會自動的把這些操作彙總到一起,隻做一次讀一次寫,而不是進行20次讀20次寫的操作。是以你可以在很友善的執行計算的同時,保證了非常好的性能。

2、Trident的聚合器已經是被優化的非常好了的。Trident并不是簡單的把一個group中所有的tuples都發送到同一個機器上面進行聚合,而是在發送之前已經進行過一次部分的聚合。打個比方,Count聚合器會先在每個partition上面進行count,然後把每個分片count彙總到一起就得到了最終的count。這個技術其實就跟MapReduce裡面的combiner是一個思想。

讓我們再來看一下Trident的另外一個例子。

Reach

這個例子是一個純粹的DRPC topology,這個topology會計算一個給定URL的reach值,reach值是該URL對應頁面的推文能夠送達(Reach)的使用者數量,那麼我們就把這個數量叫做這個URL的reach。要計算reach,你需要擷取轉發過這個推文的所有人,然後找到所有該轉發者的粉絲,并将這些粉絲去重,最後就得到了去重後的使用者的數量。如果把計算reach的整個過程都放在一台機器上面,就太困難了,因為這會需要數千次資料庫調用以及千萬級别數量的tuple。如果使用Storm和Trident,你就可以把這些計算步驟在整個cluster中并行進行(具體哪些步驟,可以參考DRPC介紹一文,該文有介紹過Reach值的計算方法)。

這個topology會讀取兩個state源:一個将該URL映射到所有轉發該推文的使用者清單,還有一個将使用者映射到該使用者的粉絲清單。topology的定義如下:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16

TridentState urlToTweeters =

topology.newStaticState(getUrlToTweetersState());

TridentState tweetersToFollowers =

topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream(

"reach"

)

.stateQuery(urlToTweeters,

new

Fields(

"args"

),

new

MapGet(),

new

Fields(

"tweeters"

))

.each(

new

Fields(

"tweeters"

),

new

ExpandList(),

new

Fields(

"tweeter"

))

.shuffle()

.stateQuery(tweetersToFollowers,

new

Fields(

"tweeter"

),

new

MapGet(),

new

Fields(

"followers"

))

.parallelismHint(

200

)

.each(

new

Fields(

"followers"

),

new

ExpandList(),

new

Fields(

"follower"

))

.groupBy(

new

Fields(

"follower"

))

.aggregate(

new

One(),

new

Fields(

"one"

))

.parallelismHint(

20

)

.aggregate(

new

Count(),

new

Fields(

"reach"

));

這個topology使用newStaticState方法建立了TridentState對象來代表一個外部資料庫。使用這個TridentState對象,我們就可以在這個topology上面進行動态查詢了。和所有的state源一樣,在這些資料庫上面的查找會自動被批量執行,進而最大程度的提升效率。

這個topology的定義是非常簡單的 – 它僅是一個批處理的任務。

首先,查詢urlToTweeters資料庫來得到轉發過這個URL的使用者清單。這個查詢會傳回一個tweeter清單,是以我們使用ExpandList函數來把其中的每一個tweeter轉換成一個tuple。

接下來,我們來擷取每個tweeter的follower。我們使用shuffle來把要處理的tweeter均勻地配置設定到toplology運作的每一個worker中并發去處理。然後查詢tweetersToFollowers資料庫進而的到每個轉發者的粉絲。你可以看到我們為topology的這部分配置設定了很大的并行度,這是因為這部分是整個topology中最耗資源的計算部分。

然後,我們對這些粉絲進行去重和計數。這分為如下兩步:①、通過“follower”字段對流進行分組,并對每個組執行“One”聚合器。“One”聚合器對每個分組簡單的發送一個tuple,該tuple僅包含一個數字“1”。②、将這些“1”加到一起,得到去重後的粉絲集中的粉絲數。“One”聚合器的定義如下:

01 02 03 04 05 06 07 08 09 10 11 12 13

public

class

One

implements

CombinerAggregator<Integer> {

public

Integer init(TridentTuple tuple) {

return

1

;

}

public

Integer combine(Integer val1, Integer val2) {

return

1

;

}

public

Integer zero() {

return

1

;

}

}

這是一個“彙總聚合器(combiner aggregator)”, 它會在傳送結果到其他worker彙總之前進行局部彙總,進而使性能最優。同樣,Sum被定義成一個彙總聚合器,在topology的最後部分進行全局求和是高效的。

接下來讓我們一起來看看Trident的一些細節。

Fields and tuples

Trident的資料模型是TridentTuple。在一個topology中,tuple是在一系列的處理操作(operation)中增量生成的。operation一般以一組字段作為輸入并輸出一組功能字段(function fileds)。Operation的輸入字段經常是輸入tuple的一個子集,而功能字段則是operation的輸出。

看下面這個例子。假定你有一個叫做“stream”的stream,它包含了“x”,”y”和”z”三個字段。為了運作一個讀取“y”作為輸入的過濾器MyFilter,你可以這樣寫:

1

stream.each(

new

Fields(

"y"

),

new

MyFilter())

MyFilter的實作如下:

1 2 3 4 5

public

class

MyFilter

extends

BaseFilter {

public

boolean

isKeep(TridentTuple tuple) {

return

tuple.getInteger(

) <

10

;

}

}

這會保留所有“y”字段小于10的tuples。傳給MyFilter的TridentTuple參将隻包含字段“y”。這裡需要注意的是,當選擇輸入字段時,Trident隻發送tuple的一個子集,這個操作是非常高效的。

讓我們一起看一下“功能字段(function field)”是怎樣工作的。假定你有如下這個函數:

1 2 3 4 5 6 7

public

class

AddAndMultiply

extends

BaseFunction {

public

void

execute(TridentTuple tuple, TridentCollector collector) {

int

i1 = tuple.getInteger(

);

int

i2 = tuple.getInteger(

1

);

collector.emit(

new

Values(i1 + i2, i1 * i2));

}

}

這個函數接收兩個數作為輸入并輸出兩個新的值:這兩個數的和與乘積。假定你有一個stream,其中包含“x”,”y”和”z”三個字段。你可以這樣使用這個函數:

1

stream.each(

new

Fields(

"x"

,

"y"

),

new

AddAndMultiply(),

new

Fields(

"added"

,

"multiplied"

));

輸出的功能字段被添加到輸入tuple後面,是以這個時候,每個tuple中将會有5個字段”x”, “y”, “z”, “added”, 和 “multiplied”.。”added” 和”multiplied”對應于AddAndMultiply輸出的第一和第二個字段。

另外,我們可以使用聚合器來用輸出字段來替換輸入tuple。如果你有一個stream包含字段”val1″和”val2″,你可以這樣做:

1

stream.aggregate(

new

Fields(

"val2"

),

new

Sum(),

new

Fields(

"sum"

))

輸出流将會僅包含一個tuple,該tuple有一個“sum”字段,這個sum字段就是一批tuple中“val2”字段的累積和。但是若對groupby之後的流進行該聚合操作,則輸出tuple中包含分組字段和聚合器輸出的字段,例如:

1 2

stream.groupBy(

new

Fields(

"val1"

))

.aggregate(

new

Fields(

"val2"

),

new

Sum(),

new

Fields(

"sum"

))

這個例子中的輸出包含“val1”字段和“sum”字段。

State

在實時計算領域的一個主要問題就是怎麼樣來管理狀态并能輕松應對錯誤和重試。消除錯誤的是不可能的,當一個節點死掉,或者一些其他的問題出現時,這些batch需要被重新處理。問題是-你怎樣做狀态更新來保證每一個消息被處理且隻被處理一次?

這是一個很棘手的問題,我們可以用接下來的例子進一步說明。假定你在做一個你的stream的計數聚合,并且你想要存儲運作時的count到一個資料庫中去。如果你隻是存儲這個count到資料庫中,并且想要進行一次更新,我們是沒有辦法知道同樣的狀态是不是以前已經被update過了的。這次更新可能在之前就嘗試過,并且已經成功的更新到了資料庫中,不過在後續的步驟中失敗了。還有可能是在上次更新資料庫的過程中失敗的,這些你都不知道。

Trident通過做下面兩件事情來解決這個問題:

1、每一個batch被賦予一個唯一辨別id“transaction id”。如果一個batch被重試,它将會擁有和之前同樣的transaction id

2、狀态更新是按照batch的順序進行的(強順序)。也就是說,batch 3的狀态更新必須等到batch 2的狀态更新成功之後才可以進行。

有了這2個原則,你就可以達到有且隻有一次更新的目标。此時,不是隻将count存到資料庫中,而是将transaction id和count作為原子值存到資料庫中。當更新一個count的時候,需要比較資料庫中transaction id和目前batch的transaction id。如果相同,就跳過這次更新。如果不同,就更新這個count。

當然,你不需要在topology中手動處理這些邏輯,這些邏輯已經被封裝在State的抽象中并自動進行。你的State object也不需要自己去實作transaction id的跟蹤操作。如果你想了解更多的關于如何實作一個State以及在容錯過程中的一些取舍問題,可以參照這篇文章。

一個State可以采用任何政策來存儲狀态,它可以存儲到一個外部的資料庫,也可以在記憶體中保持狀态并備份到HDFS中。State并不需要永久的保持狀态。比如說,你有一個記憶體版的State實作,它儲存最近X個小時的資料并丢棄老的資料。可以把 Memcached integration 作為例子來看看State的實作。

Trident topology的執行

Trident的topology會被編譯成盡可能高效的Storm topology。隻有在需要對資料進行重新配置設定(repartition)的時候(如groupby或者shuffle)才會把tuple通過network發送出去,如果你有一個trident topology如下:

Storm進階原語(三) — Trident topology

它将會被編譯成如下的storm topology:

Storm進階原語(三) — Trident topology

小結

Trident使得實時計算更加優雅。你已經看到了如何使用Trident的API來完成大吞吐量的流式計算、狀态維護、低延時查詢等等功能。Trident讓你在擷取最大性能的同時,以更自然的一種方式進行實時計算。