dolphinscheduer源碼解析-master啟動
文章目錄
- dolphinscheduer源碼解析-master啟動
-
- MasterServer類定義
MasterServer類定義
/**
* master server
*/
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.server.worker.*",
"org.apache.dolphinscheduler.server.monitor.*",
"org.apache.dolphinscheduler.server.log.*"
})
})
@EnableTransactionManagement
public class MasterServer implements IStoppable
可以看出在MasterServer服務中掃描的類為org.apache.dolphinscheduler包下的類,但是又排除了一下worker服務monitor服務和log服務。并且開啟了事務機制。
MasterServer屬性
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
@Autowired
private MasterConfig masterConfig;
@Autowired
private SpringApplicationContext springApplicationContext;
private NettyRemotingServer nettyRemotingServer;
@Autowired
private MasterRegistryClient masterRegistryClient;
@Autowired
private MasterSchedulerService masterSchedulerService;
可以看出這裡面一共有兩大服務,NettyRemoteServer [點選打開],masterSchedulerService [點選打開] 其實還有一個quartz服務但是沒有在屬性上面,一個用戶端。 其中
NettyRemotingServer
是沒有托管給spring建立的而是在run方法中自行建立。
啟動Master服務方法如下。它通過@PostConstruct注解在伺服器加載這個servlet的時候在構造函數之後執行
@PostConstruct
public void run() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
// self tolerant
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
// scheduler start
this.masterSchedulerService.start();
// start QuartzExecutors
// what system should do if exception
try {
logger.info("start Quartz server...");
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
QuartzExecutors.getInstance().shutdown();
} catch (SchedulerException e1) {
logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
}
logger.error("start Quartz failed", e);
}
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
此處可以看出它注冊了三個command的處理器。分别對應任務相應,任務請求,和kill響應。注冊這三個處理程式之後才讓remoteserver啟動。這裡大家可以了解為啟動了對任務請求,響應和殺死的rpc調用服務。
接下來啟動了masterRegistryClient,也就是啟動了master的zookeeper用戶端
在接下來啟動masterSchedulerService服務也就是啟動了master排程服務。
最後啟動的是QuartzExecutors.getInstance().start()。這個服務是排程的核心quartz,它使用了單例模式。
最後注冊一下shutdownhook就結啟動完成。
再看一下關閉的程式
public void close(String cause) {
try {
// execute only once
if (Stopper.isStopped()) {
return;
}
logger.info("master server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
try {
// thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
// close
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRegistryClient.closeRegistry();
// close quartz
try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
} catch (Exception e) {
logger.warn("Quartz service stopped exception:{}", e.getMessage());
}
// close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
} catch (Exception e) {
logger.error("master server stop exception ", e);
}
}
這裡可以看出關閉時他是先關閉master排程服務,然後關閉remotingserver的rpc服務再再然後關閉了master的zookeeper用戶端,最後關閉了quartz排程器和整個spring context。關閉springcontext 可以讓@PreDestroy 注解生效。