天天看點

nacos——02

執行個體注冊——服務端處理

RequestHandler

nacos所有request處理的父類,子類需要實作handle方法

package com.alibaba.nacos.core.remote;
/**
 * Nacos based request handler.
 *
 * 
 */
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RequestHandler<T extends Request, S extends Response> {
    
    @Autowired
    private RequestFilters requestFilters;
    
    /**
     * Handler request.
     */
    public Response handleRequest(T request, RequestMeta meta) throws NacosException {
        for (AbstractRequestFilter filter : requestFilters.filters) {
            try {
                Response filterResult = filter.filter(request, meta, this.getClass());
                if (filterResult != null && !filterResult.isSuccess()) {
                    return filterResult;
                }
            } catch (Throwable throwable) {
                Loggers.REMOTE.error("filter error", throwable);
            }
            
        }
        return handle(request, meta);
    }
    
    /**
     * Handler request.
     */
    public abstract S handle(T request, RequestMeta meta) throws NacosException;
    
}      

InstanceRequestHandler

RequestHandler的子類

需要留意這倆集合singletonRepository、namespaceSingletonMaps,這是v2版本中注冊資訊的資料結構,均是ConcurrentHashMap類型的集合

核心處理方法handle,是服務端InstanceRequest處理入口,會根據請求類型分别處理注冊與登出。這裡隻看注冊邏輯

registerInstance方法三塊

  1. registerInstance 向服務注冊執行個體
  2. publishEvent 釋出RegisterInstanceTraceEvent
  3. 生成InstanceResponse傳回
package com.alibaba.nacos.naming.remote.rpc.handler;
/**
 * Instance request handler.
 * 繼承自 RequestHandler ,這是nacos處理reques的基類
 * @author xiweng.yy
 */
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    /**
     * rpc的操作都是針對臨時服務,http是持久服務
     * v2中已将臨時/持久提升到服務層級,而不再是執行個體級别
     */
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }
    
    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        //根據request類型分别處理注冊與登出,其他抛異常
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }
    
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
            throws NacosException {
        //注冊執行個體
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        //由通知中心釋出消息
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                request.getInstance().getIp(), request.getInstance().getPort()));
        //生成InstanceResponse并傳回
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    
    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //與注冊基本相同,這裡略過
        ......
    }
    
}      

1、EphemeralClientOperationServiceImpl.registerInstance

rpc的操作都是針對臨時服務,http是持久服務,這裡是rpc請求處理,是以是EphemeralClientOperationServiceImpl,而不是PersistentClientOperationServiceImpl

接口ClientOperationService是用戶端操作服務(注冊、批量注冊、登出、訂閱、取消訂閱、執行個體轉為釋出執行個體),共有三個實作類,分别是臨時、持久及臨時與持久的組合(内部根據執行個體的 是否臨時屬性确定使用哪個)

package com.alibaba.nacos.naming.core.v2.service.impl;
/**
 * Operation service for ephemeral clients and services.
 *
 * @author xiweng.yy
 */
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    
    private final ClientManager clientManager;
    /**
     * 直接指定 ClientManagerDelegate
     * @param clientManager
     */
    public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
        this.clientManager = clientManager;
    }
    
    @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        // 1 從緩存中取得服務
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        // 2 從緩存中取得client
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        // 3 将執行個體轉為服務端執行個體,為目前client添加服務下執行個體
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        // 4 釋出事件 ClientRegisterServiceEvent
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        // 4 釋出事件 InstanceMetadataEvent
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    
    @Override
    public void batchRegisterInstance(Service service, List<Instance> instances, String clientId) {
        //略
    }
    
    @Override
    public void deregisterInstance(Service service, Instance instance, String clientId) {
        //略
    }
    
    @Override
    public void subscribeService(Service service, Subscriber subscriber, String clientId) {
        //略
    }
    
    @Override
    public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
        //略
    }
    
    private boolean clientIsLegal(Client client, String clientId) {
        if (client == null) {
            return false;
        }
        if (!client.isEphemeral()) {
            return false;
        }
        return true;
    }
}      

1.1、ServiceManager.getInstance().getSingleton(service)

這是nacos v2 中注冊資訊的資料結構,重點關注這倆集合singletonRepository、namespaceSingletonMaps。

package com.alibaba.nacos.naming.core.v2;
/**
 * Nacos service manager for v2.
 * 重點關注這倆集合singletonRepository、namespaceSingletonMaps
 * 這是nacos v2 中注冊資訊的資料結構
 */
public class ServiceManager {
    
