天天看點

Storm的一個簡單例子Storm的一個簡單例子

Storm的一個簡單例子

  • Storm的一個簡單例子
    • maven依賴
      • 相關類
      • LogAnalyserStorm
      • FakeCallLogReaderSpout
      • CallLogCreatorBolt
      • CallLogCounterBolt

Storm的一個簡單例子

maven依賴

<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.3</version>
        </dependency>
           

相關類

  • LogAnalyserStorm.java
  • FakeCallLogReaderSpout.java
  • CallLogCreatorBolt.java
  • CallLogCounterBolt.java

LogAnalyserStorm

package com.storm.demo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class LogAnalyserStorm {

    public static void main(String... args) throws Exception{
        Config config = new Config();
        config.setDebug(true);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

        builder
                .setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
                .shuffleGrouping("call-log-reader-spout");
        builder
                .setBolt("call-log-counter-bolt", new CallLogCounterBolt())
                .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
        Thread.sleep(10000);

        cluster.shutdown();
    }
}
           

FakeCallLogReaderSpout

package com.storm.demo;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class FakeCallLogReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private boolean completed = false;

    private TopologyContext context;

    private Random randomGenerator = new Random();
    private Integer idx = 0;

    /**
     * 為Spout提供執行環境,執行器将運作此方法來初始化噴頭
     * @param map 為此Spout提供storm配置
     * @param topologyContext 提供有關拓撲中的Spout位置,其任務ID,輸入和輸出資訊的完整資訊
     * @param spoutOutputCollector 使我們能夠發出将由bolts處理的元祖
     */
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.collector = spoutOutputCollector;
    }

    /**
     * 當Spout将要關閉時調用此方法
     */
    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    /**
     * 通過收集器發出生成的資料
     */
    @Override
    public void nextTuple() {

        if (this.idx <= 1000) {
            List<String> mobileNumbers = new ArrayList<>();
            mobileNumbers.add("13805194441");
            mobileNumbers.add("13805194442");
            mobileNumbers.add("13805194443");
            mobileNumbers.add("13805194444");

            Integer localIndex = 0;
            while (localIndex++ < 100 && this.idx++ < 1000) {
                String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

                while (fromMobileNumber == toMobileNumber){
                    toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                }

                Integer duration = randomGenerator.nextInt(60);
                this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
            }
        }
    }

    /**
     * 确認處理了特定元祖
     * @param o
     */
    @Override
    public void ack(Object o) {

    }

    /**
     * 指定不處理和不重新處理特定元祖
     * @param o
     */
    @Override
    public void fail(Object o) {

    }

    /**
     * 聲明元祖的輸出模式
     * @param outputFieldsDeclarer 用于聲明輸出流ID,輸出字段等
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
           

CallLogCreatorBolt

package com.storm.demo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class CallLogCreatorBolt implements IRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String from = tuple.getString(0);
        String to = tuple.getString(1);
        Integer duration = tuple.getInteger(2);
        collector.emit(new Values(from + " - " + to, duration));
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
           

CallLogCounterBolt

package com.storm.demo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CallLogCounterBolt implements IRichBolt {

    Map<String, Integer> counterMap;
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.counterMap = new HashMap<>();
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String call = tuple.getString(0);
        Integer duration = tuple.getInteger(1);

        if (!counterMap.containsKey(call)) {
            counterMap.put(call, 1);
        } else {
            Integer c = counterMap.get(call) + 1;
            counterMap.put(call, c);
        }

        collector.ack(tuple);
    }

    @Override
    public void cleanup() {
        counterMap.entrySet().forEach(f -> {
            System.out.println(f.getKey() + " : " + f.getValue());
        });
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}