public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}# 前言
作為Spring Cloud Alibaba的重要元件之一,Nacos的到底是如何工作的呢?接下來走進Nacos的神秘世界
Nacos整合Spring Cloud的服務注冊
Nacos作為Spring Cloud Alibaba 的核心元件之一,必然也會遵循cloud的一些規範,在Spring Cloud中存在一個ServiceRegistry的接口,顧名思義,該接口就是用于服務注冊的,是以在Nacos在Spring Cloud的環境下,也存在一個實作了該接口的類
NacosServiceRegistry
,該方法實作了ServiceRegistry的所有方法。如下:
public interface ServiceRegistry<R extends Registration> {
void register(R registration);
void deregister(R registration);
void close();
void setStatus(R registration, String status);
<T> T getStatus(R registration);
目前主題之讨論服務的注冊方法
register
在讨論該方法之前,我們還是先簡單說明一下
NacosServiceRegistry
是怎麼裝配到spring 的環境中的
這裡采用的是spring boot的自動裝配的原理。我們可以在
spring-cloud-starter-alibaba-nacos-discovery-2.2.1.RELEASE.jar
中的META-INF/spring.factories檔案中存在一個
NacosServiceRegistryAutoConfiguration
的類,該類就是Nacos整合Spring Cloud的自動裝配的類,進入到該類
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
該類是一個配置類,注入了
NacosServiceRegistry
,
NacosRegistration
,
NacosAutoServiceRegistration
等bean。
其中
-
: serviceRegistry接口,作為nacos服務注冊的實作NacosServiceRegistry
-
: 實作了NacosRegistration
和Registration
,使用者存儲服務執行個體資訊,如端口,ip等等ServiceInstance
-
:繼承自NacosAutoServiceRegistration
類,用于通過事件的機制觸發服務的自動注冊。AbstractAutoServiceRegistration
是以入口我們基本上已經找到了,會在
NacosAutoServiceRegistration
中觸發服務的注冊。
進入到
AbstractAutoServiceRegistration
類我們可以看到該類實作了
ApplicationListener<WebServerInitializedEvent>
接口,是以會監聽到
WebServerInitializedEvent
的事件,如果使用dubbo作為注冊中心且沒有web的環境時,将會走
DubboServiceRegistrationNonWebApplicationAutoConfiguration
類的自動裝配,該類中會監聽
ApplicationStartedEvent
事件,然後調用
serviceRegistry.register()
方法。這裡我們從web環境的入口進入。
進入到
AbstractAutoServiceRegistration
類的監聽方法
onApplicationEvent
。該方法會調用一個bind方法
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
- 為port指派,擷取web項目監聽的端口
- 調用start()方法開始注冊服務
進入到start()方法
該方法最終會調用子類的
register
方法,對端口進行校驗,校驗之後調用父類的
register
方法,在父類的
register
方法中,調用注入的
serviceRegistry
的
register
方法,并傳入注入的
Registration
.如下:
protected void register() {
this.serviceRegistry.register(getRegistration());
}
現在進入到
serviceRegistry.register
方法中。因為他的實作是
NacosServiceRegistry
,是以我們先進入到
NacosServiceRegistry
的構造方法中看看該類初始化時做了什麼
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.namingService = nacosDiscoveryProperties.namingServiceInstance();
}
該類在注入時,會傳入一個nacosDiscoveryProperties,同時可以通過nacosDiscoveryProperties.namingServiceInstance();方法擷取到一個namingService,該對象是nacos提供的服務注冊的API。在namingService對象的初始化時初始化的是
NacosNamingService
類,将會初始化如下資訊
private void init(Properties properties) {
this.namespace = InitUtils.initNamespaceForNaming(properties);
this.initServerAddr(properties);
InitUtils.initWebRootContext();
this.initCacheDir();
this.initLogName(properties);
this.eventDispatcher = new EventDispatcher();
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties));
}
然後回到
NacosServiceRegistry
中的register方法中
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
- 參數校驗
- 擷取
,作為服務名,預設會擷取serviceId
配置的名字spring.application.name
- 擷取group,可配置,預設為Default,
- 調用
組裝一個Instance,表示一個服務的具體的是一個執行個體對象instance.該方法中主要攜帶了post,ip,權重,中繼資料資訊,叢集名稱等。如下getNacosInstanceFromRegistration
Instance instance = new Instance(); instance.setIp(registration.getHost()); instance.setPort(registration.getPort()); instance.setWeight(nacosDiscoveryProperties.getWeight()); instance.setClusterName(nacosDiscoveryProperties.getClusterName()); instance.setMetadata(registration.getMetadata());
- 調用
進行具體的服務注冊namingService.registerInstance(serviceId, group, instance);
進入到
namingService.registerInstance(serviceId, group, instance);
方法,該對象中有很多的重載方法,最終都是建構一個instance的執行個體,是以我們直接進入到該方法
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
如果執行個體是一個臨時節點,需要盡心心跳檢測,配置心跳資訊對象
BeatInfo
,調用this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);方法将心跳資訊傳入。該方法會通過線程池建立一個scheduler.發送心跳,這裡就不具體分析了。
進入到
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
方法,看看具體的服務注冊
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
封裝發起請求的參數及注冊的api:這裡傳入的是/nacos/v1/ns/instance,因為在nacos中的服務注冊其實是通過http請求進行的。接下來調用callServer方法
public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
List<String> headers = builderHeaders();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
}
HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.observe(end - start);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(result.code, result.content);
}
使用HttpClient發起一個http接口的調用。到這裡,關于服務注冊的client已經完成了。
Nacos 整合Spring Cloud 的服務發現
在Spring Cloud中同樣存在一個服務發現的接口
DiscoveryClient
,作為服務的發現接口,而在nacos中同樣存在實作了該接口的類,
NacosDiscoveryClient
,它的注入依然是通過spring boot的自動裝配實作的,進入到該類的
getInstances
方法中
public List<ServiceInstance> getInstances(String serviceId) {
try {
return serviceDiscovery.getInstances(serviceId);
}
catch (Exception e) {
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
該方法中會調用
serviceDiscovery
該方法根據服務名傳回該服務的所有執行個體。服務名的擷取方式有很多,比如restTemplate攔截指定的服務名,feign用戶端指定等等。
現在直接進入到
serviceDiscovery.getInstances(serviceId);
方法中。
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return hostToServiceInstanceList(instances, serviceId);
}
從配置
discoveryProperties
擷取分組資訊,并通過配置初始化一個
namingServiceInstance
的對象,通過該對象調用
selectInstances
方法擷取所有執行個體。namingService的擷取方式跟服務注冊一樣,這裡就不再詳述。
進入到
selectInstances
方法。該方法會根據是否訂閱通過不同的方式擷取到ServiceInfo.SerivceInfo中包含了我們服務的所有資訊,如叢集名稱,執行個體list,分組名稱,服務名等。selectInstances如下
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
如果沒有訂閱時,将調用
hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
直接從遠端服務端拉取服務。最終調用
serverProxy.queryList(serviceName, clusters, 0, false);
方法擷取服務。在
queryList
中将會組裝參數和遠端調用的接口api
/v1/ns/instance/list
接口,然後遠端調用接口。傳回ServiceInfo。
我們重點來看看當訂閱時,做了些什麼東西。
進入到
hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
方法中。
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
- 現根據服務名和叢集名擷取ServiceInfo.如果不null,則如果服務需要根性,就根性,否則就傳回該服務對應的ServiceInfo.
- 如果為null,則立刻拉取服務
進入
updateServiceNow
方法中。
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
該方法将會立刻調用
serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
方法查詢最新的服務。如果擷取到,就進入到
processServiceJSON(result);
進行處理。我們注意到,這裡傳入了一個UDP的端口,該端口會在hostReactor初始化時初始化一個
PushReceiver
,該對象中初始化一個
DatagramSocket
,并使用線程池處理接收到的請求,當注冊中心服務提供者出現變化時,會主動回調該接口,發送變動的服務提供者執行個體,這裡接到請求後在進行處理。
該方法比較長,主要的功能就是做一些判斷,然後将新擷取到的ServiceInfo組裝起來。緩存到
serviceInfoMap
集合中。
回到
getServiceInfo
方法,此時
serviceInfoMap
中已經存在ServiceInfo對象了,進入到
scheduleUpdateIfAbsent
方法
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
該方法會添加一個排程任務,定時的去更新遠端服務.因為添加的是一個任務,是以我們直接去看
UpdateTask
中的run方法
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
定時的去遠端拉取服務。每隔10s執行一次,根據擷取到的服務進行判斷,是否更新本地緩存。
到這裡,服務發現就講完了,同時用戶端從注冊中心更新服務的兩種方式都已經得到了展現。