天天看點

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

Nacos/Sentinel/Seata核心源碼剖析

  • Nacos/Sentinel/Seata核心源碼剖析
    • 1 Nacos源碼剖析
      • 1.1 用戶端工作流程
        • 1.1.1 服務注冊
        • 1.1.2 服務發現
        • 1.1.3 服務下線
        • 1.1.4 服務訂閱
      • 1.2 服務端工作流程
        • 1.2.1 注冊處理
        • 1.2.2 一緻性算法Distro協定介紹
        • 1.2.3 Distro尋址-單機模式
          • 1.2.3.1 單機模式
          • 1.2.3.2 檔案尋址模式
          • 1.2.3.3 伺服器尋址模式
      • 1.2.4 資料同步
        • 1.2.4.1 全量同步
          • 1.2.4.2 增量同步
    • 2 Sentinel Dashboard資料持久化
      • 2.1 動态配置原理
      • 2.2 Sentinel+Nacos資料持久化
        • 2.2.1 Dashboard改造分析
        • 2.2.2 頁面改造
        • 2.2.3 Nacos配置
        • 2.3.4 Dashboard持久化改造
        • 2.3.5 Nacos配置建立
        • 2.3.6 改造源碼
            • 2.3.6.1 改造NacosConfig
          • 2.3.6.2 動态擷取流控規則
          • 2.3.6.3 資料持久化測試
    • 3 Seata事務控制源碼剖析
      • 3.1 全局事務操作判斷
      • 3.2 記錄檔鏡像保
      • 3.3 事務操作
    • 總結
      • Nacous
        • 服務啟動
        • 服務注冊
        • 服務訂閱
        • 服務發現
        • 服務登出
        • 增量資料同步
        • Distro協定:資料一緻性(服務資料并不會持久化:DataStore)
        • Sentinel1.8.0配置持久化->Nacos
      • Seata源碼

Nacos/Sentinel/Seata核心源碼剖析

1 Nacos源碼剖析

Nacos源碼有很多值得我們學習的地方,為了深入了解Nacos,我們剖析源碼,分析如下2個知識點:

1:Nacos對注冊中心的通路原理 
2:Nacos注冊服務處理流程

           

我們接下來對Nacos源碼做一個深度剖析,首先搭建Nacos源碼環境,源碼環境搭建起來比較輕松,幾乎不會報什麼錯誤,我們這裡就不去示範源碼環境搭建了。

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

用戶端與注冊中心服務端的互動,主要集中在服務注冊、服務下線、服務發現、訂閱某個服務,其實使用最多的就是服務注冊和服務發現,下面我會從源碼的角度分析一下這四個功能。

在Nacos源碼中 nacos-example 中 com.alibaba.nacos.example.NamingExample 類分别示範了這4個功能的操作,我們可以把它當做入口,代碼如下

public class NamingExample {

    public static void main(String[] args) throws NacosException {

        Properties properties = new Properties();
        properties.setProperty("serverAddr", System.getProperty("serverAddr"));
        properties.setProperty("namespace", System.getProperty("namespace"));

        NamingService naming = NamingFactory.createNamingService(properties);
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "DEFAULT");
        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");

        System.out.println(naming.getAllInstances("nacos.test.3"));
        naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");

        System.out.println(naming.getAllInstances("nacos.test.3"));
        Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("test-thread");
                        return thread;
                    }
                });

        naming.subscribe("nacos.test.3", new AbstractEventListener() {

            //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
            //So you can override getExecutor() to async handle event.
            @Override
            public Executor getExecutor() {
                return executor;
            }

            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent) event).getServiceName());
                System.out.println(((NamingEvent) event).getInstances());
            }
        });
    }
}
           

1.1 用戶端工作流程

我們先來看一下用戶端是如何實作服務注冊、服務發現、服務下線操作、服務訂閱操作的。

1.1.1 服務注冊

我們沿着案例中的服務注冊方法調用找到 nacos-api 中的NamingService.registerInstance() 并找到它的實作類和方法com.alibaba.nacos.client.naming.NacosNamingService ,代碼如下:

@Override
    public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {
        registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
    }



 @Override
    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException {
        
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setWeight(1.0);
        instance.setClusterName(clusterName);
        
        registerInstance(serviceName, groupName, instance);
    }





@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //該字段表示注冊的執行個體是否是臨時執行個體還是持久化執行個體。 
        // 如果是臨時執行個體,則不會在 Nacos 服務端持久化存儲,需要通過上報心跳的方式進行包活, 
        // 如果一段時間内沒有上報心跳,則會被 Nacos 服務端摘除。 
        if (instance.isEphemeral()) {
        //為注冊服務設定一個定時任務擷取心跳資訊,預設為5s彙報一次
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        //注冊到服務端
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
           

注冊主要做了兩件事,第一件事:為注冊的服務設定一個定時任務,定時拉去服務資訊。 第二件事:将服務注冊到服務端。

1:啟動一個定時任務,定時拉取服務資訊,時間間隔為5s,如果拉下來服務正常,不做處理,如果不正常, 
重新注冊 
2:發送http請求給注冊中心服務端,調用服務注冊接口,注冊服務 

           

上面代碼我們可以看到定時任務添加,但并未完全看到遠端請求, serverProxy.registerService()方法如下,會先封裝請求參數,接下來調用 reqApi() 而 reqApi() 最後會調用 callServer() ,代碼如下:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);
        //封裝Http請求參數
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        //執行Http請求
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
        
    }


  public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
        injectSecurityInfo(params);
        //封裝請求頭部
        Header header = builderHeader();
        //請求是Http還是Https協定
        String url;
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
            url = curServer + api;
        } else {
            if (!IPUtil.containsPort(curServer)) {
                curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
            }
            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        }
        
        try {
        //執行遠端請求,并擷取結果集
            HttpRestResult<String> restResult = nacosRestTemplate
                    .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
            end = System.currentTimeMillis();
            
            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                    .observe(end - start);
            //結果集解析
            if (restResult.ok()) {
                return restResult.getData();
            }
            if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
                return StringUtils.EMPTY;
            }
            throw new NacosException(restResult.getCode(), restResult.getMessage());
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to request", e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }
    }

           

執行遠端Http請求的對象是 NacosRestTemplate ,該對象就是封裝了普通的Http請求,大家可以自己查閱一下。

1.1.2 服務發現

我們沿着案例中的服務發現方法調用找到 nacos-api 中的NamingService.getAllInstances() 并找到它的實作類和方法com.alibaba.nacos.client.naming.NacosNamingService.getAllInstances() ,代碼如下:

@Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
            boolean subscribe) throws NacosException {
        
        ServiceInfo serviceInfo;
        /*預設true->擷取服務執行個體*/
        if (subscribe) {
        //從本地緩存中擷取,如果本地緩存不存在從服務端拉取 
        //本地緩存會存儲在HostReactor.serviceInfoMap中,它是一個Map對象
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                    StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor
                    .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                            StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
           

上面的代碼調用了 hostReactor.getServiceInfo() 方法,該方法會先調用 getServiceInfo0() 方法從本地緩存擷取資料,緩存沒有資料,就建構執行個體更新到Nacos,并從Nacos中擷取最新資料,getServiceInfo0() 方法源碼如下:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        /*1。先從本地緩存中擷取服務對象,因為啟動是第一次進來,是以緩存站不存在*/
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
        if (null == serviceObj) {
        /*建構服務執行個體*/
            serviceObj = new ServiceInfo(serviceName, clusters);
            /*将服務執行個體存放到緩存中*/
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            /*更新nacos-上的服務*/
            updatingMap.put(serviceName, new Object());
            /*主動擷取,并且更新到快取區域,以及已過期的服務更新等*/
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) {
            
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        /*2.開啟定時任務*/
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }
           

updateServiceNow(serviceName, clusters); 主從從遠端伺服器擷取更新資料,最終會調用updateService() 方法,在該方法中完成遠端請求和資料處理,源碼如下:

public void updateService(String serviceName, String clusters) throws NacosException {
  /*擷取本地緩存清單中所存在的服務*/
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            /*擷取服務以及提供者端口資訊,端口等*/
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            
            if (StringUtils.isNotEmpty(result)) {
            /*對結果進行處理*/
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
           

1.1.3 服務下線

我們沿着案例中的服務下線方法調用找到 nacos-api 中的 NamingService.deregisterInstance() 并找到它的實作類和方法 NacosNamingService.deregisterInstance() ,代碼如下:

@Override
    public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException {
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setClusterName(clusterName);
        //服務下線操作
        deregisterInstance(serviceName, groupName, instance);
    }


 @Override
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            //移除心跳資訊監測的定時任務
            beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
                    instance.getPort());
        }
        //發送遠端請求執行服務下線銷毀操作
        serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
    }
           

服務下線方法比較簡單,和服務注冊做的事情正好相反,也做了兩件事,第一件事:不再進行心跳檢測。 第二件事:請求服務端服務下線接口。

1.1.4 服務訂閱

我們可以檢視訂閱服務的案例,會先建立一個線程池,接下來會把線程池封裝到監聽器中,而監聽器中可以監聽指定執行個體資訊,代碼如下:

Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("test-thread");
                        return thread;
                    }
                });

        naming.subscribe("nacos.test.3", new AbstractEventListener() {

            //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
            //So you can override getExecutor() to async handle event.
            @Override
            public Executor getExecutor() {
                return executor;
            }
           //讀取監聽到的服務執行個體
            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent) event).getServiceName());
                System.out.println(((NamingEvent) event).getInstances());
            }
        });
           

我們沿着案例中的服務訂閱方法調用找到 nacos-api 中的NamingService.subscribe() 并找到它的實作類和方法NacosNamingService.deregisterInstance() ,代碼如下:

public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        //注冊監聽
        notifier.registerListener(serviceName, clusters, eventListener);
        //擷取并更新服務執行個體
        getServiceInfo(serviceName, clusters);
    }
           

此時會注冊監聽,注冊監聽,注冊監聽就是将目前的監聽對象資訊注入到listenerMap集合中,在監聽對象的指定方法onEvent中可以讀取執行個體資訊,代碼如下:

public void registerListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (eventListeners == null) {
            synchronized (lock) {
                eventListeners = listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = new ConcurrentHashSet<EventListener>();
                    listenerMap.put(key, eventListeners);
                }
            }
        }
        //将目前監聽對象放入到集合中,在監聽對象的onEvent中可以讀出對應的執行個體對象
        eventListeners.add(listener);
    }
           

1.2 服務端工作流程

注冊中心服務端的主要功能包括,接收用戶端的服務注冊,服務發現,服務下線的功能,但是除了這些和用戶端的互動之外,服務端還要做一些更重要的事情,就是我們常常會在分布式系統中聽到的AP和CP,作為一個叢集,nacos即實作了AP也實作了CP,其中AP使用的自己實作的Distro協定,而CP是采用raft協定實作的,這個過程中牽涉到心跳、選主等操作。

我們來學習一下注冊中心服務端接收用戶端服務注冊的功能。

1.2.1 注冊處理

我們先來學習一下Nacos的工具類 WebUtils ,該工具類在 nacos-core 工程下,該工具類是用于處理請求參數轉化的,裡面提供了2個常被用到的方法 required() 和 optional() :

required方法通過參數名key,解析HttpServletRequest請求中的參數,并轉碼為UTF-8編碼。 
optional方法在required方法的基礎上增加了預設值,如果擷取不到,則傳回預設值。 
           

代碼如下:

public class WebUtils {
    
    /**
     * get target value from parameterMap, if not found will throw {@link IllegalArgumentException}.
     * required方法通過參數名key,解析HttpServletRequest請求中的參數,并轉碼為UTF-8編碼。
     * @param req {@link HttpServletRequest}
     * @param key key
     * @return value
     */
    public static String required(final HttpServletRequest req, final String key) {
        String value = req.getParameter(key);
        if (StringUtils.isEmpty(value)) {
            throw new IllegalArgumentException("Param '" + key + "' is required.");
        }
        String encoding = req.getParameter("encoding");
        return resolveValue(value, encoding);
    }
    
    /**
     * get target value from parameterMap, if not found will return default value.
     * optional方法在required方法的基礎上增加了預設值,如果擷取不到,則傳回預設值。
     * @param req          {@link HttpServletRequest}
     * @param key          key
     * @param defaultValue default value
     * @return value
     */
    public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
        if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
            return defaultValue;
        }
        String value = req.getParameter(key);
        if (StringUtils.isBlank(value)) {
            return defaultValue;
        }
        String encoding = req.getParameter("encoding");
        return resolveValue(value, encoding);
    }
    
           

nacos server-client 使用了 http 協定來互動,那麼在 server 端必定提供了 http 接口的入口,并且在 core 子產品看到其依賴了 spring boot starter ,是以它的http接口由內建了Spring的web伺服器支援,簡單地說就是像我們平時寫的業務服務一樣,有controller層和service層。

以OpenAPI作為入口來學習,我們找到 /nacos/v1/ns/instance 服務注冊接口,在 nacos-naming 工程中我們可以看到 InstanceController 正是我們要找的對象,如下圖:

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

處理服務注冊,我們直接找對應的POST方法即可,代碼如下:

@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //擷取namespaceid,該參數是可選參數
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
                //擷取服務名字
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //校驗服務的名字,服務的名字格式為[email protected]@serviceName
        NamingUtils.checkServiceNameFormat(serviceName);
        //建立執行個體
        final Instance instance = parseInstance(request);
        //注冊服務
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
    
           

如上圖,該方法主要用于接收用戶端注冊資訊,并且會校驗參數是否存在問題,如果不存在問題就建立服務的執行個體,服務執行個體建立後将服務執行個體注冊到Nacos中,注冊的方法如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //判斷本地緩存中是否存在該命名空間,如果不存在就建立,之後判斷該命名空間下是否 
//存在該服務,如果不存在就建立空的服務 
//如果執行個體為空,則建立執行個體,并且會将建立的執行個體存入到serviceMap集合中

        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //從serviceMap集合中擷取建立的執行個體
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //服務注冊,這一步才會把服務的執行個體資訊和服務綁定起來
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
           

注冊的方法中會先建立該執行個體對象,建立前先檢查本地緩存是否存在該執行個體對象,如果不存在就建立,最後注冊該服務,并且該服務會和執行個體資訊捆綁到一起,并将資訊同步到磁盤,資料同步到磁盤就涉及到資料一緻性了,我們接下來講解Nacos的資料一緻性。

1.2.2 一緻性算法Distro協定介紹

Distro是阿裡巴巴的私有協定,目前流行的Nacos服務管理架構就采用了Distro協定。Distro 協定被定位為 臨時資料的一緻性協定:該類型協定, 不需要把資料存儲到磁盤或者資料庫,因為臨時資料通常和伺服器保持一個session會話, 該會話隻要存在,資料就不會丢失 。

Distro 協定保證寫必須永遠是成功的,即使可能會發生網絡分區。當網絡恢複時,把各資料分片的資料進行合并。

Distro 協定具有以下特點:

1:專門為了注冊中心而創造出的協定; 
2:用戶端與服務端有兩個重要的互動,服務注冊與心跳發送; 
3:用戶端以服務為次元向服務端注冊,注冊後每隔一段時間向服務端發送一次心跳,心跳包需要帶上注冊服務的全部資訊,在用戶端看來,服務端節點對等,是以請求的節點是随機的; 
4:用戶端請求失敗則換一個節點重新發送請求; 
5:服務端節點都存儲所有資料,但每個節點隻負責其中一部分服務,在接收到用戶端的“寫”(注冊、心跳、下線等)請求後,服務端節點判斷請求的服務是否為自己負責,如果是,則處理,否則交由負責的節點處理; 
6:每個服務端節點主動發送健康檢查到其他節點,響應的節點被該節點視為健康節點; 
7:服務端在接收到用戶端的服務心跳後,如果該服務不存在,則将該心跳請求當做注冊請求來處理; 
8:服務端如果長時間未收到用戶端心跳,則下線該服務; 
9:負責的節點在接收到服務注冊、服務心跳等寫請求後将資料寫入後即傳回,背景異步地将資料同步給其他節點; 
10:節點在收到讀請求後直接從本機擷取後傳回,無論資料是否為最新。
           

1.2.3 Distro尋址-單機模式

Distro協定服務端節點發現使用尋址機制來實作服務端節點的管理。在Nacos中,尋址模式有三種:

單機模式(StandaloneMemberLookup) 
檔案模式(FileConfigMemberLookup) 
伺服器模式(AddressServerMemberLookup)
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
1.2.3.1 單機模式

在 com.alibaba.nacos.core.cluster.lookup.LookupFactory 中有建立尋址方式,可以建立叢集啟動方式、單機啟動方式,不同啟動方式就決定了不同尋址模式,如果是叢集啟動,

public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
        //叢集方式啟動
        if (!EnvUtil.getStandaloneMode()) {
            String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
            //由參數中傳入的尋址方式得到LookupType對象
            LookupType type = chooseLookup(lookupType);
            //選擇尋址方式
            LOOK_UP = find(type);
            //設定目前尋址方式
            currentLookupType = type;
        } else {
            //單機啟動
            LOOK_UP = new StandaloneMemberLookup();
        }
        LOOK_UP.injectMemberManager(memberManager);
        Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
        return LOOK_UP;
    }



     private static MemberLookup find(LookupType type) {
        //檔案尋址模式,也就是配置cluster.conf配置檔案将多個節點串聯起來, 
        // 通過配置檔案尋找其他節點,以達到和其他節點通信的目的  
        if (LookupType.FILE_CONFIG.equals(type)) {
            LOOK_UP = new FileConfigMemberLookup();
            return LOOK_UP;
        }
        //伺服器模式
        if (LookupType.ADDRESS_SERVER.equals(type)) {
            LOOK_UP = new AddressServerMemberLookup();
            return LOOK_UP;
        }
        // unpossible to run here
        throw new IllegalArgumentException();
    }
           

單節點尋址模式會直接建立 StandaloneMemberLookup 對象,而檔案尋址模式會建立FileConfigMemberLookup 對象,伺服器尋址模式會建立 AddressServerMemberLookup ;

1.2.3.2 檔案尋址模式
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

檔案尋址模式主要在建立叢集的時候,通過 cluster.conf 來配置叢集,程式可以通過監聽cluster.conf 檔案變化實作動态管理節點, FileConfigMemberLookup 源碼如下

public class FileConfigMemberLookup extends AbstractMemberLookup {
    //建立檔案監聽器
    private FileWatcher watcher = new FileWatcher() {
    //檔案發生變更事件
        @Override
        public void onChange(FileChangeEvent event) {
            readClusterConfFromDisk();
        }
        //檢查context是否包含cluster.conf
        @Override
        public boolean interest(String context) {
            return StringUtils.contains(context, "cluster.conf");
        }
    };
    
    @Override
    public void start() throws NacosException {
        if (start.compareAndSet(false, true)) {
            readClusterConfFromDisk();
            
            // Use the inotify mechanism to monitor file changes and automatically
            // trigger the reading of cluster.conf
            // 使用inotify機制來監視檔案更改,并自動觸發對cluster.conf的讀取
            try {
                WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
            } catch (Throwable e) {
                Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
            }
        }
    }
    
    @Override
    public void destroy() throws NacosException {
        WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
    }
    
    private void readClusterConfFromDisk() {
        Collection<Member> tmpMembers = new ArrayList<>();
        try {
            List<String> tmp = EnvUtil.readClusterConf();
            tmpMembers = MemberUtil.readServerConf(tmp);
        } catch (Throwable e) {
            Loggers.CLUSTER
                    .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
        }
        
        afterLookup(tmpMembers);
    }
}
           
1.2.3.3 伺服器尋址模式

使用位址伺服器存儲節點資訊,會建立 AddressServerMemberLookup ,服務端定時拉取資訊進行管理;

public class AddressServerMemberLookup extends AbstractMemberLookup {
    
    private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
    };
    
    public String domainName;
    
    public String addressPort;
    
    public String addressUrl;
    
    public String envIdUrl;
    
    public String addressServerUrl;
    
    private volatile boolean isAddressServerHealth = true;
    
    private int addressServerFailCount = 0;
    
    private int maxFailCount = 12;
    
    private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);
    
    private volatile boolean shutdown = false;
    
    @Override
    public void start() throws NacosException {
        if (start.compareAndSet(false, true)) {
            this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
            initAddressSys();
            run();
        }
    }
    
    private void initAddressSys() {
        String envDomainName = System.getenv("address_server_domain");
        if (StringUtils.isBlank(envDomainName)) {
            domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");
        } else {
            domainName = envDomainName;
        }
        String envAddressPort = System.getenv("address_server_port");
        if (StringUtils.isBlank(envAddressPort)) {
            addressPort = EnvUtil.getProperty("address.server.port", "8080");
        } else {
            addressPort = envAddressPort;
        }
        String envAddressUrl = System.getenv("address_server_url");
        if (StringUtils.isBlank(envAddressUrl)) {
            addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");
        } else {
            addressUrl = envAddressUrl;
        }
        addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
        envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
        
        Loggers.CORE.info("ServerListService address-server port:" + addressPort);
        Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
    }
    
    @SuppressWarnings("PMD.UndefineMagicConstantRule")
    private void run() throws NacosException {
        // With the address server, you need to perform a synchronous member node pull at startup
        // Repeat three times, successfully jump out
        boolean success = false;
        Throwable ex = null;
        int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
        for (int i = 0; i < maxRetry; i++) {
            try {
            //拉取叢集節點資訊
                syncFromAddressUrl();
                success = true;
                break;
            } catch (Throwable e) {
                ex = e;
                Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
            }
        }
        if (!success) {
            throw new NacosException(NacosException.SERVER_ERROR, ex);
        }
        
        GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
    }
    
    @Override
    public void destroy() throws NacosException {
        shutdown = true;
    }
    
    @Override
    public Map<String, Object> info() {
        Map<String, Object> info = new HashMap<>(4);
        info.put("addressServerHealth", isAddressServerHealth);
        info.put("addressServerUrl", addressServerUrl);
        info.put("envIdUrl", envIdUrl);
        info.put("addressServerFailCount", addressServerFailCount);
        return info;
    }
    
    private void syncFromAddressUrl() throws Exception {
        RestResult<String> result = restTemplate
                .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
        if (result.ok()) {
            isAddressServerHealth = true;
            Reader reader = new StringReader(result.getData());
            try {
                afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
            } catch (Throwable e) {
                Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
                        ExceptionUtil.getAllExceptionMsg(e));
            }
            addressServerFailCount = 0;
        } else {
            addressServerFailCount++;
            if (addressServerFailCount >= maxFailCount) {
                isAddressServerHealth = false;
            }
            Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
        }
    }
    // 定時任務
    class AddressServerSyncTask implements Runnable {
        
        @Override
        public void run() {
            if (shutdown) {
                return;
            }
            try {
            //拉取服務清單
                syncFromAddressUrl();
            } catch (Throwable ex) {
                addressServerFailCount++;
                if (addressServerFailCount >= maxFailCount) {
                    isAddressServerHealth = false;
                }
                Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
            } finally {
                GlobalExecutor.scheduleByCommon(this, 5_000L);
            }
        }
    }
}

           

1.2.4 資料同步

Nacos資料同步分為全量同步和增量同步,所謂全量同步就是初始化資料一次性同步,而增量同步是指有資料增加的時候,隻同步增加的資料。

1.2.4.1 全量同步

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

全量同步流程比較複雜,流程如上圖:

1:啟動一個定時任務線程DistroLoadDataTask加載資料,調用load()方法加載資料 
2:調用loadAllDataSnapshotFromRemote()方法從遠端機器同步所有的資料 
3:從namingProxy代理擷取所有的資料data 
4:構造http請求,調用httpGet方法從指定的server擷取資料 
5:從擷取的結果result中擷取資料bytes 
6:處理資料processData 
7:從data反序列化出datumMap 
8:把資料存儲到dataStore,也就是本地緩存dataMap 
9:監聽器不包括key,就建立一個空的service,并且綁定監聽器
10:監聽器listener執行成功後,就更新data store 
           

任務啟動

在 com.alibaba.nacos.core.distributed.distro.DistroProtocol 的構造函數中調用startDistroTask() 方法,該方法會執行 startVerifyTask() 和 startLoadTask() ,我們重點關注startLoadTask() ,該方法代碼如下:

private void startDistroTask() {
        if (EnvUtil.getStandaloneMode()) {
            isInitialized = true;
            return;
        }
        //啟動startVerifyTask,做資料同步校驗
        startVerifyTask();
        //啟動DistroLoadDataTask,批量加載資料
        startLoadTask();
    }
    //啟動DistroLoadDataTask
    private void startLoadTask() {
    //處理狀态回調對象
        DistroCallback loadCallback = new DistroCallback() {
        //處理成功
            @Override
            public void onSuccess() {
                isInitialized = true;
            }
            //處理失敗
            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        //執行DistroLoadDataTask,是一個多線程
        GlobalExecutor.submitLoadDataTask(
                new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
    }
    //資料校驗
    private void startVerifyTask() {
        GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
                distroConfig.getVerifyIntervalMillis());
    }
           

資料如何執行加載

上面方法會調用 DistroLoadDataTask 對象,而該對象其實是個線程,是以會執行它的run方法,run方法會調用load()方法實作資料全量加載,代碼如下:

public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroConfig distroConfig, DistroCallback loadCallback) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroConfig = distroConfig;
        this.loadCallback = loadCallback;
        loadCompletedMap = new HashMap<>(1);
    }
    
    @Override
    public void run() {
        try {
        //加載資料
            load();
            if (!checkCompleted()) {
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
        } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
        }
    }
    //加載資料,并同步
    private void load() throws Exception {
        while (memberManager.allMembersWithoutSelf().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
            TimeUnit.SECONDS.sleep(1);
        }
        while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
            TimeUnit.SECONDS.sleep(1);
        }
        //同步資料
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            //從遠端機器上同步所有資料
                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
            }
        }
    }
    
           

資料同步

資料同步會通過Http請求從遠端伺服器擷取資料,并同步到目前服務的緩存中,執行流程如下:

1:loadAllDataSnapshotFromRemote()從遠端加載所有資料,并處理同步到本機 
2:transportAgent.getDatumSnapshot()遠端加載資料,通過Http請求執行遠端加載 
3:dataProcessor.processSnapshot()處理資料同步到本地 
           

資料處理完整邏輯代碼如下: loadAllDataSnapshotFromRemote() 方法

private boolean loadAllDataSnapshotFromRemote(String resourceType) {
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == transportAgent || null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                    resourceType, transportAgent, dataProcessor);
            return false;
        }
        //周遊叢集成員節點,不包括自己
        for (Member each : memberManager.allMembersWithoutSelf()) {
            try {
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
                //從遠端節點加載資料,調用http請求接口: distro/datums;
                DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
                //處理資料
                boolean result = dataProcessor.processSnapshot(distroData);
                Loggers.DISTRO
                        .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                                result);
                if (result) {
                    return true;
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
            }
        }
        return false;
    }
    
           

遠端加載資料代碼如下: transportAgent.getDatumSnapshot() 方法

//從namingProxy代理擷取所有的資料data,從擷取的結果result中擷取資料bytes;
 @Override
    public DistroData getDatumSnapshot(String targetServer) {
        try {
        //從namingProxy代理擷取所有的資料data,從擷取的結果result中擷取資料bytes;
            byte[] allDatum = NamingProxy.getAllData(targetServer);
            //将資料封裝成DistroData
            return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
        } catch (Exception e) {
            throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
        }
    }



/**
* Get all datum from target server. 
* NamingProxy.getAllData 
* 執行HttpGet請求,并擷取傳回資料 
* @param server target server address 
* @return all datum byte array 
* @throws Exception exception 
*/

  public static byte[] getAllData(String server) throws Exception {
        //參數封裝
        Map<String, String> params = new HashMap<>(8);
        //組裝URL,并執行HttpGet請求,擷取結果集
        RestResult<String> result = HttpClient.httpGet(
                "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
                new ArrayList<>(), params);
        //傳回資料
        if (result.ok()) {
            return result.getData().getBytes();
        }
        
        throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "
                + result.getMessage());
    }
           

處理資料同步到本地代碼如下: dataProcessor.processSnapshot()

public boolean processSnapshot(DistroData distroData) {
        try {
            return processData(distroData.getContent());
        } catch (Exception e) {
            return false;
        }
    }



private boolean processData(byte[] data) throws Exception {
//從data反序列化出datumMap
        if (data.length > 0) {
            Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
            // 把資料存儲到dataStore,也就是本地緩存dataMap
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                dataStore.put(entry.getKey(), entry.getValue());
                //監聽器不包括key,就建立一個空的service,并且綁定監聽器
                if (!listeners.containsKey(entry.getKey())) {
                    // pretty sure the service not exist:
                    if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
                        //建立一個空的service
                        Loggers.DISTRO.info("creating service {}", entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        service.setName(serviceName);
                        service.setNamespaceId(namespaceId);
                        service.setGroupName(Constants.DEFAULT_GROUP);
                        // now validate the service. if failed, exception will be thrown
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        service.recalculateChecksum();
                        
                        // The Listener corresponding to the key value must not be empty
                        // 與鍵值對應的監聽器不能為空,這裡的監聽器類型是 ServiceManager
                        RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
                        if (Objects.isNull(listener)) {
                            return false;
                        }
                        //為空的綁定監聽器
                        listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                    }
                }
            }
            //循環所有datumMap
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                
                if (!listeners.containsKey(entry.getKey())) {
                    // Should not happen:
                    Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                    continue;
                }
                
                try {
                //執行監聽器的onChange監聽方法
                    for (RecordListener listener : listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(), entry.getValue().value);
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                    continue;
                }
                
                // Update data store if listener executed successfully:
                // 監聽器listener執行成功後,就更新dataStore
                dataStore.put(entry.getKey(), entry.getValue());
            }
        }
        return true;
    }
           