    private static final ServiceManager INSTANCE = new ServiceManager();
    /**
     * singletonRepository的類型
     */
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    /**
     * namespaceSingletonMaps的類型,key為命名空間
     */
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
    private ServiceManager() {
        //初始化集合size=1024
        singletonRepository = new ConcurrentHashMap<>(1 << 10);
        //初始化集合size=4
        namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);
    }
    
    public static ServiceManager getInstance() {
        return INSTANCE;
    }
    
    public Set<Service> getSingletons(String namespace) {
        return namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1));
    }
    
    /**
     * Get singleton service. Put to manager if no singleton.
     *
     * @param service new service
     * @return if service is exist, return exist service, otherwise return new service
     */
    public Service getSingleton(Service service) {
        //如果singletonRepository中不存在目前service,則存入目前service
        singletonRepository.putIfAbsent(service, service);
        //取出singletonRepository中所存儲的service(之前有就取出來,)
        Service result = singletonRepository.get(service);
        //如果namespaceSingletonMaps中沒有service對應命名空間則存入目前service對應的命名空間
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
        //将目前service對應命名空間存入namespaceSingletonMaps
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
    //其他省略
    ......
}      

1.2、ClientManagerDelegate.getClient

ClientManager 的委托代理,會根據方法 getClientManagerById 的結果決定使用哪個 ClientManager。具體實作很簡潔

package com.alibaba.nacos.naming.core.v2.client.manager;
/**
 * Client manager delegate.
 * ClientManager 的委托代理,會根據方法 getClientManagerById 的結果決定使用哪個 ClientManager
 * @author xiweng.yy
 */
@DependsOn({"clientServiceIndexesManager", "namingMetadataManager"})
@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {
    
    private final ConnectionBasedClientManager connectionBasedClientManager;
    
    private final EphemeralIpPortClientManager ephemeralIpPortClientManager;
    
    private final PersistentIpPortClientManager persistentIpPortClientManager;
    
    public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager,
            EphemeralIpPortClientManager ephemeralIpPortClientManager,
            PersistentIpPortClientManager persistentIpPortClientManager) {
        this.connectionBasedClientManager = connectionBasedClientManager;
        this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
        this.persistentIpPortClientManager = persistentIpPortClientManager;
    }
    
    @Override
    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        return getClientManagerById(clientId).clientConnected(clientId, attributes);
    }
    
    @Override
    public boolean clientConnected(Client client) {
        return getClientManagerById(client.getClientId()).clientConnected(client);
    }
    
    @Override
    public boolean syncClientConnected(String clientId, ClientAttributes attributes) {
        return getClientManagerById(clientId).syncClientConnected(clientId, attributes);
    }
    
    @Override
    public boolean clientDisconnected(String clientId) {
        return getClientManagerById(clientId).clientDisconnected(clientId);
    }
    
    @Override
    public Client getClient(String clientId) {
        return getClientManagerById(clientId).getClient(clientId);
    }
    
    @Override
    public boolean contains(String clientId) {
        return connectionBasedClientManager.contains(clientId) || ephemeralIpPortClientManager.contains(clientId)
                || persistentIpPortClientManager.contains(clientId);
    }
    
    @Override
    public Collection<String> allClientId() {
        Collection<String> result = new HashSet<>();
        result.addAll(connectionBasedClientManager.allClientId());
        result.addAll(ephemeralIpPortClientManager.allClientId());
        result.addAll(persistentIpPortClientManager.allClientId());
        return result;
    }
    
    @Override
    public boolean isResponsibleClient(Client client) {
        return getClientManagerById(client.getClientId()).isResponsibleClient(client);
    }
    
    @Override
    public boolean verifyClient(DistroClientVerifyInfo verifyData) {
        return getClientManagerById(verifyData.getClientId()).verifyClient(verifyData);
    }

    /**
     * 這是最關鍵的,決定具體的 ClientManager
     * @param clientId
     * @return
     */
    private ClientManager getClientManagerById(String clientId) {
        if (isConnectionBasedClient(clientId)) {
            return connectionBasedClientManager;
        }
        return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
    }
    
    //略
}      

1.3、将執行個體轉為服務端執行個體,為目前client添加服務執行個體

//3 将執行個體轉為服務端執行個體,為目前client添加服務下執行個體
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();      

調用IpPortBasedClient.addServiceInstance方法

//這裡的client對應 IpPortBasedClient
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
    }      

調用父類AbstractClient.addServiceInstance方法,這裡又釋出了一個事件 ClientChangedEvent

/**
     * publishers 類型 ConcurrentHashMap<Service, InstancePublishInfo>
     */
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);

    /**
     * subscribers 類型 ConcurrentHashMap<Service, Subscriber>
     */
    protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        // 将服務與服務端執行個體存入map,一個服務下隻有一個執行個體的
        if (null == publishers.put(service, instancePublishInfo)) {
            // 做統計處理
            if (instancePublishInfo instanceof BatchInstancePublishInfo) {
                MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
            } else {
                MetricsMonitor.incrementInstanceCount();
            }
        }
        // 釋出 ClientChangedEvent
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }      

1.4、NotifyCenter.publishEvent

通過統一事件通知中心釋出事件,解耦+異步。nacos裡有大量的事件釋出,都是通過這個工具類實作的。

這裡先後釋出了兩個事件:    ClientRegisterServiceEvent、InstanceMetadataEvent

2、NotifyCenter.publishEvent

釋出 RegisterInstanceTraceEvent 事件

3、InstanceResponse

生成InstanceResponse并傳回

整體流程