微服務将自己的執行個體注冊到注冊中心,nacos服務端存儲了注冊清單,然後通過
nacos
調用服務,具體是如何調用?如果
ribbon
服務挂了,還能正常調用服務嗎?調用的服務清單發生變化,調用方是如何感覺變化的?帶着這些問題,來探索一下服務發現的原理。
nacos
版本 2.1.1
-
Nacos Server:2.1.1
-
spring-cloud-starter-alibaba:2.1.1.RELEASE
-
spring-boot:2.1.1.RELEASE
-
spring-cloud-starter-netflix-ribbon:2.1.1.RELEASE
用戶端和服務端版本号都為 2.1.1
。
從 Ribbon 講起
使用
ribbon
來調用服務,就添加
ribbon
依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
ribbon
依賴包含
spring-cloud-commons
依賴,而在
spring-cloud-commons
包中
spring.factories
自動配置
LoadBalancerAutoConfiguration
類:
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
隻要标注了注解的
@LoadBalanced
都會添加負載均衡攔截器
restTemplates
。
LoadBalancerInterceptor
使用
Ribbon
元件調用服務:
restTemplate.getForObject("http://service-name",String.class);
restTemplate
的
http
請求方法,最終會調用到
doExecute
方法。
doExecute
在發起
http
請求之前,會先執行
LoadBalancerInterceptor
負載均衡攔截器的
intercept
方法。 該方法調用
execute
方法。
而在
execute
方法中,主要有兩個方法:
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
execute
先通過
getLoadBalancer
擷取
ILoadBalancer
執行個體,然後再通過
getServer
擷取
Server
執行個體。
getLoadBalancer
最終會調用
Ribbon
的
ServerList
接口,具體調用流程:
getLoadBalancer() ->
ZoneAwareLoadBalancer ->
DynamicServerListLoadBalancer ->
restOfInit()->
updateListOfServers()->
ServerList.getUpdatedListOfServers()->
Nacos
實作類
NacosServerList
實作了
ServerList
接口。
總之我們在進行微服務調用的時候,最終會調用
Ribbon
類中的
NacosServerList
方法。
getUpdatedListOfServers
Nacos 擷取服務
NacosServerList
類的
getUpdatedListOfServers
方法調用了該類的
getServers
方法:
private List<NacosServer> getServers() {
try {
// 擷取分組
String group = discoveryProperties.getGroup();
// 重點,查詢執行個體清單
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
重點看
NacosNamingService
類的
selectInstances
方法,會調用以下
selectInstances
三個重載方法:
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 預設訂閱
if (subscribe) {
// 擷取服務,這是重點
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
最後一個
selectInstances
方法裡面的
hostReactor.getServiceInfo
方法是擷取服務的核心方法:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 先在本地緩存查詢
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 查詢不到
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 請求Nacos Server執行個體,并更新服務執行個體
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 定時更新本地緩存
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
getServiceInfo
是服務發現的核心方法,先查詢
serviceInfoMap
集合中查詢本地緩存,本地緩存查詢不到就請求
Nacos Server
執行個體,并更新本地緩存。
請求
Nacos Server
執行個體,實際就是發送
http
請求
Nacos Server
:
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 調用 Nacos Server 查詢服務
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
// 結果不為空,更新緩存
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
//向 Nacos Server發起 HTTP 清單查詢
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
queryList
方法主要封裝号請求參數,然後向
Nacos Server
服務端發送
http
請求。
當服務端執行個體發生改變時,
Nacos Server
會推送最新的執行個體給服務端。
服務發現是先擷取本地緩存,如果沒有本地緩存,就請求
Nacos Server
服務端擷取資料,如果
Nacos Server
挂了,也不會影響服務的調用。
總結
-
Ribbon
- 項目啟動時,會建立一個負載均衡攔截器。
- 從
發起服務請求開始,最終會調用到攔截器的攔截方法。Ribbon
- 攔截方法又調用
擷取執行個體接口,而ServerList
實作擷取執行個體清單。NacosServerList
-
調用服務Nacos
-
實作了擷取服務執行個體清單。NacosServerList
-
類NacosServerList
方法最終調用了selectInstances
方法hostReactor.getServiceInfo
-
方法先從getServiceInfo
集合中擷取本地緩存,如果本地緩存找不到,就請求serviceInfoMap
擷取服務執行個體,并更新本地緩存。Nacos Server
- 擷取服務之後,定時更新本地緩存。
-
參考
- Spring Cloud nacos Ribbon整合源碼分析
- 服務發現:服務之間調用請求鍊路分析