到此實作資料全量同步,其實全量同步最終封裝的協定還是Http。

1.2.4.2 增量同步

新增資料使用異步廣播同步:

1:DistroProtocol 使用 sync() 方法接收增量資料 
2:向其他節點釋出廣播任務 調用 distroTaskEngineHolder 釋出延遲任務 
3:調用 DistroDelayTaskProcessor.process() 方法進行任務投遞:将延遲任務轉換為異步變更任務
4:執行變更任務 DistroSyncChangeTask.run() 方法:向指定節點發送消息 
  調用 DistroHttpAgent.syncData() 方法發送資料 
  調用 NamingProxy.syncData() 方法發送資料 
5:異常任務調用 handleFailedTask() 方法進行處理 
  調用 DistroFailedTaskHandler 處理失敗任務 
  調用 DistroHttpCombinedKeyTaskFailedHandler 将失敗任務重新投遞成延遲任務。
           

增量資料入口

我們回到服務注冊,服務注冊的 InstanceController.register() 就是資料入口,它會調用

ServiceManager.registerInstance() ,執行資料同步的時候,調用 addInstance() ,在該方法中會執行 DistroConsistencyServiceImpl.put() ,該方法是增量同步的入口,會調用

distroProtocol.sync() 方法,代碼如下:

@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }




public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
    



 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            
            consistencyService.put(key, instances);
        }
    }
           

put

@Override
    public void put(String key, Record value) throws NacosException {
    //将資料存入到dataStore中
        onPut(key, value);
        //使用distroProtocol同步資料
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }
           

sync() 方法會執行任務釋出,代碼如下:

public void sync(DistroKey distroKey, DataOperation action, long delay) {
 //向除了自己外的所有節點廣播
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            //從distroTaskEngineHolder擷取延時執行引擎,并将distroDelayTask任務添加進來 
            //執行延時任務釋出
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }
           

增量同步操作

延遲任務對象我們可以從 DistroTaskEngineHolder 構造函數中得知是DistroDelayTaskProcessor ,代碼如下:

@Component
public class DistroProtocol {
    
    private final ServerMemberManager memberManager;
    
    private final DistroComponentHolder distroComponentHolder;
    
    private final DistroTaskEngineHolder distroTaskEngineHolder;
    
    private final DistroConfig distroConfig;
    
    private volatile boolean isInitialized = false;
    
    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        this.distroConfig = distroConfig;
        startDistroTask();
    }


//構造函數指定任務處理器
  public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        //指定任務處理器defaultDelayTaskProcessor
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }
           

它延遲執行的時候會執行 process 方法,該方法正是執行資料同步的地方,它會執行

DistroSyncChangeTask任務,代碼如下:

//任務處理過程
   @Override
    public boolean process(NacosTask task) {
        if (!(task instanceof DistroDelayTask)) {
            return true;
        }
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
        //将延遲任務變更成異步任務,異步任務對象是一個線程
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            //将任務添加到NacosExecuteTaskExecuteEngine中,并執行
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        }
        return false;
    }
           

DistroSyncChangeTask 實質上是任務的開始,它自身是一個線程,是以會執行它的run方法,而run方法這是資料同步操作,代碼如下:

@Override
    public void run() {
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
        try {
        //擷取本地緩存資料
            String type = getDistroKey().getResourceType();
            DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
            distroData.setType(DataOperation.CHANGE);
            //向其他節點同步資料
            boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
            if (!result) {
                handleFailedTask();
            }
            Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
        } catch (Exception e) {
            Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
            handleFailedTask();
        }
    }
           

資料同步會執行調用 syncData ,該方法其實就是通過Http協定将資料發送到其他節點實作資料同步,代碼如下:

@Override
    public boolean syncData(DistroData data, String targetServer) {
        if (!memberManager.hasMember(targetServer)) {
            return true;
        }
        //擷取資料位元組數組
        byte[] dataContent = data.getContent();
        //通過Http協定同步資料
        return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
    }
    
           

2 Sentinel Dashboard資料持久化

Sentinel 的理念是開發者隻需要關注資源的定義,當資源定義成功後可以動态增加各種流控降級規則。 Sentinel 提供兩種方式修改規則:

  • 通過 API 直接修改 ( loadRules )
  • 通過 DataSource 适配不同資料源修改

手動通過 API 修改比較直覺,可以通過以下幾個 API 修改不同的規則:

FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控規則 
DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降級規則
           

手動修改規則(寫死方式)一般僅用于測試和示範,生産上一般通過動态規則源的方式來動态管理規則。

2.1 動态配置原理

loadRules() 方法隻接受記憶體态的規則對象,但更多時候規則存儲在檔案、資料庫或者配置中心當中。 DataSource 接口給我們提供了對接任意配置源的能力。相比直接通過 API 修改規則,實作DataSource 接口是更加可靠的做法。

我們推薦通過控制台設定規則後将規則推送到統一的規則中心,用戶端實作 ReadableDataSource 接口端監聽規則中心實時擷取變更,流程如下:

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

DataSource 擴充常見的實作方式有:

拉模式:用戶端主動向某個規則管理中心定期輪詢拉取規則,這個規則中心可以是 RDBMS、檔案,甚至是 VCS 等。這樣做的方式是簡單,缺點是無法及時擷取變更;

推模式:規則中心統一推送,用戶端通過注冊監聽器的方式時刻監聽變化,比如使用 Nacos、 Zookeeper 等配置中心。這種方式有更好的實時性和一緻性保證。

Sentinel 目前支援以下資料源擴充:

Pull-based: 動态檔案資料源、Consul, Eureka

Push-based: ZooKeeper, Redis, Nacos, Apollo, etcd

2.2 Sentinel+Nacos資料持久化

我們要想實作Sentinel+Nacos資料持久化,需要下載下傳Sentinel控制台源碼,關于源碼下載下傳我們這裡就不再重複了。

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

在Sentinel Dashboard中配置規則之後重新開機應用就會丢失,是以實際生産環境中需要配置規則的持久化實作,Sentinel提供多種不同的資料源來持久化規則配置,包括file,redis、nacos、zk。

這就需要涉及到Sentinel Dashboard的規則管理及推送功能:集中管理和推送規則。 sentinel-core提供 API 和擴充接口來接收資訊。開發者需要根據自己的環境,選取一個可靠的推送規則方式;同時,規則最好在控制台中集中管理。

我們采用Push模式,即Sentinel-Dashboard統一管理配置,然後将規則統一推送到Nacos并持久化(生成配置檔案),最後用戶端監聽Nacos(這一部了解使用過Nacos的話應該很熟,采用ConfigService.getConfg()方法擷取配置檔案),下發配置生成Rule。

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

這張圖的意思我們解釋說明一下:

1:Sentinel Dashboard界面配置流控規則---釋出/推送--->Nacos生成配置檔案并持久化; 
2:通過Nacos配置檔案修改流控規則---拉取--->Sentinel Dashboard界面顯示最新的流控規則。
           

在Nacos控制台上修改流控制,雖然可以同步到Sentinel Dashboard,但是Nacos此時應該作為一個流控規則的持久化平台,是以正常操作過程應該是開發者在Sentinel Dashboard上修改流控規則後同步到Nacos,遺憾的是目前Sentinel Dashboard不支援該功能。

如果公司沒有統一在Sentinel Dashboard或Nacos中二選一進行配置,而是一會在Sentinel Dashboard配置,一會在Nacos配置。那麼就會出現很嚴重的問題(流控規則達不到預期,配置資料不一緻),是以推薦使用Sentinel Dashboard統一界面進行配置管理流控規則。

我們接下來基于Sentinel1.8.1開始改造Sentinel Dashboard,使他能結合Nacos實作資料持久化。

2.2.1 Dashboard改造分析

Sentinel Dashboard的流控規則下的所有操作,都會調用Sentinel-Dashboard源碼中的

FlowControllerV1類,這個類中包含流控規則本地化的CRUD操作;

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

在com.alibaba.csp.sentinel.dashboard.controller.v2包下存在一個FlowControllerV2;類,這個類同樣提供流控規則的CURD,與V1不同的是,它可以實作指定資料源的規則拉取和釋出。

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

上面代碼就是 FlowControllerV2 部分代碼,分别實作了拉取規則和推送規則:

1:DynamicRuleProvider:動态規則的拉取,從指定資料源中擷取控制後在Sentinel Dashboard中展示。
2:DynamicRulePublisher:動态規則釋出,将在Sentinel Dashboard中修改的規則同步到指定資料源中。 
           

我們隻需要擴充這兩個類,然後內建Nacos來實作Sentinel Dashboard規則同步。

2.2.2 頁面改造

在目錄resources/app/scripts/directives/sidebar找到sidebar.html,裡面有關于V1版本的請求入口:

<li ui-sref-active="active" ng-if="!entry.isGateway">
            <a ui-sref="dashboard.flowV1({app: entry.app})">
              <i class="glyphicon glyphicon-filter"></i>&nbsp;&nbsp;流控規則</a>
          </li>

           

對應的JS(app.js)請求如下,可以看到請求就是V1版本的Controller,那麼之後的改造需要重新對應V2版本的Controller

.state('dashboard.flowV1', {
        templateUrl: 'app/views/flow_v1.html',
        url: '/flow/:app',
        controller: 'FlowControllerV1',
        resolve: {
          loadMyFiles: ['$ocLazyLoad', function ($ocLazyLoad) {
            return $ocLazyLoad.load({
              name: 'sentinelDashboardApp',
              files: [
                'app/scripts/controllers/flow_v1.js',
              ]
            });
          }]
        }
      })
           

2.2.3 Nacos配置

在源碼中雖然官方提供了test示例(即test目錄)下關于Nacos等持久化示例,但是具體的實作還需要一些細節,比如在Sentinel Dashboard配置Nacos的serverAddr、namespace、groupId,并且通過Nacos擷取配置檔案擷取服務清單等。

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

我們可以打開 NacosConfig 源碼, NacosConfig 中ConfigFactory.createConfigService(“localhost”) 并沒有實作建立具體的 nacos config service ,而是預設 localhost ,application.properties檔案中也沒有Nacos的相關配置,這些都需要我們額外配置, NacosConfig 代碼如下:

@EnableConfigurationProperties(NacosPropertiesConfiguration.class)
@Configuration
public class NacosConfig {

    @Bean
    public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    @Bean
    public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
        return s -> JSON.parseArray(s, FlowRuleEntity.class);
    }

    @Bean
    public ConfigService nacosConfigService(NacosPropertiesConfiguration nacosPropertiesConfiguration) throws Exception {
        return ConfigFactory.createConfigService(properties);
    }
}
           

如果我們需要把資料存儲到Nacos,在 NacosConfigutils 已經指定了預設的流控規則配置檔案的groupId 等,如果需要指定的話這裡也需要修改

public final class NacosConfigUtil {

    //Nacos中對應的GroupID
    public static final String GROUP_ID = "SENTINEL_GROUP";

    //檔案後半部分
    public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules";
    public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-rules";
    public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map";

    /**
     * cc for `cluster-client`
     */
    public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config";
    /**
     * cs for `cluster-server`
     */
    public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config";
    public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config";
    public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set";

    private NacosConfigUtil() {}
}
           

.我們在 application.properties 中配置Nacos:

spring.cloud.sentinel.datasource.flow.nacos.server-addr=nacos:8848 
spring.cloud.sentinel.datasource.flow.nacos.data-id=${spring.application.name}-flow-rules 
spring.cloud.sentinel.datasource.flow.nacos.group-id=SENTINEL_GROUP 
spring.cloud.sentinel.datasource.flow.nacos.data-type=json 
spring.cloud.sentinel.datasource.flow.nacos.rule-type=flow 
           

2.3.4 Dashboard持久化改造

我們接下來開始改造Dashboard源碼,官方提供的Nacos持久化用例都是在test目錄下,是以scope需要去除test,需要sentinel-datasource-nacos包的支援。之後将修改好的源碼放在源碼主目錄下,而不是繼續在test目錄下。

<dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
            <!--<scope>test</scope>-->
        </dependency>
           

找到resources/app/scripts/directives/sidebar/sidebar.html檔案修改,修改flowV1為flow,去掉V1,這樣的話會調用FlowControllerV2接口

修改前:

<li ui-sref-active="active" ng-if="!entry.isGateway">
            <a ui-sref="dashboard.flowV1({app: entry.app})">
              <i class="glyphicon glyphicon-filter"></i>&nbsp;&nbsp;流控規則</a>
          </li>
           

修改後:

<li ui-sref-active="active" ng-if="!entry.isGateway">
            <a ui-sref="dashboard.flow({app: entry.app})">
              <i class="glyphicon glyphicon-filter"></i>&nbsp;&nbsp;流控規則 V1</a>
          </li>
           

這樣就可以通過js跳轉至FlowControllerV2了,app.js代碼如下:

.state('dashboard.flow', {
          templateUrl: 'app/views/flow_v2.html',
          url: '/v2/flow/:app',
          controller: 'FlowControllerV2',
          resolve: {
              loadMyFiles: ['$ocLazyLoad', function ($ocLazyLoad) {
                  return $ocLazyLoad.load({
                      name: 'sentinelDashboardApp',
                      files: [
                          'app/scripts/controllers/flow_v2.js',
                      ]
                  });
              }]
          }
      })
           

2.3.5 Nacos配置建立

我們采用官方的限制,即 預設 Nacos 适配的 dataId 和 groupId 約定,是以不需要修改

NacosConfigUtil.java了,配置如下:

groupId: SENTINEL_GROUP 
流控規則 dataId: {appName}-flow-rules,比如應用名為 appA,則 dataId 為 appA-flow-rules 
           

我們在 application.properties 中配置Nacos服務資訊:

# nacos config server 
sentinel.nacos.serverAddr=nacos:8848 
sentinel.nacos.namespace= 
sentinel.nacos.group-id=SENTINEL_GROUP
           

接下來建立讀取 nacos 配置的 NacosPropertiesConfiguration 檔案并且 application.properties指定配置

