天天看點

dolphinscheduer源碼解析-master啟動dolphinscheduer源碼解析-master啟動

dolphinscheduer源碼解析-master啟動

文章目錄

  • dolphinscheduer源碼解析-master啟動
    • MasterServer類定義

MasterServer類定義

/**
 *  master server
 */
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
        @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
                "org.apache.dolphinscheduler.server.worker.*",
                "org.apache.dolphinscheduler.server.monitor.*",
                "org.apache.dolphinscheduler.server.log.*"
        })
})
@EnableTransactionManagement
public class MasterServer implements IStoppable 
           

可以看出在MasterServer服務中掃描的類為org.apache.dolphinscheduler包下的類,但是又排除了一下worker服務monitor服務和log服務。并且開啟了事務機制。

MasterServer屬性

private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);

@Autowired
private MasterConfig masterConfig;

@Autowired
private SpringApplicationContext springApplicationContext;

private NettyRemotingServer nettyRemotingServer;

@Autowired
private MasterRegistryClient masterRegistryClient;

@Autowired
private MasterSchedulerService masterSchedulerService;
           

可以看出這裡面一共有兩大服務,NettyRemoteServer [點選打開],masterSchedulerService [點選打開] 其實還有一個quartz服務但是沒有在屬性上面,一個用戶端。 其中

NettyRemotingServer

是沒有托管給spring建立的而是在run方法中自行建立。

啟動Master服務方法如下。它通過@PostConstruct注解在伺服器加載這個servlet的時候在構造函數之後執行

@PostConstruct
public void run() {
    // init remoting server
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(masterConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
    this.nettyRemotingServer.start();

    // self tolerant
    this.masterRegistryClient.start();
    this.masterRegistryClient.setRegistryStoppable(this);

    // scheduler start
    this.masterSchedulerService.start();

    // start QuartzExecutors
    // what system should do if exception
    try {
        logger.info("start Quartz server...");
        QuartzExecutors.getInstance().start();
    } catch (Exception e) {
        try {
            QuartzExecutors.getInstance().shutdown();
        } catch (SchedulerException e1) {
            logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
        }
        logger.error("start Quartz failed", e);
    }

    /**
     * register hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (Stopper.isRunning()) {
            close("shutdownHook");
        }
    }));

}
           

此處可以看出它注冊了三個command的處理器。分别對應任務相應,任務請求,和kill響應。注冊這三個處理程式之後才讓remoteserver啟動。這裡大家可以了解為啟動了對任務請求,響應和殺死的rpc調用服務。

接下來啟動了masterRegistryClient,也就是啟動了master的zookeeper用戶端

在接下來啟動masterSchedulerService服務也就是啟動了master排程服務。

最後啟動的是QuartzExecutors.getInstance().start()。這個服務是排程的核心quartz,它使用了單例模式。

最後注冊一下shutdownhook就結啟動完成。

再看一下關閉的程式

public void close(String cause) {

    try {
        // execute only once
        if (Stopper.isStopped()) {
            return;
        }

        logger.info("master server is stopping ..., cause : {}", cause);

        // set stop signal is true
        Stopper.stop();

        try {
            // thread sleep 3 seconds for thread quietly stop
            Thread.sleep(3000L);
        } catch (Exception e) {
            logger.warn("thread sleep exception ", e);
        }
        // close
        this.masterSchedulerService.close();
        this.nettyRemotingServer.close();
        this.masterRegistryClient.closeRegistry();
        // close quartz
        try {
            QuartzExecutors.getInstance().shutdown();
            logger.info("Quartz service stopped");
        } catch (Exception e) {
            logger.warn("Quartz service stopped exception:{}", e.getMessage());
        }
        // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
        springApplicationContext.close();
    } catch (Exception e) {
        logger.error("master server stop exception ", e);
    }
}
           

這裡可以看出關閉時他是先關閉master排程服務,然後關閉remotingserver的rpc服務再再然後關閉了master的zookeeper用戶端,最後關閉了quartz排程器和整個spring context。關閉springcontext 可以讓@PreDestroy 注解生效。

繼續閱讀