天天看點

How it works(11) NodeODM源碼閱讀(B) 任務控制與任務後處理結語

任務運作時

上一節的最後,通過"TaskManager.singleton().addNew(task);"方法,建立的任務被加入進了任務管理器,自此,任務的一切都交由任務管理器來調控了.

由上方的代碼可以看出,TaskManager是一個單例執行個體.原因是保證當Taskmanager(下面簡稱任務管理器)在不同子產品被引用時,都指向唯一的任務管理器執行個體.

//在整個應用啟動時,執行TaskManager.initialize()
//在其他任何時候,執行TaskManager.singleton()即可擷取執行個體
module.exports = {
    singleton: function(){ return taskManager; },
    initialize: function(cb){ 
        taskManager = new TaskManager(cb);
    }
};
           

任務最終運作

任務加入任務管理器時發生了什麼呢:

addNew(task){
        //将任務加入任務隊列
        this.tasks[task.uuid] = task;
        //排程函數
        this.processNextTask();
    }
           

可以看出,任務沒有被直接執行,隻是進入了一個隊列進行排隊.

最終何時運作,完全依靠任務管理器的排程:

//尋找下一個可以執行的任務
    //觸發時機:任務被添加時,任務完成時,任務被手動取消/移除/重新開機時
    processNextTask(){
        //如果目前運作的任務數量小于最大任務數
        if (this.runningQueue.length < config.parallelQueueProcessing){
            //在任務清單中尋找第一個狀态是"排隊中"的任務
            let task = this.findNextTaskToProcess();
            if (task){
                //将該任務添加到正在運作任務清單
                this.addToRunningQueue(task);
                //運作任務
                task.start(() => {
                    //當任務結束時調用webhooks
                    task.callWebhooks();
                    //将本任務從正在運作任務清單移除
                    this.removeFromRunningQueue(task);
                    //尋找下一個排隊中的任務
                    this.processNextTask();
                });
                //如果本節點可允許同時允許的任務較多,就再檢查一遍
                if (this.runningQueue.length < config.parallelQueueProcessing) this.processNextTask();
            }
        }
    }

    //将本任務從正在運作任務清單移除
    removeFromRunningQueue(task){
        //将自己從清單裡排除
        this.runningQueue = this.runningQueue.filter(t => t !== task);
    }
           

任務的取消/移除/重新開機

任務運作後的操作相對就簡單多了,基本上調用task的對于方法:

cancel(uuid, cb){
        //擷取指定的任務
        let task = this.find(uuid, cb);
        if (task){
            if (!task.isCanceled()){
                //調用任務的取消方法
                task.cancel(err => {
                    //從正在運作清單移除
                    this.removeFromRunningQueue(task);
                    //尋找下一個可執行任務
                    this.processNextTask();
                    cb(err);
                });
            }else{
                cb(null);
            }
        }
    }

    // 移除任務之前自動取消任務
    remove(uuid, cb){
        this.cancel(uuid, err => {
            if (!err){
                let task = this.find(uuid, cb);
                if (task){
                    //删除目錄
                    task.cleanup(err => {
                        if (!err){
                            //從清單中移除該任務
                            delete(this.tasks[uuid]);
                            //尋找下一個可運作任務
                            this.processNextTask();
                            cb(null);
                        }else cb(err);
                    });
                }
            }else cb(err);
        });
    }

    // 重新啟動狀态為已取消或已失敗的任務
    restart(uuid, options, cb){
        let task = this.find(uuid, cb);
        if (task){
            //調用任務的方法,将任務切換為排隊中狀态
            task.restart(options, err => {
                //尋找下一個可運作任務
                if (!err) this.processNextTask();
                cb(err);
            });
        }
    }
           

任務本身

以上一直在讨論如何操作任務之外的環境及如何調用任務的各種功能,現在将視線回歸到任務,所有環境的準備都是為了任務,所有指令的執行者也是任務.

其實對于一個具體任務,與整體類似,也分如下幾個步驟:

  • 準備照片和控制點檔案等運作前準備
  • 運作任務
  • 響應諸如停止,移除,回報進度等指令

運作前準備

運作前做的工作很簡單:

  • 初始化任務描述資訊,如建立時間
  • 擷取所有圖檔的路徑
  • 擷取所有控制點描述檔案的路徑.

運作任務

整個程式的核心就在這裡了.而任務運作又是由若幹子任務的運作構成的:

  • 調用ODM指令處理圖檔
  • 生成完成後調用後處理腳本
  • 将最終成果打包到zip中

先看看這些子任務是如何實作的:

調用ODM指令處理圖檔

ODM本身并不包含在NodeODM項目中,而是安裝為指令行工具,是以,這一步其實是調用指令行的ODM指令進行處理:

