Storm是一个实时分布式计算框架,是真正的实时处理框架。针对数据来一条记录就处理一次;流式数据处理技术框架 ,可以达到毫秒级别。
SparkStreaming是一个微批数据处理框架 ,只能达到秒级
一、Storm集群机构

Nimbus:主节点
- 接受客户端任务Topology的提交,负责在集群中发送代码(jar包)
- 发送任务到zookeeper,等待Supervisor领取
- 监听集群状态
- 容错机制:当Supervisor挂掉,重新分配任务给其他Supervisor执行
Supervisor:从节点
- 从zookeeper上获取Nimbus分配的任务,负责启动和停止本机上的Worker进程来执行任务。
- 需要将自己的运行状态汇报到zookeeper上,有Nimbus监控
Worker
- 负责启动本机上的Executor线程来执行任务Task
- 负责与其他Worker之间进行数据传输
- 需要将自己的状态汇报到zookeeper上,有Nimbus监控
Executor:真正执行任务Task的线程,由Worker启动和停止
Zookeeper
- 存储任务调度信息,各节点状态信息,心跳
- 使Storm集群保持无状态,这样具有高可靠性
Zookeeper上各节点存储的信息
- /storm/supervisor/supervisor-id:supervisor节点的心跳
- /storm/storms/topology-id:topology基本信息
- /storm/assignments/topology-id:topology调度信息
- /storm/workbeats/topology-id/worker-id:worker进程的心跳
- /storm/errors/topology-id:任务运行错误信息
二、Storm安装部署与测试运行
安装要求
- zookeeper集群
- python 2.6.6以上:$ python --version
安装
1.解压
$ tar -zxvf apache-storm-0.9.6.tar.gz -C /opt/modules/
2.修改配置文件
vim /opt/modules/apache-storm-0.9.6/conf/storm_env.ini
JAVA_HOME:/opt/modules/jdk1.7.0_67
vim /opt/modules/apache-storm-0.9.6/conf/storm.yaml
storm.zookeeper.servers:
- "hive-stu.ibeifeng.com"
nimbus.host: "nimbus" nimbus 主节点
storm.local.dir: "/mnt/storm" 参数指定storm本地文件存放目录,存放任务jar的目录
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
指定supervisor节点启动一些工作进程 worker 所使用的默认端口
supervisor storm的从节点
ui : storm web监控页面
启动
$ nohup bin/storm nimbus > /dev/null 2>&1 &
$ nohup bin/storm supervisor > /dev/null 2>&1 &
$ nohup bin/storm ui > /dev/null 2>&1 &
$ nohup bin/storm logviewer > /dev/null 2>&1 &
测试运行
$ bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
ui界面:http://hadoop.com:8081/
storm批启动脚本编写
统一写在节点nimbus服务器
1、zookeeper批启动脚本:
zoo.cfg
server.1=hostname:2888:388
zookeeper.sh 内容:
#!/bin/bash
if [ $# -ne 1 ];then
echo "Usage: bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}"
exit 4
fi
source /etc/profile
ZK_HOME=/opt/modules/zookeeper-3.4.5-cdh5.3.6
for node in hive-stu.ibeifeng.com
do
echo "$1 zookeeper in $node"
ssh $node "source /etc/profile && $ZK_HOME/bin/zkServer.sh $1"
done
2、Storm停止脚本
主节点
nimbus
ui
从节点
logviewer
supervisor
在storm的主节点上添加一个指定supervisor节点的文件
supervisors
/opt/modules/apache-storm-0.9.6/conf/supervisors
内容 一行一个Supervisor节点服务器主机名
$ kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep daemon.supervisor | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep daemon.logviewer | awk '{print $2}' | head -n 1`
stop-storm.sh
内容:
#!/bin/bash
source /etc/profile
STORM_HOME=/opt/modules/apache-storm-0.9.6
#先在主节点上停止nimbus和ui进程
kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
#在从节点上停止logviewer和supervisor
SUPERVISORS=$(cat $STORM_HOME/conf/supervisors)
for supervisor in $SUPERVISORS
do
echo "stop supervisor and logviewer in $supervisor"
ssh $supervisor kill -9 `ssh $supervisor ps -ef | grep daemon.supervisor | awk '{print $2}'|head -n 1`
ssh $supervisor kill -9 `ssh $supervisor ps -ef | grep daemon.logviewer | awk '{print $2}'|head -n 1`
done
3、批启动脚本
start-storm.sh
内容:
#!/bin/bash
source /etc/profile
STORM_HOME=/opt/modules/apache-storm-0.9.6
#先在主节点上启动nimbus和ui进程
$STORM_HOME/bin/storm nimbus >/dev/null 2>&1 &
$STORM_HOME/bin/storm ui >/dev/null 2>&1 &
#在从节点上启动logviewer和supervisor
SUPERVISORS=$(cat $STORM_HOME/conf/supervisors)
for supervisor in $SUPERVISORS
do
echo "start supervisor and logviewer in $supervisor"
ssh $supervisor "source /etc/profile && nohup $STORM_HOME/bin/storm supervisor >/dev/null 2>&1" >/dev/null 2>&1 &
ssh $supervisor "source /etc/profile && nohup $STORM_HOME/bin/storm logviewer >/dev/null 2>&1" >/dev/null 2>&1 &
done
- 三、Storm的编程模型
Topology:拓扑图,DVG有向无环图
数据结构:stream,tuple
Spout:数据采集器,从数据源采集数据,发送给Bolt
Bolt:数据处理器(Filter,function,aggregate聚合,join组合,存储数据到数据库等操作)
数据流分组类型
- Shuffle Grouping:随机分组,保证分发到每个Bolt的Task接收Tople数量大致一致
- Fields Grouping:按照字段分组,保证相同的字段分发到相同的Task
- Global Grouping:全局分组,所有的Tuple都发送到一个Task
Storm并发
- 多个Worker
- 在Storm的配置中配置:topology.workers
- 在启动topology的时候配置:Config config = new Config();conf.setNumWorkders(num) 优先级更高
- 多个Executor:在指定Spout或者Bolt的时候指定
- 指定Task的并发:轮流执行