天天看点

Storm学习总结

Storm是一个实时分布式计算框架,是真正的实时处理框架。针对数据来一条记录就处理一次;流式数据处理技术框架 ,可以达到毫秒级别。

SparkStreaming是一个微批数据处理框架 ,只能达到秒级

一、Storm集群机构

Storm学习总结

Nimbus:主节点

  1. 接受客户端任务Topology的提交,负责在集群中发送代码(jar包)
  2. 发送任务到zookeeper,等待Supervisor领取
  3. 监听集群状态
  4. 容错机制:当Supervisor挂掉,重新分配任务给其他Supervisor执行

Supervisor:从节点

  1. 从zookeeper上获取Nimbus分配的任务,负责启动和停止本机上的Worker进程来执行任务。
  2. 需要将自己的运行状态汇报到zookeeper上,有Nimbus监控

Worker

  1. 负责启动本机上的Executor线程来执行任务Task
  2. 负责与其他Worker之间进行数据传输
  3. 需要将自己的状态汇报到zookeeper上,有Nimbus监控

Executor:真正执行任务Task的线程,由Worker启动和停止

Zookeeper

  1. 存储任务调度信息,各节点状态信息,心跳
  2. 使Storm集群保持无状态,这样具有高可靠性

Zookeeper上各节点存储的信息

  • /storm/supervisor/supervisor-id:supervisor节点的心跳
  • /storm/storms/topology-id:topology基本信息
  • /storm/assignments/topology-id:topology调度信息
  • /storm/workbeats/topology-id/worker-id:worker进程的心跳
  • /storm/errors/topology-id:任务运行错误信息

二、Storm安装部署与测试运行

安装要求

  1. zookeeper集群
  2. python 2.6.6以上:$ python --version

安装

1.解压
$ tar -zxvf apache-storm-0.9.6.tar.gz -C /opt/modules/
2.修改配置文件
vim /opt/modules/apache-storm-0.9.6/conf/storm_env.ini
JAVA_HOME:/opt/modules/jdk1.7.0_67

vim /opt/modules/apache-storm-0.9.6/conf/storm.yaml
storm.zookeeper.servers:
     - "hive-stu.ibeifeng.com"

	nimbus.host: "nimbus"     nimbus 主节点

storm.local.dir: "/mnt/storm"  参数指定storm本地文件存放目录,存放任务jar的目录
								
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
	指定supervisor节点启动一些工作进程 worker 所使用的默认端口
 supervisor storm的从节点

 ui : storm web监控页面

           

启动

$ nohup bin/storm nimbus > /dev/null 2>&1 &
$ nohup bin/storm supervisor > /dev/null 2>&1 &
$ nohup bin/storm ui > /dev/null 2>&1 &
$ nohup bin/storm logviewer > /dev/null 2>&1 &
           

测试运行

$ bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

ui界面:http://hadoop.com:8081/

storm批启动脚本编写

统一写在节点nimbus服务器

1、zookeeper批启动脚本:

zoo.cfg
server.1=hostname:2888:388

zookeeper.sh 内容:
#!/bin/bash

if [ $# -ne 1 ];then
        echo "Usage: bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}"
        exit 4
fi

source /etc/profile
ZK_HOME=/opt/modules/zookeeper-3.4.5-cdh5.3.6

for node in hive-stu.ibeifeng.com
do
        echo "$1 zookeeper in $node"
        ssh $node "source /etc/profile && $ZK_HOME/bin/zkServer.sh $1"
done


2、Storm停止脚本
主节点
nimbus
ui
从节点
logviewer
supervisor

在storm的主节点上添加一个指定supervisor节点的文件

supervisors

/opt/modules/apache-storm-0.9.6/conf/supervisors
内容 一行一个Supervisor节点服务器主机名

$ kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep daemon.supervisor | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
$ kill -9 `ps -ef | grep daemon.logviewer | awk '{print $2}' | head -n 1`

stop-storm.sh
内容:
#!/bin/bash

source /etc/profile
STORM_HOME=/opt/modules/apache-storm-0.9.6

#先在主节点上停止nimbus和ui进程
kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`

#在从节点上停止logviewer和supervisor
SUPERVISORS=$(cat $STORM_HOME/conf/supervisors)

for supervisor in $SUPERVISORS
do
        echo "stop supervisor and logviewer in $supervisor"
        ssh $supervisor kill -9 `ssh $supervisor ps -ef | grep daemon.supervisor | awk '{print $2}'|head -n 1`
	ssh $supervisor kill -9 `ssh $supervisor ps -ef | grep daemon.logviewer | awk '{print $2}'|head -n 1`
done

3、批启动脚本
start-storm.sh
内容:
#!/bin/bash

source /etc/profile
STORM_HOME=/opt/modules/apache-storm-0.9.6

#先在主节点上启动nimbus和ui进程
$STORM_HOME/bin/storm nimbus >/dev/null 2>&1 &
$STORM_HOME/bin/storm ui >/dev/null 2>&1 &


#在从节点上启动logviewer和supervisor
SUPERVISORS=$(cat $STORM_HOME/conf/supervisors)

for supervisor in $SUPERVISORS
do
        echo "start supervisor and logviewer in $supervisor"
        ssh $supervisor "source /etc/profile && nohup $STORM_HOME/bin/storm supervisor >/dev/null 2>&1" >/dev/null 2>&1 &
        ssh $supervisor "source /etc/profile && nohup $STORM_HOME/bin/storm logviewer >/dev/null 2>&1" >/dev/null 2>&1 &

done


           
  1. 三、Storm的编程模型

Topology:拓扑图,DVG有向无环图

数据结构:stream,tuple

Spout:数据采集器,从数据源采集数据,发送给Bolt

Bolt:数据处理器(Filter,function,aggregate聚合,join组合,存储数据到数据库等操作)

数据流分组类型

  1. Shuffle Grouping:随机分组,保证分发到每个Bolt的Task接收Tople数量大致一致
  2. Fields Grouping:按照字段分组,保证相同的字段分发到相同的Task
  3. Global Grouping:全局分组,所有的Tuple都发送到一个Task

Storm并发

  1. 多个Worker
    1. 在Storm的配置中配置:topology.workers
    2. 在启动topology的时候配置:Config config = new Config();conf.setNumWorkders(num) 优先级更高
  2. 多个Executor:在指定Spout或者Bolt的时候指定
  3. 指定Task的并发:轮流执行