天天看點

Nimbus<一>Storm系列(五)架構分析之Nimbus啟動過程

啟動流程圖

Nimbus<一>Storm系列(五)架構分析之Nimbus啟動過程

mk-assignments

功能:對目前叢集中所有Topology進行新一輪的任務排程。

實作源碼路徑: 

\apache-storm-0.9.4\storm-core\src\clj\backtype\storm\daemon\ nimbus.clj

方法原型:

defnk mk-assignments [nimbus :scratch-topology-id nil]

方法說明:

參數nimbus為nimbus-data對象,scratch-topology-id為送出的Topology id.

從nimbus中依次擷取conf、storm-cluster-state。

擷取目前所有活躍的topology的id集合。

根據活躍的topology id調用read-topology-details方法,擷取TopologyDetails資訊并傳回<storm-id,TopologyDetails>集合。

根據<storm-id,TopologyDetails>集合建立topologies對象。

擷取已配置設定資源的Topology的id集合。

根據已配置設定資源的Topology id擷取每個Topology的任務配置設定情況assigments,并傳回<storm-id,Assigment>集合existing-assignments,除了scratch-topology-id指定的Topology不會擷取它的Assigments。

調用compute-new-topology->executor->node+port方法獲為所有Topology計算新的排程,傳回topology->executor->node+port.

調用basic-supervisor-details-map從Zookeeper中擷取所有SupervisorInfo資訊,傳回<supervisor-id,supervisorDetails>集合。

對第8步傳回的結果集中的每一項進行周遊構造新的Assignment對象集合new-assignments,Assigmnet定義如下: 

(defrecord Assignent [master-code-dir node->host executor->node+port executor->start-time-secs]) 

master-code-dir:Nimbus在本地儲存Topology資訊路勁,主要包括stormjar.jar、stormcode.ser、stormconf.ser. 

node->host:該Topology配置設定的<supervisor-id,hostname>集合. 

executor->node+port:該Topology中executor的配置設定情況,node為supervisor-id,port為端口号。 

executor->start-time-secs:該Topology對用的supervisor的啟動時間.

比較new-assignments與existing-assignments中的每一項是否有差異,如果沒有就列印一條提示資訊,如果有就将該Topology在Zookeeper中儲存的排程結果更新為new-assignments。

計算new-assignment中的每一項新增加的slot并進行配置設定。(新增的solt通過new-assignment中的node+port減去existing-assignment中的node+port得到,傳回為<topology-id,WorkerSlot>集合) 

WorkerSlot格式為{ nodeId port }

功能總結: 

擷取已配置設定資源的Topology的任務配置設定情況<storm-id,Assigment>集合(existing-assignments),擷取活躍的Topology資訊<storm-id,TopologyDetails>集合建立topologies對象。然後調用compute-new-topology->executor->node+port方法獲為所有Topology計算新的排程,傳回topology->executor->node+port再構造Assigmnet對象集。

compute-new-topology->executor->node+port

函數原型:

defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]

參數說明: 

nimbus:nimbus-data對象。 

existing-assignments:目前已經配置設定的的任務,格式<topology-id,Assignment>。 

Topologies:目前活躍的Topology,格式<storm-id,TopologyDetails>. 

scratch-topology-id:需要重新排程的topology-id.

調用compute-topology->executors方法根據existing-assignments中的topology-id擷取<topology-id,executors>集合,與調用compute-executors方法效果作用一樣。

調用update-all-hearbeats!更新上一步中executor中的心跳資訊.

調用compute-topology->alive-executors擷取<topology-id,alive-executors>集合,每個topology-id對應的活躍的executor.

調用compute-supervisor->dead-ports擷取<supervisor-id,dead-ports>集合。

調用compute-topology->scheduler-assignment擷取<topology-id,Scheduler-AssignmentImpl>集合.(topology-id對用的任務配置設定情況Scheduler-AssignmentImpl == <ExecutorDetails,WorkerSlot>).

根據參數topologies中的topology-id進行條件過濾,該topology中所有executor為空或者topology中的所有executor不等于Topology中活躍的executor或者該Topology的num-use-workers小于其指定的num-worker,過濾後的結果叢集指派給missing-assignmnet-topologies.

調用all-scheduling-slots方法擷取<node-id,port>集合。

調用read-all-supervisor-details方法擷取<supervisor-id,supervisorDetails>集合。

根據參數nimbus、第5步、第8步的結果集構造Cluster對象。

調用nimbus中的scheduler方法進行任務排程。

從Cluster對象中擷取重新排程完之後的所有Assignments作為new-scheduler-assignment,格式為<topology-id,SchedulerAssignment>集合。

調用compute-topology->executor->node+port将第11步的結果集轉換為<topology-id,{executor[node port]}>集合。

調用basic-supervisor-details-map将Zookeeper中記錄的所有SupervisorInfo都轉換為SupervisorDetails,傳回<supervisor-id,SuperviosrDetails>集合.

流程圖:

Nimbus<一>Storm系列(五)架構分析之Nimbus啟動過程

compute-executor

函數原型:

defn- compute-executors [nimbus storm-id]

函數實作說明:

擷取storm-id(topology-id)對用的stormBase中component-executors形象(每個元件的并行度)。

擷取storm-id對應的storm-conf配置。

擷取storm-id對應Topology.

調用storm-task-info擷取<task-id,component-id>集合,其中task-id對該Topology的所有元件是全局遞增的。

将第4步的結果集轉換為<component-id,tasks>并按照升序排序。

将第1步的結果集<component-id,parallelism>與第5步的結果集進行join得到<component-id,[parallelism,tasks]>集合.

對第6步的結果集中的每一項進行處理,将tasks集合均勻分布到數目為parallelism的分區上。

功能總結:

擷取storm-id對應Topology所有元件的并行度(線程數),擷取該Topology中各元件TOPOLOGY_TASK資訊,最後的結果使每個線程中均勻分布多個task運作。

繼續閱讀