Storm是一個實時分布式計算架構,是真正的實時處理架構。針對資料來一條記錄就處理一次;流式資料處理技術架構 ,可以達到毫秒級别。
SparkStreaming是一個微批資料處理架構 ,隻能達到秒級
一、Storm叢集機構

Nimbus:主節點
- 接受用戶端任務Topology的送出,負責在叢集中發送代碼(jar包)
- 發送任務到zookeeper,等待Supervisor領取
- 監聽叢集狀态
- 容錯機制:當Supervisor挂掉,重新配置設定任務給其他Supervisor執行
Supervisor:從節點
- 從zookeeper上擷取Nimbus配置設定的任務,負責啟動和停止本機上的Worker程序來執行任務。
- 需要将自己的運作狀态彙報到zookeeper上,有Nimbus監控
Worker
- 負責啟動本機上的Executor線程來執行任務Task
- 負責與其他Worker之間進行資料傳輸
- 需要将自己的狀态彙報到zookeeper上,有Nimbus監控
Executor:真正執行任務Task的線程,由Worker啟動和停止
Zookeeper
- 存儲任務排程資訊,各節點狀态資訊,心跳
- 使Storm叢集保持無狀态,這樣具有高可靠性
Zookeeper上各節點存儲的資訊
- /storm/supervisor/supervisor-id:supervisor節點的心跳
- /storm/storms/topology-id:topology基本資訊
- /storm/assignments/topology-id:topology排程資訊
- /storm/workbeats/topology-id/worker-id:worker程序的心跳
- /storm/errors/topology-id:任務運作錯誤資訊
二、Storm安裝部署與測試運作
安裝要求
- zookeeper叢集
- python 2.6.6以上:$ python --version
安裝
1.解壓
$ tar -zxvf apache-storm-0.9.6.tar.gz -C /opt/modules/
2.修改配置檔案
vim /opt/modules/apache-storm-0.9.6/conf/storm_env.ini
JAVA_HOME:/opt/modules/jdk1.7.0_67
vim /opt/modules/apache-storm-0.9.6/conf/storm.yaml
storm.zookeeper.servers:
- "hive-stu.ibeifeng.com"
nimbus.host: "nimbus" nimbus 主節點
storm.local.dir: "/mnt/storm" 參數指定storm本地檔案存放目錄,存放任務jar的目錄
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
指定supervisor節點啟動一些工作程序 worker 所使用的預設端口
supervisor storm的從節點
ui : storm web監控頁面
啟動
$ nohup bin/storm nimbus > /dev/null 2>&1 &
$ nohup bin/storm supervisor > /dev/null 2>&1 &
$ nohup bin/storm ui > /dev/null 2>&1 &
$ nohup bin/storm logviewer > /dev/null 2>&1 &
測試運作
$ bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
ui界面:http://hadoop.com:8081/
storm批啟動腳本編寫
統一寫在節點nimbus伺服器
1、zookeeper批啟動腳本:
zoo.cfg
server.1=hostname:2888:388
zookeeper.sh 内容:
#!/bin/bash
if [ $# -ne 1 ];then
echo "Usage: bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}"
exit 4
fi
source /etc/profile
ZK_HOME=/opt/modules/zookeeper-3.4.5-cdh5.3.6
for node in hive-stu.ibeifeng.com
do
echo "$1 zookeeper in $node"
ssh $node "source /etc/profile && $ZK_HOME/bin/zkServer.sh $1"
done
2、Storm停止腳本
主節點
nimbus
ui
從節點
logviewer
supervisor
在storm的主節點上添加一個指定supervisor節點的檔案
supervisors
/opt/modules/apache-storm-0.9.6/conf/supervisors
内容 一行一個Supervisor節點伺服器主機名
$ kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep daemon.supervisor | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep daemon.logviewer | awk '{print $2}' | head -n 1`
stop-storm.sh
内容:
#!/bin/bash
source /etc/profile
STORM_HOME=/opt/modules/apache-storm-0.9.6
#先在主節點上停止nimbus和ui程序
kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
#在從節點上停止logviewer和supervisor
SUPERVISORS=$(cat $STORM_HOME/conf/supervisors)
for supervisor in $SUPERVISORS
do
echo "stop supervisor and logviewer in $supervisor"
ssh $supervisor kill -9 `ssh $supervisor ps -ef | grep daemon.supervisor | awk '{print $2}'|head -n 1`
ssh $supervisor kill -9 `ssh $supervisor ps -ef | grep daemon.logviewer | awk '{print $2}'|head -n 1`
done
3、批啟動腳本
start-storm.sh
内容:
#!/bin/bash
source /etc/profile
STORM_HOME=/opt/modules/apache-storm-0.9.6
#先在主節點上啟動nimbus和ui程序
$STORM_HOME/bin/storm nimbus >/dev/null 2>&1 &
$STORM_HOME/bin/storm ui >/dev/null 2>&1 &
#在從節點上啟動logviewer和supervisor
SUPERVISORS=$(cat $STORM_HOME/conf/supervisors)
for supervisor in $SUPERVISORS
do
echo "start supervisor and logviewer in $supervisor"
ssh $supervisor "source /etc/profile && nohup $STORM_HOME/bin/storm supervisor >/dev/null 2>&1" >/dev/null 2>&1 &
ssh $supervisor "source /etc/profile && nohup $STORM_HOME/bin/storm logviewer >/dev/null 2>&1" >/dev/null 2>&1 &
done
- 三、Storm的程式設計模型
Topology:拓撲圖,DVG有向無環圖
資料結構:stream,tuple
Spout:資料采集器,從資料源采集資料,發送給Bolt
Bolt:資料處理器(Filter,function,aggregate聚合,join組合,存儲資料到資料庫等操作)
資料流分組類型
- Shuffle Grouping:随機分組,保證分發到每個Bolt的Task接收Tople數量大緻一緻
- Fields Grouping:按照字段分組,保證相同的字段分發到相同的Task
- Global Grouping:全局分組,所有的Tuple都發送到一個Task
Storm并發
- 多個Worker
- 在Storm的配置中配置:topology.workers
- 在啟動topology的時候配置:Config config = new Config();conf.setNumWorkders(num) 優先級更高
- 多個Executor:在指定Spout或者Bolt的時候指定
- 指定Task的并發:輪流執行