微服務将自己的執行個體注冊到nacos注冊中心,nacos服務端存儲了注冊清單,然後通過ribbon調用服務,具體是如何調用?如果nacos服務挂了,還能正常調用服務嗎?調用的服務清單發生變化,調用方是如何感覺變化的?帶着這些問題,來探索一下服務發現的原理。
版本 2.1.1
- Nacos Server:2.1.1
- spring-cloud-starter-alibaba:2.1.1.RELEASE
- spring-boot:2.1.1.RELEASE
- spring-cloud-starter-netflix-ribbon:2.1.1.RELEASE
用戶端和服務端版本号都為2.1.1。
從 Ribbon 講起
使用ribbon來調用服務,就添加ribbon依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
ribbon依賴包含spring-cloud-commons依賴,而在spring-cloud-commons包中spring.factories自動配置LoadBalancerAutoConfiguration類:
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
隻要标注了@LoadBalanced注解的restTemplates都會添加負載均衡攔截器LoadBalancerInterceptor。
使用Ribbon元件調用服務:
restTemplate.getForObject("http://service-name",String.class);
restTemplate的http請求方法,最終會調用到doExecute方法。doExecute在發起http請求之前,會先執行LoadBalancerInterceptor負載均衡攔截器的intercept方法。 該方法調用execute方法。
而在execute方法中,主要有兩個方法:
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
execute先通過getLoadBalancer擷取ILoadBalancer執行個體,然後再通過getServer擷取Server執行個體。
getLoadBalancer最終會調用Ribbon的ServerList接口,具體調用流程:
getLoadBalancer() ->
ZoneAwareLoadBalancer ->
DynamicServerListLoadBalancer ->
restOfInit()->
updateListOfServers()->
ServerList.getUpdatedListOfServers()->
Nacos實作類NacosServerList實作了ServerList接口。
總之我們在進行微服務調用的時候,Ribbon最終會調用NacosServerList類中的getUpdatedListOfServers方法。
Nacos 擷取服務
NacosServerList類的getUpdatedListOfServers方法調用了該類的getServers方法:
private List<NacosServer> getServers() {
try {
// 擷取分組
String group = discoveryProperties.getGroup();
// 重點,查詢執行個體清單
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
重點看NacosNamingService類的selectInstances方法,會調用以下selectInstances三個重載方法:
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 預設訂閱
if (subscribe) {
// 擷取服務,這是重點
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
最後一個selectInstances方法裡面的hostReactor.getServiceInfo方法是擷取服務的核心方法:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 先在本地緩存查詢
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 查詢不到
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 請求Nacos Server執行個體,并更新服務執行個體
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 定時更新本地緩存
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
getServiceInfo是服務發現的核心方法,先查詢serviceInfoMap集合中查詢本地緩存,本地緩存查詢不到就請求Nacos Server執行個體,并更新本地緩存。
請求Nacos Server執行個體,實際就是發送http請求Nacos Server:
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 調用 Nacos Server 查詢服務
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
// 結果不為空,更新緩存
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
//向 Nacos Server發起 HTTP 清單查詢
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
queryList方法主要封裝号請求參數,然後向Nacos Server服務端發送http請求。
當服務端執行個體發生改變時,Nacos Server會推送最新的執行個體給服務端。
服務發現是先擷取本地緩存,如果沒有本地緩存,就請求Nacos Server服務端擷取資料,如果Nacos Server挂了,也不會影響服務的調用。
總結
- Ribbon 項目啟動時,會建立一個負載均衡攔截器。從Ribbon發起服務請求開始,最終會調用到攔截器的攔截方法。攔截方法又調用ServerList擷取執行個體接口,而NacosServerList實作擷取執行個體清單。
- Nacos調用服務 NacosServerList實作了擷取服務執行個體清單。NacosServerList類selectInstances方法最終調用了hostReactor.getServiceInfo方法getServiceInfo方法先從serviceInfoMap集合中擷取本地緩存,如果本地緩存找不到,就請求Nacos Server擷取服務執行個體,并更新本地緩存。擷取服務之後,定時更新本地緩存。
參考
- Spring Cloud nacos Ribbon整合源碼分析
- 服務發現:服務之間調用請求鍊路分析