天天看點

JStorm之Topology送出用戶端

    一個topology包含一或多個spout bolt,spout負責在資料源獲得資料并發送給bolt,每個bolt負責做完處理後發給下一個bolt。通常topology的建立是由TopologyBuilder來建立的,該元件會記錄包含哪些spout bolt,并做相應驗證:各元件是否有id沖突,校驗方法如下:

private void validateUnusedId(String id) {
	if (_bolts.containsKey(id)) {
		throw new IllegalArgumentException(
				"Bolt has already been declared for id " + id);
	}
	if (_spouts.containsKey(id)) {
		throw new IllegalArgumentException(
				"Spout has already been declared for id " + id);
	}
	if (_stateSpouts.containsKey(id)) {
		throw new IllegalArgumentException(
				"State spout has already been declared for id " + id);
	}
} <span style="font-family: 'Courier New'; background-color: rgb(255, 255, 255);">   </span>           

TopologyBuilder會儲存各個元件到相應的資料結構中,資料結構如下:

public class TopologyBuilder {
	// 存放所有的bolt
	private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
	// 存放所有的spout
	private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
	//存放各元件配置資訊
	private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
	..........
}           

元件配置資訊存放方法如下

private void initCommon(String id, IComponent component, Number parallelism) {
	ComponentCommon common = new ComponentCommon();
	common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
	if (parallelism != null)
		common.set_parallelism_hint(parallelism.intValue());
	else {
	    common.set_parallelism_hint(Integer.valueOf(1));
	}
	Map conf = component.getComponentConfiguration();
	if (conf != null)
		common.set_json_conf(Utils.to_json(conf));
	_commons.put(id, common);
}           

資訊儲存好後,在topology階段builder會根據這些資訊建立一個StormTopology執行個體,然後由StormSubmitter.submitTopology進行送出,該階段分兩步:1、上傳jar檔案 2、送出作業

public static void submitTopology(String name, Map stormConf,
		StormTopology topology, SubmitOptions opts)
		throws AlreadyAliveException, InvalidTopologyException {
	//讀取配置資訊并進行相關校驗
	if (!Utils.isValidConf(stormConf)) {
		throw new IllegalArgumentException(
				"Storm conf is not valid. Must be json-serializable");
	}
	stormConf = new HashMap(stormConf);
	stormConf.putAll(Utils.readCommandLineOpts());
	Map conf = Utils.readStormConfig();
	conf.putAll(stormConf);
	putUserInfo(conf, stormConf);
	try {
		String serConf = Utils.to_json(stormConf);
		if (localNimbus != null) {
			LOG.info("Submitting topology " + name + " in local mode");
			localNimbus.submitTopology(name, null, serConf, topology);
		} else {
			NimbusClient client = NimbusClient.getConfiguredClient(conf);
			//校驗叢集中有無同名topology在運作
			if (topologyNameExists(conf, name)) {
				throw new RuntimeException("Topology with name `" + name
						+ "` already exists on cluster");
			}
			//上傳jar檔案,下面會詳細解釋這個方法
			submitJar(conf);
			try {
				LOG.info("Submitting topology " + name
						+ " in distributed mode with conf " + serConf);
		  //送出topology,會調用服務端ServiceHandler的submitTopology方法,開始啟動這個topology,那就屬于服務端的事情了
				if (opts != null) {
					client.getClient().submitTopologyWithOpts(name, path,
							serConf, topology, opts);
				} else {
					// this is for backwards compatibility
					client.getClient().submitTopology(name, path, serConf,
							topology);
				}
			} finally {
				client.close();
			}
		}
		LOG.info("Finished submitting topology: " + name);
	} catch (InvalidTopologyException e) {
		.......
	}
}           

jar檔案上傳包含兩部分,jar檔案本身和其依賴的庫檔案都會被傳到服務端,預設上傳buf大小為512K,可以通過nimbus.thrift.max_buffer_size來調整buf大小,服務端儲存的目錄結構如下:

[[email protected] ~]$tree /home/hongmin.lhm/jstorm_data/nimbus/inbox/
/home/hongmin.lhm/jstorm_data/nimbus/inbox/
`-- 7c1b7d1e-9134-4ed8-b664-836271b49bd3
    `-- stormjar-7c1b7d1e-9134-4ed8-b664-836271b49bd3.jar           
private static void submitJar(Map conf) {
	if (submittedJar == null) {
		NimbusClient client = NimbusClient.getConfiguredClient(conf);
		try {
			LOG.info("Jar not uploaded to master yet. Submitting jar...");
			String localJar = System.getProperty("storm.jar");
			path = client.getClient().beginFileUpload();
			String[] pathCache = path.split("/");
			String uploadLocation = path + "/stormjar-"
					+ pathCache[pathCache.length - 1] + ".jar";
			List<String> lib = (List<String>) conf
					.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
			Map<String, String> libPath = (Map<String, String>) conf
					.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
			if (lib != null && lib.size() != 0) {
				for (String libName : lib) {
					String jarPath = path + "/" + libName;
					client.getClient().beginLibUpload(jarPath);
					submitJar(conf, libPath.get(libName), jarPath, client);
				}
				if (localJar != null)
					submittedJar = submitJar(conf, localJar,
							uploadLocation, client);
			} else {
				submittedJar = submitJar(conf, localJar, uploadLocation,
						client);
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		} finally {
			client.close();
		}
	} else {
		LOG.info("Jar already uploaded to master. Not submitting jar.");
	}
}           

繼續閱讀