天天看點

分布式流處理架構 Apache Storm —— 程式設計模型詳解

一、簡介

二、IComponent接口

三、Spout

    3.1 ISpout接口

    3.2 BaseRichSpout抽象類

四、Bolt

    4.1 IBolt 接口

    4.2 BaseRichBolt抽象類

五、詞頻統計案例

六、送出到伺服器叢集運作

七、關于項目打包的擴充說明

一、簡介

下圖為Strom的運作流程圖,在開發Storm流處理程式時,我們需要采用内置或自定義實作

spout

(資料源)和

bolt

(處理單元),并通過

TopologyBuilder

将它們之間進行關聯,形成

Topology

分布式流處理架構 Apache Storm —— 程式設計模型詳解

二、IComponent接口

IComponent

接口定義了Topology中所有元件(spout/bolt)的公共方法,自定義的spout或bolt必須直接或間接實作這個接口。

public interface IComponent extends Serializable {

    /**
     * 聲明此拓撲的所有流的輸出模式。
     * @param declarer這用于聲明輸出流id,輸出字段以及每個輸出流是否是直接流(direct stream)
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);

    /**
     * 聲明此元件的配置。
     *
     */
    Map<String, Object> getComponentConfiguration();

}           

三、Spout

3.1 ISpout接口

自定義的spout需要實作

ISpout

接口,它定義了spout的所有可用方法:

public interface ISpout extends Serializable {
    /**
     * 元件初始化時候被調用
     *
     * @param conf ISpout的配置
     * @param context 應用上下文,可以通過其擷取任務ID群組件ID,輸入和輸出資訊等。
     * @param collector  用來發送spout中的tuples,它是線程安全的,建議儲存為此spout對象的執行個體變量
     */
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /**
     * ISpout将要被關閉的時候調用。但是其不一定會被執行,如果在叢集環境中通過kill -9 殺死程序時其就無法被執行。
     */
    void close();
    
    /**
     * 當ISpout從停用狀态激活時被調用
     */
    void activate();
    
    /**
     * 當ISpout停用時候被調用
     */
    void deactivate();

    /**
     * 這是一個核心方法,主要通過在此方法中調用collector将tuples發送給下一個接收器,這個方法必須是非阻塞的。     
     * nextTuple/ack/fail/是在同一個線程中執行的,是以不用考慮線程安全方面。當沒有tuples發出時應該讓
     * nextTuple休眠(sleep)一下,以免浪費CPU。
     */
    void nextTuple();

    /**
     * 通過msgId進行tuples處理成功的确認,被确認後的tuples不會再次被發送
     */
    void ack(Object msgId);

    /**
     * 通過msgId進行tuples處理失敗的确認,被确認後的tuples會再次被發送進行處理
     */
    void fail(Object msgId);
}           

3.2 BaseRichSpout抽象類

通常情況下,我們實作自定義的Spout時不會直接去實作

ISpout

接口,而是繼承

BaseRichSpout

BaseRichSpout

繼承自

BaseCompont

,同時實作了

IRichSpout

接口。

分布式流處理架構 Apache Storm —— 程式設計模型詳解

IRichSpout

接口繼承自

ISpout

IComponent

,自身并沒有定義任何方法:

public interface IRichSpout extends ISpout, IComponent {

}           

BaseComponent

抽象類空實作了

IComponent

getComponentConfiguration

方法:

public abstract class BaseComponent implements IComponent {
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }    
}           

BaseRichSpout

繼承自

BaseCompont

類并實作了

IRichSpout

接口,并且空實作了其中部分方法:

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
    @Override
    public void close() {}

    @Override
    public void activate() {}

    @Override
    public void deactivate() {}

    @Override
    public void ack(Object msgId) {}

    @Override
    public void fail(Object msgId) {}
}           

通過這樣的設計,我們在繼承

BaseRichSpout

