天天看點

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

文章目錄

  • [從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步
    • 0x00 摘要
    • 0x02 業務範疇
      • 2.1 DataServer 資料一緻性
      • 2.2 本地機房政策
    • 0x03 總體邏輯
    • 0x04 消息
      • 4.1 LocalDataServerChangeEvent
      • 4.2 來源
    • 0x05 消息處理
      • 5.1 LocalDataServerChangeEventHandler
        • 5.1.1 投放消息
        • 5.1.2 啟動引擎
    • 0x06 消費通知消息
      • 6.1 新節點
        • 6.1.1 notifyOnline
      • 6.2 線上服務節點
        • 6.2.1 notifyToFetch
        • 6.2.2 getToBeSyncMap
        • 6.2.3 getNewJoined
        • 6.2.4 BackupTriad
    • 0x07 changeVersion 從哪裡來
      • 7.1 版本号和變化
        • 7.1.1 DataServerCache
        • 7.1.2 設定和使用
        • 7.1.3 兩個設計點
      • 7.2 Data Server
        • 7.2.1 主動擷取變化
      • 7.3 Meta server
        • 7.3.1 設定版本号
        • 7.3.2 提取版本号
      • 7.4 Data Server
        • 7.4.1 獲得變化
    • 0x08 Data Server後續處理
      • 8.1 newDataServerChangeItem
      • 8.2 curVersion
        • 8.2.1 發送版本号
        • 8.2.2 接收版本号
    • 0xFF 參考

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生産級、高時效、高可用的服務注冊中心。

本系列文章重點在于分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實作機制和架構思路,讓大家借以學習阿裡如何設計。

本文為第十二篇,上文我們簡述了Data節點變化之後,在dataServer中是如何變化處理的,本文我們按照資料流程繼續進行,講講SOFARegistry如何處理本機房Data節點變化。

0x02 業務範疇

2.1 DataServer 資料一緻性

DataServer 在 SOFARegistry 中,承擔着核心的資料存儲功能。資料按 dataInfoId 進行一緻性 Hash 分片存儲,支援多副本備份,保證資料高可用。這一層可随服務資料量的規模的增長而擴容。

如果 DataServer 當機,MetaServer 能感覺,并通知所有 DataServer 和 SessionServer,資料分片可 failover 到其他副本,同時 DataServer 叢集内部會進行分片資料的遷移。

2.2 本地機房政策

Data Center 代表的是本地機房,從目前來看:

  • 資料備份僅僅在本地機房完成;
  • 每個資料中心有自己的hash;

阿裡有異機房備份,應該就是Global部分,但是沒有開源。

是以我們得重點剖析本地節點如何繼續處理。

0x03 總體邏輯

我們先要提前劇透下總體邏輯,DataServer彼此通知是圍繞着資料版本号進行,即:

  • 新加入節點 會通過 NotifyOnlineRequest 告訴其他已經線上的節點,我是新的,你可以做相應配置,以及告訴我,你有哪些版本号的資料。
  • 線上服務節點 會通過 NotifyFetchDatumRequest 告訴新節點,我這裡有你需要的版本号資料,你過來取。

是以我們總結如下:在收到 meta server 的 data server change 消息之後,同一個Data Center 之中所有data server 會互相通知彼此更新版本号。

  • notifyOnline 會發送 NotifyOnlineRequest,而其他 Data Server 的 NotifyOnlineHandler 會做相應處理。
  • notifyToFetch 會發送 NotifyFetchDatumRequest,而其他 Data Server 的 notifyFetchDatumHandler 會做相應處理。

0x04 消息

4.1 LocalDataServerChangeEvent

前文提到,在DataServerChangeEventHandler中,處理DataServerChangeEvent時,若目前節點是 DataCenter 節點,則觸發 LocalDataServerChangeEvent 事件。

public class LocalDataServerChangeEvent implements Event {
    private Map<String, DataNode> localDataServerMap;
    private long                  localDataCenterversion;
    private Set<String>           newJoined;
    private long                  version;
}
           

4.2 來源

LocalDataServerChangeEvent消息來源是DataServerChangeEventHandler。

MetaServer 會通過網絡連接配接感覺到新節點上線或者下線,所有的 DataServer 中運作着一個定時重新整理連接配接的任務 ConnectionRefreshTask,該任務定時去輪詢 MetaServer,擷取資料節點的資訊。需要注意的是,除了 DataServer 主動去 MetaServer 拉取節點資訊外,MetaServer 也會主動發送 NodeChangeResult 請求到各個節點,通知節點資訊發生變化,推拉擷取資訊的最終效果是一緻的。

當輪詢資訊傳回資料節點有變化時,會向 EventCenter 投遞一個 DataServerChangeEvent 事件,在該事件的處理器中,如果判斷出是目前機房節點資訊有變化,則會投遞新的事件 LocalDataServerChangeEvent,該事件的處理器 LocalDataServerChangeEventHandler 中會判斷目前節點是否為新加入的節點,如果是新節點則會向其它節點發送 NotifyOnlineRequest 請求,如圖所示:

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

在 DataServerChangeEventHandler 的 doHandle 函數中,會産生 LocalDataServerChangeEvent。

0x05 消息處理

LocalDataServerChangeEventHandler是同機房資料節點變更事件處理器,或者說是同一叢集資料同步器。

LocalDataServerChangeEvent事件的處理器 LocalDataServerChangeEventHandler 中會判斷目前節點是否為新加入的節點,如果是新節點則會向其它節點發送 NotifyOnlineRequest 請求。是以是針對本 Data Center中新加入的 Data Server 進行處理。

5.1 LocalDataServerChangeEventHandler

LocalDataServerChangeEventHandler 之中關鍵是:

private BlockingQueue<LocalDataServerChangeEvent> events = new LinkedBlockingDeque<>(); 

private class LocalClusterDataSyncer implements Runnable
           

講解如下:

  • afterPropertiesSet 之後會start一個線程LocalClusterDataSyncer,用來異步處理;
  • doHandle時候,通過events來調用LocalClusterDataSyncer進行異步處理;

即在LocalDataServerChangeEventHandler内部做了一個統一延遲異步處理。當從 EventCenter 中拿到 LocalDataServerChangeEvent 之後,會往 events 放入這個 event,然後内部的LocalClusterDataSyncer 會在随後異步執行。

在LocalClusterDataSyncer内部是:

  • 如果本身是工作狀态,就開始比較資料,通知相關的 Data Servers。即if local server is working, compare sync data;
  • 如果本身不是工作狀态,說明自己本身就是一個新server,則通知其他server。即if local server is not working, notify others that i am newer;

LocalDataServerChangeEventHandler定義如下:

public class LocalDataServerChangeEventHandler extends
                                              AbstractEventHandler<LocalDataServerChangeEvent> {

    @Autowired
    private DataServerConfig                          dataServerConfig;

    @Autowired
    private LocalDataServerCleanHandler               localDataServerCleanHandler;

    @Autowired
    private DataServerCache                           dataServerCache;

    @Autowired
    private DataNodeExchanger                         dataNodeExchanger;

    @Autowired
    private DataNodeStatus                            dataNodeStatus;

    @Autowired
    private DatumCache                                datumCache;

    @Autowired
    private DatumLeaseManager                         datumLeaseManager;
  
    private BlockingQueue<LocalDataServerChangeEvent> events    = new LinkedBlockingDeque<>();  
}
           

5.1.1 投放消息

在doHandle函數中,會把最新的消息投放到 BlockingQueue 之中。

public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
    isChanged.set(true);

    // Better change to Listener pattern
    localDataServerCleanHandler.reset();
    datumLeaseManager.reset();

    events.offer(localDataServerChangeEvent);
}
           

