[從源碼學設計]螞蟻金服SOFARegistry之程式基本架構
0x00 摘要
之前我們通過三篇文章初步分析了 MetaServer 的基本架構,MetaServer 這三篇文章為我們接下來的工作做了堅實的鋪墊。
本系列我們接着分析 Data Server,順帶會涉及一些 Session Server。因為 DataServer 和 MetaServer 代碼實作和架構的基本套路類似,是以我們主要關心差異點和DataServer的特點。
本文會分析DataServer程式的基本架構。
0x01 思路
前面文章專注于系統業務本身,本系列文章會換一種思路,重點在于分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實作機制和架構思路,讓大家借以學習阿裡如何設計。
具體學習方法是:
- 先自行設想業務場景,存在問題以及解決方案。一定要密切聯系業務,一切脫離業務談設計,都是耍流氓;
- 然後看螞蟻金服源碼,看看他們是怎麼解決問題的,和自己的方案有何差別。因為螞蟻金服的目前代碼,不一定是在目前應用場景下的最理想方案(比如hash的解決方案),但肯定是在各種力量博弈後的産物,是經曆了金融級别錘煉下的最佳實踐。
- 因為自己的設想肯定和現實差距很大,是以在研究代碼之後,需要調整自己的思路,再次思考。
- 然後看看阿裡的解決方案在未來有沒有可以改進的地方;
- 最後看看是否可以從源碼中提煉出共同點或者是可以複用的辦法或者模式。
學習時注意點是:
- 架構設計的本質之一是平衡和妥協。一個系統在不同的時期、不同的資料環境、不同的應用場景下會選擇不同的架構,在選擇時本質上是在平衡一些重要的點;
- 重點關注算法以及其他相關邏輯在時間和空間上的關系——這樣一種邏輯上的架構關系。在一個系統中,這些次元(空間和時間)縱橫交錯,使得複雜度非常高。我們學習的目的就是分離這些次元,簡化之間的互動。
- 不光要看表面的,還要看底層的思路和邏輯。不光要看代碼注釋中提到的,更要挖掘代碼注釋中沒有提到的;
- 我們既要深入研究一個個孤立的功能/元件/子產品,也要在架構的角度和業務場景下重新審視這些子產品,這樣可以對元件之間的關系有更加深入的了解,可以從全局角度來看這個系統;
- 思維方式的轉變才是你最應該在意的部分;
因為會從多個次元來分析設計,比如業務次元和架構次元,是以在本系列中,可能有的文章會集中在模式的總結提取,有的文章會集中在業務實作,有的文章會集中在具體知識點的運用,也會出現 某一個業務子產品或者代碼段因為業務和實作 在不同文章中被提及的現象,希望大家事先有所了解。
0x02 基本架構&準則
2.1 SOFARegistry 總體架構
首先,我們要回憶下SOFARegistry 總體架構
- Client 層
應用伺服器叢集。Client 層是應用層,每個應用系統通過依賴注冊中心相關的用戶端 jar 包,通過程式設計方式來使用服務注冊中心的服務釋出和服務訂閱能力。
- Session 層
Session 伺服器叢集。顧名思義,Session 層是會話層,通過長連接配接和 Client 層的應用伺服器保持通訊,負責接收 Client 的服務釋出和服務訂閱請求。該層隻在記憶體中儲存各個服務的釋出訂閱關系,對于具體的服務資訊,隻在 Client 層和 Data 層之間透傳轉發。Session 層是無狀态的,可以随着 Client 層應用規模的增長而擴容。
- Data 層
資料伺服器叢集。Data 層通過分片存儲的方式儲存着所用應用的服務注冊資料。資料按照 dataInfoId(每一份服務資料的唯一辨別)進行一緻性 Hash 分片,多副本備份,保證資料的高可用。下文的重點也在于随着資料規模的增長,Data 層如何在不影響業務的前提下實作平滑的擴縮容。
- Meta 層
中繼資料伺服器叢集。這個叢集管轄的範圍是 Session 伺服器叢集和 Data 伺服器叢集的伺服器資訊,其角色就相當于 SOFARegistry 架構内部的服務注冊中心,隻不過 SOFARegistry 作為服務注冊中心是服務于廣大應用服務層,而 Meta 叢集是服務于 SOFARegistry 内部的 Session 叢集和 Data 叢集,Meta 層能夠感覺到 Session 節點和 Data 節點的變化,并通知叢集的其它節點。
2.2 準則
對于一個程式來說,什麼樣才算是優秀的架構,其實沒有一個放之四海而皆準的标準,關于這方面的書或者文章也有很多,是以我們就從最簡單直接的角度,即從結果來想:即靜态和動态兩方面。
- 靜态 :這個角度就是當你拿到一個新代碼,你首先會看其目錄結構。如果這個程式的目錄結構清晰,隻看目錄結構就能讓你能把這個代碼邏輯整理出來,隻從檔案名字就能知道它應該屬于什麼目錄,什麼子產品,不會出現某一個檔案讓你覺得其實應該放到另外目錄的沖動,那麼這個程式從靜态角度講,其架構就是優秀的。
- 動态 :這個角度就是當你隻是大概浏覽了代碼,你閉眼之後,自己能夠在腦子中把程式運作子產品建構出來,能夠知道程式分成幾個功能子產品,清晰的知道程式的入口,能構架出來其基本功能的流程和内部子產品互動邏輯,那麼這個程式從動态角度講,其架構就是優秀的。
比如,假設你程式是基于SpringBoot,那麼Bean的建構和分類就非常重要,如果Bean處理得很好,對你整理動态架構是非常有幫助。
下面就開始分析DataServer程式的基本架構。
0x03 目錄結構
目錄結構如下,我們可以看出來SOFAReistry大緻思路,當然因為業務和架構耦合,是以我的分類不一定完全恰當,也有其他分類的方式,具體取決于你自己的思考方式。
- 程式主體:DataApplication;
- 程式入口以及Bean:bootstrap;
程式基礎業務功能:
- 網絡:remoting;
- 輔助:utils;
- http:resource;
- 緩存:cache;
- 線程:executor;
業務功能:
- renew;
- datasync;
- change;
- event;
- node;
具體目錄如下:
.
├── DataApplication.java
├── bootstrap
├── cache
├── change
├── datasync
│ └── sync
├── event
│ └── handler
├── executor
├── node
├── remoting
│ ├── dataserver
│ │ ├── handler
│ │ └── task
│ ├── handler
│ ├── metaserver
│ │ ├── handler
│ │ ├── provideData
│ │ │ └── processor
│ │ └── task
│ └── sessionserver
│ ├── disconnect
│ ├── forward
│ └── handler
├── renew
├── resource
└── util
複制
0x04 基本架構
依然是類似MetaServer的路數,使用SpringBoot架構來進行總體搭建。
@EnableDataServer
@SpringBootApplication
public class DataApplication {
public static void main(String[] args) {
SpringApplication.run(DataApplication.class, args);
}
}
複制
EnableDataServer這個注解将引入基本配置 DataServerBeanConfiguration。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DataServerBeanConfiguration.class)
public @interface EnableDataServer {
}
複制
0x05 配置Bean
DataServer是SpringBoot程式。是以大量使用Bean。
DataServerBeanConfiguration 的作用是建構各種相關配置,從其中可以看出來DataServer相關子產品和功能。
系統初始化時的 bean 都在 DataServerBeanConfiguration 裡面通過 JavaConfig 來注冊,主要以如下幾個配置類展現(配置類會有變更,具體内容可以參照源碼實作):
- DataServerBootstrap
- DataServerBootstrapConfigConfiguration,
- CommonConfig
- DataServerConfig
- DataNodeStatus
- PropertySplitter
- DataServerStorageConfiguration
- DatumCache
- LocalDatumStorage
- LogTaskConfigConfiguration
- CacheDigestTask
- SessionRemotingConfiguration
- jerseyExchange
- boltExchange
- MetaNodeExchanger
- DataNodeExchanger
- DataServerCache
- ForwardService
- SessionServerConnectionFactory
- DataServerConnectionFactory
- MetaServerConnectionFactory
- serverHandlers
- serverSyncHandlers
- dataClientHandlers
- metaClientHandlers
- AfterWorkingProcessHandler
- DatumLeaseManager
- DisconnectEventHandler
- DataServerNotifyBeanConfiguration
- DataChangeHandler
- SessionServerNotifier
- TempPublisherNotifier
- BackUpNotifier
- SnapshotBackUpNotifier
- DataServerSyncBeanConfiguration
- SyncDataService
- LocalAcceptorStore
- syncDataScheduler
- StoreServiceFactory
- DataServerEventBeanConfiguration
- DataServerChangeEventHandler
- LocalDataServerChangeEventHandler
- MetaServerChangeEventHandler
- StartTaskEventHandler
- LocalDataServerCleanHandler
- GetSyncDataHandler
- EventCenter
- DataChangeEventCenter
- DataServerRemotingBeanConfiguration
- ConnectionRefreshTask
- ConnectionRefreshMetaTask
- RenewNodeTask
- List tasks,包括上面三個Bean
- DefaultMetaServiceImpl
- ResourceConfiguration
- jerseyResourceConfig
- HealthResource
- DataDigestResource
- ExecutorConfiguration
- publishProcessorExecutor
- renewDatumProcessorExecutor
- getDataProcessorExecutor
- DataProvideDataConfiguration
- ProvideDataProcessorManager
- datumExpireProvideDataProcessor
部分Bean的功能如下:
- DataServerBootstrapConfigConfiguration:該配置類主要作用是提供一些 DataServer 服務啟動時基本的 Bean,比如 DataServerConfig 基礎配置 Bean、DataNodeStatus 節點狀态 Bean、DatumCache 緩存 Bean 等;
- LogTaskConfigConfiguration:該配置類主要用于提供一些日志處理相關的 Bean;
- SessionRemotingConfiguration:該配置類主要作用是提供一些與 SessionServer 互相通信的 Bean,以及連接配接過程中的一些請求處理 Bean。比如 BoltExchange、JerseyExchange 等用于啟動服務的 Bean,還有節點上下線、資料釋出等的 Bean,為關鍵配置類;
- DataServerNotifyBeanConfiguration:該配置類中配置的 Bean 主要用于進行事件通知,如用于處理資料變更的 DataChangeHandler 等;
- DataServerSyncBeanConfiguration:該配置類中配置的 Bean 主要用于資料同步操作;
- DataServerEventBeanConfiguration:該配置類中配置的 Bean 主要用于處理與資料節點相關的事件,如事件中心 EventCenter、資料變化事件中心 DataChangeEventCenter 等;
- DataServerRemotingBeanConfiguration:該配置類中配置的 Bean 主要用于 DataServer 的連接配接管理;
- ResourceConfiguration:該配置類中配置的 Bean 主要用于提供一些 Rest 接口資源;
- AfterWorkingProcessConfiguration:該配置類中配置一些後處理 Handler Bean,用于處理一些業務邏輯結束後的後處理動作;
- ExecutorConfiguration:該配置類主要配置一些線程池 Bean,用于執行不同的任務;
縮減版代碼如下 :
@Configuration
@Import(DataServerInitializer.class)
@EnableConfigurationProperties
public class DataServerBeanConfiguration {
@Bean
@ConditionalOnMissingBean
public DataServerBootstrap dataServerBootstrap() {}
@Configuration
protected static class DataServerBootstrapConfigConfiguration {}
@Configuration
public static class DataServerStorageConfiguration {}
@Configuration
public static class LogTaskConfigConfiguration {}
@Configuration
public static class SessionRemotingConfiguration {}
@Configuration
public static class DataServerNotifyBeanConfiguration {}
@Configuration
public static class DataServerSyncBeanConfiguration {}
@Configuration
public static class DataServerEventBeanConfiguration {}
@Configuration
public static class DataServerRemotingBeanConfiguration {}
@Configuration
public static class ResourceConfiguration {}
@Configuration
public static class ExecutorConfiguration {}
@Configuration
public static class DataProvideDataConfiguration {}
}
複制
0x06 啟動
6.1 入口
DataServer 子產品啟動入口類為 DataServerInitializer,該類不由 JavaConfig 管理配置,而是繼承了 SmartLifecycle 接口,在啟動時由 Spring 架構調用其 start 方法。其簡略版代碼如下:
public class DataServerInitializer implements SmartLifecycle {
@Autowired
private DataServerBootstrap dataServerBootstrap;
@Override
public void start() {
dataServerBootstrap.start();
this.isRunning = true;
}
}
複制
該方法中調用了 DataServerBootstrap#start 方法,用于啟動一系列的初始化服務。
public void start() {
try {
openDataServer();
openDataSyncServer();
openHttpServer();
startRaftClient();
fetchProviderData();
startScheduler();
Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));
}
}
複制
6.2 啟動業務
DataServerBootstrap負責程式的啟動,具體如下:
@EnableConfigurationProperties
public class DataServerBootstrap {
// 節點間的 bolt 通信元件以及其配置
@Autowired
private DataServerConfig dataServerConfig;
@Resource(name = "serverHandlers")
private Collection<AbstractServerHandler> serverHandlers;
@Resource(name = "serverSyncHandlers")
private Collection<AbstractServerHandler> serverSyncHandlers;
@Autowired
private Exchange boltExchange;
private Server server;
private Server dataSyncServer;
// 用于控制的Http 通信元件以及其配置
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ResourceConfig jerseyResourceConfig;
@Autowired
private Exchange jerseyExchange;
private Server httpServer;
// JVM 内部的事件通信元件以及其配置
@Autowired
private EventCenter eventCenter;
// MetaServer Raft相關元件
@Autowired
private IMetaServerService metaServerService;
@Autowired
private DatumLeaseManager datumLeaseManager;
// 定時器元件以及其配置
@Autowired
private Scheduler syncDataScheduler;
@Autowired
private CacheDigestTask cacheDigestTask;
/**
* start dataserver
*/
public void start() {
openDataServer(); // 節點間的 bolt 通信元件以及其配置
openDataSyncServer();
openHttpServer(); // 用于控制的Http 通信元件以及其配置
startRaftClient(); // MetaServer Raft相關元件
fetchProviderData();
startScheduler(); // 定時器元件以及其配置
Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));
}
// 節點間的 bolt 通信元件以及其配置
private void openDataServer() {
if (serverForSessionStarted.compareAndSet(false, true)) {
server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
dataServerConfig.getPort()), serverHandlers
.toArray(new ChannelHandler[serverHandlers.size()]));
}
}
private void openDataSyncServer() {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
// 用于控制的Http 通信元件以及其配置
private void openHttpServer() {
if (httpServerStarted.compareAndSet(false, true)) {
bindResourceConfig();
httpServer = jerseyExchange.open(
new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
.getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
}
}
// MetaServer Raft相關元件
private void startRaftClient() {
metaServerService.startRaftClient();
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
private void fetchProviderData() {
ProvideData provideData = metaServerService
.fetchData(ValueConstants.ENABLE_DATA_DATUM_EXPIRE);
boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
.getObject());
datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
// 定時器元件以及其配置
private void startScheduler() {
if (schedulerStarted.compareAndSet(false, true)) {
syncDataScheduler.startScheduler();
// start all startTask except correction task
eventCenter.post(new StartTaskEvent(
Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
.collect(Collectors.toSet())));
//start dump log
cacheDigestTask.start();
}
}
}
複制
6.2 核心元件
DataServer 的核心啟動類是 DataServerBootstrap,對于其内部子產品分類,官方部落客要提及其主要元件 :
該類主要包含了三類元件:節點間的 bolt 通信元件、JVM 内部的事件通信元件、定時器元件。
我這裡劃分的更加細緻,把元件劃分為如下:
- 節點間的 bolt 通信元件以及其配置
- DataServerConfig。配置
- boltExchange。bolt元件通訊元件,用來給server和dataSyncServer提供通訊服務;
- server。dataServer 則負責資料相關服務,比如資料服務,擷取資料的推送,服務上下線通知等;
- dataSyncServer。dataSyncServer 主要是處理一些資料同步相關的服務;
- serverHandlers。服務響應函數
- serverSyncHandlers。服務響應函數,從其注冊的 handler 來看,dataSyncServer 和 dataSever 的職責有部分重疊;
- 用于控制的Http 通信元件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、資料查詢等;
- jerseyResourceConfig。配置
- jerseyExchange。jersey元件通訊元件,提供服務;
- applicationContext。注冊服務所需;
- httpServer 主要提供一系列 http 接口,用于 dashboard 管理、資料查詢等;
- MetaServer相關元件
- metaServerService,用來與MetaServer進行互動,基于raft和Bolt;
- datumLeaseManager,用來維護具體資料;
- JVM 内部的事件通信元件以及其配置
- EventCenter。DataServer 内部邏輯主要是通過事件驅動機制來實作的,一個事件往往會有多個投遞源,非常适合用 EventCenter 來解耦事件投遞和事件處理之間的邏輯;
- 定時器元件以及其配置
- syncDataScheduler,主要啟動了expireCheckExecutor,versionCheckExecutor,即例如定時檢測節點資訊、定時檢測資料版本資訊;
- CacheDigestTask,用來定時分析;
6.3 Server元件
6.3.1 DataServer
dataServer 負責資料相關服務,比如資料服務,擷取資料的推送,服務上下線通知等;
DataServer是基于Bolt進行通訊。
private void openDataServer() {
try {
if (serverForSessionStarted.compareAndSet(false, true)) {
server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
dataServerConfig.getPort()), serverHandlers
.toArray(new ChannelHandler[serverHandlers.size()]));
}
}
}
複制
其響應函數為serverHandlers
@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(clientOffHandler());
list.add(getDataVersionsHandler());
list.add(publishDataProcessor());
list.add(sessionServerRegisterHandler());
list.add(unPublishDataHandler());
list.add(dataServerConnectionHandler());
list.add(renewDatumHandler());
list.add(datumSnapshotHandler());
return list;
}
複制
其具體功能如下 :
- getDataHandler:從目前Data節點中擷取注冊資訊資料,若目前節點不處于工作狀态,則改為下個節點;
- clientOffHandler:服務訂閱者下線;
- getDataVersionsHandler:擷取資料版本号;
- publishDataProcessor:服務注冊資訊釋出;
- sessionServerRegisterHandler:sessionServer會話注冊;
- unPublishDataHandler :服務下線處理;
- dataServerConnectionHandler:連接配接管理;
- renewDatumHandler:資料續約管理;
- datumSnapshotHandler:資料快照管理;
6.3.2 DataSyncServer
dataSyncServer 主要是處理一些資料同步相關的服務;也是基于Bolt進行通訊。
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
複制
其響應函數為serverSyncHandlers。
@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(publishDataProcessor());
list.add(unPublishDataHandler());
list.add(notifyFetchDatumHandler());
list.add(notifyOnlineHandler());
list.add(syncDataHandler());
list.add(dataSyncServerConnectionHandler());
return list;
}
複制
其具體功能如下 :
- getDataHandler:擷取Data節點注冊資訊資料;
- publishDataProcessor :服務注冊資訊釋出;
- unPublishDataHandler :服務下線處理;
- notifyFetchDatumHandler :對比版本号,抓去最新服務注冊資料;
- notifyOnlineHandler:檢查Data節點是否線上;
- syncDataHandler:資料同步;
- dataSyncServerConnectionHandler:連接配接管理;
6.3.3 HttpServer
HttpServer 是 Http 通信元件,提供一系列 REST 接口,用于 dashboard 管理、資料查詢等。
其基于Jersey進行通訊。
private void openHttpServer() {
try {
if (httpServerStarted.compareAndSet(false, true)) {
bindResourceConfig();
httpServer = jerseyExchange.open(
new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
.getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
}
}
}
複制
6.3.4 Handler
各 Handler 具體作用如圖 3 所示:

圖 各 Handler 作用
6.3.5 Raft
Raft相關的是:
- 啟動Raft用戶端,保證分布式一緻性;
- 向 EventCenter 投放MetaServerChangeEvent;
private void startRaftClient() {
metaServerService.startRaftClient();
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
複制
6.3.6 Scheduler
這個子產品輔助各種定期任務,具體作用是:
- 啟動資料同步任務;
- 定時檢測資料接受者節點變化情況,下線過期節點;
- 啟動資料變更輪詢;
- 向EventCenter投放消息,以便由這些消息對應的響應函數處理,包括:
- CONNECT_META,具體由 ConnectionRefreshMetaTask處理;
- CONNECT_DATA,具體由 ConnectionRefreshTask 處理;
- VERSION_COMPARE,這個目前沒有處理;
- 需要注意的是,RENEW 類型消息在系統啟動時候沒有投放,而是在 MetaServerChangeEventHandler . registerMetaServer 之中,當注冊之後,才會進行投放,以此定期Renew;
- 啟動dump log任務;
private void startScheduler() {
try {
if (schedulerStarted.compareAndSet(false, true)) {
syncDataScheduler.startScheduler();
// start all startTask except correction task
eventCenter.post(new StartTaskEvent(
Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
.collect(Collectors.toSet())));
//start dump log
cacheDigestTask.start();
}
}
}
複制
6.3.6.1 startScheduler
啟動了versionCheckExecutor和scheduler,具體會調用LocalAcceptorStore中的函數進行定期檢測。
public class Scheduler {
public final ExecutorService versionCheckExecutor;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor expireCheckExecutor;
@Autowired
private AcceptorStore localAcceptorStore;
/**
* constructor
*/
public Scheduler() {
scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
}
/**
* start scheduler
*/
public void startScheduler() {
scheduler.schedule(
new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
30, TimeUnit.SECONDS);
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
}
/**
* stop scheduler
*/
public void stopScheduler() {
if (scheduler != null && !scheduler.isShutdown()) {
scheduler.shutdown();
}
if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) {
versionCheckExecutor.shutdown();
}
}
}
複制
6.3.6.2 StartTaskEventHandler
StartTaskEventHandler内部有一個ScheduledExecutorService 和 tasks,一旦StartTaskEventHandler收到一個StartTaskEvent,就會定期調用tasks中的task執行;
@Bean(name = "tasks")
public List<AbstractTask> tasks() {
List<AbstractTask> list = new ArrayList<>();
list.add(connectionRefreshTask());
list.add(connectionRefreshMetaTask());
list.add(renewNodeTask());
return list;
}
複制
具體代碼如下:
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {
@Resource(name = "tasks")
private List<AbstractTask> tasks;
private ScheduledExecutorService executor = null;
@Override
public List<Class<? extends StartTaskEvent>> interest() {
return Lists.newArrayList(StartTaskEvent.class);
}
@Override
public void doHandle(StartTaskEvent event) {
if (executor == null || executor.isShutdown()) {
getExecutor();
}
for (AbstractTask task : tasks) {
if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
task.getTimeUnit());
}
}
}
private void getExecutor() {
executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
.getSimpleName());
}
}
複制
6.4 處理Task
這裡專門就StartTaskEventHandler做簡要說明,其就是針對 tasks Bean 裡面聲明的task,進行啟動。
但是具體啟動哪些task,則需要依據event裡面的設定決定,下面代碼中的循環就是看看tasks和event中如何比對。
for (AbstractTask task : tasks) {
if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit());
}
}
複制
具體代碼如下:
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {
@Resource(name = "tasks")
private List<AbstractTask> tasks;
private ScheduledExecutorService executor = null;
@Override
public List<Class<? extends StartTaskEvent>> interest() {
return Lists.newArrayList(StartTaskEvent.class);
}
@Override
public void doHandle(StartTaskEvent event) {
if (executor == null || executor.isShutdown()) {
getExecutor();
}
for (AbstractTask task : tasks) {
if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
task.getTimeUnit());
}
}
}
private void getExecutor() {
executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
.getSimpleName());
}
}
複制
6.4.1 beans
對應的beans,一共三個task。
@Bean(name = "tasks")
public List<AbstractTask> tasks() {
List<AbstractTask> list = new ArrayList<>();
list.add(connectionRefreshTask());
list.add(connectionRefreshMetaTask());
list.add(renewNodeTask());
return list;
}
複制
對應了StartTaskTypeEnum中的枚舉,其中VersionCompareTask沒實作。
public enum StartTaskTypeEnum {
/**
* ConnectionRefreshMetaTask
*/
CONNECT_META,
/**
* ConnectionRefreshDataTask
*/
CONNECT_DATA,
/**
* RenewNodeTask
*/
RENEW,
/**
* VersionCompareTask
*/
VERSION_COMPARE
}
複制
6.4.2 解耦
我們用 StartTaskEvent 舉例,這裡使用Set來指定本消息适用什麼task處理。
public class StartTaskEvent implements Event {
private final Set<StartTaskTypeEnum> suitableTypes;
public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {
this.suitableTypes = suitableTypes;
}
public Set<StartTaskTypeEnum> getSuitableTypes() {
return suitableTypes;
}
}
複制
在 MetaServerChangeEventHandler 之中,則啟動了renew task。
if (obj instanceof NodeChangeResult) {
NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
Map<String, Long> versionMap = result.getDataCenterListVersions();
//send renew after first register dataNode
Set<StartTaskTypeEnum> set = new HashSet<>();
set.add(StartTaskTypeEnum.RENEW);
eventCenter.post(new StartTaskEvent(set));
eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
DataServerChangeEvent.FromType.REGISTER_META));
break;
}
複制
在啟動時候,post了event,但是指定了啟動非RENEW task。
private void startScheduler() {
try {
if (schedulerStarted.compareAndSet(false, true)) {
syncDataScheduler.startScheduler();
// start all startTask except correction task
eventCenter.post(new StartTaskEvent(
Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
.collect(Collectors.toSet())));
//start dump log
cacheDigestTask.start();
}
} catch (Exception e) {
schedulerStarted.set(false);
throw new RuntimeException("Data Scheduler start error!", e);
}
}
複制
0x07 動态結構
最後動态架構如下,我們也大緻知道,DataServer就是一個SpringBoot程式,有幾個Server,有若幹Bean,有若幹定時服務,具體有一些其他業務子產品等等,這對我們接下來的了解有幫助。
+---------------------------------------------------------------------------+
| [DataServerBootstrap] |
| |
| |
| +------------------------------------------+ +------------------------+ |
| | [Bolt related] | | [http relatged] | |
| | | | | |
| | DataServerConfig | | httpServer | |
| | | | | |
| | boltExchange | | jerseyExchange | |
| | | | | |
| | server +-----------> serverHandlers | | applicationContext | |
| | | | | |
| | dataSyncServer+----> serverSyncHandlers | | jerseyResourceConfig | |
| | | | | |
| +------------------------------------------+ +------------------------+ |
| +---------------------+ +----------------+ +------------------------+ |
| |[meta related] | |[JVM related] | |[Timer related] | |
| | | | | | | |
| | metaServerService | | | | syncDataScheduler | |
| | | | EventCenter | | | |
| | datumLeaseManager | | | | CacheDigestTask | |
| +---------------------+ +----------------+ | | |
| +------------------------+ |
+---------------------------------------------------------------------------+
複制
0x08 問題清單
因為從問題出發更有幫助,是以我們總結出一些問題清單,這些我們期望在以後的分析中陸續解決。
- 問題:Datacenter究竟是什麼概念?
- 問題:DataServer應該當成什麼系統來看?
- 問題:DataServer應該實作什麼功能?如何實作?
- 問題:如何維持高可用?
- 問題:如何負載均衡?
- 問題:DataServer之間如何同步?實作資料一緻性?
- 問題:SessionServer如何尋址DataServer?
- 問題:用戶端如何知道應該聯系哪個SessionServer?
- 問題:SessionServer在DataServer内部如何表示,有緩存嘛?
- 問題:hash路由表是什麼樣的?
- 問題:DataServer如何把資訊推送給所有SessionServer?
- 問題:DataServer如何同步給其他DataServer?
- 問題:dataSyncServer 主要是處理一些資料同步相關的服務;dataServer 則負責資料相關服務;兩者有什麼差別?
- 問題:EventCenter的機制,裡面有幾種Event?
- 問題:如何輪詢MetaServer?
- 問題:如何判斷目前機房節點?
- 問題:DataServer叢集内部如何資料遷移?
- 問題:SessionServer 和 DataServer 之間的通信,是基于推拉結合的機制?
- 問題:為什麼 DataServerBootstrap 之中還有 startRaftClient?
- 問題:MetaServerChangeEventHandler怎麼啟動,誰來控制,用來做啥?
- 問題:DatumLeaseManager 的作用?
- 問題:SessionServer從DataServer拉取什麼?
- 問題:DataServer如何向MetaServer 來renew自己?是否定期?
- 問題:DataServer如何知道,儲存其他 DataServer?其他地方用到了嗎?
- 問題:需要考慮 DataServer 需要儲存什麼?
- 問題:版本号用來做什麼?
- 問題:DatumCache 用來做什麼?
- 問題:為什麼要有 AfterWorkingProcess?
- 問題:bolt怎麼維護connection?