ConfigurationProperties(prefix = "sentinel.nacos")
public class NacosPropertiesConfiguration {
    private String serverAddr;
    private String dataId;
    private String groupId = "SENTINEL_GROUP"; // 預設分組
    private String namespace;

    public String getServerAddr() {
        return serverAddr;
    }

    public void setServerAddr(String serverAddr) {
        this.serverAddr = serverAddr;
    }

    public String getDataId() {
        return dataId;
    }

    public void setDataId(String dataId) {
        this.dataId = dataId;
    }

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getNamespace() {
        return namespace;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }
}
           

2.3.6 改造源碼

2.3.6.1 改造NacosConfig

我們最後改造NacosConfig,讓NacosConfig做兩件事:

1) 注入Convert轉換器,将FlowRuleEntity轉化成FlowRule,以及反向轉化

2) 注入Nacos配置服務ConfigService

@EnableConfigurationProperties(NacosPropertiesConfiguration.class)
@Configuration
public class NacosConfig {

    @Bean
    public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    @Bean
    public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
        return s -> JSON.parseArray(s, FlowRuleEntity.class);
    }

    @Bean
    public ConfigService nacosConfigService(NacosPropertiesConfiguration nacosPropertiesConfiguration) throws Exception {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, nacosPropertiesConfiguration.getServerAddr());
        properties.put(PropertyKeyConst.NAMESPACE, nacosPropertiesConfiguration.getNamespace());
        return ConfigFactory.createConfigService(properties);
//        return ConfigFactory.createConfigService("localhost");
    }
}
           
2.3.6.2 動态擷取流控規則

動态實作從Nacos配置中心擷取流控規則需要重寫FlowRuleNacosProvider與FlowRuleNacosPublisher類。

1)重寫FlowRuleNacosProvider類

//1)通過ConfigService的getConfig()方法從Nacos Config Server讀取指定配置資訊 
//2)通過轉為converter轉化為FlowRule規則

@Component("flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    @Autowired
    private ConfigService configService;
    @Autowired
    private Converter<String, List<FlowRuleEntity>> converter;

    @Override
    public List<FlowRuleEntity> getRules(String appName) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
            NacosConfigUtil.GROUP_ID, 3000);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        return converter.convert(rules);
    }
}
           

2)重寫FlowRuleNacosPublisher類:

@Service("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {
    public static final Logger log = LoggerFactory.getLogger(FlowRuleNacosPublisher.class);
    @Autowired
    private ConfigService configService;
    @Autowired
    private Converter<List<FlowRuleEntity>, String> converter;

    /**
     * 通過configService的publishConfig()方法将rules釋出到nacos
     * @param app app name
     * @param rules list of rules to push
     * @throws Exception
     */
    @Override
    public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        log.info("sentinel dashboard push rules: {}", rules);
        configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, converter.convert(rules));
    }
}
           

3)替換預設對象

修改FlowControllerV2類,使用@Qulifier将上面配置的兩個類注入進來

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
2.3.6.3 資料持久化測試

我們接下來将程式打包,如下圖:

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

打包好程式後,再将程式運作起來:

java -Dserver.port=8858 -Dcsp.sentinel.dashboard.server=localhost:8858 - 
Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar 
           

運作起來後,我們可以發現在Nacos中多了一個服務,如下圖:

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

我們随意增加一個流控規則,如下圖:

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

點選新增之後,我們可以發現SENTILE_GROUP組下多了一個檔案,sentinel-flow-rule檔案,效果如下:

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

3 Seata事務控制源碼剖析

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

我們前面說過,Seata通過代理資料源實作了将業務資料操作和日志資料操作綁定到同一個事務中了,實作流程如上圖,我們接下來分析一下源碼。源碼學習我們帶着下面這3個問題去學習:

1:SQL 解析與執行。 
2:全局”事務”的注冊,送出與復原。 
3:undo 日志的生成與使用。 
           

3.1 全局事務操作判斷

對SQL按照不同的SQL類型進行執行,并儲存beforeImage以及afterImage鏡像檔案到 undo 資料結構中用于事務復原,這是Seata代理資料源的一大功能,我們看看它的源碼是如何實作的。

SQL解析的入口是 BaseTransactionalExecutor ,它是一個抽象類,實作了Executor 接口中的execute 方法,在該方法中會判斷是否需要全局事務,如果需要全局事務會給每個操作配置設定一個xid。它還定義了doExecute() 抽象方法,具體執行由子類AbstractDMLBaseExecutor 實作。源碼如下:

/***
     * 事務操作判斷
     */
    @Override
    public T execute(Object... args) throws Throwable {
        //1. 如果為Global 事務,則将XId 綁定在上下文中
        if (RootContext.inGlobalTransaction()) {
            String xid = RootContext.getXID();
            statementProxy.getConnectionProxy().bind(xid);
        }
        //2. 是否需要擷取全局鎖,進行不同設定
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }

  /**
     * Do execute object.
     * 定義抽象方法,由子類自行實作。
     * @param args the args
     * @return the object
     * @throws Throwable the throwable
     */
    protected abstract T doExecute(Object... args) throws Throwable;
           

3.2 記錄檔鏡像保

AbstractDMLBaseExecutor 類繼承自 BaseTransactionalExecutor 抽象。實作了父類的 doExecute()方法,并定義了多個抽象方法。代碼如下所示:

@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // 是否為自動送出
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //非自動送出
            return executeAutoCommitFalse(args);
        }
    }

 protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)
        {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //儲存操作前資料鏡像
        TableRecords beforeImage = beforeImage();
        //執行操作
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //儲存操作後鏡像
        TableRecords afterImage = afterImage(beforeImage);
        //将前後鏡像儲存到undolog中
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }


    
    /**
     * Before image table records.
     * 業務SQL 執行前的鏡像 (用于事務復原)
     * @return the table records
     * @throws SQLException the sql exception
     */
    protected abstract TableRecords beforeImage() throws SQLException;

    /**
     * After image table records.
     * 業務SQL 執行後的鏡像。(用于事務復原)
     * @param beforeImage the before image
     * @return the table records
     * @throws SQLException the sql exception
     */
    protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

           

3.3 事務操作

事務送出也是Seata中的核心邏輯,在 ConnectionProxy 類中對核心邏輯進行了實作,在 doCommit 中實作了事務控制邏輯,代碼如下:

@Override
    public void commit() throws SQLException {
        try {
            LOCK_RETRY_POLICY.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

 
    /**
     * 事務送出
     * @throws SQLException
     */
    private void doCommit() throws SQLException {
        // 如果是Global事務,則執行 processGlobalTransactionCommit方法
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            // 是否需要擷取全局鎖
            processLocalCommitWithGlobalLocks();
        } else {
            //本地事務直接 commit
            targetConnection.commit();
        }
    }


 /****
     * 事務送出
     * @throws SQLException
     */
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            //1. 向 server 端進行分支事務注冊
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //2. 如果有 UndoLog 日志,則将其新增到 undo_log 表中
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //3. 執行本地事務
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            //4. 如果異常,則向 server 端 報告未送出完成。
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            //5. 向 server 端,報告送出完成。
            report(true);
        }
        //6. 并将ccontext.reset()掉
        context.reset();
    }


    /**
     * 事務復原
     * @throws SQLException
     */
    @Override
    public void rollback() throws SQLException {
        //1. 本地事務進行復原
        targetConnection.rollback();
        //2. 如果是全局事務,且進行過分支注冊,則向server報告未送出完成。
        if (context.inGlobalTransaction() && context.isBranchRegistered()) {
            report(false);
        }
        context.reset();
    }
           

