
StromTest :
package com.yuntian.test;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
public class StromTest {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{
TopologyBuilder bulider = new TopologyBuilder();
bulider.setSpout("myspout", new MySpout());
bulider.setBolt("mybolt", new MyBolt()).shuffleGrouping("myspout");
Config conf = new Config();
//当有参数的时候是集群执行
if(args !=null && args.length > 0){
conf.setNumWorkers(3);
StormSubmitter.submitTopology("StromTest", conf, bulider.createTopology());
}else{
conf.setDebug(true);
conf.setMaxSpoutPending(10);//多少个线程来启动
LocalCluster locla = new LocalCluster();
locla.submitTopology("StromTest", conf, bulider.createTopology());
}
}
}
MySpout:
package com.yuntian.test;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IRichSpout{
SpoutOutputCollector collector = null;
@Override
public void ack(Object arg0) {
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public void fail(Object arg0) {
}
@Override
public void nextTuple() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
collector.emit(new Values("111","222","333"));
}
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector=collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("aaa","bbb","ccc"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
MyBolt:
package com.yuntian.test;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class MyBolt implements IRichBolt{
@Override
public void cleanup() {
}
@Override
public void execute(Tuple arg0) {
System.out.println(arg0.getString(0));
System.out.println(arg0.getString(1));
System.out.println(arg0.getString(2));
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
本机连接storm集群在eclipse上测试运行: