實際上就是用多線程。沒有真的對叢集做擴容。
官網介紹
http://storm.apache.org/releases/1.1.2/Understanding-the-parallelism-of-a-Storm-topology.html
storm中運作topology的實體主要有三個:
worker processes: 一個supervisor可以啟多個worker。一個topology可以運作在一個worker或者接worker中。每個worker運作一個topology的子集。一個worker程序隻服務于一個指定的topology。
executors(threads): 一個worker程序上可以運作多個executor。
tasks: task實際上是storm裡面最小的運算單元,執行實際的資料處理。 一個executors上可以運作多個相同元件(spout、bolt)的task。每個spout、bolt可以作為多個task在叢集中被處理。
對于一個元件(spout、bolt)在它的生命周期中,task的數量時不變的,executor的數量可變。這意味着,線程(executor)的數量可能小于等于task的數量。預設情況下task的數量等于executor的數量。
預設情況下:
1個supervisor節點啟動4個worker程序。
每一個topology預設占用一個worker程序。
每個worker會啟動executor。
每個executor預設啟動一個task。
(這裡用單機版storm示範。即解壓storm.tar後 ,什麼配置檔案都不用改,隻用加個環境變量就行了。
單機版安裝及啟動)
運作作業:送出作業到storm伺服器上運作
檢視UI會發現。隻啟動了一個work來執行我們的clusterSumStormTopology作業。
我們的storm作業隻有一個spout和一個bolt,但是在預設情況下卻啟動了3個executor。因為其中有一個是acker。
設定介紹
源碼中的defaults.yaml配置檔案<storm.yaml<topology代碼中指定的配置<内部配置<外部配置
worker processes數量設定(設定運作某個具體的topology使用幾個worker,而不是一個supervisor啟動幾個worker。一個supervisor啟動幾個worker的設定是修改storm.yaml配置檔案,給supervisor.slots.ports添加幾個端口。)
代碼中配置:
修改之前的代碼mian中的config部分
public static void main (String[] args){
try {
//TopologyBuilder根據spout和bolt來建構Topology
//storm中任何一個作業都是通過Topology方式進行送出的
//Topology中需要指定spout和bolt的執行順序
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("DataSourceSpout", new DataSourceSpout());
//SumBolt以随機分組的方式從DataSourceSpout中接收資料
tb.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout");
Config config = new Config();
//設定讓叢集啟動2個worker來執行這個作業
config.setNumWorkers(2);
//把執行acker的executor關掉
config.setNumAckers(0);
//代碼送出到storm叢集上運作
StormSubmitter.submitTopology("ClusterSumStormTopology", config, tb.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
重新打包,上傳到nimbus所在節點,關閉原來運作的ClusterSumStormTopology作業,重新運作。
重新檢視UI。這時就隻啟動了2個executor(因為這個topology程式隻有一個spout和一個bolt,我們把acker關掉了。這裡隻是做個測試,生産中不要關掉acker,不然不能保證資料有且隻有一次被執行,很可能資料被重複發送,或者漏了。)
用了2個worker來執行這個topology(我們在代碼中設定的2)。
executor數量設定
public static void main (String[] args){
try {
//TopologyBuilder根據spout和bolt來建構Topology
//storm中任何一個作業都是通過Topology方式進行送出的
//Topology中需要指定spout和bolt的執行順序
TopologyBuilder tb = new TopologyBuilder();
//第三個參數2,是并行度,是這個元件有幾個executor來執行
tb.setSpout("DataSourceSpout", new DataSourceSpout(),2);
//SumBolt以随機分組的方式從DataSourceSpout中接收資料
//第三個參數2,是并行度,是這個元件有幾個executor來執行
tb.setBolt("SumBolt", new SumBolt(),2).shuffleGrouping("DataSourceSpout");
Config config = new Config();
//設定讓叢集啟動2個worker來執行這個作業
config.setNumWorkers(2);
//把執行acker的executor關掉。預設情況下有幾個worker就有幾個acker
config.setNumAckers(0);
//代碼送出到storm叢集上運作
StormSubmitter.submitTopology("ClusterSumStormTopology", config, tb.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
task數量設定
public static void main (String[] args){
try {
//TopologyBuilder根據spout和bolt來建構Topology
//storm中任何一個作業都是通過Topology方式進行送出的
//Topology中需要指定spout和bolt的執行順序
TopologyBuilder tb = new TopologyBuilder();
//第三個參數2,是并行度,是這個元件有幾個executor來執行
tb.setSpout("DataSourceSpout", new DataSourceSpout(),2);
//SumBolt以随機分組的方式從DataSourceSpout中接收資料
//第三個參數2,是并行度,是這個元件有幾個executor來執行
tb.setBolt("SumBolt", new SumBolt(),2)
.setNumTasks(4)//設定task數量為4.這個bolt有2個executor,4個task
.shuffleGrouping("DataSourceSpout");
Config config = new Config();
//設定讓叢集啟動2個worker來執行這個作業
config.setNumWorkers(2);
//把執行acker的executor關掉。預設情況下有幾個worker就有幾個acker。
config.setNumAckers(0);
//代碼送出到storm叢集上運作
StormSubmitter.submitTopology("ClusterSumStormTopology", config, tb.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
topology在運作中可以動态調整worker和executor的數量,并不需要重新開機叢集。
1.在storm的UI上調整。
2.使用用戶端工具來調整。