序
本文主要研究一下PowerJob的InstanceController
InstanceController
tech/powerjob/server/web/controller/InstanceController.java
@Slf4j
@RestController
@RequestMapping("/instance")
public class InstanceController {
@Resource
private InstanceService instanceService;
@Resource
private InstanceLogService instanceLogService;
@Resource
private CacheService cacheService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long appId,Long instanceId) {
instanceService.stopInstance(appId,instanceId);
return ResultDTO.success(null);
}
@GetMapping("/retry")
public ResultDTO<Void> retryInstance(String appId, Long instanceId) {
instanceService.retryInstance(Long.valueOf(appId), instanceId);
return ResultDTO.success(null);
}
@GetMapping("/detail")
public ResultDTO<InstanceDetailVO> getInstanceDetail(Long instanceId) {
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(instanceId)));
}
@GetMapping("/log")
public ResultDTO<StringPage> getInstanceLog(Long appId, Long instanceId, Long index) {
return ResultDTO.success(instanceLogService.fetchInstanceLog(appId, instanceId, index));
}
@GetMapping("/downloadLogUrl")
public ResultDTO<String> getDownloadUrl(Long appId, Long instanceId) {
return ResultDTO.success(instanceLogService.fetchDownloadUrl(appId, instanceId));
}
@GetMapping("/downloadLog")
public void downloadLogFile(Long instanceId , HttpServletResponse response) throws Exception {
File file = instanceLogService.downloadInstanceLog(instanceId);
OmsFileUtils.file2HttpResponse(file, response);
}
@GetMapping("/downloadLog4Console")
@SneakyThrows
public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) {
// 擷取内部下載下傳連結
String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId);
// 先下載下傳到本機
String logFilePath = OmsFileUtils.genTemporaryWorkPath() + String.format("powerjob-%s-%s.log", appId, instanceId);
File logFile = new File(logFilePath);
try {
FileUtils.copyURLToFile(new URL(downloadUrl), logFile);
// 再推送到浏覽器
OmsFileUtils.file2HttpResponse(logFile, response);
} finally {
FileUtils.forceDelete(logFile);
}
}
@PostMapping("/list")
public ResultDTO<PageResult<InstanceInfoVO>> list(@RequestBody QueryInstanceRequest request) {
Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort);
InstanceInfoDO queryEntity = new InstanceInfoDO();
BeanUtils.copyProperties(request, queryEntity);
queryEntity.setType(request.getType().getV());
if (!StringUtils.isEmpty(request.getStatus())) {
queryEntity.setStatus(InstanceStatus.valueOf(request.getStatus()).getV());
}
Page<InstanceInfoDO> pageResult = instanceInfoRepository.findAll(Example.of(queryEntity), pageable);
return ResultDTO.success(convertPage(pageResult));
}
private PageResult<InstanceInfoVO> convertPage(Page<InstanceInfoDO> page) {
List<InstanceInfoVO> content = page.getContent().stream()
.map(x -> InstanceInfoVO.from(x, cacheService.getJobName(x.getJobId()))).collect(Collectors.toList());
PageResult<InstanceInfoVO> pageResult = new PageResult<>(page);
pageResult.setData(content);
return pageResult;
}
}
InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托給了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托給了instanceLogService;list委托給了instanceInfoRepository
InstanceService
tech/powerjob/server/core/instance/InstanceService.java
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceService {
private final TransportService transportService;
private final DispatchService dispatchService;
private final IdGenerateService idGenerateService;
private final InstanceManager instanceManager;
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
//......
}
InstanceService提供了stopInstance、retryInstance、cancelInstance等方法
stopInstance
@DesignateServer
public void stopInstance(Long appId,Long instanceId) {
log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
// 判斷狀态,隻有運作中才能停止
if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(instanceInfo.getStatus())) {
throw new IllegalArgumentException("can't stop finished instance!");
}
// 更新資料庫,将狀态置為停止
instanceInfo.setStatus(STOPPED.getV());
instanceInfo.setGmtModified(new Date());
instanceInfo.setFinishedTime(System.currentTimeMillis());
instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceInfoRepository.saveAndFlush(instanceInfo);
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);
/*
不可靠通知停止 TaskTracker
假如沒有成功關閉,之後 TaskTracker 會再次 reportStatus,按照流程,instanceLog 會被更新為 RUNNING,開發者可以再次手動關閉
*/
Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
if (workerInfoOpt.isPresent()) {
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
WorkerInfo workerInfo = workerInfoOpt.get();
transportService.tell(workerInfo.getProtocol(), ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()), req);
log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId);
} else {
log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId);
}
} catch (IllegalArgumentException ie) {
throw ie;
} catch (Exception e) {
log.error("[Instance-{}] stopInstance failed.", instanceId, e);
throw e;
}
}
stopInstance先通過fetchInstanceInfo擷取instanceInfo,然後判斷狀态是否是運作中,是則更新status為STOPPED,然後通過instanceManager.processFinishedInstance完成收尾工作,對于能找到WorkerInfo的,發起ServerStopInstanceReq請求
retryInstance
@DesignateServer
public void retryInstance(Long appId, Long instanceId) {
log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.FINISHED_STATUS.contains(instanceInfo.getStatus())) {
throw new PowerJobException("Only stopped instance can be retry!");
}
// 暫時不支援工作流任務的重試
if (instanceInfo.getWfInstanceId() != null) {
throw new PowerJobException("Workflow's instance do not support retry!");
}
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
instanceInfo.setExpectedTriggerTime(System.currentTimeMillis());
instanceInfo.setFinishedTime(null);
instanceInfo.setActualTriggerTime(null);
instanceInfo.setTaskTrackerAddress(null);
instanceInfo.setResult(null);
instanceInfoRepository.saveAndFlush(instanceInfo);
// 派發任務
Long jobId = instanceInfo.getJobId();
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
dispatchService.dispatch(jobInfo, instanceId,Optional.of(instanceInfo),Optional.empty());
}
retryInstance先拉取instanceInfo,判斷狀态是不是FINISHED_STATUS,是則更新status為WAITING_DISPATCH,然後通過dispatchService.dispatch進行派發
cancelInstance
@DesignateServer
public void cancelInstance(Long appId, Long instanceId) {
log.info("[Instance-{}] try to cancel the instance with appId {}.", instanceId, appId);
try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
TimerFuture timerFuture = InstanceTimeWheelService.fetchTimerFuture(instanceId);
boolean success;
// 本機時間輪中存在該任務且順利取消,搶救成功!
if (timerFuture != null) {
success = timerFuture.cancel();
} else {
// 調用該接口時間和預計排程時間相近時,理論上會出現問題,cancel 狀态還沒寫進去另一邊就完成了 dispatch,随後狀态會被覆寫
// 解決該問題的成本極高(分布式鎖),是以選擇不解決
// 該接口使用條件:調用接口時間與待取消任務的預計執行時間有一定時間間隔,否則不保證可靠性
success = InstanceStatus.WAITING_DISPATCH.getV() == instanceInfo.getStatus();
}
if (success) {
instanceInfo.setStatus(InstanceStatus.CANCELED.getV());
instanceInfo.setResult(SystemInstanceResult.CANCELED_BY_USER);
// 如果寫 DB 失敗,抛異常,接口傳回 false,即取消失敗,任務會被 HA 機制重新排程執行,是以此處不需要任何處理
instanceInfoRepository.saveAndFlush(instanceInfo);
log.info("[Instance-{}] cancel the instance successfully.", instanceId);
} else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId);
throw new PowerJobException("instance already up and running");
}
} catch (Exception e) {
log.error("[Instance-{}] cancelInstance failed.", instanceId, e);
throw e;
}
}
cancelInstance通過InstanceTimeWheelService.fetchTimerFuture擷取timerFuture,對于timerFuture不為null的直接cancel;然後更新status為CANCELED,最後儲存
InstanceLogService
tech/powerjob/server/core/instance/InstanceLogService.java
@Slf4j
@Service
public class InstanceLogService {
@Value("${server.port}")
private int port;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
private GridFsManager gridFsManager;
/**
* 本地資料庫操作bean
*/
@Resource(name = "localTransactionTemplate")
private TransactionTemplate localTransactionTemplate;
@Resource
private LocalInstanceLogRepository localInstanceLogRepository;
/**
* 本地維護了線上日志的任務執行個體ID
*/
private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();
@Resource(name = PJThreadPool.BACKGROUND_POOL)
private AsyncTaskExecutor powerJobBackgroundPool;
/**
* 分段鎖
*/
private final SegmentLock segmentLock = new SegmentLock(8);
/**
* 格式化時間戳
*/
private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
/**
* 每一個展示的行數
*/
private static final int MAX_LINE_COUNT = 100;
/**
* 過期時間
*/
private static final long EXPIRE_INTERVAL_MS = 60000;
//......
}
InstanceLogService提供了fetchInstanceLog、fetchDownloadUrl、downloadInstanceLog等方法
fetchInstanceLog
@DesignateServer
public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
try {
Future<File> fileFuture = prepareLogFile(instanceId);
// 逾時并不會打斷正在執行的任務
File logFile = fileFuture.get(5, TimeUnit.SECONDS);
// 分頁展示資料
long lines = 0;
StringBuilder sb = new StringBuilder();
String lineStr;
long left = index * MAX_LINE_COUNT;
long right = left + MAX_LINE_COUNT;
try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) {
while ((lineStr = lr.readLine()) != null) {
// 指定範圍内,讀出
if (lines >= left && lines < right) {
sb.append(lineStr).append(System.lineSeparator());
}
++lines;
}
}catch (Exception e) {
log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e);
return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
}
double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
return new StringPage(index, (long) totalPage, sb.toString());
}catch (TimeoutException te) {
return StringPage.simple("log file is being prepared, please try again later.");
}catch (Exception e) {
log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e);
return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
}
}
private Future<File> prepareLogFile(long instanceId) {
return powerJobBackgroundPool.submit(() -> {
// 線上日志還在不斷更新,需要使用本地資料庫中的資料
if (instanceId2LastReportTime.containsKey(instanceId)) {
return genTemporaryLogFile(instanceId);
}
return genStableLogFile(instanceId);
});
}
fetchInstanceLog先通過prepareLogFile準備日志檔案,對于還在更新的則執行genTemporaryLogFile,否則執行genStableLogFile;本地資料庫存在的則直接下載下傳,否則判斷gridFsManager是否可用,可用則從gridFsManager取
fetchDownloadUrl
@DesignateServer
public String fetchDownloadUrl(Long appId, Long instanceId) {
String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
return url;
}
fetchDownloadUrl則傳回/instance/downloadLog下載下傳url
downloadInstanceLog
public File downloadInstanceLog(long instanceId) throws Exception {
Future<File> fileFuture = prepareLogFile(instanceId);
return fileFuture.get(1, TimeUnit.MINUTES);
}
downloadInstanceLog則是通過prepareLogFile準備檔案,然後等待1分鐘
小結
InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托給了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托給了instanceLogService;list委托給了instanceInfoRepository。