天天看點

【五】storm調優,調整并行度

實際上就是用多線程。沒有真的對叢集做擴容。

官網介紹

http://storm.apache.org/releases/1.1.2/Understanding-the-parallelism-of-a-Storm-topology.html

【五】storm調優,調整并行度

storm中運作topology的實體主要有三個:

worker processes:    一個supervisor可以啟多個worker。一個topology可以運作在一個worker或者接worker中。每個worker運作一個topology的子集。一個worker程序隻服務于一個指定的topology。

executors(threads): 一個worker程序上可以運作多個executor。

tasks:   task實際上是storm裡面最小的運算單元,執行實際的資料處理。 一個executors上可以運作多個相同元件(spout、bolt)的task。每個spout、bolt可以作為多個task在叢集中被處理。

對于一個元件(spout、bolt)在它的生命周期中,task的數量時不變的,executor的數量可變。這意味着,線程(executor)的數量可能小于等于task的數量。預設情況下task的數量等于executor的數量。

預設情況下:

1個supervisor節點啟動4個worker程序。

每一個topology預設占用一個worker程序。

每個worker會啟動executor。

每個executor預設啟動一個task。

(這裡用單機版storm示範。即解壓storm.tar後 ,什麼配置檔案都不用改,隻用加個環境變量就行了。

單機版安裝及啟動)

運作作業:送出作業到storm伺服器上運作

檢視UI會發現。隻啟動了一個work來執行我們的clusterSumStormTopology作業。

我們的storm作業隻有一個spout和一個bolt,但是在預設情況下卻啟動了3個executor。因為其中有一個是acker。

【五】storm調優,調整并行度

設定介紹

【五】storm調優,調整并行度

源碼中的defaults.yaml配置檔案<storm.yaml<topology代碼中指定的配置<内部配置<外部配置

worker processes數量設定(設定運作某個具體的topology使用幾個worker,而不是一個supervisor啟動幾個worker。一個supervisor啟動幾個worker的設定是修改storm.yaml配置檔案,給supervisor.slots.ports添加幾個端口。)

代碼中配置:

修改之前的代碼mian中的config部分

public static void main (String[] args){
		try {
		
		//TopologyBuilder根據spout和bolt來建構Topology
		//storm中任何一個作業都是通過Topology方式進行送出的
		//Topology中需要指定spout和bolt的執行順序
		TopologyBuilder tb = new TopologyBuilder();
		tb.setSpout("DataSourceSpout", new DataSourceSpout());
		//SumBolt以随機分組的方式從DataSourceSpout中接收資料
		tb.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout");
		
		Config config = new Config();
		//設定讓叢集啟動2個worker來執行這個作業
		config.setNumWorkers(2);
		//把執行acker的executor關掉
		config.setNumAckers(0);
		//代碼送出到storm叢集上運作
		StormSubmitter.submitTopology("ClusterSumStormTopology", config, tb.createTopology());
		} catch (AlreadyAliveException e) {
			e.printStackTrace();
		} catch (InvalidTopologyException e) {
			e.printStackTrace();
		} catch (AuthorizationException e) {
			e.printStackTrace();
		}
		
	}
           

重新打包,上傳到nimbus所在節點,關閉原來運作的ClusterSumStormTopology作業,重新運作。

重新檢視UI。這時就隻啟動了2個executor(因為這個topology程式隻有一個spout和一個bolt,我們把acker關掉了。這裡隻是做個測試,生産中不要關掉acker,不然不能保證資料有且隻有一次被執行,很可能資料被重複發送,或者漏了。)

用了2個worker來執行這個topology(我們在代碼中設定的2)。

【五】storm調優,調整并行度

executor數量設定

public static void main (String[] args){
		try {
		
		//TopologyBuilder根據spout和bolt來建構Topology
		//storm中任何一個作業都是通過Topology方式進行送出的
		//Topology中需要指定spout和bolt的執行順序
		TopologyBuilder tb = new TopologyBuilder();
		//第三個參數2,是并行度,是這個元件有幾個executor來執行
		tb.setSpout("DataSourceSpout", new DataSourceSpout(),2);
		//SumBolt以随機分組的方式從DataSourceSpout中接收資料
		//第三個參數2,是并行度,是這個元件有幾個executor來執行
		tb.setBolt("SumBolt", new SumBolt(),2).shuffleGrouping("DataSourceSpout");
		
		Config config = new Config();
		//設定讓叢集啟動2個worker來執行這個作業
		config.setNumWorkers(2);
		//把執行acker的executor關掉。預設情況下有幾個worker就有幾個acker
		config.setNumAckers(0);
		//代碼送出到storm叢集上運作
		StormSubmitter.submitTopology("ClusterSumStormTopology", config, tb.createTopology());
		} catch (AlreadyAliveException e) {
			e.printStackTrace();
		} catch (InvalidTopologyException e) {
			e.printStackTrace();
		} catch (AuthorizationException e) {
			e.printStackTrace();
		}
		
	}
           

task數量設定

public static void main (String[] args){
		try {
		
		//TopologyBuilder根據spout和bolt來建構Topology
		//storm中任何一個作業都是通過Topology方式進行送出的
		//Topology中需要指定spout和bolt的執行順序
		TopologyBuilder tb = new TopologyBuilder();
		//第三個參數2,是并行度,是這個元件有幾個executor來執行
		tb.setSpout("DataSourceSpout", new DataSourceSpout(),2);
		//SumBolt以随機分組的方式從DataSourceSpout中接收資料
		//第三個參數2,是并行度,是這個元件有幾個executor來執行
		tb.setBolt("SumBolt", new SumBolt(),2)
		.setNumTasks(4)//設定task數量為4.這個bolt有2個executor,4個task
		.shuffleGrouping("DataSourceSpout");
		
		Config config = new Config();
		//設定讓叢集啟動2個worker來執行這個作業
		config.setNumWorkers(2);
		//把執行acker的executor關掉。預設情況下有幾個worker就有幾個acker。
		config.setNumAckers(0);
		//代碼送出到storm叢集上運作
		StormSubmitter.submitTopology("ClusterSumStormTopology", config, tb.createTopology());
		} catch (AlreadyAliveException e) {
			e.printStackTrace();
		} catch (InvalidTopologyException e) {
			e.printStackTrace();
		} catch (AuthorizationException e) {
			e.printStackTrace();
		}
		
	}
           

topology在運作中可以動态調整worker和executor的數量,并不需要重新開機叢集。

1.在storm的UI上調整。

2.使用用戶端工具來調整。

【五】storm調優,調整并行度