天天看點

storm學習一之storm概念1.storm中的一些術語2.storm叢集

(注:本文主要是一些學習筆記,内容多是由文末的參考連接配接自行整理而來,如覺本文尚淺可以直接參考文末連接配接)

1.storm中的一些術語

topology

  • 應用程式的所有邏輯被打包到Storm的topology中。Storm的topology類似與MapReduce的job。兩者之間的一個不同點就是MR的job最終是會運作結束的,而topology則會一直運作下去,除非人為的kill掉。topology中定義了spouts、bolts以及資料流的分組。

The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. One key difference is that a MapReduce job eventually finishes, whereas a topology runs forever (or until you kill it, of course). A topology is a graph of spouts and bolts that are connected with stream groupings.

streams

  • 消息流Streams是storm裡的最關鍵的抽象。一個消息流是一個沒有邊界的tuple序列,而這些tuples會被以一種分布式的方式并行地建立和處理
  • 也可以自定義類型(隻要能序列化即可)

The stream is the core abstraction in Storm. A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion. Streams are defined with a schema that names the fields in the stream’s tuples. By default, tuples can contain integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. You can also define your own serializers so that custom types can be used natively within tuples.

Resources:

Tuple: streams are composed of tuples

OutputFieldsDeclarer: used to declare streams and their schemas

Serialization: Information about Storm’s dynamic typing of tuples and declaring custom serializations

spout

  • spout是storm topology的資料入口,連接配接到資料源,将資料轉換為一個個tuple,并将tuple作為資料流進行發射emit。
  • spout可以是可靠的也可以是不可靠的,可靠的spout會重新重新發射一個失敗了的tuple,不可靠的spout在tuple發射了之後則不會管它了
  • spout可以發射給多個stream
  • spout的主要方法是nextTuple,nextTuple方法要麼發射一個新的元組到拓撲,要麼沒有新的元組發射就簡單的傳回。注意不要在重寫的該方法中形成阻塞。
  • 在發射成功時調用ack,失敗時調用fail

A spout is a source of streams in a topology. Generally spouts will read tuples from an external source and emit them into the topology (e.g. a Kestrel queue or the Twitter API). Spouts can either be reliable or unreliable. A reliable spout is capable of replaying a tuple if it failed to be processed by Storm, whereas an unreliable spout forgets about the tuple as soon as it is emitted.

Spouts can emit more than one stream. To do so, declare multiple streams using the declareStream method of OutputFieldsDeclarer and specify the stream to emit to when using the emit method on SpoutOutputCollector.

The main method on spouts is nextTuple. nextTuple either emits a new tuple into the topology or simply returns if there are no new tuples to emit. It is imperative that nextTuple does not block for any spout implementation, because Storm calls all the spout methods on the same thread.

The other main methods on spouts are ack and fail. These are called when Storm detects that a tuple emitted from the spout either successfully completed through the topology or failed to be completed. ack and fail are only called for reliable spouts. See the Javadoc for more information.

bolt

  • bolt可以将一個或者多個資料流作為輸入,對資料實施運算後,選擇性地輸出一個或者多個資料流。。一個bolt可以訂閱(subscribe)多個由spout或其他bolt發射的資料流。
  • Topology中的所有處理都在bolts中完成。Bolts什麼都可以做,如過濾、業務功能、聚合、連接配接(合并)、通路資料庫等等
  • 複雜的處理可以使用多個bolt來進行
  • Bolts可以發射多個流
  • Bolts的主要方法是execute方法,任務在一個新的元組輸入時執行該方法。Bolts使用OutputCollector對象發射新的元組。Bolts必須對每個處理的元組調用OutputCollector的ack方法,是以storm知道這個元組完成處理(并且能最終确定ack原始元組是安全的)。一般情況,處理一個輸入元組,基于此元組再發射0-N個元組,然後ack輸入元組。Strom提供了一個IBasicBolt接口自動調用ack方法。

All processing in topologies is done in bolts. Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.

Bolts can do simple stream transformations. Doing complex stream transformations often requires multiple steps and thus multiple bolts. For example, transforming a stream of tweets into a stream of trending images requires at least two steps: a bolt to do a rolling count of retweets for each image, and one or more bolts to stream out the top X images (you can do this particular stream transformation in a more scalable way with three bolts than with two).

Bolts can emit more than one stream. To do so, declare multiple streams using the declareStream method of OutputFieldsDeclarer and specify the stream to emit to when using the emit method on OutputCollector.

When you declare a bolt’s input streams, you always subscribe to specific streams of another component. If you want to subscribe to all the streams of another component, you have to subscribe to each one individually. InputDeclarer has syntactic sugar for subscribing to streams declared on the default stream id. Saying declarer.shuffleGrouping(“1”) subscribes to the default stream on component “1” and is equivalent to declarer.shuffleGrouping(“1”, DEFAULT_STREAM_ID).

The main method in bolts is the execute method which takes in as input a new tuple. Bolts emit new tuples using the OutputCollector object. Bolts must call the ack method on the OutputCollector for every tuple they process so that Storm knows when tuples are completed (and can eventually determine that its safe to ack the original spout tuples). For the common case of processing an input tuple, emitting 0 or more tuples based on that tuple, and then acking the input tuple, Storm provides an IBasicBolt interface which does the acking automatically.

