天天看點

storm筆記:storm基本概念

storm筆記:storm基本概念

本文主要介紹storm中的基本概念,從基礎上了解strom的體系結構,便于後續程式設計過程中作為基礎指導。主要的概念包括:

topology(拓撲)

stream(資料流)

spout(水龍頭、資料源)

bolt(螺栓,資料篩選處理)

stream group(資料流分組)

reliability(可靠性)

task(任務)

worker(執行者)

因為上述概念中除了可靠性reliability翻譯起來比較合适,其他幾個詞實在找不到合适的對應詞語,就直接使用原詞。

另外一點需要注意的是,本文使用的storm-core版本是0.10.0,包路徑為backtype.storm。因為阿裡巴巴開源了jstorm,據說strom2.0之後使用jstorm作為master主幹,從github上可以看到包路徑修改為了org.apache.storm,如果發現有包路徑錯誤的地方,請對應修改。

topology

Storm實時運作應用包邏輯上成為一個topology,一個Storm的topology相當于MapReduce的job。關鍵的不同是MapReduce的job有明确的起始和結束,而Storm的topology會一直運作下去(除非程序被殺死或取消部署)。一個topology是有多個spout、bolt通過資料流分組連接配接起來的圖結構。

storm筆記:storm基本概念

本地調試

本地調試模拟了叢集模式運作方式,對于開發和調試topology很有用。而且本地模式下運作topology與叢集模式下類似,隻是使用backtype.storm.LocalCluster來模拟叢集狀态。使用backtype.storm.LocalCluster#submitTopology方法送出topology,定義topology唯一名字、topology的配置(使用的是backtype.storm.Config對象)、以及topology對象(通過backtype.storm.topology.TopologyBuilder#createTopology方法建立)。通過backtype.storm.LocalCluster#killTopology殺掉指定topology,通過backtype.storm.LocalCluster#shutdown停止運作的本地叢集模式。比如:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();      

本地模式常用的配置如下:

Config.TOPOLOGY_MAX_TASK_PARALLELISM:這個配置項主要用來設定每個元件線程數的上限。在生産環境中,每個topology中有很多并行線程,但是在本地調試過程中,沒有必要存在這麼多并行線程,可以通過這個配置來進行設定。

Config.TOPOLOGY_DEBUG:設定為true,Storm将記錄每個tuple送出後的日志資訊,對于調試程式很有用。

叢集模式運作

叢集模式下運作topology與本地模式下類似,具體步驟如下:

定義topology(java下使用backtype.storm.topology.TopologyBuilder#createTopology建立)

通過backtype.storm.StormSubmitter#submitTopology送出topology到叢集。StormSubmitter需要的參數與LocalCluster`的參數一緻:topology名、topology配置、topology對象。比如:

Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);      

将自己的代碼與依賴的代碼打成jar包(除了storm自己的代碼,storm自己的代碼已經在classpath下了)。

如果使用的是Mava,可以使用Maven Assembly Plugin打包,在pom.xml中加入如下代碼:

<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
    <descriptorRefs>  
      <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
      <manifest>
        <mainClass>com.path.to.main.Class</mainClass>
      </manifest>
    </archive>
  </configuration>
</plugin>      

使用storm用戶端将topology送出到叢集,需要指定jar包路徑、類名、以及送出到main方法的參數清單:

./bin/storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3      

可以使用storm kill指令停止一個topology:

./bin/storm kill topologyName      

資料流

資料流是Storm核心定義的抽象概念,由無限制的tuple組成的序列,tuple包含一個或多個鍵值對清單,可以包含java自帶的類型或者自定義的可序列化的類型。

每個資料流可以在定義時通過backtype.storm.topology.OutputFieldsDeclarer的declareStream方法指定id。預設的id是“default”(直接使用declare将使用預設id)。

在上面的topology圖中,每個藍色、綠色、紅色的條帶是一個資料流,每個資料流内部由tuple組成。

spout

spout是topology中資料流的資料入口,充當資料采集器功能,通常spout從外部資料源讀取資料,将資料轉化為tuple,然後将它們發送到topology中。spout可以是可靠的或不可靠的。可靠的spout能夠保證在storm處理tuple出現異常情況下,能夠重新發送該tuple,而不可靠的spout不再處理已發送的tuple。

spout通過backtype.storm.topology.OutputFieldsDeclarer的declareStream方法定義資料流,通過backtype.storm.spout.SpoutOutputCollector的emit方法發送tream。

backtype.storm.spout.ISpout#nextTuple方法是spout的主要方法,可以發送用于發送新的tuple,或直接return(不需要發送新的tuple時,可以直接return)。

