1.用戶端
1.1.定時更新服務清單
1.1.1.NacosNamingService
在前面我們講到一個類
NacosNamingService
,這個類不僅僅提供了服務注冊功能,同樣提供了服務發現的功能。
多個重載的方法最終都會進入一個方法:
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 1.判斷是否需要訂閱服務資訊(預設為 true)
if (subscribe) {
// 1.1.訂閱服務資訊
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
// 1.2.直接去nacos拉取服務資訊
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
// 2.從服務資訊中擷取執行個體清單并傳回
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
1.1.2.HostReactor
進入訂閱服務消息,這裡是由
HostReactor
類的
getServiceInfo()
方法來實作的:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
// 由 服務名@@叢集名拼接 key
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 讀取本地服務清單的緩存,緩存是一個Map,格式:Map<String, ServiceInfo>
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 判斷緩存是否存在
if (null == serviceObj) {
// 不存在,建立空ServiceInfo
serviceObj = new ServiceInfo(serviceName, clusters);
// 放入緩存
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 放入待更新的服務清單(updatingMap)中
updatingMap.put(serviceName, new Object());
// 立即更新服務清單
updateServiceNow(serviceName, clusters);
// 從待更新清單中移除
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
// 緩存中有,但是需要更新
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish 等待5秒中,待更新完成
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 開啟定時更新服務清單的功能
scheduleUpdateIfAbsent(serviceName, clusters);
// 傳回緩存中的服務資訊
return serviceInfoMap.get(serviceObj.getKey());
}
基本邏輯就是先從本地緩存讀,根據結果來選擇:
- 如果本地緩存沒有,立即去
讀取,nacos
updateServiceNow(serviceName, clusters)
- 如果本地緩存有,則開啟定時更新功能,并傳回緩存結果:
-
scheduleUpdateIfAbsent(serviceName, clusters)
- 在
中,最終還是調用UpdateTask
方法:updateService
- 不管是立即更新服務清單,還是定時更新服務清單,最終都會執行
中的HostReactor
方法:updateService()
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 基于ServerProxy發起遠端調用,查詢服務清單
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
// 處理查詢結果
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
1.1.3.ServerProxy
而
ServerProxy
的
queryList
方法如下:
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
// 準備請求參數
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
// 發起請求,位址與API接口一緻
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
1.2.處理服務變更通知
除了定時更新服務清單的功能外,Nacos還支援服務清單變更時的主動推送功能。
在
HostReactor
類的構造函數中,有非常重要的幾個步驟:
基本思路是:
- 通過
監聽服務端推送的變更資料PushReceiver
- 解析資料後,通過
釋出服務變更的事件NotifyCenter
-
監聽變更事件,完成對服務清單的更新InstanceChangeNotifier
1.2.1.PushReceiver
我們先看
PushReceiver
,這個類會以
UDP
方式接收
Nacos
服務端推送的服務變更資料。
先看構造函數:
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
// 建立 UDP用戶端
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
// 準備線程池
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
// 開啟線程任務,準備接收變更資料
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
PushReceiver
構造函數中基于線程池來運作任務。這是因為
PushReceiver
本身也是一個
Runnable
,其中的run方法業務邏輯如下:
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 接收推送資料
udpSocket.receive(packet);
// 解析為json字元串
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
// 反序列化為對象
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// 交給 HostReactor去處理
hostReactor.processServiceJson(pushPacket.data);
// send ack to server 發送ACK回執,略。。
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
1.2.2.HostReactor
通知資料的處理由交給了
HostReactor
的
processServiceJson
方法:
public ServiceInfo processServiceJson(String json) {
// 解析出ServiceInfo資訊
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
// 查詢緩存中的 ServiceInfo
ServiceInfo oldService = serviceInfoMap.get(serviceKey);
// 如果緩存存在,則需要校驗哪些資料要更新
boolean changed = false;
if (oldService != null) {
// 拉取的資料是否已經過期
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
+ serviceInfo.getLastRefTime());
}
// 放入緩存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 中間是緩存與新資料的對比,得到newHosts:新增的執行個體;remvHosts:待移除的執行個體;
// modHosts:需要修改的執行個體
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
// 釋出執行個體變更的事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
} else {
// 本地緩存不存在
changed = true;
// 放入緩存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 直接釋出執行個體變更的事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
// 。。。
return serviceInfo;
}
2.服務端
2.1.拉取服務清單接口
在介紹的
InstanceController
中,提供了拉取服務清單的接口:
/**
* Get all instance of input service.
*
* @param request http request
* @return list of instance
* @throws Exception any error during list
*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
// 從request中擷取namespaceId和serviceName
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
// 擷取用戶端的 UDP端口
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 擷取服務清單
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
進入
doSrvIpxt()
方法來擷取服務清單:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
String clusters, String clientIP,
int udpPort, String env, boolean isCheck,
String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 擷取服務清單資訊
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
// 添加目前用戶端 IP、UDP端口到 PushService 中
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
if (service == null) {
// 如果沒找到,傳回空
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
// 結果的檢測,異常執行個體的剔除等邏輯省略
// 最終封裝結果并傳回 。。。
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
2.2.釋出服務變更的UDP通知
在上一節中,
InstanceController
中的
doSrvIpxt()
方法中,有這樣一行代碼:
pushService.addClient(namespaceId, serviceName, clusters, agent,
new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
其實是把消費者的
UDP
端口、IP等資訊封裝為一個
PushClient
對象,存儲
PushService
中。友善以後服務變更後推送消息。
PushService
類本身實作了
ApplicationListener
接口:
這個是事件監聽器接口,監聽的是
ServiceChangeEvent
(服務變更事件)。當服務清單變化時,就會通知我們:
3.總結
Nacos的服務發現分為兩種模式:
模式一:主動拉取模式,消費者定期主動從
Nacos
拉取服務清單并緩存起來,再服務調用時優先讀取本地緩存中的服務清單。