天天看点

ack机制之代码实现,实现BaseRichBolt的方式,使用BaseBasicBolt的方式实现BaseRichBolt发ack和fail的功能

代码结构如下:

pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.toto.storm.kafkastormredis</groupId>
    <artifactId>kafkastormredis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <!--<scope>provided</scope>-->
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改-->
                            <mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
           

AckSpout如下:

package cn.toto.storm.ack;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 14:27
 */
public class AckSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //每次调用一次就发送一条消息
    @Override
    public void nextTuple() {
        //生产一条数据
        String uuid = UUID.randomUUID().toString().replace("_","");
        collector.emit(new Values(uuid),new Values(uuid));
        try{
            Thread.sleep( * );
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    //的定义发送的字段
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("xiaoxi:" + msgId);
    }

    @Override
    public void fail(Object msgId) {
        System.out.println("xiaoxi" + msgId);
        collector.emit((List)msgId,msgId);
    }
}
           

Bolt1的代码如下:

package cn.toto.storm.ack;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 14:56
 */
public class Bolt1 extends BaseRichBolt {
    private OutputCollector collector;

    //初始化方法,只调用一次
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    //被循环调用
    @Override
    public void execute(Tuple input) {
        collector.emit(input,new Values(input.getString()));
        System.out.println("bolt1的execute方法被调用一次" + input.getString());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

Bolt2的代码如下:

package cn.toto.storm.ack;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:01
 */
public class Bolt2 extends BaseRichBolt {
    private OutputCollector collector;

    //初始化方法,只调动一次
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    //被循环调用
    @Override
    public void execute(Tuple input) {
        collector.emit(input,new Values(input.getString()));
        System.out.println("bolt2的execute方法被调用一次" + input.getString());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

Bolt3的代码如下:

package cn.toto.storm.ack;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:04
 */
public class Bolt3 extends BaseRichBolt {
    private OutputCollector collector;

    //初始化方法,只调用一次
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    //被循环调用
    @Override
    public void execute(Tuple input) {
        collector.emit(input,new Values(input.getString()));
        System.out.println("bolt3的execute方法被调用一次" + input.getString());
        collector.fail(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

Bolt4的代码如下:

package cn.toto.storm.ack;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:06
 */
public class Bolt4 extends BaseRichBolt {
    private OutputCollector collector;

    //初始化方法,只调用一次
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    //被循环调用
    @Override
    public void execute(Tuple input) {
        collector.emit(input,new Values(input.getString()));
        System.out.println("bolt4的execute方法调用一次" + input.getString());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

右键运行项目

案例2

AckSpout代码如下:

package cn.toto.storm.basebasicbolt;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:23
 */
public class AckSpout  extends BaseRichSpout {
    private SpoutOutputCollector collector;

    //初始化方法
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //上帝之手,循环调用,每调用过一次就发送一条消息
    public void nextTuple() {
        //生产一条数据
        String uuid = UUID.randomUUID().toString().replace("_", "");
        collector.emit(new Values(uuid),new Values(uuid));
        try {
            Thread.sleep(*);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //定义发送的字段
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("消息处理成功" + msgId);
    }

    @Override
    public void fail(Object msgId) {
        System.out.println("消息处理失败:重发" + msgId);
        collector.emit((List)msgId,msgId);
    }
}
           

Bolt1的配置如下:

package cn.toto.storm.basebasicbolt;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 20`这里写代码片`17-06-21 15:32
 */
public class Bolt1 extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        collector.emit(new Values(input.getString()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

Bolt2的配置如下:

package cn.toto.storm.basebasicbolt;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:33
 */
public class Bolt2 extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        collector.emit(new Values(input.getString()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

Bolt3的代码配置如下:

package cn.toto.storm.basebasicbolt;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:34
 */
public class Bolt3 extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        collector.emit(new Values(input.getString()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

Bolt4的代码配置如下:

package cn.toto.storm.basebasicbolt;/**
 * Created by toto on 2017/6/21.
 */

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:35
 */
public class Bolt4  extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        collector.emit(new Values(input.getString()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("uuid"));
    }
}
           

AckTopologyDriver的代码如下:

package cn.toto.storm.basebasicbolt;/**
 * Created by toto on 2017/6/21.
 */

import cn.toto.storm.ack.AckSpout;
import cn.toto.storm.ack.Bolt1;
import cn.toto.storm.ack.Bolt3;
import cn.toto.storm.ack.Bolt4;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

/**
 * 代码说明
 *
 * @author tuzq
 * @create 2017-06-21 15:37
 */
public class AckTopologyDriver {

    public static void main(String[] args) {
        //1、准备任务信息
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("mySpout", new AckSpout(), );
        topologyBuilder.setBolt("bolt1",new Bolt1(),).shuffleGrouping("mySpout");
        topologyBuilder.setBolt("bolt2",new Bolt2(),).shuffleGrouping("bolt1");
        topologyBuilder.setBolt("bolt3",new Bolt3(),).shuffleGrouping("bolt2");
        topologyBuilder.setBolt("bolt4",new Bolt4(),).shuffleGrouping("bolt3");

        Config config = new Config();
        config.setNumWorkers();
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcount",config,stormTopology);
    }
}
           

当失败了的时候,抛出:throw FailedException ,然后可以实现类似fail()方法。