天天看點

Apache Storm 官方文檔 —— Trident 教程

trident 是 storm 的一種高度抽象的實時計算模型,它可以将高吞吐量(每秒百萬級)資料輸入、有狀态的流式處理與低延時的分布式查詢無縫結合起來。如果你了解 pig 或者 cascading 這樣的進階批處理工具,你就會發現他們和 trident 的概念非常相似。trident 同樣有聯結(join)、聚合(aggregation)、分組(grouping)、函數(function)以及過濾器(filter)這些功能。trident 為資料庫或者其他持久化存儲上層的狀态化、增量式處理提供了基礎原語。由于 trident 有着一緻的、恰好一次的語義,是以推斷出 trident 拓撲的狀态也是一件很容易的事。

讓我們先從一個使用 trident 的例子開始。這個例子中做了兩件事情:

從一個句子的輸入資料流中計算出單詞流的數量

實作對一個單詞清單中每個單詞總數的查詢

為了實作這個目的,這個例子将會從下面的資料源中無限循環地讀取語句資料流:

這個 spout 會循環地通路語句集來生成語句資料流。下面的代碼就是用來實作計算過程中的單詞資料流統計部分:

讓我們一行行地來分析上面的代碼。首先我們建立了一個 <code>tridenttopology</code> 對象,這個對象提供了構造 trident 計算過程的接口。<code>tridenttopology</code> 有一個叫做 <code>newstream</code> 的方法,這個方法可以從一個輸入資料源中讀取資料建立一個新的資料流。在這個例子中,輸入的資料源就是前面定義的 <code>fixedbatchspout</code>。輸入資料源也可以是像 kestrel 和 kafka 這樣的消息系統。trident 會通過 zookeeper 一直跟蹤每個輸入資料源的一小部分狀态(trident 具體消費對象的相關中繼資料)。例如這裡的 “spout1” 就對應着 zookeeper 中的一個節點,而 trident 就會在該節點中存放資料源的中繼資料(metadata)。

trident 會将資料流處理為很多個小塊 tuple 的集合,例如,輸入的句子流就會像下面這樣被分割成很多個小塊:

Apache Storm 官方文檔 —— Trident 教程

這些小塊的大小主要取決于你的輸入吞吐量,一般可能會在數萬甚至數百萬元組的級别。

trident 為這些小塊提供了一個完全成熟的批處理 api。這個 api 和你見到過的 pig 或者 cascading 這樣的 hadoop 的進階抽象語言很相似:你可以處理分組(group by)、聯結(join)、聚合(aggregation)、函數(function)、過濾器(filter)等各種操作。當然,分别處理每個小塊并不是件好事,是以,trident 提供了适用于處理各個小塊之間的聚合操作的函數,并且可以在聚合後将結果儲存到持久化存儲中,而且無論是記憶體、memcached、cassandra 還是其他類型的存儲都可以支援。最後,trident 還提供了用于查詢實時狀态結果的一級接口。而這個結果狀态既可以像這個例子中示範的那樣由 trident 負責更新,也可以作為一個獨立的狀态資料源而存在。

再回到這個例子中,輸入資料源 spout 發送出了一個名為 “sentence” 的資料流。接下來拓撲中定義了一個 <code>split</code> 方法用于處理流中的每個 tuple,這個方法接收 “sentence” 域并将其分割成若幹個單詞。每個 sentence tuple 都會建立很多個單詞 tuple —— 例如 “the cow jumped over the moon” 這個句子就會建立 6 個 “word” tuple,下面是 <code>split</code> 的定義:

從上面的代碼中你會發現這個過程真的很簡單。這個方法中的所有操作僅僅是抓取句子、以空格分隔句子并且為每個單詞發射一個 tuple。

<code>persistentaggregate</code> 方法所存儲的值就表示所有從資料流中發送出來的塊的聚合結果。

trident 的另一個很酷的特性就是它支援完全容錯性和恰好一次處理的語義。如果處理過程中出現錯誤需要重新執行處理操作,trident 不會向資料庫中送出多次來自相同的源資料的更新操作,這就是 trident 持久化 state 的方式。

<code>persistentaggregate</code> 方法也可以将資料流結果傳入一個 <code>tridentstate</code> 對象中。這種情況下,這個 <code>tridentstate</code> 就表示所有的單詞統計資訊。這樣我們就可以使用 <code>tridentstate</code> 對象來實作整個計算過程中的分布式查詢部分。

接下來我們就可以在拓撲中實作 word count 的一個低延時分布式查詢。這個查詢接收一個由空格分隔的單詞清單作為參數,然後傳回這些單詞的數量統計結果。這個查詢看上去與普通的 rpc 調用并沒有什麼分别,不過在背景他們是并發執行的。下面是一個實作這種查詢的例子:

如你所見,這個查詢看上去隻是一個普通的遠端過程調用(rpc),不過在背景他是在一個 storm 叢集中并發執行的。這種查詢的端到端延時一般在 10 ms 左右。當然,更大量的查詢會花費更長的時間,盡管這些查詢還是取決于你為這個計算過程配置設定了多少時間。