module.exports = {
    run: function(options, projectName, done, outputReceived){
        //待運作的腳本的路徑
        const command = path.join(config.odm_path, "run.sh"),
              params = [];
        //拼接參數
        for (var name in options){
            let value = options[name];
            params.push("--" + name);
            if (typeof value !== 'boolean'){
                params.push(value);
            }
        }
        params.push(projectName);
        //調用子程序運作ODM腳本
        let childProcess = spawn(command, params, {cwd: config.odm_path});
        childProcess
            .on('exit', (code, signal) => done(null, code, signal))
            .on('error', done);
        //将子程序的輸出指回主程序
        childProcess.stdout.on('data', chunk => outputReceived(chunk.toString()));
        childProcess.stderr.on('data', chunk => outputReceived(chunk.toString()));
        return childProcess;
    }
}
           

調用後處理腳本

與執行ODM腳本相比還更簡單一些:

function makeRunner(command, args){
    return function(options, done, outputReceived){
        let commandArgs = args;
        if (typeof commandArgs === 'function') commandArgs = commandArgs(options);
        // 運作指令,綁定輸入輸出
        let childProcess = spawn(command, commandArgs);
        childProcess
            .on('exit', (code, signal) => done(null, code, signal))
            .on('error', done);
        childProcess.stdout.on('data', chunk => outputReceived(chunk.toString()));
        childProcess.stderr.on('data', chunk => outputReceived(chunk.toString()));
        return childProcess;
    };
}

module.exports = {
    runPostProcessingScript: makeRunner(path.join(__dirname, "..", "scripts", "postprocess.sh"), options => [options.projectFolderPath])
};
           

将最終成果打壓縮包

const createZipArchive = (outputFilename, files) => {
    return (done) => {
    	 //輸出進度
        this.output.push(`Compressing ${outputFilename}\n`);
	 //建立寫入流
        let output = fs.createWriteStream(this.getAssetsArchivePath(outputFilename));
        let archive = archiver.create('zip', {
                zlib: { level: 1 } // 1是最快速度,因為這些圖檔的壓縮率已經很高了,沒有更多的壓縮空間了
            });
	 //壓縮完成就回調
        archive.on('finish', () => {
            done();
        });
	 //錯誤時完成
        archive.on('error', err => {
            done(err);
        });
	 //通過管道将zip流和輸出流綁定
        archive.pipe(output);
        let globs = [];
        //輸出路徑
        const sourcePath = this.getProjectFolderPath();
        //周遊傳入的所有檔案路徑
        files.forEach(file => {
            let filePath = path.join(sourcePath, file);
            // 跳過不存在的檔案
            if (!fs.existsSync(filePath)) return;
						//比對含有"*"的路徑,判斷是不是globs表達式
						//globs子產品可以像ls指令一樣查利用*.jpg這種表達式查找檔案
            let isGlob = /\*/.test(file)
            //判斷是不是檔案夾
            let isDirectory = !isGlob && fs.lstatSync(filePath).isDirectory();
            //直接壓縮檔案夾
            if (isDirectory){
                archive.directory(filePath, file);
            }else if (isGlob){
                globs.push(filePath);
            }else{
            	//直接壓縮檔案
                archive.file(filePath, {name: file});
            }
        });

        // 如果有globs表達式,就最後處理
        if (globs.length !== 0){
            let pending = globs.length;
	    //周遊每一種globs表達式
            globs.forEach(pattern => {
                glob(pattern, (err, files) => {
                    if (err) done(err);
                    else{
                        files.forEach(file => {
                            if (fs.lstatSync(file).isFile()){
                                archive.file(file, {name: path.basename(file)});
                            }else{
                                logger.debug(`Could not add ${file} from glob`);
                            }
                        });
                        //如果處理結束就完成壓縮
                        if (--pending === 0){
                            archive.finalize();
                        }
                    }
                });
            });
        }else{
            archive.finalize();
        }
    };
};
           

看完了子任務的實作,再來看看這些任務是如何排程的:

