天天看點

Elastic-Job-Lite 源碼分析 —— 作業監控服務1. 概述2. MonitorService

1. 概述

本文主要分享 Elastic-Job-Lite 作業監控服務。内容對應《官方文檔 —— DUMP作業運作資訊》。

使用Elastic-Job-Lite過程中可能會碰到一些分布式問題,導緻作業運作不穩定。

由于無法在生産環境調試,通過dump指令可以把作業内部相關資訊dump出來,友善開發者debug分析; 另外為了不洩露隐私,已将相關資訊中的ip位址以ip1, ip2…的形式過濾,可以在網際網路上公開傳輸環境資訊,便于進一步完善Elastic-Job。

涉及到主要類的類圖如下( 打開大圖 ):

Elastic-Job-Lite 源碼分析 —— 作業監控服務1. 概述2. MonitorService
  • 在 Elastic-Job-lite 裡,作業監控服務( MonitorService ) 實作了DUMP作業運作資訊功能。

你行好事會因為得到贊賞而愉悅

同理,開源項目貢獻者會因為 Star 而更加有動力

為 Elastic-Job 點贊!傳送門

2. MonitorService

MonitorService,作業監控服務。

初始化 MonitorService 方法實作如下:

// MonitorService.java
private final String jobName;

public void listen() {
   int port = configService.load(true).getMonitorPort();
   if (port < 0) {
       return;
   }
   try {
       log.info("Elastic job: Monitor service is running, the port is '{}'", port);
       openSocketForMonitor(port);
   } catch (final IOException ex) {
       log.error("Elastic job: Monitor service listen failure, error is: ", ex);
   }
}
    
private void openSocketForMonitor(final int port) throws IOException {
   serverSocket = new ServerSocket(port);
   new Thread() {
       
       @Override
       public void run() {
           while (!closed) {
               try {
                   process(serverSocket.accept());
               } catch (final IOException ex) {
                   log.error("Elastic job: Monitor service open socket for monitor failure, error is: ", ex);
               }
           }
       }
   }.start();
}
           
  • 在作業配置的監控服務端口屬性( 

    LiteJobConfiguration.monitorPort

     )啟動 ServerSocket。一個作業對應一個作業監控端口,是以配置時,請不要重複端口噢。

處理 dump指令 方法如下:

// MonitorService.java
private void process(final Socket socket) throws IOException {
   try (
           BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
           BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
           Socket autoCloseSocket = socket) {
       // 讀取指令
       String cmdLine = reader.readLine();
       if (null != cmdLine && DUMP_COMMAND.equalsIgnoreCase(cmdLine)) { // DUMP
           List<String> result = new ArrayList<>();
           dumpDirectly("/" + jobName, result);
           outputMessage(writer, Joiner.on("\n").join(SensitiveInfoUtils.filterSensitiveIps(result)) + "\n");
       }
   }
}
           
  • #process()

     方法,目前隻支援 

    DUMP

     指令。如果你有自定義指令的需要,可以拓展該方法。
  • 調用 

    #dumpDirectly()

     方法,輸出目前作業名對應的相關調試資訊。
private void dumpDirectly(final String path, final List<String> result) {
   for (String each : regCenter.getChildrenKeys(path)) {
       String zkPath = path + "/" + each;
       String zkValue = regCenter.get(zkPath);
       if (null == zkValue) {
           zkValue = "";
       }
       TreeCache treeCache = (TreeCache) regCenter.getRawCache("/" + jobName);
       ChildData treeCacheData = treeCache.getCurrentData(zkPath);
       String treeCachePath =  null == treeCacheData ? "" : treeCacheData.getPath();
       String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData());
       // 判斷 TreeCache緩存 和 注冊中心 資料一緻
       if (zkValue.equals(treeCacheValue) && zkPath.equals(treeCachePath)) {
           result.add(Joiner.on(" | ").join(zkPath, zkValue));
       } else {
           result.add(Joiner.on(" | ").join(zkPath, zkValue, treeCachePath, treeCacheValue));
       }
       // 遞歸
       dumpDirectly(zkPath, result);
   }
}
           
    • 當作業本地 TreeCache緩存 和注冊中心資料不一緻時,DUMP 出 [zkPath, zkValue, treeCachePath, treeCacheValue]。當相同時,隻需 DUMP 出 [zkPath, zkValue],友善看出本地和注冊中心是否存在資料差異。
  • DUMP 資訊例子如下:
Yunai-MacdeMacBook-Pro-2:elastic-job yunai$ echo "dump" | nc 127.0.0.1 10024
/javaSimpleJob/sharding | 
/javaSimpleJob/sharding/2 | 
/javaSimpleJob/sharding/2/instance | [email protected]@5100
/javaSimpleJob/sharding/1 | 
/javaSimpleJob/sharding/1/instance | [email protected]@5100
/javaSimpleJob/sharding/0 | 
/javaSimpleJob/sharding/0/instance | [email protected]@5100
/javaSimpleJob/servers | 
/javaSimpleJob/servers/ip2 | 
/javaSimpleJob/servers/ip198 | 
/javaSimpleJob/leader | 
/javaSimpleJob/leader/sharding | 
/javaSimpleJob/leader/failover | 
/javaSimpleJob/leader/failover/latch | 
/javaSimpleJob/leader/failover/items | 
/javaSimpleJob/leader/election | 
/javaSimpleJob/leader/election/latch | 
/javaSimpleJob/leader/election/instance | [email protected]@5100
/javaSimpleJob/instances | 
/javaSimpleJob/instances/[email protected]@5100 | 
/javaSimpleJob/config | {"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0 0/2 * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":false,"maxTimeDiffSeconds":-1,"monitorPort":10024,"jobShardingStrategyClass":"com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}
           

繼續閱讀