天天看點

Storm記錄09- Storm Topology的并發度

Storm Topology的并發度

Understanding the parallelism of a Storm topology

https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology

概念

一個Topology可以包含一個或多個worker(并行的跑在不同的machine上), 是以worker process就是執行一個topology的子集, 并且worker隻能對應于一個topology。

一個worker占用一個supervisor端口,worker對應Java中的程序,擁有獨立的jvm。

一個worker可用包含一個或多個executor, 每個component (spout或bolt)至少對應于一個executor, 是以可以說executor執行一個compenent的子集, 同時一個executor隻能對應于一個component。

Task就是具體的處理邏輯對象, 一個executor線程可以執行一個或多個tasks。但一般預設每個executor隻執行一個task, 是以我們往往認為task就是執行線程, 其實不然。task代表最大并發度, 一個component的task數是不會改變的, 但是一個componet的executer數目是會發生變化的。當task數大于executor數時, executor數代表實際并發數。

A worker process executes a subset of a topology. 

A worker process belongs to a specific topology and may run one or more executors for one or more components (spouts or bolts) of this topology. 

A running topology consists of many such processes running on many machines within a Storm cluster.

An executor is a thread that is spawned by a worker process. It may run one or more tasks for the same component (spout or bolt).

A task performs the actual data processing — each spout or bolt that you implement in your code executes as many tasks across the cluster. 

The number of tasks for a component is always the same throughout the lifetime of a topology, but the number of executors (threads) for a component can change over time. This means that the following condition holds true: 

#threads ≤ #tasks

By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.

Configuring the parallelism of a topology, 并發度的配置

The following sections give an overview of the various configuration options and how to set them in your code. There is more than one way of setting these options though, and the table lists only some of them.

Storm currently has the following order of precedence for configuration settings:

defaults.yaml

 < 

storm.yaml

 < topology-specific configuration < internal component-specific configuration < external component-specific configuration

對于并發度的配置, 在storm裡面可以在多個地方進行配置, 優先級如上面所示... 

具體包含,

worker processes的數目, 可以通過配置檔案和代碼中配置, worker就是執行程序, 是以考慮并發的效果, 數目至少應該大于machines的數目

executor的數目, component的并發線程數,隻能在代碼中配置(通過setBolt和setSpout的參數), 例如, setBolt("green-bolt", new GreenBolt(), 2)

tasks的數目, 可以不配置, 預設和executor 1:1, 也可以通過setNumTasks()配置

Number of worker processes

  • Description: How many worker processes to create for the topology across machines in the cluster.
  • Configuration option: TOPOLOGY_WORKERS
  • How to set in your code (examples):
    • Config#setNumWorkers

Number of executors (threads)

  • Description: How many executors to spawn per component.
  • Configuration option: ?
    • TopologyBuilder#setSpout()
    • TopologyBuilder#setBolt()
    • Note that as of Storm 0.8 the 

      parallelism_hint

       parameter now specifies the initial number of executors (not tasks!) for that bolt.

Number of tasks

  • Description: How many tasks to create per component.
  • Configuration option: TOPOLOGY_TASKS
    • ComponentConfigurationDeclarer#setNumTasks()

Here is an example code snippet to show these settings in practice:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout);      

In the above code we configured Storm to run the bolt 

GreenBolt

 with an initial number of two executors and four associated tasks. Storm will run two tasks per executor (thread). If you do not explicitly configure the number of tasks, Storm will run by default one task per executor.

Example of a running topology

The following illustration shows how a simple topology would look like in operation. 

The topology consists of three components: one spout called 

BlueSpout

 and two bolts called 

GreenBolt

 and 

YellowBolt

The components are linked such that 

BlueSpout

 sends its output to 

GreenBolt

, which in turns sends its own output to 

YellowBolt

.

Config conf =  Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout",  BlueSpout(), 2); // set parallelism hint 
topologyBuilder.setBolt("green-bolt",  GreenBolt(), 2) 
               .setNumTasks(4)        //set tasks number to 4           
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt",  YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );      

圖和代碼, 很清晰, 通過setBolt和setSpout一共定義2+2+6=10個executor threads 

并且同setNumWorkers設定2個workers, 是以storm會平均在每個worker上run 5個executors

而對于green-bolt, 定義了4個tasks, 是以每個executor中有2個tasks

How to change the parallelism of a running topology, 動态的改變并發度

Storm支援在不restart topology的情況下, 動态的改變(增減)worker processes的數目和executors的數目, 稱為rebalancing. 

通過Storm web UI, 或者通過storm rebalance指令, 見下面的例子

A nifty feature of Storm is that you can increase or decrease the number of worker processes and/or executors without being required to restart the cluster or the topology. The act of doing so is called rebalancing.

You have two options to rebalance a topology:

  1. Use the Storm web UI to rebalance the topology.
# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10