天天看點

storm應用入門(一)

一.Storm是一種實時流計算架構

具體的表現形式可以從它的元件中看出:

Spout:資料來源

Bolt:處理點

總體來說就是Spout不斷的提供資料,而Bolt不斷的處理資料,這就形成了資料處理流。

二.下面以單詞計數為例子:

SentenceSpout(Spout,産生句子)->SplitSentenceBolt(Bolt,對句子進行切割)->WordCountBolt(Bolt,對切割的單詞進行計數)->ReportBolt(Bolt,輸出計數結果)

整個SentenceSpout->SplitSentenceBolt->WordCountBolt->ReportBolt流水線就構成了一個概念,Topology拓撲。

SentenceSpout.java

package com.zte.StormTest;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;


public class SentenceSpout extends BaseRichSpout
{
	private static final long serialVersionUID = -2521640424426565301L;
	
	private SpoutOutputCollector collector;
	private String[] sentences = {
			"my dog has fleas",
			"i like cold beverages",
			"the dog ate my homework",
			"don't have a cow man",
			"i don't think i like fleas"
	};                          
	private int index = 0;
	                 
	@Override
	public void nextTuple() {
		this.collector.emit(new Values(sentences[index]));
		index++;
		if(index >= sentences.length)
		{ 
			index=0;
		}
	}
	
	//所有Spout元件在初始化的時候調用這個方法
	//Map包含了Storm的配置資訊
	//TopologyContext提供了topology中的元件資訊,例如目前元件ID等
	//SpoutOutputCollector發射tuple的方法
	@Override
	public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
		this.collector  = collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		 declarer.declare(new Fields("sentence"));
	}
	
}
           

SplitSentenceBolt.java

package com.zte.StormTest;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;


public class SplitSentenceBolt extends BaseRichBolt
{
	private static final long serialVersionUID = 5516446565262406488L;
	
	private OutputCollector collector;
	
	@Override
	public void execute(Tuple tuple) 
	{
		String sentence = tuple.getStringByField("sentence");
		String[] words = sentence.split(" ");
		for(String word : words)
		{
			this.collector.emit(new Values(word));
		}
	}

	//在bolt初始化的時候調用,可以用來準備bolt用到的資源,例如資料庫連接配接等
	@Override
	public void prepare(Map config, TopologyContext context, OutputCollector collector) 
	{
		this.collector = collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) 
	{
		declarer.declare(new Fields("word"));
	}
}
           

WordCountBolt.java

package com.zte.StormTest;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;


public class WordCountBolt extends BaseRichBolt
{
	private static final long serialVersionUID = 3533537921679412895L;
	
	private OutputCollector collector;
	private HashMap<String,Long> counts = null;
	
	@Override
	public void execute(Tuple tuple) 
	{
		String word = tuple.getStringByField("word");
		Long count = this.counts.get(word);
		if(count == null)
		{
			count = 0L;
		}
		count++;
		this.counts.put(word, count);
		this.collector.emit(new Values(word,count));
		System.out.println("word:"+word+" count:"+count);
	}

	@Override
	public void prepare(Map config, TopologyContext context, OutputCollector collector) 
	{
		this.collector = collector;
		this.counts = new HashMap<String,Long>();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) 
	{
		declarer.declare(new Fields("word","count"));
	}
}
           

WordCountTopology.java

package com.zte.StormTest;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology 
{
	private static final String SENTENCE_SPOUT_ID = "sentence-spout";
	private static final String SPLIT_BOLT_ID = "split-bolt";
	private static final String COUNT_BOLT_ID = "count-bolt";
	private static final String REPORT_BOLT_ID = "report-bolt";
	private static final String TOPOLOGY_NAME = "word-count-topology";
	
	public static void main(String[] args) throws Exception
	{
		SentenceSpout spout = new SentenceSpout();
		SplitSentenceBolt splitBolt = new SplitSentenceBolt();
		WordCountBolt countBolt = new WordCountBolt();
		ReportBolt reportBolt = new ReportBolt();
		
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout(SENTENCE_SPOUT_ID, spout);
		builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
		builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
		builder.setBolt(REPORT_BOLT_ID,reportBolt).globalGrouping(COUNT_BOLT_ID);
		
		Config config = new Config();
		
		//本地運作
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
		//本地運作在關閉的時候最好加個sleep,因為關閉元件需要一些時間,才能看到計數的輸出效果
		Thread.sleep(5000); 
		cluster.killTopology(TOPOLOGY_NAME);
		Thread.sleep(30000); 
		cluster.shutdown();
		
		//正式部署到storm叢集中使用StormSubmitter.submitTopology
//		StormSubmitter.submitTopology(TOPOLOGY_NAME,config, builder.createTopology());
		
		
	}
}
           

ReportBolt.java

package com.zte.apt.StormTest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;


public class ReportBolt extends BaseRichBolt
{
	private static final long serialVersionUID = 1L;
	
	private HashMap<String,Long> counts = null;
	
	@Override
	public void execute(Tuple tuple) 
	{
		String word = tuple.getStringByField("word");
		Long count = tuple.getLongByField("count");
		this.counts.put(word, count);
	}

