執行個體注冊——服務端處理
RequestHandler
nacos所有request處理的父類,子類需要實作handle方法
package com.alibaba.nacos.core.remote;
/**
* Nacos based request handler.
*
*
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RequestHandler<T extends Request, S extends Response> {
@Autowired
private RequestFilters requestFilters;
/**
* Handler request.
*/
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
for (AbstractRequestFilter filter : requestFilters.filters) {
try {
Response filterResult = filter.filter(request, meta, this.getClass());
if (filterResult != null && !filterResult.isSuccess()) {
return filterResult;
}
} catch (Throwable throwable) {
Loggers.REMOTE.error("filter error", throwable);
}
}
return handle(request, meta);
}
/**
* Handler request.
*/
public abstract S handle(T request, RequestMeta meta) throws NacosException;
}
InstanceRequestHandler
RequestHandler的子類
需要留意這倆集合singletonRepository、namespaceSingletonMaps,這是v2版本中注冊資訊的資料結構,均是ConcurrentHashMap類型的集合
核心處理方法handle,是服務端InstanceRequest處理入口,會根據請求類型分别處理注冊與登出。這裡隻看注冊邏輯
registerInstance方法三塊
- registerInstance 向服務注冊執行個體
- publishEvent 釋出RegisterInstanceTraceEvent
- 生成InstanceResponse傳回
package com.alibaba.nacos.naming.remote.rpc.handler;
/**
* Instance request handler.
* 繼承自 RequestHandler ,這是nacos處理reques的基類
* @author xiweng.yy
*/
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
/**
* rpc的操作都是針對臨時服務,http是持久服務
* v2中已将臨時/持久提升到服務層級,而不再是執行個體級别
*/
private final EphemeralClientOperationServiceImpl clientOperationService;
public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
this.clientOperationService = clientOperationService;
}
@Override
@Secured(action = ActionTypes.WRITE)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
//根據request類型分别處理注冊與登出,其他抛異常
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
throws NacosException {
//注冊執行個體
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
//由通知中心釋出消息
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
request.getInstance().getIp(), request.getInstance().getPort()));
//生成InstanceResponse并傳回
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
//與注冊基本相同,這裡略過
......
}
}
1、EphemeralClientOperationServiceImpl.registerInstance
rpc的操作都是針對臨時服務,http是持久服務,這裡是rpc請求處理,是以是EphemeralClientOperationServiceImpl,而不是PersistentClientOperationServiceImpl
接口ClientOperationService是用戶端操作服務(注冊、批量注冊、登出、訂閱、取消訂閱、執行個體轉為釋出執行個體),共有三個實作類,分别是臨時、持久及臨時與持久的組合(内部根據執行個體的 是否臨時屬性确定使用哪個)
package com.alibaba.nacos.naming.core.v2.service.impl;
/**
* Operation service for ephemeral clients and services.
*
* @author xiweng.yy
*/
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
private final ClientManager clientManager;
/**
* 直接指定 ClientManagerDelegate
* @param clientManager
*/
public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
this.clientManager = clientManager;
}
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 1 從緩存中取得服務
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
// 2 從緩存中取得client
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
// 3 将執行個體轉為服務端執行個體,為目前client添加服務下執行個體
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
// 4 釋出事件 ClientRegisterServiceEvent
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
// 4 釋出事件 InstanceMetadataEvent
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
@Override
public void batchRegisterInstance(Service service, List<Instance> instances, String clientId) {
//略
}
@Override
public void deregisterInstance(Service service, Instance instance, String clientId) {
//略
}
@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
//略
}
@Override
public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
//略
}
private boolean clientIsLegal(Client client, String clientId) {
if (client == null) {
return false;
}
if (!client.isEphemeral()) {
return false;
}
return true;
}
}
1.1、ServiceManager.getInstance().getSingleton(service)
這是nacos v2 中注冊資訊的資料結構,重點關注這倆集合singletonRepository、namespaceSingletonMaps。
package com.alibaba.nacos.naming.core.v2;
/**
* Nacos service manager for v2.
* 重點關注這倆集合singletonRepository、namespaceSingletonMaps
* 這是nacos v2 中注冊資訊的資料結構
*/
public class ServiceManager {
private static final ServiceManager INSTANCE = new ServiceManager();
/**
* singletonRepository的類型
*/
private final ConcurrentHashMap<Service, Service> singletonRepository;
/**
* namespaceSingletonMaps的類型,key為命名空間
*/
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
private ServiceManager() {
//初始化集合size=1024
singletonRepository = new ConcurrentHashMap<>(1 << 10);
//初始化集合size=4
namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);
}
public static ServiceManager getInstance() {
return INSTANCE;
}
public Set<Service> getSingletons(String namespace) {
return namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1));
}
/**
* Get singleton service. Put to manager if no singleton.
*
* @param service new service
* @return if service is exist, return exist service, otherwise return new service
*/
public Service getSingleton(Service service) {
//如果singletonRepository中不存在目前service,則存入目前service
singletonRepository.putIfAbsent(service, service);
//取出singletonRepository中所存儲的service(之前有就取出來,)
Service result = singletonRepository.get(service);
//如果namespaceSingletonMaps中沒有service對應命名空間則存入目前service對應的命名空間
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
//将目前service對應命名空間存入namespaceSingletonMaps
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
//其他省略
......
}
1.2、ClientManagerDelegate.getClient
ClientManager 的委托代理,會根據方法 getClientManagerById 的結果決定使用哪個 ClientManager。具體實作很簡潔
package com.alibaba.nacos.naming.core.v2.client.manager;
/**
* Client manager delegate.
* ClientManager 的委托代理,會根據方法 getClientManagerById 的結果決定使用哪個 ClientManager
* @author xiweng.yy
*/
@DependsOn({"clientServiceIndexesManager", "namingMetadataManager"})
@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {
private final ConnectionBasedClientManager connectionBasedClientManager;
private final EphemeralIpPortClientManager ephemeralIpPortClientManager;
private final PersistentIpPortClientManager persistentIpPortClientManager;
public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager,
EphemeralIpPortClientManager ephemeralIpPortClientManager,
PersistentIpPortClientManager persistentIpPortClientManager) {
this.connectionBasedClientManager = connectionBasedClientManager;
this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
this.persistentIpPortClientManager = persistentIpPortClientManager;
}
@Override
public boolean clientConnected(String clientId, ClientAttributes attributes) {
return getClientManagerById(clientId).clientConnected(clientId, attributes);
}
@Override
public boolean clientConnected(Client client) {
return getClientManagerById(client.getClientId()).clientConnected(client);
}
@Override
public boolean syncClientConnected(String clientId, ClientAttributes attributes) {
return getClientManagerById(clientId).syncClientConnected(clientId, attributes);
}
@Override
public boolean clientDisconnected(String clientId) {
return getClientManagerById(clientId).clientDisconnected(clientId);
}
@Override
public Client getClient(String clientId) {
return getClientManagerById(clientId).getClient(clientId);
}
@Override
public boolean contains(String clientId) {
return connectionBasedClientManager.contains(clientId) || ephemeralIpPortClientManager.contains(clientId)
|| persistentIpPortClientManager.contains(clientId);
}
@Override
public Collection<String> allClientId() {
Collection<String> result = new HashSet<>();
result.addAll(connectionBasedClientManager.allClientId());
result.addAll(ephemeralIpPortClientManager.allClientId());
result.addAll(persistentIpPortClientManager.allClientId());
return result;
}
@Override
public boolean isResponsibleClient(Client client) {
return getClientManagerById(client.getClientId()).isResponsibleClient(client);
}
@Override
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
return getClientManagerById(verifyData.getClientId()).verifyClient(verifyData);
}
/**
* 這是最關鍵的,決定具體的 ClientManager
* @param clientId
* @return
*/
private ClientManager getClientManagerById(String clientId) {
if (isConnectionBasedClient(clientId)) {
return connectionBasedClientManager;
}
return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
}
//略
}
1.3、将執行個體轉為服務端執行個體,為目前client添加服務執行個體
//3 将執行個體轉為服務端執行個體,為目前client添加服務下執行個體
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
調用IpPortBasedClient.addServiceInstance方法
//這裡的client對應 IpPortBasedClient
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}
調用父類AbstractClient.addServiceInstance方法,這裡又釋出了一個事件 ClientChangedEvent
/**
* publishers 類型 ConcurrentHashMap<Service, InstancePublishInfo>
*/
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
/**
* subscribers 類型 ConcurrentHashMap<Service, Subscriber>
*/
protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
// 将服務與服務端執行個體存入map,一個服務下隻有一個執行個體的
if (null == publishers.put(service, instancePublishInfo)) {
// 做統計處理
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
} else {
MetricsMonitor.incrementInstanceCount();
}
}
// 釋出 ClientChangedEvent
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
1.4、NotifyCenter.publishEvent
通過統一事件通知中心釋出事件,解耦+異步。nacos裡有大量的事件釋出,都是通過這個工具類實作的。
這裡先後釋出了兩個事件: ClientRegisterServiceEvent、InstanceMetadataEvent
2、NotifyCenter.publishEvent
釋出 RegisterInstanceTraceEvent 事件
3、InstanceResponse
生成InstanceResponse并傳回