總結

Nacous

服務啟動

修改application.properties

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

Nacos服務:console->com.alibaba.nacos.Nacos.main()

服務注冊

服務注冊

nacos-client->NacosNamingService

用戶端:nacos-client

服務注冊:

NacosNamingService.registerInstance()

注冊服務->NamingProxy.registerService()

添加心跳檢測->BeatReactor.addBeatInfo()

服務注冊源代碼->registerInstance()

流程源碼

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

心跳定時任務添加源碼

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
服務注冊源碼
			NamingProxy.registerService()
			NamingProxy.reqApi()
			NamingProxy.callServer()
			NacosRestTemplate.exchangeForm()
			NacosRestTemplate.execute()
	Http服務注冊:/v1/ns/instance
		NacosRestTemplate
			exchangeForm()
			execute()->DefaultHttpClientRequest.execute()
				HttpClient:InternalHttpClient.execute()
		HttpClient的主要功能
			實作了所有 HTTP 的方法(GET、POST、PUT、HEAD、DELETE、HEAD、OPTIONS 等)
			支援 HTTPS 協定
			支援代理伺服器(Nginx等)等
			支援自動(跳轉)轉向
服務端:nacos-naming
	InstanceController.register()
           

服務訂閱

服務訂閱->NIO

入口:NamingFactory.createNamingService(properties);

構造函數:NacosNamingService.NacosNamingService()

NacosNamingService.init()

new HostReactor()->建立訂閱->new PushReceiver(this)

服務訂閱:PushReceiver->Runnable線程->緩存serviceInfoMap

run()基于NIO循環擷取資料包

udpSocket.receive(packet):擷取資料包

hostReactor.processServiceJson(pushPacket.data):處理資料包

serviceInfoMap.put():服務清單資訊存儲到serviceInfoMap中

udpSocket.send():ack确認

服務發現

服務發現

服務發現調用方法:NacosNamingService.getAllInstances()

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
沒開啟服務訂閱:遠端擷取   /v1/ns/instance/list
	服務端:nacos-naming->InstanceController.list()
	用戶端
		HostReactor.getServiceInfoDirectlyFromServer()
		NamingProxy.queryList()
		NamingProxy.reqApi()
開啟服務訂閱:本地緩存serviceInfoMap擷取
	HostReactor.getServiceInfo()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

服務登出

服務登出:

源代碼

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
NamingProxy.deregisterService()
NamingProxy.reqApi()
NamingProxy.callServer()
NacosRestTemplate.exchangeForm()
           

增量資料同步

增量資料同步

任務添加

DistroProtocol.sync():同步資料添加定時任務,1秒後執行

添加至->NacosDelayTaskExecuteEngine.tasks

同步任務啟動:DistroTaskEngineHolder

構造函數DistroTaskEngineHolder()

建立任務執行引擎:NacosDelayTaskExecuteEngine

源代碼

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
多線程建立:ProcessRunnable
				線程:NacosDelayTaskExecuteEngine()
				線程建立代碼
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
任務執行:ProcessRunnable.run()
				run()->NacosDelayTaskExecuteEngine.processTasks()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

NacosDelayTaskExecuteEngine.processTasks()

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

源代碼

從tasks擷取任務
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
執行任務
						process(NacosTask task)->添加同步任務->DistroSyncChangeTask
							建立任務源碼
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
添加任務源碼
								任務添加流程
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
任務添加到隊列中
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
多線程任務執行:TaskExecuteWorker->InnerWorker(線程)
							建立構造函數
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
任務執行InnerWorker.run()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
資料同步任務執行
							資料同步執行:DistroSyncChangeTask.run()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
同步請求:http
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

設定定時任務對象:DistroDelayTaskProcessor

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

Distro協定:資料一緻性(服務資料并不會持久化:DataStore)

Distro協定:資料一緻性(服務資料并不會持久化:DataStore)

Distro特點

專門為了注冊中心而創造出的協定

用戶端與服務端有兩個重要的互動:服務注冊與心跳發送

用戶端以服務為次元向服務端注冊,注冊後每隔一段時間向服務端發送一次心跳,心跳包需要帶上注冊服務的全部資訊,在用戶端看來,服務端節點對等,是以請求的節點是随機的

用戶端請求失敗則換一個節點重新發送請求

服務端節點都存儲所有資料,但每個節點隻負責其中一部分服務,在接收到用戶端的“寫”(注冊、心跳、下線等)請求後,服務端節點判斷請求的服務是否為自己負責,如果是,則處理,否則交由負責的節點處理

每個服務端節點主動發送健康檢查到其他節點,響應的節點被該節點視為健康節點

服務端在接收到用戶端的服務心跳後,如果該服務不存在,則将該心跳請求當做注冊請求來處理

服務端如果長時間未收到用戶端心跳,則下線該服務

負責的節點在接收到服務注冊、服務心跳等寫請求後将資料寫入後即傳回,背景異步地将資料同步給其他節點

節點在收到讀請求後直接從本機擷取後傳回,無論資料是否為最新

Distro尋址模式->LookupFactory.createLookUp()

Distro尋址模式-單機尋址-StandaloneMemberLookup

隻需要擷取自己服務的位址

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
Distro尋址模式-檔案尋址->FileConfigMemberLookup->cluster.conf
	伺服器尋址模式->AddressServerMemberLookup.initAddressSys()
資料批量同步
	DistroLoadDataTask->線程->run()->load()
	循環所有節點,從遠端擷取資料->loadAllDataSnapshotFromRemote(each)
		擷取所有資料:DistroHttpAgent.getDatumSnapshot()
         ->NamingProxy.getAllData()->/v1/ns/distro/datums
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
同步到本地:DataStore
		建立監聽器同步資料
           

Sentinel1.8.0配置持久化->Nacos

配置持久化

依賴管理

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
拷貝測試檔案
	src/test/java/com/alibaba/csp/sentinel/dashboard/rule/nacos
           

拷貝至

src/main/java/com/alibaba/csp/sentinel/dashboard/rule

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
Nacos配置
	application.properties中添加nacos配置
	配置代碼
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
Nacos配置導入:修改com.alibaba.csp.sentinel.dashboard.rule.nacos.NacosConfig
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
流控規則注入bean->替換成Nacos操作對象FlowControllerV2
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
流控管理頁面修改
	src/main/webapp/resources/app/scripts/directives/sidebar/sidebar.html
           

dashboard.flowV1({app: entry.app})

修改為

dashboard.flow({app: entry.app})

改前:56行

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
改後:56行
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
流控規則添加
	添加流程
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
Nacos效果
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
持久化的配置詳情
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

新增修改

配置字尾修改:NacosConfigUtil

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
規則推送-System
	SystemRuleNacosProvider
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
SystemRuleNacosPublisher
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
規則轉換器:在NacosConfig中添加規則轉換器
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
配置修改控制器:SystemController
效果
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

服務端配置

服務端配置

引入依賴配置

Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
修改配置檔案
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析

Seata源碼

事務操作流程
	業務操作入口:BaseTransactionalExecutor
		execute()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
AbstractDMLBaseExecutor.doExecute()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
ConnectionProxy.processGlobalTransactionCommit()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
資料復原
	DataSourceManager.branchRollback()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析
AbstractUndoLogManager.undo()
           
Nacos/Sentinel/Seata核心源碼剖析Nacos/Sentinel/Seata核心源碼剖析