我們知道,storm的強大之處就是可以很容易地在叢集中橫向拓展它的計算能力,它會把整個運算過程分割成多個獨立的tasks在叢集中進行并行計算。在storm中,一個task就是運作在叢集中的一個spout或bolt執行個體。
為了友善了解storm如何并行處理我們分給它的任務,這裡我先介紹一下在叢集中涉及到topology的四種元件:
nodes(machines):叢集中的節點,就是這些節點一起工作來執行topology。
workers(jvms):一個worker就是一個獨立的jvm程序。每個節點都可以通過配置運作一個或多個workers,一個topology可以指定由多少個workers來執行。
executors(threads):一個worker jvm中運作的線程。一個worker程序可以執行一個或多個executor線程。一個executor可以運作多個tasks,storm預設一個每個executor配置設定一個task。
tasks(bolt/spout執行個體):tasks就是spouts和bolts的執行個體,它具體是被executor線程處理的。
我們可以通過配置來調整我們work的并行數量,如果我們不進行設定,storm預設大部分過程的并行數量為1。假設我們對wordcounttopology不單獨進行配置,那麼我們的topology執行情況如下圖所示:
我們的一個節點會為我們的topology配置設定一個worker,這個worker會為每個task啟動一個executor線程。
一種最簡單的提高topology運算能力的途徑就是為我們的topology增加workers。storm為我們提供了兩種途徑來增加workers:通過配置檔案或通過程式設定。
描述:在叢集中為topology建立多少個工作程序
在代碼中配置:
<a target="_blank" href="http://storm.incubator.apache.org/apidocs/backtype/storm/config.html">config#setnumworkers</a>
通過config對象來配置workers:
config config = new config();
config.setnumworkers(2);
注意:在localmode下不管設定幾個workers,最終都隻有一個worker jvm程序。
前面我們已經說過,storm會為每個topology元件建立一個task,而預設一個executor隻處理一個task。task是spouts和bolts的執行個體,一個executor線程可由處理多個tasks,tasks是真正處理具體資料的一個過程,我們在代碼中寫的spout和bolt可以看做是由叢集中分布的tasks來執行的。task的數量在整個topology運作期間一般是不變的,但是元件的executors是有可能發生變化的。這也就意味着:threads<=tasks。
通過設定parallelism hint來指定一個元件的executors。
描述:每個元件産生多少個executor
配置選項:?
<a target="_blank" href="http://storm.incubator.apache.org/apidocs/backtype/storm/topology/topologybuilder.html">topologybuilder#setspout()</a>
<a target="_blank" href="http://storm.incubator.apache.org/apidocs/backtype/storm/topology/topologybuilder.html">topologybuilder#setbolt()</a>
note that as of storm 0.8 the <code>parallelism_hint</code> parameter now specifies the initial number of executors (not tasks!) for that bolt.
下面我們指定sentensespout的并行數量為2,則這個spout元件會有兩個executors,每個executor配置設定一個task,其topology的運作情況如下圖所示:
builder.setspout(sentence_spout_id, spout, 2);
通過setnumtasks()方法來指定一個元件的tasks數量。
描述:每個元件建立多少個task
下面我們為splitsentencebolt 設定4個tasks和2個executors,這樣的話每個executor線程将被配置設定執行4/2=2個tasks,然後再為wordcountbolt配置設定4個task,每個task由一個executor負責執行。其topology如下圖所示:
builder.setbolt(split_bolt_id, splitbolt, 2).setnumtasks(4).shufflegrouping(sentence_spout_id);
builder.setbolt(count_bolt_id, countbolt, 4).fieldsgrouping(split_bolt_id, newfields("word"));
如果一開始配置設定2個workers,則topology的運作情況如下圖所示:
下面這幅圖展示了一個實際topology的全景,topology由三個元件組成,一個spout:bluespout,兩個bolt:greenbolt、yellowbolt。
如上圖,我們配置了兩個worker程序,兩個spout線程,兩個greenbolt線程和六個yellowbolt線程,那麼分布到叢集中的話,每個工作程序都會有5個executor線程。下面看一下具體代碼:
java config conf = new config();
conf.setnumworkers(2); // use two worker processes
topologybuilder.setspout(“blue-spout”, new bluespout(), 2); // set parallelism hint to 2
topologybuilder.setbolt(“green-bolt”, new greenbolt(), 2) .setnumtasks(4) .shufflegrouping(“blue-spout”);
topologybuilder.setbolt(“yellow-bolt”, new yellowbolt(), 6) .shufflegrouping(“green-bolt”);
stormsubmitter.submittopology( “mytopology”, conf, topologybuilder.createtopology() );
當然,storm中也有一個參數來控制topology的并行數量:
storm中一個很好的特性就是可以在topology運作期間動态調制worker程序或executor線程的數量而不需要重新開機topology。這種機制被稱作rebalancing。
我們有兩種方式來均衡一個topology:
通過storm web ui來均衡
通過cli tool storm 來均衡
下面就是一個cli tool應用的例子:
# reconfigure the topology “mytopology” to use 5 worker processes, # the spout “blue-spout” to use 3 executors and # the bolt “yellow-bolt” to use 10 executors.
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10