5.1.2 啟動引擎

消費引擎在Bean啟動之後,會通過afterPropertiesSet來啟動。

@Override
public void afterPropertiesSet() throws Exception {
		super.afterPropertiesSet();
    start();
}

public void start() {
    Executor executor = ExecutorFactory
        .newSingleThreadExecutor(LocalDataServerChangeEventHandler.class.getSimpleName());
    executor.execute(new LocalClusterDataSyncer());
}
           

LocalClusterDataSyncer會執行具體業務消費消息。

0x06 消費通知消息

在引擎之中,LocalClusterDataSyncer會持續消費。

private class LocalClusterDataSyncer implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                LocalDataServerChangeEvent event = events.take();
                //if size of events is greater than 0, not handle and continue, only handle the last one in the queue
                if (events.size() > 0) {
                    continue;
                }
                long changeVersion = event.getVersion();
                isChanged.set(false);
                if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) {
                    //if local server is working, compare sync data
                    notifyToFetch(event, changeVersion);
                } else {
                    dataServerCache.checkAndUpdateStatus(changeVersion);
                    //if local server is not working, notify others that i am newer
                    notifyOnline(changeVersion);

                    dataServerCache.updateItem(event.getLocalDataServerMap(),
                        event.getLocalDataCenterversion(),
                        dataServerConfig.getLocalDataCenter());
                }
            } 
        }
    }
           

【重點說明】

每個data server 都會從 meta server 接收到 DataServerChangeEvent,因為是本地Data Server的消息,是以都會轉換為 LocalDataServerChangeEvent。

因為是每個 data server 都會接收到,是以新上線伺服器會接收到,已經線上的伺服器也會接收到。這是下面講解的重點。

6.1 新節點

在新節點中,LocalDataServerChangeEvent事件的處理器 LocalDataServerChangeEventHandler 中會判斷目前節點是否為新加入的節點,如果是新節點則會向其它節點發送 NotifyOnlineRequest 請求,如圖所示:

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

圖 DataServer 節點上線時新節點的邏輯

上圖展示的是新加入節點收到節點變更消息的處理邏輯,如果是線上已經運作的節點收到節點變更的消息,前面的處理流程都相同,不同之處在于 LocalDataServerChangeEventHandler 中會根據 Hash 環計算出變更節點(擴容場景下,變更節點是新節點,縮容場景下,變更節點是下線節點在 Hash 環中的後繼節點)所負責的資料分片範圍和其備份節點。

新加入節點 會通過 NotifyOnlineRequest 告訴其他已經線上的節點,我是新的,你可以做相應配置。

6.1.1 notifyOnline

notifyOnline 會從 DataServerNodeFactory 擷取目前Local Data Center中所有的DataServerNode,然後逐一發送 NotifyOnlineRequest 通知:我上線了。

然後其他線上的Data Server 當收到通知,會開始與新節點互動。

/**
 * notify other dataservers that this server is online newly
 *
 * @param changeVersion
 */
private void notifyOnline(long changeVersion) {
    Map<String, DataServerNode> dataServerNodeMap = DataServerNodeFactory
        .getDataServerNodes(dataServerConfig.getLocalDataCenter());
    for (Entry<String, DataServerNode> serverEntry : dataServerNodeMap.entrySet()) {
        while (true) {
            String ip = serverEntry.getKey();
            DataServerNode dataServerNode = DataServerNodeFactory.getDataServerNode(
                dataServerConfig.getLocalDataCenter(), ip);
            try {
                final Connection connection = dataServerNode.getConnection();
                CommonResponse response = (CommonResponse) dataNodeExchanger.request(
                    new Request() {

                        @Override
                        public Object getRequestBody() {
                            return new NotifyOnlineRequest(DataServerConfig.IP,
                                changeVersion);
                        }

                        @Override
                        public URL getRequestUrl() {
                            return new URL(connection.getRemoteIP(), connection
                                .getRemotePort());
                        }
                    }).getResult();
            } 
        }
    }
}
           

6.2 線上服務節點

目前線上服務節點周遊自身記憶體中的資料項,過濾出屬于變更節點的分片範圍的資料項,然後向變更節點和其備份節點發送 NotifyFetchDatumRequest 請求, 變更節點和其備份節點收到該請求後,其處理器會向發送者同步資料(NotifyFetchDatumHandler.fetchDatum),如圖所示。

注意,本圖與上圖左右的JVM放置位置相反。

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

圖 DataServer 節點變更時已存節點的邏輯

就是說,線上服務節點 會通過 NotifyFetchDatumRequest 告訴新節點,我這裡有你需要的資料,你過來取。

下面是幾個重要函數的說明:

6.2.1 notifyToFetch

notify onlined newly dataservers to fetch datum,就是通知新節點,你主動過來拉取,同時也依據請求消息來更新自身。

notifyToFetch的具體功能是:

  • 首先從 event 擷取新Server,這一份資料被設定成三種格式,分别是 Map 格式dataServerMapIn 和 list 格式 dataServerNodeList,ConcurrentHashMap格式的dataServerMap;
  • 用新Server生成一個consistentHash;
  • 使用

    toBeSyncMap = getToBeSyncMap(consistentHash);

    擷取需要同步的map;getToBeSyncMap的作用是

    哪些ip需要同步哪些東西

    ;get map of datum to be synced。
    • 周遊 toBeSyncMap,對于其中每一個需要同步的 toBeSyncEntry,擷取其IP和dataInfoMap,dataInfoMap 是

      Map<String, Map<String, BackupTriad>>

      類型;
      • 周遊 dataInfoMap 中所有的

        Entry<String, Map<String, BackupTriad>> dataCenterEntry

        ,該entry 的key 是dataCenter;
        • 周遊 dataTriadEntry 中所有的

          Entry<String, BackupTriad> dataTriadEntry

          ,其key是 dataInfoId;
          • 利用 dataInfoId 從 datumCache 中擷取 Datum;
          • 擷取 Datum 版本号

            versionMap.put(dataInfoId, datum.getVersion());

      • 針對這個 dataCenter 建構一個統一的大版本号map: allVersionMap,

        allVersionMap.put(dataCenter, versionMap);

      • 如果allVersionMap是空,就做如下操作:
        • 從dataServerCache中移除對應IP;
        • 通知該 ip對應的 data server,你需要同步這些:doNotify(ip, allVersionMap, changeVersion);即告訴這個ip,你需要同步這個 dataCenter 中的這些 dataInfoId,以及其版本号;
        • 從dataServerCache中移除ip;
  • 如果 ConcurrentHashMap格式的dataServerMap 非空,周遊其key,這是一個targetIp,從dataServerCache中移除targetIp;
  • dataServerCache 根據 dataServerMapIn 更新server list;

6.2.2 getToBeSyncMap

getToBeSyncMap 的邏輯是找出需要通知的IP清單,以及每個ip需要同步哪些dataInfoId,具體如下:

  • 函數參數是根據新servers 新計算出來的 consistentHashs
  • 根據 dataServerConfig 的舊配置計算一個舊的hash,consistentHashOld;
  • 對于 datumCache 的每一個Datum,計算新的triad;具體如下:
  • 擷取 datumCache 所有資料,建構一個 allMap,周遊 allMap 中所有 dataCenterEntry:
    • 對于該資料中心,周遊該data center所有的datumMap:
    • 以 dataInfoId 周遊這個 datumMap:
      • 用新 consistentHash 計算出 新的 backupNodes;
      • 用舊 consistentHashOld 得到舊的 backupTriad;
      • 從 backupTriad 擷取newJoinedNodes,即從新的 backupNodes 移除 backupTriad 和 NotWorking;
      • 周遊 newJoinedNodes,對于每個node,建構 toBeSyncMap = Map<node ip, Map<dataCenter, Map<dataInfoId, BackupTriad>>>
  • 傳回 toBeSyncMap;這個就是

    哪些ip需要同步哪些東西

private Map<String/*ip*/, Map<String/*datacenter*/, Map<String/*datainfoId*/, BackupTriad>>> getToBeSyncMap(ConsistentHash<DataNode> consistentHash) {

    Map<String, Map<String, Map<String, BackupTriad>>> toBeSyncMap = new HashMap<>();
    Map<String, List<DataNode>> triadCache = new HashMap<>();

    ConsistentHash<DataNode> consistentHashOld = dataServerCache
        .calculateOldConsistentHash(dataServerConfig.getLocalDataCenter());
}
           

6.2.3 getNewJoined

getNewJoined就是找出那些不在已經存儲的Triad 之中,或者在其中但是不是working狀态的。

public List<DataNode> getNewJoined(List<DataNode> newTriad, Set<String> notWorking) {
    List<DataNode> list = new ArrayList<>();
    for (DataNode node : newTriad) {
        String ip = node.getIp();
        if (!ipSetOfNode.contains(ip) || notWorking.contains(ip)) {
            list.add(node);
        }
    }
    return list;
}
           

6.2.4 BackupTriad

BackupTriad 的作用是:針對 dataInfoId,對應的備份DataNode清單。

public class BackupTriad {
    /** dataInfoId */
    private String         dataInfoId;

    /**
     * calculate current dataServer list Consistent hash to get dataInfoId belong node and backup node list
     * @see  ConsistentHash#ConsistentHash(int, java.util.Collection)
     * @see  com.alipay.sofa.registry.consistency.hash.ConsistentHash#getNUniqueNodesFor(java.lang.Object, int)
     */
    private List<DataNode> triad;

    private Set<String>    ipSetOfNode = new HashSet<>();

    /**
     * constructor
     * @param dataInfoId
     * @param triad
     */
    public BackupTriad(String dataInfoId, List<DataNode> triad) {
        this.dataInfoId = dataInfoId;
        this.triad = triad;
        for (DataNode node : triad) {
            ipSetOfNode.add(node.getIp());
        }
    }
}
           

運作時如下:

backupTriad = {BackupTriad@1400} "BackupTriad{dataInfoId='TestDataInfoId', ipSetOfNode=[192.168.0.2, 192.168.0.1, 192.168.0.3]}"
 dataInfoId = "TestDataInfoId"
 triad = {ArrayList@1399}  size = 3
  0 = {DataNode@1409} "DataNode{ip=192.168.0.1}"
  1 = {DataNode@1410} "DataNode{ip=192.168.0.2}"
  2 = {DataNode@1411} "DataNode{ip=192.168.0.3}"
 ipSetOfNode = {HashSet@1403}  size = 3
  0 = "192.168.0.2"
  1 = "192.168.0.1"
  2 = "192.168.0.3"
           

0x07 changeVersion 從哪裡來

在上述代碼中,會從LocalDataServerChangeEvent擷取一個version,進而利用這個版本做後續處理,同時也會給dataServerCache設定版本号。

LocalDataServerChangeEvent event = events.take();
long changeVersion = event.getVersion();
if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) {
    //if local server is working, compare sync data
    notifyToFetch(event, changeVersion);
} else {
    dataServerCache.checkAndUpdateStatus(changeVersion);
    //if local server is not working, notify others that i am newer
    notifyOnline(changeVersion);
}
           

現在我們就好奇,當Data Server有變更時候,這個版本是從哪裡來的。讓我們追根溯源。這是從後往前倒推的過程。

7.1 版本号和變化

7.1.1 DataServerCache

因為提到了dataServerCache設定版本号,是以我們要回溯到DataServerCache。可以看到,DataServerCache之中有兩個相關變量:curVersion 和 DataServerChangeItem。

這就是從newDataServerChangeItem擷取了對應data center的版本号,設定在DataServerCache。

具體DataServerCache中相關定義如下:

public class DataServerCache {
  
    /** new input dataServer list and version */
    private volatile DataServerChangeItem                 newDataServerChangeItem = new DataServerChangeItem();
  
    private AtomicLong                                    curVersion              = new AtomicLong(-1L);

    public Long getDataCenterNewVersion(String dataCenter) {
        synchronized (DataServerCache.class) {
            Map<String, Long> versionMap = newDataServerChangeItem.getVersionMap();
            if (versionMap.containsKey(dataCenter)) {
                return versionMap.get(dataCenter);
            } else {
                return null;
            }
        }
    }  
}
           

7.1.2 設定和使用

在 DataServerCache中隻有addStatus控制curVersion的指派,而對外的接口中,隻有 synced 和 addNotWorkingServer 調用 addStatus。

而 newDataServerChangeItem 是在compareAndSet這裡設定。

public Map<String, Set<String>> compareAndSet(DataServerChangeItem newItem, FromType fromType) {
            if (!changedMap.isEmpty()) {
                newDataServerChangeItem = newItem;
            }
}
           

邏輯如下:

+-----------------------------+
                        |[DataServerCache]            |
                        |                             |
compareAndSet +-------------> DataServerChangeItem    |
                        |                             |
                        |     curVersion              |
                        |     ^        ^              |
                        |     |        |              |
                        +-----------------------------+
                              |        |
synced +----------------------+        |
                                       |
addNotWorkingServer+-------------------+
           

7.1.3 兩個設計點

現在涉及到 DataServerCache 兩個設計點:

  • curVersion 是用來做什麼的;
  • newDataServerChangeItem是用來做什麼的;

現在推論,每一個資料中心 Data Center 有一個版本号用做其内部所有狀态控制。其實,在DataServerChangeItem 的定義中的 versionMap 也能看出來,是根據版本号控制的。

DataServerChangeItem 定義如下:

public class DataServerChangeItem {

    /** datacenter -> Map<ip, DataNode> */
    private Map<String, Map<String, DataNode>> serverMap;

    /** datacenter -> version */
    private Map<String, Long>                  versionMap;
}
           

進而知道:

  • curVersion 就是Data Center最新的版本号;
  • newDataServerChangeItem就是最新版本号對應的變化資料;

現在問題變成,

  • DataServerChangeItem是從哪裡來的。
  • curVersion 是從哪裡來的。

我們通過研讀源碼可以知道,是從Meta Server擷取,下面就跟蹤下這個過程。

7.2 Data Server

7.2.1 主動擷取變化

我們需要複習下這個流程。

Meta Server 會廣播通知 所有data server 現在有 data server 更新,也可能是 DataServer主動定期看看MetaServer 是否有更新。

但是具體更新的内容,還是 data server 主動發送 GetNodesRequest 擷取。

這裡以主動更新為例,可以看到,DataServer 會通過 metaServerService.getDateServers 從 meta server 擷取到DataServerChangeItem,進而建構 DataServerChangeEvent。

public class ConnectionRefreshTask extends AbstractTask {

    @Autowired
    private IMetaServerService metaServerService;

    @Autowired
    private EventCenter        eventCenter;

    @Override
    public void handle() {
        DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers();
        if (dataServerChangeItem != null) {
            eventCenter
                .post(new DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK));
        }
    }
}
           

在 DefaultMetaServiceImpl 中可以看到,DataServerChangeItem是從 Meta Server擷取的NodeChangeResult 提取出來。

public class DefaultMetaServiceImpl implements IMetaServerService {
    @Override
    public DataServerChangeItem getDateServers() {
        Map<String, Connection> connectionMap = metaServerConnectionFactory
            .getConnections(dataServerConfig.getLocalDataCenter());
        String leader = getLeader().getIp();
        if (connectionMap.containsKey(leader)) {
            Connection connection = connectionMap.get(leader);
            if (connection.isFine()) {
                try {
                    GetNodesRequest request = new GetNodesRequest(NodeType.DATA);
                    Object obj = metaNodeExchanger.request(new Request() {
                        @Override
                        public Object getRequestBody() {
                            return request;
                        }

                        @Override
                        public URL getRequestUrl() {
                            return new URL(connection.getRemoteIP(), connection.getRemotePort());
                        }
                    }).getResult();
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();
                        versionMap.put(result.getLocalDataCenter(), result.getVersion());
                        return new DataServerChangeItem(result.getNodes(), versionMap);
                    }
                } 
            }
        }
        String newip = refreshLeader().getIp();
        return null;
    }  
}
           

邏輯如下:

+  Data Server
|
|
|  +------------------+
|  | NodeChangeResult |
|  +-------+----------+
|          |                                +--------------------------+
|          |                                |[DataServerCache]         |
|          |                                |                          |
|          +---------------->compareAndSet------> DataServerChangeItem |
|     DataServerChangeItem                  |                          |
|                                           |     curVersion           |
|                                           |     ^        ^           |
|                                           |     |        |           |
|                                           +--------------------------+
|                                                 |        |
|                             synced +-------------        |
|                                                          |
|                             addNotWorkingServer----------+
|
|
+

           

7.3 Meta server

7.3.1 設定版本号

讓我們來到 meta server之中。可以看到,之前 在DataStoreService put,remove等各個函數中,當data server有變化時候,會調用 dataNodeRepository 通過時間戳設定版本号。

7.3.2 提取版本号

當 meta server 接收到 GetNodesRequest 之後,會生成 NodeChangeResult。

DataStoreService 會調用 dataNodeRepository 擷取版本号,進而在 NodeChangeResult之中設定。

public class DataStoreService implements StoreService<DataNode> {
    @Override
    public NodeChangeResult getNodeChangeResult() {

        NodeChangeResult nodeChangeResult = new NodeChangeResult(NodeType.DATA);

        try {
            String localDataCenter = nodeConfig.getLocalDataCenter();
            Map<String/*dataCenter*/, NodeRepository> dataNodeRepositoryMap = dataRepositoryService
                    .getNodeRepositories();

            ConcurrentHashMap<String/*dataCenter*/, Map<String/*ipAddress*/, DataNode>> pushNodes = new ConcurrentHashMap<>();
            Map<String/*dataCenter*/, Long> versionMap = new ConcurrentHashMap<>();

            dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> {
               //在這裡會擷取版本号
                if (localDataCenter.equalsIgnoreCase(dataCenter)) {                
                    nodeChangeResult.setVersion(dataNodeRepository.getVersion());
                }
            
                versionMap.put(dataCenter, dataNodeRepository.getVersion());

                Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap();
                Map<String, DataNode> newMap = new ConcurrentHashMap<>();
                dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
                pushNodes.put(dataCenter, newMap);
            });

            nodeChangeResult.setNodes(pushNodes);
            nodeChangeResult.setDataCenterListVersions(versionMap);
            nodeChangeResult.setLocalDataCenter(localDataCenter);
        } 
				//傳回
        return nodeChangeResult;
    }  
}
           

具體如下:

Meta Server  +  Data Server
                                                                |
                                                                |
             getNodeChangeResult       +-----------------+      |  +------------------+
          +------------------------->  | NodeChangeResult| +------>+ NodeChangeResult |
          |                            +-----------------+      |  +-------+----------+
          |                                                     |          |                                +--------------------------+
          |                                                     |          |                                |[DataServerCache]         |
 +--------+--------+                                            |          |                                |                          |
 |DataStoreService | +-------------------+                      |          +---------------->compareAndSet------> DataServerChangeItem |
 +-----------------+    getVersion       |                      |     DataServerChangeItem                  |                          |
                                         |                      |                                           |     curVersion           |
                                         |                      |                                           |     ^        ^           |
                                         |                      |                                           |     |        |           |
                                         v                      |                                           +--------------------------+
+----------------------+               +-+-----------------+    |                                                 |        |
| DataRepositoryService+-------------> |dataNodeRepository |    |                             synced +-------------        |
+----------------------+               +-------------------+    |                                                          |
                setVersion(currentTimeMillis)                   |                             addNotWorkingServer----------+
                                                                |
                                                                |
                                                                +

           

手機上如圖:

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

7.4 Data Server

7.4.1 獲得變化

我們又回到 Data Server。

當Data Server接收到NodeChangeResult之後,會提取出DataServerChangeItem。

public class DefaultMetaServiceImpl implements IMetaServerService {
  
    @Override
    public DataServerChangeItem getDateServers() {
   
                    ......
      
                    GetNodesRequest request = new GetNodesRequest(NodeType.DATA);
                    Object obj = metaNodeExchanger.request(new Request() {
                      
                        ......
                      
                        }
                    }).getResult();
      
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();
                        versionMap.put(result.getLocalDataCenter(), result.getVersion());
                        return new DataServerChangeItem(result.getNodes(), versionMap);
                    }
                } 
            }
        }
    }  
}
           