Its perfectly fine to launch new threads in bolts that do processing asynchronously. OutputCollector is thread-safe and can be called at any time.

tuple

storm最基礎的資料結構了。其實作(TupleImpl.java)内部是使用一個List values來儲存各種類型的資料

可以通過如下方式擷取

String A = tuple.getString(0);
long a= tuple.getLong(1);
           

總體用這個圖來說表示的最完全:

storm學習一之storm概念1.storm中的一些術語2.storm叢集

Stream groupings

不是很想翻譯了,都挺原文說的直接明了的

資料是流式的進入storm叢集,storm提供了多種方式來将這一條條資料分發到不同的blot中。 Hadoop map reduce也有這個功能啊

Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt’s tasks.

There are eight built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing the CustomStreamGrouping interface:

Shuffle grouping: Tuples are randomly distributed across the bolt’s tasks in a way such that each bolt is guaranteed to get an equal number of tuples.

Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task, but tuples with different “user-id”'s may go to different tasks.

Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.

All grouping: The stream is replicated across all the bolt’s tasks. Use this grouping with care.

Global grouping: The entire stream goes to a single one of the bolt’s tasks. Specifically, it goes to the task with the lowest id.

None grouping: This grouping specifies that you don’t care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).

Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).

Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.

Tasks

spout和bolt是作為一個叢集中的task來運作的,每個任務對應一個執行線程。 stream groupings定義了如何将某個task中的tuple發送到其他的task。

Each spout or bolt executes as many tasks across the cluster. Each task corresponds to one thread of execution, and stream groupings define how to send tuples from one set of tasks to another set of tasks. You set the parallelism for each spout or bolt in the setSpout and setBolt methods of TopologyBuilder.

Workers

topology運作在一個或多個worker程序上,worker是jvm虛拟機,運作topology所有task的一部分。比如,topology的并發是300,有50個worker,那每個worker就有6個task。Storm會平衡所有worker的task數量。通過Config.TOPOLOGY_WORKERS來設定topology的worker數量。

Topologies execute across one or more worker processes. Each worker process is a physical JVM and executes a subset of all the tasks for the topology. For example, if the combined parallelism of the topology is 300 and 50 workers are allocated, then each worker will execute 6 tasks (as threads within the worker). Storm tries to spread the tasks evenly across all the workers.

2.storm叢集

nimbus守護程序

有點類似Hbase中HMaster的功能(負責分發region到各個RegionServer上,以及一些負載均衡)

nimbus則是負責分發storm叢集中的topology到supervisor上

nimbus守護程序的主要職責是管理,協調和監控在叢集上運作的topology。包括topology的釋出,任務支派,事件處理失敗時重新指派任務。

将topology釋出到Storm叢集,将預先打包的jar檔案的topology和配置資訊送出到nimbus伺服器上,一旦nimbus接收到了topology的壓縮包,會将jar包分發到足夠數量的supervisor節點上。當supervisor節點接收到了topology壓縮檔案,nimbus就會指派task(bolt和spout執行個體)到每個supervisor并且發送信号訓示supervisor生成足夠的worker來執行指派的task。

nimbus記錄所有supervisor節點的狀态和配置設定給它們的task。如果nimbus發現某個supervisor沒有上報心跳或者已經不可達了,它會将故障supervisor配置設定的task重新配置設定到叢集中的其他supervisor節點。

嚴格意義上講 nimbus 不會引起單點故障。這個特性是因為 nimubs 并不參與 topology 的資料處理過程,它僅僅是管理 topology 的初始化,任務分發和進行監控。實際上,如果 nimbus 守護程序在 topology 運作時停止了,隻要配置設定的 supervisor 和worker 健康運作,topology 一直繼續資料處理。要注意的是,在 nimbus 已經停止的情況下 supervisor 異常終止,因為沒有 nimbus 守護程序來重新指派失敗這個終止的 supervisor的任務,資料處理就會失敗。

supervisor守護程序

這裡對應到Hbase中RS似乎并不恰當,因為supervisor隻是一個守護程序,真正的任務執行下發到别的worker程序中了

supervisor守護程序等待nimbus配置設定任務後生成并監控workers(JVM程序)執行任務。supervisor和worker都是運作在不同的 JVM 程序上,如果由 supervisor 拉起的一個woker 程序因為錯誤(或者因為 Unix 終端的 kill-9 指令,Window 的 tskkill 指令強制結束)異常退出,supervisor 守護程序會嘗試重新生成新的 worker 程序。

如果一個worker甚至整個supervisor節點都故障了,Storm怎麼保障出錯時正在處理的tuples的傳輸呢?答案就在Storm的tuple的錨定和應答确認機制中。當打開了可靠i傳輸的選項,傳輸到故障節點上的tuples将不會收到應答确認,spout會因為逾時而重新發射原始的tuple。這樣的過程會一直重複直到topology從故障中恢複開始正常處理資料。

總體叢集結構用下圖來說最合适:

storm學習一之storm概念1.storm中的一些術語2.storm叢集

參考:

storm官網:http://storm.apache.org/releases/current/Concepts.html

參考:https://matt33.com/2015/05/26/the-basis-of-storm/#基礎

圖檔來自于:https://www.jianshu.com/p/90456bab8487

其他參考:https://www.howardliu.cn/storm/the-concepts-of-storm/