天天看點

聊聊PowerJob的SystemInfoController

作者:碼匠亂炖

本文主要研究一下PowerJob的SystemInfoController

SystemInfoController

tech/powerjob/server/web/controller/SystemInfoController.java

@Slf4j
@RestController
@RequestMapping("/system")
@RequiredArgsConstructor
public class SystemInfoController {

    private final JobInfoRepository jobInfoRepository;

    private final InstanceInfoRepository instanceInfoRepository;

    private final ServerInfoService serverInfoService;

    private final WorkerClusterQueryService workerClusterQueryService;

    @GetMapping("/listWorker")
    public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {

        List<WorkerInfo> workerInfos = workerClusterQueryService.getAllWorkers(appId);
        return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList()));
    }

    @GetMapping("/overview")
    public ResultDTO<SystemOverviewVO> getSystemOverview(Long appId) {

        SystemOverviewVO overview = new SystemOverviewVO();

        // 總任務數量
        overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV()));
        // 運作任務數
        overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
        // 近期失敗任務數(24H内)
        Date date = DateUtils.addDays(new Date(), -1);
        overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));

        // 伺服器時區
        overview.setTimezone(TimeZone.getDefault().getDisplayName());
        // 伺服器時間
        overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));

        overview.setServerInfo(serverInfoService.fetchServiceInfo());

        return ResultDTO.success(overview);
    }

}
           
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker則是根據目前登入的appId來擷取其WorkerInfo;getSystemOverview則是統計了目前appId的總任務數量、運作任務數、近期失敗任務數

getAllWorkers

tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

@DesignateServer
    public List<WorkerInfo> getAllWorkers(Long appId) {
        List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
        workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
        return workers;
    }

    private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {
        ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);
        if (clusterStatusHolder == null) {
            log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
            return Collections.emptyMap();
        }
        return clusterStatusHolder.getAllWorkers();
    }

    public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
        return WorkerClusterManagerService.getAppId2ClusterStatus();
    }        
           
getAllWorkers通過getWorkerInfosByAppId擷取WorkerInfo,然後根據getSystemMetrics().calculateScore()進行排序

WorkerClusterManagerService

tech/powerjob/server/remote/worker/WorkerClusterManagerService.java

@Slf4j
public class WorkerClusterManagerService {

    /**
     * 存儲Worker健康資訊,appId -> ClusterStatusHolder
     */
    private static final Map<Long, ClusterStatusHolder> APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap();

    /**
     * 更新狀态
     * @param heartbeat Worker的心跳包
     */
    public static void updateStatus(WorkerHeartbeat heartbeat) {
        Long appId = heartbeat.getAppId();
        String appName = heartbeat.getAppName();
        ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
        clusterStatusHolder.updateStatus(heartbeat);
    }

    /**
     * 清理不需要的worker資訊
     * @param usingAppIds 需要維護的appId,其餘的資料将被删除
     */
    public static void clean(List<Long> usingAppIds) {
        Set<Long> keys = Sets.newHashSet(usingAppIds);
        APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
    }


    /**
     * 清理緩存資訊,防止 OOM
     */
    public static void cleanUp() {
        APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release);
    }

    protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
        return APP_ID_2_CLUSTER_STATUS;
    }

}
           
WorkerClusterManagerService定義了APP_ID_2_CLUSTER_STATUS,維護了appId到具體ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然後執行clusterStatusHolder.updateStatus(heartbeat)

updateStatus

tech/powerjob/server/remote/worker/ClusterStatusHolder.java

public void updateStatus(WorkerHeartbeat heartbeat) {

        String workerAddress = heartbeat.getWorkerAddress();
        long heartbeatTime = heartbeat.getHeartbeatTime();

        WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
            WorkerInfo wf = new WorkerInfo();
            wf.refresh(heartbeat);
            return wf;
        });
        long oldTime = workerInfo.getLastActiveTime();
        if (heartbeatTime < oldTime) {
            log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
            return;
        }

        workerInfo.refresh(heartbeat);

        List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
        if (!CollectionUtils.isEmpty(containerInfos)) {
            containerInfos.forEach(containerInfo -> {
                Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                infos.put(workerAddress, containerInfo);
            });
        }
    }
           
