天天看點

storm 實戰及執行個體講解(三)

storm 實戰及執行個體講解(三)

                                     ——comaple.zhang

                                                                                              ——2012-09-13

            本講将接着上一講,把一個完成的topology完成。上一節主要介紹了一個基本的topology的構造過程,以及每一步所對應的storm叢集中配置設定的資源情況。要想開發storm應用必須對上一講我提到的那些概念有完全的了解,否則開發出來的應用很有可能有這樣那樣的問題而無法工作。那麼接下來我們來一起定義一個spot節點和bolt節點。

spot節點:在實際的開發中這個節點可以起到和外界溝通的作用,他可以從一個資料庫中按照某種規則取資料,也可以從分布式隊列中取任務(該方式我會在後續章節中談到)。這裡我們将開發一個簡單的模拟資料噴發的節點。具體方式見代碼:

package com.jd.comaple.storm.test.spout;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import java.util.Map;

import java.util.Random;

public class SimpleSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private static String[] info = new String[]{

            "comaple\t,12424,44w46,654,12424,44w46,654,",

            "lisi\t,435435,6537,12424,44w46,654,",

            "lipeng\t,45735,6757,12424,44w46,654,",

            "hujintao\t,45735,6757,12424,44w46,654,",

            "jiangmin\t,23545,6457,2455,7576,qr44453",

            "beijing\t,435435,6537,12424,44w46,654,",

            "xiaoming\t,46654,8579,w3675,85877,077998,",

            "xiaozhang\t,9789,788,97978,656,345235,09889,",

            "ceo\t,46654,8579,w3675,85877,077998,",

            "cto\t,46654,8579,w3675,85877,077998,",

            "zhansan\t,46654,8579,w3675,85877,077998,"};

    Random rd = new Random();

    @Override

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        this.collector = collector;

    }

    @Override

    public void nextTuple() {

        try {

            String msg = info[rd.nextInt(10)];

            //調用發射方法

            collector.emit(new Values(msg));

            //模拟等待100ms

            Thread.sleep(100);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("source"));

    }

}

bolt節點: 處理節點,該節點接收噴發節點發送的資料進行簡單的處理後,發射出去。

package com.jd.comaple.storm.test.bolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class SimpleBolt extends BaseBasicBolt {

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields( "info"));

    }

    @Override

    public void execute(Tuple input, BasicOutputCollector collector) {

        try {

            String mesg = input.getString(0);

            if (mesg != null)

                collector.emit(new Values( mesg+"mesg is processed!"));

        } catch (Exception e) {

            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

        }

    }

}