天天看點

初識 storm 分布式流計算

       在2011年Storm開源之前,由于Hadoop的火紅,整個業界都在喋喋不休地談論大資料。Hadoop的高吞吐,海量資料處理的能力使得人們可以友善地地處理海量資料。但是,Hadoop的缺點也和它的優點同樣鮮明——延遲大,響應緩慢,運維複雜。

有需求也就有創造,在Hadoop基本奠定了大資料霸主地位的時候很多的開源項目都是以彌補Hadoop的實時性為目标而被創造出來。而在這個節骨眼上

Storm橫空出世了。

Storm帶着流式計算的标簽華麗麗滴出場了,其實storm相對來說比hadoop複雜多了.如果你學會storm以後再去看hadoop那是非常簡單的,看看它的一些賣點

    1.  分布式系統:可橫向拓展,現在的項目不帶個分布式特性都不好意思開源。
    2. 運維簡單:Storm的部署的确簡單。雖然沒有Mongodb的解壓即用那麼簡單,但是它也就是多安裝兩個依賴庫而已。
    3. 高度容錯:子產品都是無狀态的,随時當機重新開機。
    4. 無資料丢失:Storm創新性提出的ack消息追蹤架構和複雜的事務性處理,能夠滿足很多級别的資料處理需求。不過,越高的資料處理需求,性能下降越嚴重。
    5. 多語言:實際上,Storm的多語言更像是臨時添加上去似的。因為,你的送出部分還是要使用Java實作.    

  下面我們來對比一下hadoop與storm

初識 storm 分布式流計算

但是他們沒有誰強誰弱,應為側重的方向不一樣,hadoop側重離線海量資料的分析,而storm需要實時計算分析,比如交通流量的調查,

淘寶或者京東購買商品時候的實時推薦等等 都可以通過storm實作.Storm主要分為兩種元件Nimbus和Supervisor。

這兩種元件都是快速失敗的,沒有狀态。任務狀态和心跳資訊等都儲存在Zookeeper上的,送出的代碼資源都在本地機器的硬碟上。

    1. Nimbus負責在叢集裡面發送代碼,配置設定工作給機器,并且監控狀态。全局隻有一個。
    2. Supervisor會監聽配置設定給它那台機器的工作,根據需要啟動/關閉工作程序Worker。每一個要運作Storm的機器上都要部署一個,并且,按照機器的配置設定上面配置設定的槽位數。
    3. Zookeeper是Storm重點依賴的外部資源。
    4. Nimbus和Supervisor甚至實際運作的Worker都是把心跳儲存在Zookeeper上的。
    5. Nimbus也是根據Zookeerper上的心跳和任務運作狀況,進行排程和任務配置設定的。Storm送出運作的程式稱為Topology。
    6. Topology處理的最小的消息機關是一個Tuple,也就是一個任意對象的數組。Topology由Spout和Bolt構成。
    7. Spout是發出Tuple的結點。Bolt可以随意訂閱某個Spout或者Bolt發出的Tuple。Spout和Bolt都統稱為component。
我們來看張經典的Topology的流程圖就明白了
初識 storm 分布式流計算

在看張storm的架構圖

初識 storm 分布式流計算

可能看到這或許你大概知道怎麼工作的了,

下面看下我們的hell world

首先引入依賴

<dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.0</version>
      </dependency>
           

  看下我們的Topology 

public static void main(String[] args) throws InterruptedException {
        //初始化top構造器
        TopologyBuilder top=new TopologyBuilder();
        //設定spout
        top.setSpout("spout",new TestSpout());
        //設定bolt
        top.setBolt("print-bolt",new TestBoot()).shuffleGrouping("spout");
        top.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");
        //本地模式
        LocalCluster cluster=new LocalCluster();
        //送出top圖
        cluster.submitTopology("tag",new Config(),top.createTopology());
        Thread.sleep(10000);
        cluster.killTopology("tap");
        cluster.shutdown();

    }
           

下面是Spout 

public class TestSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector collector;

    private static final Map<Integer, String> map = new HashMap<Integer, String>();

    static {
        map.put(0, "java");
        map.put(1, "php");
        map.put(2, "groovy");
        map.put(3, "python");
        map.put(4, "ruby");
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //對spout進行初始化
        this.collector = collector;
        System.out.println("++++++++++++++++++++++++++++++");
        //System.out.println(this.collector);
    }


    /**
     * <B>方法名稱:</B>輪詢tuple<BR>
     * <B>概要說明:</B><BR>
     * @see
     */
    @Override
    public void nextTuple() {
        //随機發送一個單詞
        final Random r = new Random();
        int num = r.nextInt(5);
        System.out.println("----------------------------");
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.collector.emit(new Values(map.get(num)));
    }

    /**
     * <B>方法名稱:</B>declarer聲明發送資料的field<BR>
     * <B>概要說明:</B><BR>
     * @see
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("=============================");
        //進行聲明
        declarer.declare(new Fields("print"));
    }

}
           

TestBoot

public class TestBoot extends BaseRichBolt {

    OutputCollector _collector;


    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector=outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String spout = tuple.getStringByField("print");
        System.out.println("1111111111111"+spout);
        //發射字段
        _collector.emit(new Values(spout));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //這是下遊接收者
        outputFieldsDeclarer.declare(new Fields("write"));
    }
}
           

writeBolt

public class WriteBolt extends BaseRichBolt {

	private static final long serialVersionUID = 1L;

	private static final Log log = LogFactory.getLog(WriteBolt.class);
	
	private FileWriter writer ;
	@Override
	public void execute(Tuple input) {
		//擷取上一個元件所聲明的Field
		String text = input.getStringByField("write");
		System.out.println(text);
		try {
			if(writer == null){
				if(System.getProperty("os.name").equals("Windows 10")){
					writer = new FileWriter("D:\\099_test\\" + this);
				} else if(System.getProperty("os.name").equals("Windows 8.1")){
					writer = new FileWriter("D:\\099_test\\" + this);
				} else if(System.getProperty("os.name").equals("Windows 7")){
					writer = new FileWriter("D:\\099_test\\" + this);
				} else if(System.getProperty("os.name").equals("Linux")){
					System.out.println("----:" + System.getProperty("os.name"));
					writer = new FileWriter("/usr/local/temp/" + this);
				}
			}
			log.info("【write】: 寫入檔案");
			writer.write(text);
			writer.write("\n");
			writer.flush();
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}


	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

	}

	@Override
	public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

	}

}
           

我們的Topology圖的流程是這樣的由TestSpout 流向 TestBoot 然後在流向 WriteBolt ,TestBoot負責在控制台列印,WriteBolt 負責往磁盤寫入 ,下面使我們的運作結果

初識 storm 分布式流計算

通過Spout随機給TestBoot發送單詞,然後經過WriteBolt進行寫入到磁盤.并且啟動的是本地模式.如果想在伺服器上的storm環境中運作Topology 需要改成叢集模式

try {
            //開啟叢集模式
            StormSubmitter.submitTopology("top1", new Config(), top.createTopology());
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        } catch (AuthorizationException e) {
            e.printStackTrace();
        }
           

本文章重點是要讓大家對Storm有個初步的認識,希望能幫到大家