實作自定義spout時,就隻有三個方法必須實作:

  • open : 來源于ISpout,可以通過此方法擷取用來發送tuples的

    SpoutOutputCollector

  • nextTuple :來源于ISpout,必須在此方法内部發送tuples;
  • declareOutputFields :來源于IComponent,聲明發送的tuples的名稱,這樣下一個元件才能知道如何接受。

四、Bolt

bolt接口的設計與spout的類似:

4.1 IBolt 接口

/**
  * 在用戶端計算機上建立的IBolt對象。會被被序列化到topology中(使用Java序列化),并送出給叢集的主機(Nimbus)。  
  * Nimbus啟動workers反序列化對象,調用prepare,然後開始處理tuples。
 */

public interface IBolt extends Serializable {
    /**
     * 元件初始化時候被調用
     *
     * @param conf storm中定義的此bolt的配置
     * @param context 應用上下文,可以通過其擷取任務ID群組件ID,輸入和輸出資訊等。
     * @param collector  用來發送spout中的tuples,它是線程安全的,建議儲存為此spout對象的執行個體變量
     */
    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

    /**
     * 處理單個tuple輸入。
     * 
     * @param Tuple對象包含關于它的中繼資料(如來自哪個元件/流/任務)
     */
    void execute(Tuple input);

    /**
     * IBolt将要被關閉的時候調用。但是其不一定會被執行,如果在叢集環境中通過kill -9 殺死程序時其就無法被執行。
     */
    void cleanup();           

4.2 BaseRichBolt抽象類

同樣的,在實作自定義bolt時,通常是繼承

BaseRichBolt

抽象類來實作。

BaseRichBolt

繼承自

BaseComponent

抽象類并實作了

IRichBolt

接口。

分布式流處理架構 Apache Storm —— 程式設計模型詳解

IRichBolt

接口繼承自

IBolt

IComponent

,自身并沒有定義任何方法:

public interface IRichBolt extends IBolt, IComponent {

}           

通過這樣的設計,在繼承

BaseRichBolt

實作自定義bolt時,就隻需要實作三個必須的方法:

  • prepare: 來源于IBolt,可以通過此方法擷取用來發送tuples的

    OutputCollector

  • execute:來源于IBolt,處理tuples和發送處理完成的tuples;
  • declareOutputFields :來源于IComponent,聲明發送的tuples的名稱,這樣下一個元件才能知道如何接收。

五、詞頻統計案例

5.1 案例簡介

這裡我們使用自定義的

DataSourceSpout

産生詞頻資料,然後使用自定義的

SplitBolt

CountBolt

來進行詞頻統計。

分布式流處理架構 Apache Storm —— 程式設計模型詳解
案例源碼下載下傳位址:storm-word-count

5.2 代碼實作

1. 項目依賴

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
</dependency>           

2. DataSourceSpout

public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模拟産生資料
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模拟資料
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}           

上面類使用

productData

方法來産生模拟資料,産生資料的格式如下:

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm
           

3. SplitBolt

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}           

4. CountBolt

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 輸出
        System.out.print("目前實時統計結果:");
        counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
        System.out.println();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}           

5. LocalWordCountApp

通過TopologyBuilder将上面定義好的元件進行串聯形成 Topology,并送出到本地叢集(LocalCluster)運作。通常在開發中,可先用本地模式進行測試,測試完成後再送出到伺服器叢集運作。

public class LocalWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        
        // 指明将 DataSourceSpout 的資料發送到 SplitBolt 中處理
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
        
        //  指明将 SplitBolt 的資料發送到 CountBolt 中 處理
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        // 建立本地叢集用于測試 這種模式不需要本機安裝storm,直接運作該Main方法即可
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountApp",
                new Config(), builder.createTopology());
    }

}           

6. 運作結果

啟動

WordCountApp

的main方法即可運作,采用本地模式Storm會自動在本地搭建一個叢集,是以啟動的過程會稍慢一點,啟動成功後即可看到輸出日志。

分布式流處理架構 Apache Storm —— 程式設計模型詳解

六、送出到伺服器叢集運作