拓撲中的分布式查詢的實作是這樣的:

這裡還需要使用前面的 <code>tridenttopology</code> 對象來建立一個 drpc 資料流,這個建立資料流的方法叫做 “words”。前面使用<code>drpcclient</code> 進行 rpc 調用的第一個參數必須與這個方法名完全相同。

在這段代碼裡,首先是使用 <code>split</code> 方法來将請求的參數分割成若幹個單詞。這些單詞構成的單詞流是通過 “word” 域來分組的,而 <code>statequery</code> 運算符就是用來查詢拓撲中第一個部分中生成的 <code>tridentstate</code> 對象的。<code>statequery</code> 接收一個 state(在這個例子中就是拓撲前面計算得到的單詞數結果)和查詢這個 state 的方法作為參數。在這個例子裡,<code>statequery</code> 調用了 <code>mapget</code> 方法,用于擷取每個單詞的個數。由于 drpc 資料流是和 tridentstate 采用的完全相同的方式進行分組的(通過 “word” 域),每個單詞查詢都可以精确地定位到 tridentstate 對象中的指定部分,同時 tridentstate 對象中維護着對應的單詞的更新狀态。

接下來,個數為 0 的單詞會被 <code>filternull</code> 過濾器過濾掉,然後就可以使用 <code>sum</code> 聚合器來擷取其他的單詞統計個數。接着 trident 就會自動将結果傳回給等待的用戶端。

trident 很聰明,它知道怎麼以最好的性能運作拓撲。在這個拓撲中還有兩個會自動發生的有趣的事:

從 state 中讀取或寫入的操作(例如 persistentaggregate 和 statequery)會自動批處理化。是以,如果目前的批處理過程需要對資料庫執行 20 個更新操作,trident 就會自動将讀取或寫入操作當作批處理過程,僅僅會對資料庫發送一次讀請求和一次寫請求,而不是發送 20 次讀請求和 20 次寫請求(而且一般你還可以在你的 state 裡使用緩存來消除讀請求)。這樣做就有兩個方面的好處:可以按照你指定的方式來執行你的計算過程,同時還可以維持較好的性能。

trident 的聚合器是高度優化的。在向網絡中發送 tuple 之前,trident 有時候會做部分聚合操作,而不是将一個分組的所有的 tuple 一股腦地發送到同一台機器中來執行聚合。例如,<code>count</code> 聚合器就是這樣先計算每個小塊的個數,然後向網絡中發送很多個部分計數的結果,接着再将所有的部分計數結果彙總來得到最終的統計結果。這個技術與 mapreduce 的 combiner 模型很相似。

我們再來看看 trident 的另一個例子。

這個例子是一個純粹的 drpc 拓撲,計算了一個指定 url 的 reach 數。reach 指的是 twitter 上能夠看到一個指定的 url 的獨立使用者數。要想計算 reach,你需要先提取所有轉發了該 url 的使用者,提取這些使用者的關注者,将關注者放入一個 set 集合中來去除重複的關注者,然後再統計這個 set 中的數量。對于單一的一台機器來說,計算 reach 太耗時了,這個過程大概需要數千次資料庫調用并生成數千萬 tuple。而使用 storm 和 trident 就可以通過一個叢集來将計算過程的每個步驟進行并行化處理。

這個拓撲會從兩個 state 源中讀取資料。其中一個資料庫建立了 url 和轉發了該 url 的使用者清單的關聯表。另一個資料庫中建立了使用者和使用者的關注者清單的關聯表。拓撲的定義是這樣的:

這個拓撲使用 <code>newstaticstate</code> 方法建立了兩個分别對應外部于兩個外部資料庫的 <code>tridentstate</code> 對象。在拓撲的後續部分就可以對這兩個 <code>tridentstate</code> 對象執行查詢操作。和 state 的所有資料源一樣,為了最大程度地提升效率,對這些資料庫的查詢将會自動地批處理化。

拓撲的定義很直接 —— 就是一個簡單的批處理 job。首先,會通過查詢 urltotweeters 資料庫來擷取轉發了 url 的使用者清單,然後就可以調用 <code>expandlist</code> 方法來為每個 tweeter 建立一個 tuple。

接下來必須要擷取每個 tweeter 的關注者。由于需要調用 shuffle 方法将所有的 tweeter 均衡配置設定到拓撲的所有 worker 中,是以這個步驟必須并發進行,這一點非常重要。然後就可以查詢關注者資料庫來擷取每個 tweeter 的關注者清單。你可能注意到了這個過程的并行度非常高,因為這是整個計算過程中複雜度最高的部分。

再接下來,關注者就會被放入一個單獨的 set 集合中用于計數。這裡包含兩個步驟。首先,會根據 “follower” 域來執行 “group by” 分組操作,并在每個組上運作 <code>one</code> 聚合器。“one”聚合器的作用僅僅是為每個組發送一個包含數字 1 的 tuple。然後,就可以通過統計這些 one 結果來得到關注者 set 的大小,也就是真正的關注者數量。下面是 “one” 聚合器的定義:

這是一個“組合聚合器”,它知道怎樣在向網絡中發送 tuple 之前以最好的效率進行部分聚合操作。同樣,sum 也是一個組合聚合器,是以在拓撲結尾的全局統計操作也會有很高的效率。

下面讓我們再來看看 trident 中的一些細節。

trident 的資料模型 tridenttuple 是一個指定的值清單。在一個拓撲中,tuple 是在一系列操作中不斷生成的。這些操作一般會輸入一個“輸入域”(input fields)集合,然後發送出一個“方法域”(function fields)的集合。輸入域主要用于選取一個 tuple 的子集作為操作的輸入,而“方法域”主要用于為該操作的輸出結果域命名。

我們來看看這樣一個場景。假設你有一個名為 “stream” 的資料流,其中包含域 “x”、“y” 和 “z”。如果要運作一個接收 “y” 作為輸入的過濾器 myfilter,你可以這樣寫:

再假設 myfilter 的實作是這樣的:

這樣就會保留所有 “y” 域的值小于 10 的 tuple。myfilter 輸入的 tridenttuple 将會僅包含有 “y” 域。值得注意的是,trident 可以在選取輸入域時以一種非常高效的方式來投射 tuple 的子集:這個投射過程非常靈活。

我們再來看看 “function fields” 是怎麼工作的。假設你有這樣一個函數:

這個函數接收兩個數字作為輸入,然後發送出兩個新值:分别是兩個數字的和和乘積。再假定你有一個包含 “x”、“y” 和 “z” 域的資料流,你可以這樣使用這個函數:

這個函數的輸出增加了兩個新的域。是以,這個 each 調用的輸出 tuple 會包含 5 個域:“x”、“y” 、“z”、“added” 和 “multiplied”。其中 “added” 與 addandmultiply 的第一個輸出值相對應,“multiplied” 和 addandmultiply 的第二個輸出值相對應。

另一方面,通過聚合器,函數域也可以替換輸入 tuple 的域。假如你有一個包含域 “val1” 和域 “val2” 的資料流,通過這樣的操作:

就會使得輸出資料流中隻包含一個隻帶有 “sum” 的域的 tuple,這個 “sum” 域就代表了在哪個批處理塊中所有的 “val2” 域的總和值。

通過資料流分組,輸出就可以同時包含用于分組的域以及由聚合器發送的域。舉個例子:

這個操作就會使得輸出同時包含域 “val1” 以及域 “sum”。

實時計算的一個關鍵問題就在于如何管理狀态(state),使得在失敗與重試操作之後的更新過程仍然是幂等的。錯誤是不可消除的,是以在出現節點故障或者其他問題發生時批處理操作還需要進行重試。不過這裡最大的問題就在于怎樣執行一種合适的狀态更新操作(不管是針對外部資料庫還是拓撲内部的狀态),來使得每個消息都能夠被執行且僅僅被執行一次。

這個問題很麻煩,接下來的例子裡面就有這樣的問題。假如你正在對你的資料流做一個計數聚合操作,并且打算将計數結果存儲到一個資料庫中。如果你僅僅把計數結果存到資料庫裡就完事了的話,那麼在你繼續準備更新某個塊的狀态的時候,你沒法知道到底這個狀态有沒有被更新過。這個資料塊有可能在更新資料庫的步驟上成功了,但在後續的步驟中失敗了,也有可能先失敗了,沒有進行更新資料庫的操作。你完全不知道到底發生了什麼。

trident 通過下面兩件事情解決了這個問題:

在 trident 中為每個資料塊标記了一個唯一的 id,這個 id 就叫做“事務 id”(transaction id)。如果資料塊由于失敗復原了,那麼它持有的事務 id 不會改變。

state 的更新操作是按照資料塊的順序進行的。也就是說,在成功執行完塊 2 的更新操作之前,不會執行塊 3 的更新操作。

基于這兩個基礎特性,你的 state 更新就可以實作恰好一次(exactly-once)的語義。與僅僅向資料庫中存儲計數不同,這裡你可以以一個原子操作的形式把事務 id 和計數值一起存入資料庫。在後續更新這個計數值的時候你就可以先比對這個資料塊的事務 id。如果比對結果是相同的,那麼就可以跳過更新操作 —— 由于 state 的強有序性,可以确定資料庫中已經包含有目前資料庫的額值。而如果比對結果不同,就可以放心地更新計數值了。

trident 拓撲會被編譯成一種盡可能和普通拓撲有着同樣的運作效率的形式。隻有在請求資料的重新配置設定(比如 groupby 或者 shuffle 操作)時 tuple 才會被發送到網絡中。是以,像下面這樣的 trident 拓撲:

Apache Storm 官方文檔 —— Trident 教程

就會被編譯成若幹個 spout/bolt:

Apache Storm 官方文檔 —— Trident 教程

trident 讓實時計算變得非常簡單。從上面的描述中,你已經看到了高吞吐量的資料流處理、狀态操作以及低延時查詢處理是怎樣通過 trident 的 api 來實作無縫結合的。總而言之,trident 可以讓你以一種更加自然,同時仍然保持着良好的性能的方式來實作實時計算。