1.storm是什麼
strom是apache下的一個頂級的項目,官網位址為http://storm.apache.org/
借用官網的一段話
Apache Storm is a free and open source distributed realtime computation system
是一個免費開源的分布式實時計算系統
2.strom的特點是什麼和應用場景舉例
特點:實時,可擴充,容錯,多語言
應用場景:實時統計使用者的行為,推送使用者感興趣的内容;實時分析統計日志内容,做到預警等
3.strom核心概念
1)tuple 單個資料,可了解為水滴
2)stream 資料流,可了解為水流
3)spout 資料的發送者,spout從外部擷取資料,比如kafka,可了解為水龍頭
4)bolt 資料的處理單元 可以嵌套 可以了解為 水桶、水壺
5)Topology 拓撲 由spouts和bolts組成的有序無環圖,可以了解為整個處理水流頭流出水的流程。
4.入門demo
使用的是1.2.3版本
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!-- <scope>provided</scope>--> <!--打包的時候是不需要的,但編譯的時候需要-->
</dependency>
package club.songhq.bigdata;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* @description 本地求和
* @author songhaiqiang
* @date 2020/1/18 17:32
*
* @return
*/
public class LocalSumStormTopology {
/**
* @description spout實作baseRichSpout
* @author songhaiqiang
* @date 2020/1/18 17:35
*
* @return
*/
public static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
/**
* @description 聲明輸出字段
* @author songhaiqiang
* @date 2020/1/18 17:57
* @param declarer
* @return void
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number"));
}
/**
* @description
* @author songhaiqiang
* @date 2020/1/18 17:37
* @param conf 配置參數
* @param context 上下文
* @param collector 資料發射器
* @return void
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
int number = 0;
/**
* @description 會産生資料,在生産上是從消息隊列中擷取資料
*
* 要不停的發送資料就一定是個死循環,
* @author songhaiqiang
* @date 2020/1/18 17:34
* @return void
*/
@Override
public void nextTuple() {
collector.emit(new Values(++number));
System.out.println("Spout : " +number);
//防止資料産生太快,這個方法還提複雜的
Utils.sleep(1000);
}
}
public static class SumBolt extends BaseRichBolt{
int sum = 0;
/**
* @description 初始化方法,會被執行一次
* @author songhaiqiang
* @date 2020/1/18 18:44
* @param stormConf
* @param context
* @param collector
* @return void
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* @description 該方法也是一個死循環
* @author songhaiqiang
* @date 2020/1/18 18:45
* @param input
* @return void
*/
@Override
public void execute(Tuple input) {
//Bolt中擷取值可以根據index擷取,也可以根據上一個環節定義的filed名稱擷取(建議使用該方式)
Integer number = input.getIntegerByField("number");
sum += number;
System.out.println("Bolt 輸出的值是 : "+ sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//這裡沒有繼續向下面傳遞tuple
}
}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("DataSourceSpout",new DataSourceSpout());
//表示前後邏輯關系
topologyBuilder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
//穿件一個Storm叢集:本地模式運作,不需要搭建storm叢集
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("LocalSumStormTopology",new Config(),topologyBuilder.createTopology());
}
}
5.storm并行度
storm是在生産部署上分為nimbus,supervisor ,worker(slot) ,executor,task
worker(slot)是程序上的概念,一個supervisor預設可配置4個worker,可有這個參數指定端口号 supervisor.slots.ports,端口号的個數就是worker的個數,一個worker處理的是一個topology的子集,也就是說,一個worker隻處理一個topology,但一個topology可以交給多個worker處理。
executor則是線程上的概念,一個worker上可以起多少個線程。
task則是處理資料的最小單元,可以是一個線程對應一個task,也可以是一個線程對應對應多個task。
下面是一個storm cluster的生産部署ui總覽。