然後會回到前面 “主動擷取變化” 小節,發送DataServerChangeEvent,進而轉化為LocalDataServerChangeEvent,就和我們的代碼聯系起來。

0x08 Data Server後續處理

關于 DataServerCache . curVersion 和 newDataServerChangeItem 如何進一步處理,我們需要再研究。

8.1 newDataServerChangeItem

DataServerChangeEventHandler 的 doHandle 函數中有使用:

for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) {
    String dataCenter = changeEntry.getKey();
    Set<String> ips = changeEntry.getValue();
    Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
}
           

調用的是dataServerCache的函數,可以看到是取出newDataServerChangeItem的版本号。

public Long getDataCenterNewVersion(String dataCenter) {
    synchronized (DataServerCache.class) {
        Map<String, Long> versionMap = newDataServerChangeItem.getVersionMap();
        if (versionMap.containsKey(dataCenter)) {
            return versionMap.get(dataCenter);
        } else {
            return null;
        }
    }
}
           

建構LocalDataServerChangeEvent時候,則把newDataServerChangeItem的版本作為本地版本号localDataCenterversion。

public LocalDataServerChangeEvent(Map<String, DataNode> localDataServerMap,
                                  Set<String> newJoined, long version,
                                  long localDataCenterversion) {
    this.localDataServerMap = localDataServerMap;
    this.newJoined = newJoined;
    this.version = version;
    this.localDataCenterversion = localDataCenterversion;
}
           

dataServerCache會據此做相關更新。

dataServerCache.updateItem(dataServerMapIn, event.getLocalDataCenterversion(),
    dataServerConfig.getLocalDataCenter());
           

8.2 curVersion

關于curVersion,則來到了 notifyToFetch 和 notifyOnline 後續如何處理。

8.2.1 發送版本号

前面我們隻是講解了如何發送版本号,即:

  • 線上服務節點 會通過 NotifyFetchDatumRequest 告訴新節點,我這裡有你需要的資料,你過來取。
  • 新加入節點 會通過 NotifyOnlineRequest 告訴其他已經線上的節點,我是新的,你可以做相應配置。

是以我們總結可以,在收到 meta server 的 data server change 消息之後,同一個Data Center 之中所有data server 會互相通知彼此更新版本号。

  • notifyOnline 會發送 NotifyOnlineRequest,而其他 Data Server 的 NotifyOnlineHandler 會做相應處理。
  • notifyToFetch 會發送 NotifyFetchDatumRequest,而其他 Data Server 的 notifyFetchDatumHandler 會做相應處理。

8.2.2 接收版本号

下面我們要看看接收版本号之後,DataServer的新節點與線上節點分别做了什麼。

  • notifyFetchDatumHandler----新節點處理

這是一個資料拉取請求,當該 Handler 被觸發時,通知目前 DataServer 節點進行版本号對比,若請求中資料的版本号高于目前節點緩存中的版本号,則會進行資料同步操作,保證資料是最新的。

  • notifyOnlineHandler----線上節點處理

這是一個 DataServer 上線通知請求 Handler,當其他節點上線時,會觸發該 Handler,進而目前節點在緩存中存儲新增的節點資訊。用于管理節點狀态,究竟是 INITIAL 還是 WORKING 。

于是可以看到,在NotifyOnlineHandler和NotifyFetchDatumHandler之中,都會根據本地dataServerCache中存儲的curVersion做判斷是否需要繼續處理。

public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> {

    @Autowired
    private DataServerCache dataServerCache;

    @Override
    public Object doHandle(Channel channel, NotifyOnlineRequest request) {
        long version = request.getVersion();
        if (version >= dataServerCache.getCurVersion()) {
            dataServerCache.addNotWorkingServer(version, request.getIp());
        }
        return CommonResponse.buildSuccessResponse();
    }
}
           

以及 NotifyFetchDatumHandler 之中會調用sycned。

public class NotifyFetchDatumHandler extends AbstractServerHandler<NotifyFetchDatumRequest> {

    private static final Logger         LOGGER = LoggerFactory
                                                   .getLogger(NotifyFetchDatumHandler.class);

    @Autowired
    private DataServerCache             dataServerCache;

    @Autowired
    private DataServerConnectionFactory dataServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter       dataChangeEventCenter;

    @Autowired
    private Exchange                    boltExchange;

    @Autowired
    private DataServerConfig            dataServerConfig;

    @Autowired
    private DatumCache                  datumCache;

    @Autowired
    private LocalDataServerCleanHandler localDataServerCleanHandler;

