一、離線計算與實時計算
離線計算: 批量擷取資料、批量傳輸資料;周期性計算資料,展示資料;
代表技術: sqoop批量導入,HDFS批量存儲,mapreduce批量計算,Hive批量計算資料…
就業方向: hivesql, Hadoop叢集運維
實時計算: 資料實時産生,資料實時傳輸,資料實時計算,實時展示;
代表技術: Flume實時擷取資料,kafka/metaq 實時資料存儲,storm/Jstrom實時計算資料,redis實時結果緩存,mysql持久化存儲…
流式計算一般架構圖

flume實時采集資料。
Kafka消息隊列,用來臨時儲存資料。
Strom實時計算資料。
Redis實時存儲資料。
流式計算就是将源源不斷産生的資料實時收集并實時計算,盡可能快的得到計算結果。
二、Strom介紹
Storm用來實時計算資料架構,特點:低延遲、高可用、分布式、可擴充、資料不丢失。提供簡單容易了解的接口,便于開發。
Storm與Hadoop差別
Storm用于實時計算,Storm處理的資料儲存在記憶體中,資料源源不斷,通過網絡傳輸進來;
Hadoop用于離線計算,處理的一批一批資料儲存在檔案系統中,資料儲存在磁盤中。
三、Strom程式設計模型
Nimbus : 任務配置設定,對任務監督;
Zookeeper : 儲存任務配置設定的資訊、心跳資訊、中繼資料資訊。
Supervisor : 接受任務,并啟動worker。worker的數量根據端口号來的;
Worker : 執行任務的具體元件(其實就是一個JVM),可以執行Spout或者bolt兩種類型的任務; 一個worker就是一個端口号;
Task : Task=線程=executor, 一個Task屬于一個Spout或者Bolt并發任務;
并發度:
使用者指定的一個任務,可以被多個線程執行,對應到storm中就是一個task,并發度的數量等于線程的數量。一個任務的多個線程,會被運作在多個Worker(JVM)上,有一種類似于平均算法的負載均衡政策。盡可能減少網絡IO;
一個Strom程式可以擷取多個資料源,每個topology的資料都是自己獨有的,和其他的topology沒有關系。
spout : 擷取外部資料源;
Bolt : 業務邏輯處理節點,可以有多個;
Tuple : 消息發送的最小單元,是一個Tuple對象。
四、Strom叢集部署
搭建前,需要安裝好zookeeper叢集
1、準備工作
配置hosts
vi /etc/hosts
192.168.239.128 storm01 zk01 hadoop01
192.168.239.129 storm02 zk02 hadoop02
192.168.239.130 storm03 zk03 hadoop03
關閉防火牆
chkconfig iptables off && setenforce 0
建立使用者
groupadd realtime && useradd realtime && usermod -a -G realtime realtime
建立工作目錄并賦權
mkdir /export
mkdir /export/servers
chmod 755 -R /export
切換到realtime使用者下
su realtime
2、解壓壓縮包
3、修改配置檔案
4、分發壓縮包
5、啟動叢集
6、檢視叢集
通路storm01:8080,即可看到storm的ui界面。
四、Storm操作指令
1、送出任務
storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】
storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
注意:因為從外部擷取資料、結果儲存到redis,是以不需要指定輸入、輸出
路徑,這是和hadoop差別
2、殺死任務
storm kill 【拓撲名稱】 -w 10
storm kill topology-name -w 10
3、停運任務
storm deactivte 【拓撲名稱】
storm deactivte topology-name
五、問題解答
1、worker與topology
一個worker隻屬于一個topology,一個topology包含多個worker,其實就是這個topology運作在多個worker上。
一個topology要求的worder數量如何不被滿足,叢集在配置設定任務時,根據現有的worker先運作的topology。如果目前叢集中worder數量為0,那麼最新送出的topology将隻會标示為active, 不會運作。隻有叢集有了空閑資源才會被運作。
2、StreamGrouping:(分組政策)
shuffleGrouping(随機分組)、FieldGrouping(按字段分組)、不分組等
3、運作模式
1、叢集模式:在叢集上運作
2、本地模式:在目前idea上運作,測試代碼功能時,可以選擇該模式
4、如何指定驅動類中每個元件的并發度數量
1、根據上遊的資料量來設定spout并發度
2、根據業務複雜度和execute方法執行時間設定bolt并發度
3、根據叢集可用資源配置,一般情況下70%資源使用率
5、如何設定worker的數量
worker的數量根據程式并發度task數量來均分,實際業務場景中,并反複調整
六、worldcount代碼編寫
1、WordCountTopologMain
package cn.itcast.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* Created by maoxiangyi on 2016/4/27.
*/
public class WordCountTopologMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1、準備一個TopologyBuilder
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout",new MySpout(),2);
topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");
topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));
//topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).shuffleGrouping("mybolt1");
// config.setNumWorkers(2);
/**
* i
* am
* lilei
* love
* hanmeimei
*/
//2、建立一個configuration,用來指定目前topology 需要的worker的數量
Config config = new Config();
config.setNumWorkers(2);
//3、送出任務 -----兩種模式 本地模式和叢集模式
//StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
}
}
2、MySpout
package cn.itcast.storm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* Created by maoxiangyi on 2016/4/27.
*/
public class MySpout extends BaseRichSpout {
SpoutOutputCollector collector;
//初始化方法
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
//storm 架構在 while(true) 調用nextTuple方法
public void nextTuple() {
collector.emit(new Values("i am lilei love hanmeimei"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("love"));
}
}
3、MySplitBolt
package cn.itcast.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* Created by maoxiangyi on 2016/4/27.
*/
public class MySplitBolt extends BaseRichBolt {
OutputCollector collector;
//初始化方法
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
// 被storm架構 while(true) 循環調用 傳入參數tuple
public void execute(Tuple input) {
String line = input.getString(0);
String[] arrWords = line.split(" ");
for (String word:arrWords){
collector.emit(new Values(word,1));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
package cn.itcast.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* Created by maoxiangyi on 2016/4/27.
*/
public class MyCountBolt extends BaseRichBolt {
OutputCollector collector;
Map<String, Integer> map = new HashMap<String, Integer>();
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String word = input.getString(0);
Integer num = input.getInteger(1);
System.out.println(Thread.currentThread().getId() + " word:"+word);
if (map.containsKey(word)){
Integer count = map.get(word);
map.put(word,count + num);
}else {
map.put(word,num);
}
// System.out.println("count:"+map);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//不輸出
}
}