6.1 代碼更改

送出到伺服器的代碼和本地代碼略有不同,送出到伺服器叢集時需要使用

StormSubmitter

進行送出。主要代碼如下:

為了結構清晰,這裡建立ClusterWordCountApp類來示範叢集模式的送出。實際開發中可以将兩種模式的代碼寫在同一個類中,通過外部傳參來決定啟動何種模式。
public class ClusterWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        
        // 指明将 DataSourceSpout 的資料發送到 SplitBolt 中處理
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
        
        //  指明将 SplitBolt 的資料發送到 CountBolt 中 處理
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        // 使用StormSubmitter送出Topology到伺服器叢集
        try {
            StormSubmitter.submitTopology("ClusterWordCountApp",  new Config(), builder.createTopology());
        } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
            e.printStackTrace();
        }
    }

}           

6.2 打包上傳

打包後上傳到伺服器任意位置,這裡我打包後的名稱為

storm-word-count-1.0.jar

# mvn clean package -Dmaven.test.skip=true           

6.3 送出Topology

使用以下指令送出Topology到叢集:

# 指令格式: storm jar jar包位置 主類的全路徑 ...可選傳參
storm jar /usr/appjar/storm-word-count-1.0.jar  com.heibaiying.wordcount.ClusterWordCountApp           

出現

successfully

則代表送出成功:

分布式流處理架構 Apache Storm —— 程式設計模型詳解

6.4 檢視Topology與停止Topology(指令行方式)

# 檢視所有Topology
storm list

# 停止  storm kill topology-name [-w wait-time-secs]
storm kill ClusterWordCountApp -w 3           
分布式流處理架構 Apache Storm —— 程式設計模型詳解

6.5 檢視Topology與停止Topology(界面方式)

使用UI界面同樣也可進行停止操作,進入WEB UI界面(8080端口),在

Topology Summary

中點選對應Topology 即可進入詳情頁面進行操作。

分布式流處理架構 Apache Storm —— 程式設計模型詳解

七、關于項目打包的擴充說明

mvn package的局限性

在上面的步驟中,我們沒有在POM中配置任何插件,就直接使用

mvn package

進行項目打包,這對于沒有使用外部依賴包的項目是可行的。但如果項目中使用了第三方JAR包,就會出現問題,因為

package

打包後的JAR中是不含有依賴包的,如果此時你送出到伺服器上運作,就會出現找不到第三方依賴的異常。

這時候可能大家會有疑惑,在我們的項目中不是使用了

storm-core

這個依賴嗎?其實上面之是以我們能運作成功,是因為在Storm的叢集環境中提供了這個JAR包,在安裝目錄的lib目錄下:

分布式流處理架構 Apache Storm —— 程式設計模型詳解

為了說明這個問題我在Maven中引入了一個第三方的JAR包,并修改産生資料的方法:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.8.1</version>
</dependency>           

StringUtils.join()

這個方法在

commons.lang3

storm-core

中都有,原來的代碼無需任何更改,隻需要在

import

時指明使用

commons.lang3

import org.apache.commons.lang3.StringUtils;

private String productData() {
    Collections.shuffle(list);
    Random random = new Random();
    int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
    return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}           

此時直接使用

mvn clean package

打包運作,就會抛出下圖的異常。是以這種直接打包的方式并不适用于實際的開發,因為實際開發中通常都是需要第三方的JAR包。

分布式流處理架構 Apache Storm —— 程式設計模型詳解

想把依賴包一并打入最後的JAR中,maven提供了兩個插件來實作,分别是

maven-assembly-plugin

maven-shade-plugin

。鑒于本篇文章篇幅已經比較長,且關于Storm打包還有很多需要說明的地方,是以關于Storm的打包方式單獨整理至下一篇文章:

Storm三種打包方式對比分析

參考資料

  1. Running Topologies on a Production Cluster
  2. Pre-defined Descriptor Files
更多大資料系列文章可以參見個人 GitHub 開源項目: 大資料入門指南