本篇讨論nacos服務端對于執行個體注冊的處理流程。
先回想一下用戶端如何發起注冊請求的,代碼在NacosNamingService的registerService方法:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
臨時節點添加心跳保持,然後調用serverProxy.registerService注冊服務:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
不往裡跟了,url是/nacos/v1/ns/instance。下面我們把目光轉向服務端。首先是接收的接口InstanceController類,對應的方法是:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
首先根據請求封裝一個Instance執行個體對象,然後調用serviceManager的registerInstance方法進行執行個體注冊:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 建立服務
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 擷取服務
Service service = getService(namespaceId, serviceName);
// 再取一次,如果是空,可能是沒有心跳被移除了
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加執行個體
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
同一個服務可能有多個執行個體,是以這裡隻是提供了執行個體接口,先判斷服務是否存在,如果不存在則建立一個新的,最後把執行個體添加進去。
createEmptyService 建立服務
跟一下代碼到createServiceIfAbsent方法:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 先擷取服務
Service service = getService(namespaceId, serviceName);
// 如果服務不存在則建立一個
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
// new 一個
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
//計算校驗和
service.recalculateChecksum();
//有叢集要添加
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//服務驗證,服務和叢集名驗證
service.validate();
//服務初始化
putServiceAndInit(service);
if (!local) {
//永久服務還要添加到一緻性服務裡
addOrReplaceService(service);
}
}
}
getService
先看一下服務是否存在,如果不存在則建立一個新的。看一下getService方法:
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
// 對應命名空間下的服務名
return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
這個serviceMap的結構是Map<namespaceId,Map<serviceName,Service>>。
服務初始化putServiceAndInit方法的過程:
private void putServiceAndInit(Service service) throws NacosException {
//添加到命名空間中
putService(service);
//心跳初始化
service.init();
//生成key放入一緻性服務裡,永久的和臨時的
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
putService就是往serviceMap中添加新的服務對象。
init方法:
public void init() {
// 建立一個5秒的周期任務, 檢查心跳 延遲5秒,周期5秒
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
啟一個clientBeatCheckTask任務。 下面關于Cluster的暫時先不看。 這裡暫時沒有。
我們看一下這個任務的執行邏輯:
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
// 目前service的所有執行個體
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
// 超過健康檢查時間
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 超過删除檢查時間
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
檢查該服務下的所有執行個體,如果最後上報的心跳時間超過了InstanceHeartBeatTimeOut時間,則釋出事件,這裡釋出了兩個事件ServiceChangeEvent和InstanceHeartbeatTimeoutEvent事件。
然後再檢查是否超過了删除時間,如果過了删除時間,則執行deleteIp删除:
private void deleteIp(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
// delete instance asynchronously:
HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
@Override
public Object onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), response.getResponseBody(), response.getStatusCode());
}
return null;
}
});
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
調的是本機自己的删除執行個體接口。
ServiceChangeEvent
回來看心跳逾時的兩個事件。首先是ServiceChangeEvent。處理的監聽器是PushService,我們看一下它的實作:
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
//擷取所有需要推送的PushClient
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
//逾時的不删除不處理
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
//有壓縮資料
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
//準備UDP資料
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
//發送
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
//發送完删除
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
可以看到就是發送UDP資料通知所有用戶端。我們看一下udpPush:
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
//大于嘗試的次數 記錄日志,failedPush+1,移除待發送資料,确認ackMap移除對應的key
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
//發送UDP封包
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
//10秒沒應答就再嘗試一次
// Retransmitter裡面重新調用udpPush
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
注意UDP的不可靠性,這裡會重新嘗試。我們看一下Retransmitter:
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
//沒接受到響應。收到響應後會從ackMap中移除
if (ackMap.containsKey(ackEntry.key)) {
Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
// 再次發送
udpPush(ackEntry);
}
}
}
還是調用udpPush。
InstanceHeartbeatTimeoutEvent
這個事件沒找到哪兒處理的,有興趣的朋友可以找找看。
consistencyService的listen
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
一緻性服務監聽兩個key,一個是臨時的,一個是永久的。 這裡的key字首是com.alibaba.nacos.naming.iplist.,如果是臨時的,後面多個ephemeral.
然後是DelegateConsistencyServiceImpl代理一緻性服務
@Override
public void listen(String key, RecordListener listener) throws NacosException {
// this special key is listened by both:
// 特殊字首key兩個都監聽
// com.alibaba.nacos.naming.domains.meta.
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
persistentConsistencyService.listen(key, listener);
ephemeralConsistencyService.listen(key, listener);
return;
}
// 根據key設定不同的一緻性監聽
mapConsistencyService(key).listen(key, listener);
}
private ConsistencyService mapConsistencyService(String key) {
// com.alibaba.nacos.naming.iplist.ephemeral.
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
我們看一下臨時的DistroConsistencyServiceImpl
@Override
public void listen(String key, RecordListener listener) throws NacosException {
// ConcurrentLinkedQueue隊列
if (!listeners.containsKey(key)) {
listeners.put(key, new ConcurrentLinkedQueue<>());
}
// 二次确認不重複添加
if (listeners.get(key).contains(listener)) {
return;
}
// 添加到隊列中
listeners.get(key).add(listener);
}
就是添加到監聽隊列裡。我們看一下監聽什麼。首先看一下DistroConsistencyServiceImpl的初始化方法:
@PostConstruct
public void init() {
GlobalExecutor.submitLoadDataTask(loadDataTask);
GlobalExecutor.submitDistroNotifyTask(notifier);
}
添加了兩個任務,loadDataTask是啟動加載的,這個以後再說。現在看notifier
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
// 無限循環
for (; ; ) {
try {
// 消費隊列
Pair<String, ApplyAction> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
當有task添加進來後,這裡從隊列中取出來,然後處理:
private void handle(Pair<String, ApplyAction> pair) {
try {
String datumKey = pair.getValue0();
ApplyAction action = pair.getValue1();
// 消費後移除key
services.remove(datumKey);
int count = 0;
// 沒有監聽直接退出
if (!listeners.containsKey(datumKey)) {
return;
}
// 周遊監聽器,調動對應的監聽方法
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
可以看到監聽onChange和onDelete。也就是說每個服務service建立出來後會添加到監聽器清單中,當出現修改和删除時處理對應的變更。
addOrReplaceService
當要添加的服務不是臨時服務時,會調用這個方法:
public void addOrReplaceService(Service service) throws NacosException {
consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
}
然後是RaftConsistencyServiceImpl的put方法:
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
然後是signalPublish方法:
public void signalPublish(String key, Record value) throws Exception {
//不是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
//交給leader去做/v1/ns/raft/datum
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
try {
// 是leader
// 加鎖
OPERATE_LOCK.lock();
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//釋出資料改變通知 peers是所有節點集合. peers.local擷取本機
onPublish(datum, peers.local());
final String content = json.toString();
// 過半同步成功才會響應,也就是說put操作需要過半同步成功,強一緻性 CP模型
//隻要過半的結點數
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//周遊所有結點
for (final String server : peers.allServersIncludeMyself()) {
//自己算一次
if (isLeader(server)) {
latch.countDown();
continue;
}
// /v1/ns/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
// 異步同步資料
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
// 處理完成coundDown
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半數完成 還有個5秒逾時時間
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
// 解鎖
OPERATE_LOCK.unlock();
}
}
可以看到這裡涉及到raft部分的了。如果目前節點不是leader,則轉發給leader處理。
如果是leader,onPublish進行本地資料儲存,釋出change任務。 然後通知所有從節點,使用CountDownLatch計數,當有超過一半+1個響應了,則送出完成了。我們看一下CountDownLatch的個數怎麼來的peers.majorityCount():
public int majorityCount() {
return peers.size() / 2 + 1;
}
節點數/2+1,即半數+1個。
回來看一下onPublish方法:
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
//不是leader不能幹這個事
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
//過時了
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
//重置任期
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
//放入資料
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
//添加任務
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
raftStore做了本地檔案存儲。然後addTask添加change任務。
至此新增服務部分完成了,下面是往服務裡添加執行個體。
addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//獲得服務執行個體key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//再次擷取服務
Service service = getService(namespaceId, serviceName);
// 加鎖
synchronized (service) {
//添加并擷取所有該服務的執行個體
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
//設定到包裝對象裡
instances.setInstanceList(instanceList);
//放進一緻性服務裡。這裡根據key來選擇是臨時性的還是永久性的
consistencyService.put(key, instances);
}
}
首先是添加新的執行個體
addIpAddresses
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
updateIpAddresses方法:
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
//擷取老的執行個體集合資料
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
//擷取叢集中所有相關的執行個體集合,臨時的或者是永久的
List<Instance> currentIPs = service.allIPs(ephemeral);
//IP端口和執行個體的映射
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
//執行個體ID集合
Set<String> currentInstanceIds = Sets.newHashSet();
//放入對應的集合裡
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
//更新後的老的執行個體集合
Map<String, Instance> instanceMap;
if (datum != null) {
//根據目前服務執行個體的健康标志和心跳時間,來更新老的執行個體集合資料
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
//重新建立一個
instanceMap = new HashMap<>(ips.length);
}
//周遊新的執行個體
for (Instance instance : ips) {
//不存在就建立服務執行個體叢集
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
//初始化,開啟叢集心跳檢查
cluster.init();
//添加服務執行個體叢集
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
//删除操作的話就删除老的執行個體集合的資料
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
//否則添加
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
//傳回總的執行個體集合
return new ArrayList<>(instanceMap.values());
}
這是一個統一的更新操作,新舊比較後處理完傳回新的資料
然後丢到一緻性服務裡
consistencyService.put
前面分析過RaftConsistencyServiceImpl的put了,這裡我們看一下臨時的DistroConsistencyServiceImpl的put方法:
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
// 臨時一緻性協定的同步資料。這裡同步資料是異步任務執行的,也就是說先傳回用戶端put成功再同步,弱一緻性。 AP模型
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), ApplyAction.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
首先是onPut方法:
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
//建立臨時資料
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//放進一個map裡
dataStore.put(key, datum);
}
//沒有監聽器就傳回
if (!listeners.containsKey(key)) {
return;
}
//有監聽立即通知服務有改變
notifier.addTask(key, ApplyAction.CHANGE);
}
可以看到addTask一個change任務。即新執行個體添加變更事件。
下面是distroProtocol.sync:
public void sync(DistroKey distroKey, ApplyAction action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// holder持有臨時同步延遲執行器引擎,引擎中有NacosTaskProcessor,臨時一緻性情況下實際上持有的是DistroDelayTaskProcessor,添加任務後最終由processor執行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
這裡是添加了distroDelayTask任務,通過DistroDelayTaskProcessor處理:
@Override
public boolean process(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
// 發起臨時一緻性同步任務
if (ApplyAction.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
return true;
}
return false;
}
syncChangeTask是異步線程了,我們看一下他的run方法:
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(ApplyAction.CHANGE);
// syncData執行資料同步,交由 NamingProxy.syncData執行 /nacos/v1/ns/distro/datum
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
// 同步失敗重試,就是重新送出任務
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
發送rest請求,失敗重試。
總結一下
到這裡nacos服務端處理服務注冊基本分析完了。簡單總結一下:
- 首先看情況是否建立一個新的Service。
- 初始化時建立資料初始化加載任務和心跳檢測任務
- 建立Service後需要添加監聽,用于監聽資料的修改和删除操作。
- 新服務Service 建立完成後需要把新的執行個體添加進去
- 新老執行個體比較一下進行更新
- 觸發一緻性服務的put方法,會先進行本地通知,然後進行叢集資料同步。
- 過程中我們分析了兩種叢集同步資料的方式
- AP模型,弱一緻性。通過建立DistroSyncChangeTask任務進行通知。主線程不等待叢集通知結果先傳回給使用者。
- CP模型,強一緻性。通過RaftCore的signalPublish方法進行資料同步,要求半數以上有響應才會成功。