	@Override
	public void prepare(Map config, TopologyContext context, OutputCollector collector) 
	{
		this.counts = new HashMap<String,Long>();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) 
	{
	}
	
	public void cleanup()
	{
		System.out.println("-------FINAL COUNTS--------");
		List<String> keys = new ArrayList<String>();
		keys.addAll(this.counts.keySet());
		Collections.sort(keys);
		for(String key:keys)
		{
			System.out.println(key+":"+this.counts.get(key));
		}
		System.out.println("-------FINAL COUNTS--------");
	}
}
           

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.zte.apt</groupId>
	<artifactId>StormTest</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<name>StormTest</name>
	<!-- FIXME change it to the project's website -->
	<url>http://www.example.com</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>

	<dependencies>
		<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.1</version>
    <scope>provided</scope>
</dependency>

	</dependencies>

	<build>
		<pluginManagement><!-- lock down plugins versions to avoid using Maven 
				defaults (may be moved to parent pom) -->
			<plugins>
				
			</plugins>
		</pluginManagement>
	</build>
</project>

           

三.storm基本概念

Storm叢集遵循主/從結構,Storm叢集由一個主節點(nimbus)和一個或者多個從工作節點(supervisor)組成,nimbus和supervisor的協作都依靠着zookeeper。

1.Nodes(伺服器節點),配置在Storm叢集中的伺服器,一個叢集可以包括一個或者多個工作node

2.nimbus(主節點),主要職責是管理,協調和監控在叢集上運作的topology,包括topology的釋出(釋出至supervisor),任務指派,事件處理失敗時重新指派任務,即如果發現某個supervisor沒有上報心跳或者已經不可達,則會将故障supervisor配置設定的task重新配置設定到叢集的其他supervisor。注意:nimbus不參與topology的資料處理過程

3.supervisor(從節點),等待numbus配置設定任務後生成并監控worker(JVM程序)執行任務,如果worker程序因為錯誤或者linux的kill-9指令等異常退出,supervisor會嘗試重新生成新的worker程序。

4.Workers(JVM虛拟機,程序),指一個node上互相獨立運作的JVM程序,每個node可以配置運作一個或者多個worker,每一個worker隻能綁定到一個topology

設定工作程序數,比如Config.setNumWorkers(3)

5.Executer(線程),指一個worker的jvm程序中運作的java線程,多個Task可以指派給同一個executer,預設Storm會給每一個Executer配置設定一個Task

設定線程數,比如builder.setBolt(SPLIT_BOLT_ID, splitBolt,2)

6.Task(bolt/spout執行個體),task是spout和bolt執行個體,它們的nextTuple()和executer()方法會被executor線程調用執行。

設定任務Task數builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4);

四.資料的分組政策

1.Shuffle grouping 随機分發tuple,發出多少個,bolt所有線程收到的總數就是多少個

2.Fields grouping 按字段分組,按照指定的字段組合值進行tuple的分發,如果值相同,tuple始終分發同一個bolt

比如有在單詞計數的時候,固定的a->bolt1,b->bolt2,c->bolt3,d->bolt1.

3.All grouping 全複制分組,每一個bolt都會接收到一個tuple的副本,比如發出10個,每個bolt的都會接收到10個

4.Direct Grouping 指向性分組,資料源(Spout/blot)會調用emitDirect方法來判斷一個tuple應該由哪個Storm元件來接收,隻能在生命了指向型資料流上使用。

比如Spout指定xxx資料隻能由TaskID=4的bolt來處理

5.Globle grouping全局分組 所有的tuple都會發送給具有最小taskID的bolt,也就是說并發度對該設定沒有效果。

6.None grouing不分組,其實和随機分組相同

7.CustomStreamGrouping 實作自定義分組

五.storm運作

1.在本地運作,使用LocalCluster,然後直接在eclipse中運作

2.在叢集上運作,使用StormSubmitter.submitTopology,然後将工程打包,不需要将storm依賴包一起打包,然後使用以下指令運作即可:

bin/storm jar WordCount.jar com.zte.StormTest.WordCountTopology

六.storm安裝

確定環境安裝了JDK1.8

1.安裝zookeeper

下載下傳zookeeper包,解壓

(1)先設定配置檔案

将conf目錄下的zoo_sample.cfg更名為zoo.cfg,預設端口為2181

(2)使用bin/zkServer.sh start 啟動zookeeper

2.安裝storm

解壓縮包

(1)bin目錄是啟動相關

(2)conf目錄是配置相關,其中storm.yml為配置項,裡面有包含配置zookeeper的配置項,預設為localhost

可以在

storm.zookeeper.servers:
   - "storm-01.test.com(主機名或者IP,10.42.27.1)"
   - "storm-02.test.com"
   - "storm-03.test.com"
           

nimbus.seeds 可以配置主伺服器

所有配置完以後然後也是通過直接拷貝整個storm檔案夾都其它的伺服器

(3)啟動主節點 bin/storm nimbus &

(4)啟動從節點 bin/storm supervisor &

(5)啟動UI界面 bin/storm ui &

(6)啟動日志檢視程序 bin/storm logviewer &

然後使用ip:8080/index.html 通路UI界面 192.168.1.104:8080/index.html