當Storm檢測到由某一spout發送的tuple成功處理後,将調用backtype.storm.spout.ISpout#ack方法;當調用失敗,将調用backtype.storm.spout.ISpout#fail方法。具體可以檢視後面的可靠性。

bolt

在topology中所有操作都是在bolt中執行的,它可以進行過濾、計算、連接配接、聚合、資料庫讀寫,以及其他操作。可以将一個或多個spout作為輸入,對資料進行運算後,選擇性的輸出一個或多個資料流。一個bolt可以做一些簡單的資料變換,複雜的資料處理需要多個步驟或多個bolt。

bolt可以訂閱一個或多個spout或bolt的資料,通過backtype.storm.topology.OutputFieldsDeclarer#declareStream方法定義輸出的資料流,通過backtype.storm.topology.BasicOutputCollector#emit方法送出資料。

bolt通過backtype.storm.topology.InputDeclarer類的shuffleGrouping方法指定需要訂閱的資料流,比如:declarer.shuffleGrouping("1", "stream_id"),同時InputDeclarer也提供了接收所有資料流的文法糖,比如:declarer.shuffleGrouping("1"),相當于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。這個地方有點亂,簡單的說,bolt B前面有一個spout A或bolt A,從A中發送一個id為a_id的資料流,如果B向隻訂閱id為a_id的資料流,就使用第一個方法,如果可以接收所有id類型的資料流,就用第二個方法。

該類型中主要執行的方法是cn.howardliu.demo.storm.kafka.wordCount.SentenceBolt#execute,用來擷取新的tuple,并進行處理。同樣使用backtype.storm.topology.BasicOutputCollector#emit方法發送新的tuple。bolt可以調用backtype.storm.task.OutputCollector#ack方法來通知Storm該tuple已經處理完成。

資料流分組

定義topology的很重要的一部分就是定義資料流資料流應該發送到那些bolt中。資料流分組就是将資料流進行分組,按需要進入不同的bolt中。可以使用Storm提供的分組規則,也可以實作backtype.storm.grouping.CustomStreamGrouping自定義分組規則。Storm定義了8種内置的資料流分組方法:

Shuffle grouping(随機分組):随機分發tuple給bolt的各個task,每個bolt執行個體接收到相同數量的tuple;

Fields grouping(按字段分組):根據指定字段的值進行分組。比如,一個資料流按照”user-id”分組,所有具有相同”user-id”的tuple将被路由到同一bolt的task中,不同”user-id”可能路由到不同bolt的task中;

Partial Key grouping(部分key分組):資料流根據field進行分組,類似于按字段分組,但是将在兩個下遊bolt之間進行均衡負載,當資源發生傾斜的時候能夠更有效率的使用資源。The Power of Both Choices: Practical Load

Balancing for Distributed Stream Processing Engines提供了更加詳細的說明;

All grouping(全複制分組):将所有tuple複制後分發給所有bolt的task。小心使用。

Global grouping(全局分組):将所有的tuple路由到唯一一個task上。Storm按照最小的task ID來選取接收資料的task;(注意,當時用全局分組是,設定bolt的task并發是沒有意義的,因為所有tuple都轉發到一個task上。同時需要注意的是,所有tuple轉發到一個jvm執行個體上,可能會引起storm叢集某個jvm或伺服器出現性能瓶頸或崩潰)

None grouping(不分組):這種分組方式指明不需要關心分組方式。實際上,不分組功能與随機分組相同。預留功能。

Direct grouping(指向型分組):資料源會調用emitDirect來判斷一個tuple應該由哪個storm元件接收,隻能在聲明了指向型的資料流上使用。

Local or shuffle grouping(本地或随機分組):當同一個worker程序中有目标bolt,将把資料發送到這些bolt中。否則,功能将與随機分組相同。該方法取決與topology的并發度,本地或随機分組可以減少網絡傳輸,降低IO,提高topology性能。

可靠行

storm可以保證每一個spout發出的tuple能夠被完整處理,通過跟蹤tuple樹上的每個tuple,檢查是否被成功處理。每個topology有一個逾時時間,如果storm檢查到某個tuple已經逾時,将重新發送該tuple。為了使用這種特性,需要定義tuple的起點,以及tuple被成功處理。更多内容檢視Guaranteeing message processing。

task

task是spout和bolt的執行個體,他們的nextTuple()和execute()方法會被executors線程調用執行。根據資料流分組來确定如何從某個task中的tuple發送到其他的task。

worker

topology運作在一個或多個worker程序上,worker是jvm虛拟機,運作topology所有task的一部分。比如,topology的并發是300,有50個worker,那每個worker就有6個task。Storm會平衡所有worker的task數量。通過Config.TOPOLOGY_WORKERS來設定topology的worker數量。