啟動流程圖

mk-assignments
功能:對目前叢集中所有Topology進行新一輪的任務排程。
實作源碼路徑:
\apache-storm-0.9.4\storm-core\src\clj\backtype\storm\daemon\ nimbus.clj
方法原型:
1
defnk mk-assignments [nimbus :scratch-topology-id nil]
方法說明:
參數nimbus為nimbus-data對象,scratch-topology-id為送出的Topology id.
從nimbus中依次擷取conf、storm-cluster-state。
擷取目前所有活躍的topology的id集合。
根據活躍的topology id調用read-topology-details方法,擷取TopologyDetails資訊并傳回<storm-id,TopologyDetails>集合。
根據<storm-id,TopologyDetails>集合建立topologies對象。
擷取已配置設定資源的Topology的id集合。
根據已配置設定資源的Topology id擷取每個Topology的任務配置設定情況assigments,并傳回<storm-id,Assigment>集合existing-assignments,除了scratch-topology-id指定的Topology不會擷取它的Assigments。
調用compute-new-topology->executor->node+port方法獲為所有Topology計算新的排程,傳回topology->executor->node+port.
調用basic-supervisor-details-map從Zookeeper中擷取所有SupervisorInfo資訊,傳回<supervisor-id,supervisorDetails>集合。
對第8步傳回的結果集中的每一項進行周遊構造新的Assignment對象集合new-assignments,Assigmnet定義如下:
(defrecord Assignent [master-code-dir node->host executor->node+port executor->start-time-secs])
master-code-dir:Nimbus在本地儲存Topology資訊路勁,主要包括stormjar.jar、stormcode.ser、stormconf.ser.
node->host:該Topology配置設定的<supervisor-id,hostname>集合.
executor->node+port:該Topology中executor的配置設定情況,node為supervisor-id,port為端口号。
executor->start-time-secs:該Topology對用的supervisor的啟動時間.
比較new-assignments與existing-assignments中的每一項是否有差異,如果沒有就列印一條提示資訊,如果有就将該Topology在Zookeeper中儲存的排程結果更新為new-assignments。
計算new-assignment中的每一項新增加的slot并進行配置設定。(新增的solt通過new-assignment中的node+port減去existing-assignment中的node+port得到,傳回為<topology-id,WorkerSlot>集合)
WorkerSlot格式為{ nodeId port }
功能總結:
擷取已配置設定資源的Topology的任務配置設定情況<storm-id,Assigment>集合(existing-assignments),擷取活躍的Topology資訊<storm-id,TopologyDetails>集合建立topologies對象。然後調用compute-new-topology->executor->node+port方法獲為所有Topology計算新的排程,傳回topology->executor->node+port再構造Assigmnet對象集。
compute-new-topology->executor->node+port
函數原型:
defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
參數說明:
nimbus:nimbus-data對象。
existing-assignments:目前已經配置設定的的任務,格式<topology-id,Assignment>。
Topologies:目前活躍的Topology,格式<storm-id,TopologyDetails>.
scratch-topology-id:需要重新排程的topology-id.
調用compute-topology->executors方法根據existing-assignments中的topology-id擷取<topology-id,executors>集合,與調用compute-executors方法效果作用一樣。
調用update-all-hearbeats!更新上一步中executor中的心跳資訊.
調用compute-topology->alive-executors擷取<topology-id,alive-executors>集合,每個topology-id對應的活躍的executor.
調用compute-supervisor->dead-ports擷取<supervisor-id,dead-ports>集合。
調用compute-topology->scheduler-assignment擷取<topology-id,Scheduler-AssignmentImpl>集合.(topology-id對用的任務配置設定情況Scheduler-AssignmentImpl == <ExecutorDetails,WorkerSlot>).
根據參數topologies中的topology-id進行條件過濾,該topology中所有executor為空或者topology中的所有executor不等于Topology中活躍的executor或者該Topology的num-use-workers小于其指定的num-worker,過濾後的結果叢集指派給missing-assignmnet-topologies.
調用all-scheduling-slots方法擷取<node-id,port>集合。
調用read-all-supervisor-details方法擷取<supervisor-id,supervisorDetails>集合。
根據參數nimbus、第5步、第8步的結果集構造Cluster對象。
調用nimbus中的scheduler方法進行任務排程。
從Cluster對象中擷取重新排程完之後的所有Assignments作為new-scheduler-assignment,格式為<topology-id,SchedulerAssignment>集合。
調用compute-topology->executor->node+port将第11步的結果集轉換為<topology-id,{executor[node port]}>集合。
調用basic-supervisor-details-map将Zookeeper中記錄的所有SupervisorInfo都轉換為SupervisorDetails,傳回<supervisor-id,SuperviosrDetails>集合.
流程圖:
compute-executor
函數原型:
defn- compute-executors [nimbus storm-id]
函數實作說明:
擷取storm-id(topology-id)對用的stormBase中component-executors形象(每個元件的并行度)。
擷取storm-id對應的storm-conf配置。
擷取storm-id對應Topology.
調用storm-task-info擷取<task-id,component-id>集合,其中task-id對該Topology的所有元件是全局遞增的。
将第4步的結果集轉換為<component-id,tasks>并按照升序排序。
将第1步的結果集<component-id,parallelism>與第5步的結果集進行join得到<component-id,[parallelism,tasks]>集合.
對第6步的結果集中的每一項進行處理,将tasks集合均勻分布到數目為parallelism的分區上。
功能總結:
擷取storm-id對應Topology所有元件的并行度(線程數),擷取該Topology中各元件TOPOLOGY_TASK資訊,最後的結果使每個線程中均勻分布多個task運作。