天天看点

storm 实战及实例讲解(三)

storm 实战及实例讲解(三)

                                     ——comaple.zhang

                                                                                              ——2012-09-13

            本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。

spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码:

package com.jd.comaple.storm.test.spout;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import java.util.Map;

import java.util.Random;

public class SimpleSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private static String[] info = new String[]{

            "comaple\t,12424,44w46,654,12424,44w46,654,",

            "lisi\t,435435,6537,12424,44w46,654,",

            "lipeng\t,45735,6757,12424,44w46,654,",

            "hujintao\t,45735,6757,12424,44w46,654,",

            "jiangmin\t,23545,6457,2455,7576,qr44453",

            "beijing\t,435435,6537,12424,44w46,654,",

            "xiaoming\t,46654,8579,w3675,85877,077998,",

            "xiaozhang\t,9789,788,97978,656,345235,09889,",

            "ceo\t,46654,8579,w3675,85877,077998,",

            "cto\t,46654,8579,w3675,85877,077998,",

            "zhansan\t,46654,8579,w3675,85877,077998,"};

    Random rd = new Random();

    @Override

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        this.collector = collector;

    }

    @Override

    public void nextTuple() {

        try {

            String msg = info[rd.nextInt(10)];

            //调用发射方法

            collector.emit(new Values(msg));

            //模拟等待100ms

            Thread.sleep(100);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("source"));

    }

}

bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。

package com.jd.comaple.storm.test.bolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class SimpleBolt extends BaseBasicBolt {

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields( "info"));

    }

    @Override

    public void execute(Tuple input, BasicOutputCollector collector) {

        try {

            String mesg = input.getString(0);

            if (mesg != null)

                collector.emit(new Values( mesg+"mesg is processed!"));

        } catch (Exception e) {

            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

        }

    }

}