天天看點

大資料之storm

一、離線計算與實時計算

離線計算: 批量擷取資料、批量傳輸資料;周期性計算資料,展示資料;

代表技術: sqoop批量導入,HDFS批量存儲,mapreduce批量計算,Hive批量計算資料…

就業方向: hivesql, Hadoop叢集運維

實時計算: 資料實時産生,資料實時傳輸,資料實時計算,實時展示;

代表技術: Flume實時擷取資料,kafka/metaq 實時資料存儲,storm/Jstrom實時計算資料,redis實時結果緩存,mysql持久化存儲…

流式計算一般架構圖

大資料之storm

 flume實時采集資料。

 Kafka消息隊列,用來臨時儲存資料。

 Strom實時計算資料。

 Redis實時存儲資料。

流式計算就是将源源不斷産生的資料實時收集并實時計算,盡可能快的得到計算結果。

二、Strom介紹

Storm用來實時計算資料架構,特點:低延遲、高可用、分布式、可擴充、資料不丢失。提供簡單容易了解的接口,便于開發。

Storm與Hadoop差別

 Storm用于實時計算,Storm處理的資料儲存在記憶體中,資料源源不斷,通過網絡傳輸進來;

 Hadoop用于離線計算,處理的一批一批資料儲存在檔案系統中,資料儲存在磁盤中。

三、Strom程式設計模型

大資料之storm

Nimbus : 任務配置設定,對任務監督;

Zookeeper : 儲存任務配置設定的資訊、心跳資訊、中繼資料資訊。

Supervisor : 接受任務,并啟動worker。worker的數量根據端口号來的;

Worker : 執行任務的具體元件(其實就是一個JVM),可以執行Spout或者bolt兩種類型的任務; 一個worker就是一個端口号;

Task : Task=線程=executor, 一個Task屬于一個Spout或者Bolt并發任務;

并發度:

使用者指定的一個任務,可以被多個線程執行,對應到storm中就是一個task,并發度的數量等于線程的數量。一個任務的多個線程,會被運作在多個Worker(JVM)上,有一種類似于平均算法的負載均衡政策。盡可能減少網絡IO;

大資料之storm

一個Strom程式可以擷取多個資料源,每個topology的資料都是自己獨有的,和其他的topology沒有關系。

spout : 擷取外部資料源;

Bolt : 業務邏輯處理節點,可以有多個;

Tuple : 消息發送的最小單元,是一個Tuple對象。

四、Strom叢集部署

搭建前,需要安裝好zookeeper叢集

1、準備工作

配置hosts

vi  /etc/hosts
 
192.168.239.128 storm01 zk01 hadoop01
192.168.239.129 storm02 zk02 hadoop02
192.168.239.130 storm03 zk03 hadoop03      

關閉防火牆

chkconfig iptables off  && setenforce 0      

建立使用者

groupadd realtime && useradd realtime && usermod -a -G realtime realtime      

建立工作目錄并賦權

mkdir /export
mkdir /export/servers
chmod 755 -R /export      

切換到realtime使用者下

su realtime      

2、解壓壓縮包

3、修改配置檔案

4、分發壓縮包

5、啟動叢集

6、檢視叢集

通路storm01:8080,即可看到storm的ui界面。

四、Storm操作指令

1、送出任務

storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】

storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount      

注意:因為從外部擷取資料、結果儲存到redis,是以不需要指定輸入、輸出

路徑,這是和hadoop差別

2、殺死任務

storm kill 【拓撲名稱】 -w 10

storm kill topology-name -w 10      

3、停運任務

storm deactivte 【拓撲名稱】

storm deactivte topology-name      

五、問題解答

1、worker與topology

一個worker隻屬于一個topology,一個topology包含多個worker,其實就是這個topology運作在多個worker上。

一個topology要求的worder數量如何不被滿足,叢集在配置設定任務時,根據現有的worker先運作的topology。如果目前叢集中worder數量為0,那麼最新送出的topology将隻會标示為active, 不會運作。隻有叢集有了空閑資源才會被運作。

2、StreamGrouping:(分組政策)

shuffleGrouping(随機分組)、FieldGrouping(按字段分組)、不分組等

3、運作模式

1、叢集模式:在叢集上運作

2、本地模式:在目前idea上運作,測試代碼功能時,可以選擇該模式

4、如何指定驅動類中每個元件的并發度數量

1、根據上遊的資料量來設定spout并發度

2、根據業務複雜度和execute方法執行時間設定bolt并發度

3、根據叢集可用資源配置,一般情況下70%資源使用率

5、如何設定worker的數量

worker的數量根據程式并發度task數量來均分,實際業務場景中,并反複調整

六、worldcount代碼編寫

1、WordCountTopologMain

package cn.itcast.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * Created by maoxiangyi on 2016/4/27.
 */
public class WordCountTopologMain {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

        //1、準備一個TopologyBuilder
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("mySpout",new MySpout(),2);
        topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");
        topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));
        //topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).shuffleGrouping("mybolt1");
        //  config.setNumWorkers(2);
        
        /**
         * i
         * am
         * lilei
         * love
         * hanmeimei
         */

        //2、建立一個configuration,用來指定目前topology 需要的worker的數量
        Config config =  new Config();
        config.setNumWorkers(2);

        //3、送出任務  -----兩種模式 本地模式和叢集模式
        //StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
    }
}      

2、MySpout

package cn.itcast.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by maoxiangyi on 2016/4/27.
 */
public class MySpout extends BaseRichSpout {
    SpoutOutputCollector collector;

    //初始化方法
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //storm 架構在 while(true) 調用nextTuple方法
    public void nextTuple() {
        collector.emit(new Values("i am lilei love hanmeimei"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("love"));
    }
}      

3、MySplitBolt

package cn.itcast.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by maoxiangyi on 2016/4/27.
 */
public class MySplitBolt extends BaseRichBolt {
    OutputCollector collector;
    //初始化方法
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    // 被storm架構 while(true) 循環調用  傳入參數tuple
    public void execute(Tuple input) {
        String line = input.getString(0);
        String[] arrWords = line.split(" ");
        for (String word:arrWords){
            collector.emit(new Values(word,1));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","num"));
    }
}      
package cn.itcast.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by maoxiangyi on 2016/4/27.
 */
public class MyCountBolt extends BaseRichBolt {
    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>();

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple input) {
        String word = input.getString(0);
        Integer num = input.getInteger(1);
        System.out.println(Thread.currentThread().getId() + "    word:"+word);
        if (map.containsKey(word)){
            Integer count = map.get(word);
            map.put(word,count + num);
        }else {
            map.put(word,num);
        }
//        System.out.println("count:"+map);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
       //不輸出
    }
}