function start(done) {
    const finished = err => {
        this.stopTrackingProcessingTime();
        done(err);
    };
    //後處理任務
    const postProcess = () => {
        //執行後處理腳本
        const runPostProcessingScript = () => {
            return (done) => {
                //添加到runningProcesses數組中
                //runningProcesses存儲了所有子程序,可以友善統一管理
                this.runningProcesses.push(
                    //運作腳本,實作諸如切圖等對生成成功進行的操作
                    processRunner.runPostProcessingScript({
                        projectFolderPath: this.getProjectFolderPath()
                    }, (err, code, signal) => {
                        if (err) done(err);
                        else {
                            if (code === 0) done();
                            else done(new Error(`Process exited with code ${code}`));
                        }
                        }, output => {
                        //将運作日志導入output數組
                        this.output.push(output);
                    })
                );
            };
        };
        // 任務檔案夾下所有的子檔案夾名
        let allPaths = ['odm_orthophoto/odm_orthophoto.tif', 'odm_orthophoto/odm_orthophoto.mbtiles',
            'odm_georeferencing', 'odm_texturing',
            'odm_dem/dsm.tif', 'odm_dem/dtm.tif', 'dsm_tiles', 'dtm_tiles',
            'orthophoto_tiles', 'potree_pointcloud', 'images.json'
        ];
        //記錄所有要進行的後處理任務
        let tasks = [];

        //先運作後處理腳本,再執行檔案壓縮
        if (!this.skipPostProcessing) tasks.push(runPostProcessingScript());
        tasks.push(createZipArchive('all.zip', allPaths));
        async.series(tasks, (err) => {
            if (!err) {
                this.setStatus(statusCodes.COMPLETED);
                finished();
            } else {
                this.setStatus(statusCodes.FAILED);
                finished(err);
            }
        });
    };

    if (this.status.code === statusCodes.QUEUED) {
        //更新任務狀态與已運作時長
        this.startTrackingProcessingTime();
        this.setStatus(statusCodes.RUNNING);
        //建構任務參數
        let runnerOptions = this.options.reduce((result, opt) => {
            result[opt.name] = opt.value;
            return result;
        }, {});
        runnerOptions["project-path"] = fs.realpathSync(Directories.data);
        if (this.gcpFiles.length > 0) {
            runnerOptions.gcp = fs.realpathSync(path.join(this.getGcpFolderPath(), this.gcpFiles[0]));
        }
        //首先運作的是ODM腳本
        this.runningProcesses.push(odmRunner.run(runnerOptions, this.uuid, (err, code, signal) => {
            if (err) {
                this.setStatus(statusCodes.FAILED, {
                    errorMessage: `Could not start process (${err.message})`
                });
                finished(err);
            } else {
                //如果ODM腳本正常結束,則進行後處理
                if (this.status.code !== statusCodes.CANCELED) {
                    if (code === 0) {
                        postProcess();
                    } else {
                        this.setStatus(statusCodes.FAILED, {
                            errorMessage: `Process exited with code ${code}`
                        });
                        finished();
                    }
                } else {
                    finished();
                }
            }
        }, output => {
            //格式化輸出
            output = output.replace(/\x1b\[[0-9;]*m/g, "");
            output.trim().split('\n').forEach(line => {
                this.output.push(line.trim());
            });
        }));
        return true;
    } else {
        return false;
    }
}
           

響應指令

取消任務

取消任務是通過殺程序的方式,殺死正在運作的ODM程序,或後處理程序,當程式正處在最後的壓縮階段時,因為不是采用子程序的方式,取消是無效的.

cancel(cb){
        if (this.status.code !== statusCodes.CANCELED){
            let wasRunning = this.status.code === statusCodes.RUNNING;
            this.setStatus(statusCodes.CANCELED);

            if (wasRunning) {
                //殺死所有正在運作的子程序
                this.runningProcesses.forEach(proc => {
                    if (proc) kill(proc.pid);
                });
                this.runningProcesses = [];
            }
            //停止更新時間
            this.stopTrackingProcessingTime(true);
            cb(null);
        }else{
            cb(new Error("Task already cancelled"));
        }
    }
           

調用webhooks

因為ODM任務一般耗時很長,是以在任務結束時,可以通過網絡鈎子将結束資訊發送到指定端口,提醒任務已經結束

function callWebhooks() {
    const hooks = [this.webhook, config.webhook];
    //擷取任務的圖檔描述資訊
    this.readImagesDatabase((err, images) => {
        if (!images) images = [];
        //擷取任務的基本資訊和狀态
        let json = this.getInfo();
        json.images = images;
        //周遊所有的webhook
        hooks.forEach(hook => {
            if (hook && hook.length > 3) {
                //設定最多進行5次提醒調用
                const notifyCallback = (attempt) => {
                    //失敗超過5次就不再繼續嘗試
                    if (attempt > 5) {
                        logger.warn(`Webhook invokation failed, will not retry: ${hook}`);
                        return;
                    }
                    //發起請求,将任務資訊傳遞給該webhook
                    request.post(hook, {
                            json
                        },
                        (error, response) => {
                            if (error || response.statusCode != 200) {
                                logger.warn(`Webhook invokation failed, will retry in a bit: ${hook}`);
                                //出錯就隔一段時間再請求一次
                                setTimeout(() => {
                                    notifyCallback(attempt + 1);
                                }, attempt * 5000);
                            } else {
                                logger.debug(`Webhook invoked: ${hook}`);
                            }
                        });
                };
                notifyCallback(0);
            }
        });
    });
}
           

結語

NodeODM并不是個複雜的系統,但是關于任務的細節比較完善.盡管如此,在實際使用webODM調用NodeODM時,還是會遇到各種無法響應指令的問題.猜測原因在于通過子程序調用ODM腳本的方式是不能保證絕對的應答響應,NodeODM的正常運作不能保證ODM就能正常運作,甚至兩者都正常運作也難以保證兩者的通信鍊路是否不會出現中斷.

因為ODM本身是Python的庫,假設ODM本身就封裝了NodeODM這些對外的接口,webODM調用起來或許會更加穩定.

繼續閱讀