天天看點

jstorm源碼分析:work管理管理work

管理work

前面已經知道了,通過SyncProcessEvent 可以從zookeeper上擷取配置設定到一台機器的任務資訊,那麼有了這些資訊,我們能幹什麼呢?接下來我們就來揭開SyncProcessEvent的面紗

主要成員

private LocalState localState; 本地狀态,主要是本地所有的work狀态資訊,包括心跳檢測資訊等   
           

private Map

入口

整個入口還是在我們的run函數中:

public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds ) 
           

第一個參數就是從zk上擷取的配置設定到這台機器上的work資訊

第二個是下載下傳失敗的拓撲任務

從檔案中擷取所有work的心跳資訊

所有的work都會把自己的資訊寫到檔案中,這樣我們可以通過讀取檔案擷取這些work的運作資訊

“`

public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf, LocalState localState, Map<Integer, LocalAssignment> assignedTasks) throws Exception {

    Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>();

    int now = TimeUtils.current_time_secs();

    /**
     * Get Map<workerId, WorkerHeartbeat> from local_dir/worker/ids/heartbeat
     */
    //從本地檔案中,擷取所有本地的程序
    Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
    for (Entry<String, WorkerHeartbeat> entry : idToHeartbeat.entrySet()) {

        String workerid = entry.getKey().toString();

        WorkerHeartbeat whb = entry.getValue();

        State state = null;
        //如果是空的,那麼就不是一個正常啟動的work,我們就去看看是否正在啟動,如果是,那麼去驗證是否zk上配置設定這個任務,如果否,直接從啟動清單中删除
        if (whb == null) {
            state = State.notStarted;
            //擷取work的啟動時間和端口
            Pair<Integer, Integer> timeToPort = this.workerIdToStartTimeAndPort.get(workerid);
            //有程序的啟動清單中
            if (timeToPort != null) {
                LocalAssignment localAssignment = assignedTasks.get(timeToPort.getSecond());
                //但是在zk中找不到任務的資訊,那麼就需要把從啟動清單中删除
                if (localAssignment == null) {
                    LOG.info("Following worker don't exit assignment, so remove this port=" + timeToPort.getSecond());
                    state = State.disallowed;
                    // workerId is disallowed ,so remove it from workerIdToStartTimeAndPort
                    Integer port = this.workerIdToStartTimeAndPort.get(workerid).getSecond();
                    this.workerIdToStartTimeAndPort.remove(workerid);
                    this.portToWorkerId.remove(port);
                }
            }
        } else if (matchesAssignment(whb, assignedTasks) == false) {   //看headbeat和zk配置設定資訊是否比對

            // workerId isn't approved or
            // isn't assigned task
            state = State.disallowed;

        } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
            //上次彙報時間距離目前時間超過了配置時間,并且不是被殺掉的,那麼就報告錯誤資訊。 統一标注是逾時
            if (killingWorkers.containsKey(workerid) == false) {
                String outTimeInfo = " it is likely to be out of memory, the worker is time out ";
                workerReportError.report(whb.getTopologyId(), whb.getPort(),
                        whb.getTaskIds(), outTimeInfo);
            }

            state = State.timedOut;
         } else {
            //程序是否死了,看/proc/workid目錄是否存在
            if (isWorkerDead(workerid)) {
                //不在被殺掉清單中
                if (killingWorkers.containsKey(workerid) == false){
                    String workeDeadInfo = "Worker is dead ";
                    workerReportError.report(whb.getTopologyId(), whb.getPort(),
                            whb.getTaskIds(), workeDeadInfo);
                }
                state = State.timedOut;
            } else {
                state = State.valid;
            }
        }

        if (state != State.valid) {
            if (killingWorkers.containsKey(workerid) == false)
                LOG.info("Worker:" + workerid + " state:" + state + " WorkerHeartbeat:" + whb + " assignedTasks:" + assignedTasks
                        + " at supervisor time-secs " + now);
        } else {
            LOG.debug("Worker:" + workerid + " state:" + state + " WorkerHeartbeat: " + whb + " at supervisor time-secs " + now);
        }
        //更新headbeats
        workeridHbstate.put(workerid, new StateHeartbeat(state, whb));
    }

    return workeridHbstate;
}
           

“`

代碼比較長,總結:

1 通過調用readWorkerHeartbeats 擷取所有work心跳資訊(讀取檔案擷取的)

2 對每個work的心跳資訊,分情況進行處理:

2.1 心跳資訊是空的:那麼如果在啟動清單中,且不是配置設定到這個機器上的任務(zk資訊中找不到),那麼從啟動清單中删除。 這種情況是work還在啟動中,但是任務已經被停止了。

2.2 檢測任務是否比對: 看work的heatbeats中的任務id和zk上該work的任務id是否一樣

2.3 目前work的heartbeat更新時間超過配置時間了:那是否在删除的清單中,如果不存在,報個錯誤。标示work狀态為逾時。

2.4 看程序是否死了,如果死了,那麼标示逾時,否則,就是正常的

3 就是把狀态更新到heartbeat中。

删除無用的worker

對work各種 狀态進行判斷

1 如果在work正在啟動清單中并且heatbeat狀态是沒有啟動,直接跳過。

2 我們狀态正常的,先記錄下

3 對于非正常的。 首先放到待删除清單中,如果是正常關閉中,那麼就記錄個日志。

通過調用shutWorker關閉所有待删除的work.

如果帶删除work都成功殺死了,那麼就釋放: 任務清空逾時資訊

清空已經删除的work的本地資訊

最後傳回所有正常的work,這個包含兩個部分,一個是前面狀态正常的,另外加上正在啟動清單中,有心跳資訊但是心跳狀态是nostarted 的