    @Override
    public Object doHandle(Channel channel, NotifyFetchDatumRequest request) {
        ParaCheckUtil.checkNotBlank(request.getIp(), "ip");

        //receive other data NotifyFetchDatumRequest,must delay clean datum task until fetch all datum
        localDataServerCleanHandler.reset();

        Map<String, Map<String, Long>> versionMap = request.getDataVersionMap();
        long version = request.getChangeVersion();
        String ip = request.getIp();
        if (version >= dataServerCache.getCurVersion()) {
            if (versionMap.isEmpty()) {
                dataServerCache.synced(version, ip);
            } else {
                ExecutorFactory.getCommonExecutor().execute(() -> {
                    for (Entry<String, Map<String, Long>> dataCenterEntry : versionMap.entrySet()) {
                        String dataCenter = dataCenterEntry.getKey();
                        Map<String, Long> map = dataCenterEntry.getValue();
                        for (Entry<String, Long> dataInfoEntry : map.entrySet()) {
                            String dataInfoId = dataInfoEntry.getKey();
                            Datum datum = datumCache.get(dataCenter, dataInfoId);
                            if (datum != null) {
                                long inVersion = dataInfoEntry.getValue();
                                long currentVersion = datum.getVersion();
                                if (currentVersion > inVersion) {
                                    continue;
                                } else if (datum.getVersion() == dataInfoEntry.getValue()) {
                                    //if version same,maybe remove publisher all by LocalDataServerCleanHandler,so must fetch from other node
                                    if (!datum.getPubMap().isEmpty()) {
                                        continue;
                                    }
                                }
                            }
                            fetchDatum(ip, dataCenter, dataInfoId);
                        }
                    }
                    dataServerCache.synced(version, ip);
                });
            }
        } 
        return CommonResponse.buildSuccessResponse();
    }
}
           

于是,目前如下:

+
                                                                |
                                                   Meta Server  |  Data Server
                                                                |
                                                                |
             getNodeChangeResult       +-----------------+      |  +------------------+
          +------------------------->  | NodeChangeResult| +------>+ NodeChangeResult |
          |                            +-----------------+      |  +-------+----------+
          |                                                     |          |                                +--------------------------+
          |                                                     |          |                                |[DataServerCache]         |
 +--------+--------+                                            |          |                                |                          |
 |DataStoreService | +-------------------+                      |          +---------------->compareAndSet+-----> DataServerChangeItem |
 +-----------------+    getVersion       |                      |     DataServerChangeItem                  |                          |
                                         |                      |                                           |     curVersion           |
                                         |                      |                                           |     ^        ^           |
                                         |                      |                                           |     |        |           |
                                         v                      |                                           +-----------------+---+----+
+----------------------+               +-+-----------------+    |                                                 |        |  ^   ^
| DataRepositoryService+-------------> |dataNodeRepository |    |     synced +------------------------------------+        |  |   | getCurVersion
+----------------------+               +-------------------+    |                                                          |  |   |
                setVersion(currentTimeMillis)                   |     addNotWorkingSer^er+---------------------------------+  |   |
                                                                |                 +-------------------------------------------+   |
                                                                |                 | getCurVersion                                 |
                                                                |                 |                                               |
                                                                |   +-------------+---------+               +---------------------+----+
                                                                |   |  NotifyOnlineHandler  |               |  NotifyFetchDatumHandler |
                                                                |   +-------------+---------+               +---------------+----------+
                                                                |                 ^ In Exist Server                         ^  In New Server
                                                                |                 |                                         |
                                                                |                 |                                         |
                                                                +-----------------------------------------------------------------------------+
                                                                                  |                                         |
                                                                                  |                                         |
                                                                          +-------+------------+                  +---------+-----------+
                                                                          |   New Data Server  |                  |   Exist Data Server |
                                                                          +--------------------+                  +---------------------+

           

手機如下:

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

至此,版本号流程就完全梳理完畢。

0xFF 參考

螞蟻金服服務注冊中心如何實作 DataServer 平滑擴縮容

螞蟻金服服務注冊中心 SOFARegistry 解析 | 服務發現優化之路

服務注冊中心 Session 存儲政策 | SOFARegistry 解析

海量資料下的注冊中心 - SOFARegistry 架構介紹

服務注冊中心資料分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通信架構SOFABolt解析之連接配接管理剖析

螞蟻金服開源通信架構SOFABolt解析之逾時控制機制及心跳機制

螞蟻金服開源通信架構 SOFABolt 協定架構解析

螞蟻金服服務注冊中心資料一緻性方案分析 | SOFARegistry 解析

★★★★★★關于生活和技術的思考★★★★★★

微信公衆賬号:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,可以掃描下面二維碼(或者長按識别二維碼)關注個人公衆号)。

[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步[從源碼學設計]螞蟻金服SOFARegistry 之 LocalDataServerChangeEvent及資料同步

繼續閱讀