天天看點

Spring Cloud Nacos源碼講解(四)- Nacos服務端服務注冊源碼分析Nacos服務端服務注冊源碼分析

Nacos服務端服務注冊源碼分析

服務端調用接口

​ 我們已經知道用戶端在注冊服務的時候實際上是調用的NamingService.registerInstance這個方法來完成執行個體的注冊,而且在最後我們也告訴了大家實際上從本質上講服務注冊就是調用的對應接口nacos/v1/ns/instance,那咱們現在就在服務端先找到這個接口,然後來看具體服務端的操作。

Spring Cloud Nacos源碼講解(四)- Nacos服務端服務注冊源碼分析Nacos服務端服務注冊源碼分析

​ 這是從Nacos官網上我們看到的Nacos架構圖,其實在這裡我們已經就能分析出我們要找的接口應該在NamingService這個服務中,從源碼角度來看,其實通過一下這個項目結構圖中我們也能清楚的看見naming這個子子產品,而且在源碼的第一節課就和大家分析過這個naming實際上就是實作服務的注冊的。

Spring Cloud Nacos源碼講解(四)- Nacos服務端服務注冊源碼分析Nacos服務端服務注冊源碼分析

​ 那我們接着來向下看這個項目中的controller,因為我們知道所有的接口其實都在controller中,從這些Controller中我們就會明顯的看到一個InstanceController,是以很明顯注冊執行個體一定和它有關

Spring Cloud Nacos源碼講解(四)- Nacos服務端服務注冊源碼分析Nacos服務端服務注冊源碼分析

​ 是以我們打開InstanceController來深入研究一下,這個時候會發現@RequestMapping注解中的值就是我們通路的注冊接口

Spring Cloud Nacos源碼講解(四)- Nacos服務端服務注冊源碼分析Nacos服務端服務注冊源碼分析
Spring Cloud Nacos源碼講解(四)- Nacos服務端服務注冊源碼分析Nacos服務端服務注冊源碼分析

​ 接下來我們再來尋找RESTful API接口POST請求類型的方法register,在這個方法中實際上就是接受使用者請求,把收到的資訊進行解析,還原成Instance,然後調用registerInstance方法來完成注冊,這個方法才是服務端注冊的核心

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {

    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    final Instance instance = HttpRequestInstanceBuilder.newBuilder()
        .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
	//注冊服務執行個體
    getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
    return "ok";
}
           

​ 各位這個位置我們注意一下的這個方法

​ 其中的getInstanceOperator(),就是判斷是否采用Grpc協定,很明顯這個位置走的是instanceServiceV2

private InstanceOperator getInstanceOperator() {
    return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}
           

服務注冊

instanceServiceV2.registerInstance

​ 實際上instanceServiceV2就是InstanceOperatorClientImpl,是以我們來看這裡面的registerInstance方法

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
    //判斷是否為瞬時對象(臨時用戶端)
    boolean ephemeral = instance.isEphemeral();
    //擷取用戶端ID
    String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
    //通過用戶端ID建立用戶端連接配接
    createIpPortClientIfAbsent(clientId);
    //擷取服務
    Service service = getService(namespaceId, serviceName, ephemeral);
    //具體注冊服務
    clientOperationService.registerInstance(service, instance, clientId);
}
           

​ 在這裡我們要分析一些細節,其實Nacos2.0以後新增Client模型。一個用戶端gRPC長連接配接對應一個Client,每個Client有自己唯一的id(clientId)。Client負責管理一個用戶端的服務執行個體注冊Publish和服務訂閱Subscribe。我們可以看一下這個模型其實就是一個接口(為了大家看着友善,注釋改成了中文)

public interface Client {
    // 用戶端id/gRPC的connectionId
    String getClientId();

    // 是否臨時用戶端
    boolean isEphemeral();
    // 用戶端更新時間
    void setLastUpdatedTime();
    long getLastUpdatedTime();

    // 服務執行個體注冊/登出/查詢
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    InstancePublishInfo removeServiceInstance(Service service);
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();

    // 服務訂閱/取消訂閱/查詢訂閱
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    boolean removeServiceSubscriber(Service service);
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步給其他節點的client資料
    ClientSyncData generateSyncData();
    // 是否過期
    boolean isExpire(long currentTime);
    // 釋放資源
    void release();
}
           

EphemeralClientOperationServiceImpl.registerInstance

​ EphemeralClientOperationServiceImpl實際負責處理服務注冊,那我們來看具體方法

@Override
public void registerInstance(Service service, Instance instance, String clientId) {
    //確定Service單例存在
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    //根據用戶端id,找到用戶端
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    //用戶端Instance模型,轉換為服務端Instance模型
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    //将Instance儲存到Client裡
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    //建立Service與ClientId的關系
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter
        .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
           

ServiceManager

​ Service的容器是ServiceManager,但是在com.alibaba.nacos.naming.core.v2包下,容器中Service都是單例。

public class ServiceManager {
    
    private static final ServiceManager INSTANCE = new ServiceManager();
    //單例Service,可以檢視Service的equals和hasCode方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    //namespace下的所有service
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    .....
}
           

​ 是以從這個位置可以看出,當調用這個注冊方法的時候ServiceManager負責管理Service單例

//通過Map儲存單例的Service
public Service getSingleton(Service service) {
    singletonRepository.putIfAbsent(service, service);
    Service result = singletonRepository.get(service);
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
}
           

clientManager

​ 這是一個接口這裡我們要看它對應的一個實作類ConnectionBasedClientManager,這個實作類負責管理長連接配接clientId與Client模型的映射關系

// 根據clientId查詢Client
public Client getClient(String clientId) {
    return clients.get(clientId);
}
           

Client執行個體AbstractClient

負責存儲目前用戶端的服務系統資料庫,即Service與Instance的關系。注意對于單個用戶端來說,同一個服務隻能注冊一個執行個體。

//
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    if (null == publishers.put(service, instancePublishInfo)) {
        MetricsMonitor.incrementInstanceCount();
    }
    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}
           

ClientOperationEvent.ClientRegisterServiceEvent

​ 這裡的目的是為了過濾目标服務得到最終Instance清單建立Service與Client的關系,建立Service與Client的關系就是為了加速查詢。

​ 釋出ClientRegisterServiceEvent事件,ClientServiceIndexesManager監聽,ClientServiceIndexesManager維護了兩個索引:

  • Service與釋出clientId
  • Service與訂閱clientId
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);
    }
}

//建立Service與釋出Client的關系
private void addPublisherIndexes(Service service, String clientId) {
    publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
    publisherIndexes.get(service).add(clientId);
    NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
           

這個索引關系建立以後,還會觸發ServiceChangedEvent,代表服務系統資料庫變更。對于系統資料庫變更緊接着還要做兩個事情:1.通知訂閱用戶端 2.Nacos叢集資料同步。這兩點後續再說。

繼續閱讀