天天看點

聊聊PowerJob的InstanceController

作者:碼匠亂炖

本文主要研究一下PowerJob的InstanceController

InstanceController

tech/powerjob/server/web/controller/InstanceController.java

@Slf4j
@RestController
@RequestMapping("/instance")
public class InstanceController {



    @Resource
    private InstanceService instanceService;
    @Resource
    private InstanceLogService instanceLogService;

    @Resource
    private CacheService cacheService;
    @Resource
    private InstanceInfoRepository instanceInfoRepository;

    @GetMapping("/stop")
    public ResultDTO<Void> stopInstance(Long appId,Long instanceId) {
        instanceService.stopInstance(appId,instanceId);
        return ResultDTO.success(null);
    }

    @GetMapping("/retry")
    public ResultDTO<Void> retryInstance(String appId, Long instanceId) {
        instanceService.retryInstance(Long.valueOf(appId), instanceId);
        return ResultDTO.success(null);
    }

    @GetMapping("/detail")
    public ResultDTO<InstanceDetailVO> getInstanceDetail(Long instanceId) {
        return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(instanceId)));
    }

    @GetMapping("/log")
    public ResultDTO<StringPage> getInstanceLog(Long appId, Long instanceId, Long index) {
        return ResultDTO.success(instanceLogService.fetchInstanceLog(appId, instanceId, index));
    }

    @GetMapping("/downloadLogUrl")
    public ResultDTO<String> getDownloadUrl(Long appId, Long instanceId) {
        return ResultDTO.success(instanceLogService.fetchDownloadUrl(appId, instanceId));
    }

    @GetMapping("/downloadLog")
    public void downloadLogFile(Long instanceId , HttpServletResponse response) throws Exception {

        File file = instanceLogService.downloadInstanceLog(instanceId);
        OmsFileUtils.file2HttpResponse(file, response);
    }

    @GetMapping("/downloadLog4Console")
    @SneakyThrows
    public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) {
        // 擷取内部下載下傳連結
        String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId);
        // 先下載下傳到本機
        String logFilePath = OmsFileUtils.genTemporaryWorkPath() + String.format("powerjob-%s-%s.log", appId, instanceId);
        File logFile = new File(logFilePath);

        try {
            FileUtils.copyURLToFile(new URL(downloadUrl), logFile);

            // 再推送到浏覽器
            OmsFileUtils.file2HttpResponse(logFile, response);
        } finally {
            FileUtils.forceDelete(logFile);
        }
    }

    @PostMapping("/list")
    public ResultDTO<PageResult<InstanceInfoVO>> list(@RequestBody QueryInstanceRequest request) {

        Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
        PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort);

        InstanceInfoDO queryEntity = new InstanceInfoDO();
        BeanUtils.copyProperties(request, queryEntity);
        queryEntity.setType(request.getType().getV());

        if (!StringUtils.isEmpty(request.getStatus())) {
            queryEntity.setStatus(InstanceStatus.valueOf(request.getStatus()).getV());
        }

        Page<InstanceInfoDO> pageResult = instanceInfoRepository.findAll(Example.of(queryEntity), pageable);
        return ResultDTO.success(convertPage(pageResult));
    }

    private PageResult<InstanceInfoVO> convertPage(Page<InstanceInfoDO> page) {
        List<InstanceInfoVO> content = page.getContent().stream()
                .map(x -> InstanceInfoVO.from(x, cacheService.getJobName(x.getJobId()))).collect(Collectors.toList());

        PageResult<InstanceInfoVO> pageResult = new PageResult<>(page);
        pageResult.setData(content);
        return pageResult;
    }

}
           
InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托給了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托給了instanceLogService;list委托給了instanceInfoRepository

InstanceService

tech/powerjob/server/core/instance/InstanceService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceService {

    private final TransportService transportService;

    private final DispatchService dispatchService;

    private final IdGenerateService idGenerateService;

    private final InstanceManager instanceManager;

    private final JobInfoRepository jobInfoRepository;

    private final InstanceInfoRepository instanceInfoRepository;

    private final WorkerClusterQueryService workerClusterQueryService;

    //......
}    
           
InstanceService提供了stopInstance、retryInstance、cancelInstance等方法

stopInstance

@DesignateServer
    public void stopInstance(Long appId,Long instanceId) {

        log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
        try {

            InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
            // 判斷狀态,隻有運作中才能停止
            if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(instanceInfo.getStatus())) {
                throw new IllegalArgumentException("can't stop finished instance!");
            }

            // 更新資料庫,将狀态置為停止
            instanceInfo.setStatus(STOPPED.getV());
            instanceInfo.setGmtModified(new Date());
            instanceInfo.setFinishedTime(System.currentTimeMillis());
            instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER);
            instanceInfoRepository.saveAndFlush(instanceInfo);

            instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);

            /*
            不可靠通知停止 TaskTracker
            假如沒有成功關閉,之後 TaskTracker 會再次 reportStatus,按照流程,instanceLog 會被更新為 RUNNING,開發者可以再次手動關閉
             */
            Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
            if (workerInfoOpt.isPresent()) {
                ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
                WorkerInfo workerInfo = workerInfoOpt.get();
                transportService.tell(workerInfo.getProtocol(), ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()), req);
                log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId);
            } else {
                log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId);
            }

        } catch (IllegalArgumentException ie) {
            throw ie;
        } catch (Exception e) {
            log.error("[Instance-{}] stopInstance failed.", instanceId, e);
            throw e;
        }
    }
           
stopInstance先通過fetchInstanceInfo擷取instanceInfo,然後判斷狀态是否是運作中,是則更新status為STOPPED,然後通過instanceManager.processFinishedInstance完成收尾工作,對于能找到WorkerInfo的,發起ServerStopInstanceReq請求

retryInstance

@DesignateServer
    public void retryInstance(Long appId, Long instanceId) {

        log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);

        InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
        if (!InstanceStatus.FINISHED_STATUS.contains(instanceInfo.getStatus())) {
            throw new PowerJobException("Only stopped instance can be retry!");
        }
        // 暫時不支援工作流任務的重試
        if (instanceInfo.getWfInstanceId() != null) {
            throw new PowerJobException("Workflow's instance do not support retry!");
        }

        instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
        instanceInfo.setExpectedTriggerTime(System.currentTimeMillis());
        instanceInfo.setFinishedTime(null);
        instanceInfo.setActualTriggerTime(null);
        instanceInfo.setTaskTrackerAddress(null);
        instanceInfo.setResult(null);
        instanceInfoRepository.saveAndFlush(instanceInfo);

        // 派發任務
        Long jobId = instanceInfo.getJobId();
        JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
        dispatchService.dispatch(jobInfo, instanceId,Optional.of(instanceInfo),Optional.empty());
    }
           
retryInstance先拉取instanceInfo,判斷狀态是不是FINISHED_STATUS,是則更新status為WAITING_DISPATCH,然後通過dispatchService.dispatch進行派發

cancelInstance

@DesignateServer
    public void cancelInstance(Long appId, Long instanceId) {
        log.info("[Instance-{}] try to cancel the instance with appId {}.", instanceId, appId);

        try {
            InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
            TimerFuture timerFuture = InstanceTimeWheelService.fetchTimerFuture(instanceId);

            boolean success;
            // 本機時間輪中存在該任務且順利取消,搶救成功!
            if (timerFuture != null) {
                success = timerFuture.cancel();
            } else {
                // 調用該接口時間和預計排程時間相近時,理論上會出現問題,cancel 狀态還沒寫進去另一邊就完成了 dispatch,随後狀态會被覆寫
                // 解決該問題的成本極高(分布式鎖),是以選擇不解決
                // 該接口使用條件:調用接口時間與待取消任務的預計執行時間有一定時間間隔,否則不保證可靠性
                success = InstanceStatus.WAITING_DISPATCH.getV() == instanceInfo.getStatus();
            }

            if (success) {
                instanceInfo.setStatus(InstanceStatus.CANCELED.getV());
                instanceInfo.setResult(SystemInstanceResult.CANCELED_BY_USER);
                // 如果寫 DB 失敗,抛異常,接口傳回 false,即取消失敗,任務會被 HA 機制重新排程執行,是以此處不需要任何處理
                instanceInfoRepository.saveAndFlush(instanceInfo);
                log.info("[Instance-{}] cancel the instance successfully.", instanceId);
            } else {
                log.warn("[Instance-{}] cancel the instance failed.", instanceId);
                throw new PowerJobException("instance already up and running");
            }

        } catch (Exception e) {
            log.error("[Instance-{}] cancelInstance failed.", instanceId, e);
            throw e;
        }
    }
           
cancelInstance通過InstanceTimeWheelService.fetchTimerFuture擷取timerFuture,對于timerFuture不為null的直接cancel;然後更新status為CANCELED,最後儲存

InstanceLogService

tech/powerjob/server/core/instance/InstanceLogService.java

@Slf4j
@Service
public class InstanceLogService {

    @Value("${server.port}")
    private int port;

    @Resource
    private InstanceMetadataService instanceMetadataService;

    @Resource
    private GridFsManager gridFsManager;
    /**
     * 本地資料庫操作bean
     */
    @Resource(name = "localTransactionTemplate")
    private TransactionTemplate localTransactionTemplate;

    @Resource
    private LocalInstanceLogRepository localInstanceLogRepository;

    /**
     * 本地維護了線上日志的任務執行個體ID
     */
    private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();

    @Resource(name = PJThreadPool.BACKGROUND_POOL)
    private AsyncTaskExecutor powerJobBackgroundPool;

    /**
     *  分段鎖
     */
    private final SegmentLock segmentLock = new SegmentLock(8);

    /**
     * 格式化時間戳
     */
    private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
    /**
     * 每一個展示的行數
     */
    private static final int MAX_LINE_COUNT = 100;
    /**
     * 過期時間
     */
    private static final long EXPIRE_INTERVAL_MS = 60000;

    //......
}    
           
InstanceLogService提供了fetchInstanceLog、fetchDownloadUrl、downloadInstanceLog等方法

fetchInstanceLog

@DesignateServer
    public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
        try {
            Future<File> fileFuture = prepareLogFile(instanceId);
            // 逾時并不會打斷正在執行的任務
            File logFile = fileFuture.get(5, TimeUnit.SECONDS);

            // 分頁展示資料
            long lines = 0;
            StringBuilder sb = new StringBuilder();
            String lineStr;
            long left = index * MAX_LINE_COUNT;
            long right = left + MAX_LINE_COUNT;
            try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) {
                while ((lineStr = lr.readLine()) != null) {

                    // 指定範圍内,讀出
                    if (lines >= left && lines < right) {
                        sb.append(lineStr).append(System.lineSeparator());
                    }
                    ++lines;
                }
            }catch (Exception e) {
                log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e);
                return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
            }

            double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
            return new StringPage(index, (long) totalPage, sb.toString());

        }catch (TimeoutException te) {
            return StringPage.simple("log file is being prepared, please try again later.");
        }catch (Exception e) {
            log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e);
            return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
        }
    }

    private Future<File> prepareLogFile(long instanceId) {
        return powerJobBackgroundPool.submit(() -> {
            // 線上日志還在不斷更新,需要使用本地資料庫中的資料
            if (instanceId2LastReportTime.containsKey(instanceId)) {
                return genTemporaryLogFile(instanceId);
            }
            return genStableLogFile(instanceId);
        });
    }    
           
fetchInstanceLog先通過prepareLogFile準備日志檔案,對于還在更新的則執行genTemporaryLogFile,否則執行genStableLogFile;本地資料庫存在的則直接下載下傳,否則判斷gridFsManager是否可用,可用則從gridFsManager取

fetchDownloadUrl

@DesignateServer
    public String fetchDownloadUrl(Long appId, Long instanceId) {
        String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
        log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
        return url;
    }
           
fetchDownloadUrl則傳回/instance/downloadLog下載下傳url

downloadInstanceLog

public File downloadInstanceLog(long instanceId) throws Exception {
        Future<File> fileFuture = prepareLogFile(instanceId);
        return fileFuture.get(1, TimeUnit.MINUTES);
    }
           
downloadInstanceLog則是通過prepareLogFile準備檔案,然後等待1分鐘

小結

InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托給了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托給了instanceLogService;list委托給了instanceInfoRepository。