updateStatus方法先根據workerAddress擷取workerInfo,若heartbeatTime大于等于lastActiveTime則執行workerInfo.refresh(heartbeat),同時更新containerInfos

getSystemMetrics

tech/powerjob/worker/common/utils/SystemInfoUtils.java

public class SystemInfoUtils {

    private static final NumberFormat NF = NumberFormat.getNumberInstance();
    static {
        NF.setMaximumFractionDigits(4);
        NF.setMinimumFractionDigits(4);
        NF.setRoundingMode(RoundingMode.HALF_UP);
        // 不按照千分位輸出
        NF.setGroupingUsed(false);
    }

    // JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway.
    private static final Runtime runtime = Runtime.getRuntime();
    private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();

    public static SystemMetrics getSystemMetrics() {

        SystemMetrics metrics = new SystemMetrics();

        fillCPUInfo(metrics);
        fillMemoryInfo(metrics);
        fillDiskInfo(metrics);

        // 在Worker完成分數計算,減小Server壓力
        metrics.calculateScore();
        return metrics;
    }

    private static void fillCPUInfo(SystemMetrics metrics) {
        metrics.setCpuProcessors(osMXBean.getAvailableProcessors());
        metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage()));
    }

    private static void fillMemoryInfo(SystemMetrics metrics) {
        // JVM記憶體資訊(maxMemory指JVM能從作業系統擷取的最大記憶體,即-Xmx參數設定的值,totalMemory指JVM目前持久的總記憶體)
        long maxMemory = runtime.maxMemory();
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        metrics.setJvmMaxMemory(bytes2GB(maxMemory));
        // 已使用記憶體:目前申請總量 - 目前空餘量
        metrics.setJvmUsedMemory(bytes2GB(usedMemory));
        // 已用記憶體比例
        metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory));
    }

    private static void fillDiskInfo(SystemMetrics metrics) {
        long free = 0;
        long total = 0;
        File[] roots = File.listRoots();
        for (File file : roots) {
            free += file.getFreeSpace();
            total += file.getTotalSpace();
        }

        metrics.setDiskUsed(bytes2GB(total - free));
        metrics.setDiskTotal(bytes2GB(total));
        metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal()));
    }

    private static double bytes2GB(long bytes) {
        return miniDouble(bytes / 1024.0 / 1024 / 1024);
    }

    private static double miniDouble(double origin) {
        return Double.parseDouble(NF.format(origin));
    }

}
           
SystemInfoUtils提供了getSystemMetrics方法,它通過fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk資訊,最後執行metrics.calculateScore();cpu資訊通過osMXBean.getAvailableProcessors()、osMXBean.getSystemLoadAverage()擷取;memory資訊通過Runtime擷取;disk資訊則通過周遊File.listRoots()去統計freeSpace及totalSpace

calculateScore

tech/powerjob/common/model/SystemMetrics.java

public int calculateScore() {
        if (score > 0) {
            return score;
        }
        // Memory is vital to TaskTracker, so we set the multiplier factor as 2.
        double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;
        // Calculate the remaining load of CPU. Multiplier is set as 1.
        double cpuScore = cpuProcessors - cpuLoad;
        // Windows can not fetch CPU load, set cpuScore as 1.
        if (cpuScore > cpuProcessors) {
            cpuScore = 1;
        }
        score = (int) (memScore + cpuScore);
        return score;
    }
           
SystemMetrics的calculateScore則是由memScore、cpuScore兩部分相加而成;memScore為(jvmMaxMemory - jvmUsedMemory) * 2,cpuScore為cpuProcessors - cpuLoad

小結

SystemInfoController提供了listWorker、getSystemOverview方法;listWorker則是根據目前登入的appId來擷取其WorkerInfo;getSystemOverview則是統計了目前appId的總任務數量、運作任務數、近期失敗任務數;WorkerClusterManagerService定義了APP_ID_2_CLUSTER_STATUS,維護了appId到具體ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然後執行clusterStatusHolder.updateStatus(heartbeat);WorkerInfo包含了SystemMetrics,SystemInfoUtils提供了getSystemMetrics方法,它通過fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk資訊,最後執行metrics.calculateScore()。