![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiMGc902byZ2PlJzNlVzY2AzN2IWY2MjYhJWYwQTO2YWOiFWNmVTM2E2LcBza5QTcsJja2FXLp1ibj1ycvR3Lc5Wanlmcv9CXt92YucWbp9WYpRXdvRnL1A3Lc9CX6MHc0RHaiojIsJye.jpg)
大家好,我是傑哥
上周,我們通過文章Spring Cloud(六):注冊中心nacos-服務端視角篇,站在nacos的服務端的視角,對nacos作為注冊中心的服務端節點之間的選舉、心跳、服務注冊以及同步等主要動作進行了梳理說明
那麼,如果想要對與nacos作為注冊中心有一個全面的了解呢,今天就繼續跟着傑哥一起,轉向用戶端的角度,來探究一波吧~
關于如何實作Nacos用戶端,以及用戶端的consumer如何如調用provider的服務,可以參考文章Spring Cloud(五):注冊中心-nacos篇,進行實作
等你實作以後,再進入下面的源碼探究環節,可能會更快速進入節奏哦~
首先,我們從初始化步驟開始,進入源碼環節
一 準備
初始工作
nacos作為注冊中心,它的初始化是由NacosNamingService.init()方法進行的
我們一起看看,init()方法裡都做了什麼
1)初始化命名空間namespace:
initNamespaceForNaming()
在Nacos中,命名空間用于租戶粗粒度隔離,同時還可以進行環境的差別,如開發環境和測試環境等等
2)初始化伺服器位址:initServerAddr()
3)初始化web上下文:initWebRootContext()
它支援通過阿裡雲EDAS進行部署
4)初始化緩存目錄:initCacheDir()
5)初始化日志檔案名稱:調用initLogName()方法,從配置中擷取日志檔案
接下來,就是我們要看的重點了
1)初始化事件分發監聽器EventDispatcher
當用戶端訂閱了某個服務資訊後,會以Listener的方式注冊到EventDispatcher的隊列中,當有服務變化的時候,便會通知訂閱者
2)初始化服務端代理NamingProxy
用于用戶端與服務端的通信
3)初始化心跳通信類BeatReactor
用于維持與伺服器之間的心跳通信,上報用戶端注冊到服務端的服務資訊
4)初始化服務資訊更新器HostReactor
用于用戶端服務的訂閱,以及從服務端更新服務資訊。對了,也就是說用戶端會使用HostReactor進行訂閱操作,訂閱了之後,上面說到的事件分發監聽器EventDispatcher會将服務資訊通知給訂閱者咯~
二 重點
步驟源碼
01.服務更新
EventDispatcher是一個事件分發器,它維護了一個有變更的服務的隊列
changedServices
private BlockingQueue<ServiceInfo> changedServices
= new LinkedBlockingQueue<ServiceInfo>();
以及一個對于某個服務的監聽者清單ConcurrentMap:observerMap
private ConcurrentMap<String, List<EventListener>> observerMap
= new ConcurrentHashMap<String, List<EventListener>>();
其中key為服務名稱,value為監聽這個服務變更的用戶端執行個體清單
也就是說,EventDispatcher會實時地将服務變化資訊,同步給監聽該服務變化資訊的用戶端,使得該用戶端可以進行後續動态操作
小結 1 EventDispatcher工作流程
02.心跳
首先,在初始化NacosNamingService.init()中,我們看到nacos已經對心跳類BeatReactor進行了初始化,那麼在什麼時候會開始進行第一次心跳呢?
答案是:執行個體第一次注冊的時候。第一次開啟并執行了心跳任務,會再開啟下一次心跳的定時任務,一直繼續下去,直到該執行個體下線
1)進入addBeatInfo()方法
執行個體注冊的時候調用了addBeatInfo()方法,并且預設時間間隔為5s
2)進入BeatReactor.addBeatInfo()方法
該方法通過ScheduledExecutorService執行BeatTask任務
那麼隻需要看一下該方法的run()方法,就可以知道心跳任務的具體執行邏輯了
3)進入BeatTask.run()
該方法會調用serverProxy.sendBeat()方法,向服務端發送自身的健康狀況
4)進入serverProxy.sendBeat()方法
調用reqAPI()方法,調用服務端接口:/v1/ns/instance/beat
那麼,來到服務端,我們看看,接收到心跳請求之後,服務端的處理
進入服務端
服務端的/beat接口,接收到來自用戶端的心跳包,調用RaftCore.receivedBeat()方法進行心跳包的處理
1)進入RaftCore#receivedBeat()方法
分為兩個部分來看
第一部分 邏輯判斷,排除資料過期等場景
a 分别構造目前節點資訊local和發送心跳的節點資訊remote
b 若發送節點不是leader,則不符合邏輯(隻能由leader發起心跳),抛出異常
c 如果local的任期大于remote,說明資訊已過期,抛出異常
d 若本地節點不是follower狀态,不合邏輯,将本地的節點狀态更新為follower
第二部分 收集資料,實作服務的更新
a 周遊請求參數中的datums,如果Follwoer不存在這個datumKey或者時間戳比較舊,則收集這個datumKey
b 當datnum的數量小于50時,繼續進行收集
c 當數量大于等于50時,進行異步發送,擷取對應的50個最新的Datnum對象
第三部分 将資料添加到緩存中,并更新參數
a 調用RaftStore#write()方法,将Datum序列化為json寫到本地緩存中
b 将Datum存放到RaftCore的datums集合中
c 調用notifier.addTask()通知對應的RaftListener,删除key對應的舊的Datum
d 重置leaderDue時間
e 更新本地節點的任期term
小結 2 用戶端心跳流程
關于服務端後續的具體處理流程,依然可以參考Spring Cloud(五):注冊中心-nacos篇哦~
03.服務更新
HostReactor用于擷取、儲存、更新各服務執行個體資訊
通過維護一個服務的map集合:serviceInfoMap實作
private Map<String, ServiceInfo> serviceInfoMap;
具體的話,來一起看看~
1)擷取服務端的服務資訊
當用戶端擷取服務資訊的時候,HostReactor就把服務資訊儲存到serviceInfoMap中,并通過UpdateTask能夠周期性的從服務端擷取訂閱服務的最新資訊,具體代碼如下
我們看到,擷取服務資訊的方法getServiceInfo()方法中,擷取到服務資訊,然後将服務資訊儲存在serviceInfoMap中
2)更新服務資訊
進入updateTask.run()方法
我們看到該方法實際上是會去定期判斷目前是否有服務資訊發生變化,若有變化,則會調用updateServiceNow(),來更新服務資訊
3)進入updateServiceNow()方法
該方法通過服務代理調用queryList()方法,擷取服務資訊清單,并通過
processServiceJSON()方法,進行結果資訊的處理
首先我們先來看看queryList()方法
4)進入queryList()方法
調用服務端接口/instance/list方法,擷取服務資訊清單
5)再來看看processServiceJSON()方法
分為兩部分處理
第一部分:原有服務不為空的情況處理
第二部分:原有服務資訊為空的情況處理
我們看到,總的來說,processServiceJSON()方法會将擷取到的最新服務資訊清單存放在serviceInfoMap中,于是服務資訊便得到了更新
當然,我們也可以看到,processServiceJSON()方法更新完服務之後,也會調用eventDispatcher.serviceChanged(),将服務的變更情況,釋出到隊列中,也會調用DiskCache.write(),将服務資訊持久化到磁盤中
同時,在這裡也簡單提一下,HostReactor還持有以下兩個對象
pushReceiver對象,這個對象呢,用于通過UDP協定從伺服器擷取推送的資訊,當然也會更新到serviceInfoMap當中
failoverReactor對象,這個對象呢,用于故障轉移。當服務端不可用的時候,會切換到本地緩存模式,從緩存中擷取服務資訊。
在我看來,這一點類似于Eureka中,用戶端也會将服務資訊存儲在本地緩存,當注冊中心服務不可用時,也不會馬上調用失敗,就極大地保證了服務的可用性
小結3 服務資訊維護角色-HostReactor的原理
好了,看完了 服務的心跳和服務更新機制,接下來,再一起進入服務上線與下線的步驟中
04.上線(注冊)
說到服務的注冊呢,我們先從服務Nacos的服務自動注冊類說起
1) 檢視NacosAutoServiceRegistration類
該類繼承了AbstractAutoServiceRegistration類,而這個類,實作了ApplicationListener接口,并實作了其onApplicationEvent()方法,使得在服務啟動之後,可以執行一些處理
我們看到,這裡調用了bind()方法
2)進入bind()方法
在bind()方法中,調用了start()方法
3)進入start()方法
在start()方法中,便調用了register()方法,進行服務的注冊操作
4)進入registrer()方法
這裡使用政策模式,進入到
NacosServiceRegistry.registerInstance()方法
5)進入registerInstance()方法
6)進入執行個體注冊方法registerInstance()
可以看到,會首先發送心跳,再使用服務代理,調用服務端注冊接口,發送注冊請求
7)進入NamingProxy.registerService()
首先擷取執行個體資訊,封裝參數,然後調用服務端/nacos/ns/instance的post接口,進行服務的注冊請求
1)register接口接受到用戶端的請求,調用接口registerInstance()方法進行執行個體資訊注冊
2)進入ServiceManager#registerInstance()方法
a 建立空service
b 調用addInstance()方法添加新的執行個體
3)進入addInstance()方法
該方法主要添加 instance 到緩存中,并且持久化
4)進入addIpAddresss()方法
該方法通過調用updateIpAddresss()方法進行具體處理
5)進入updateIpAddresss()方法
我們看到該方法實際上是通過調用setValid()方法,将舊執行個體清單與新執行個體清單進行合并的
6)執行個體資訊持久化consistencyService.put(key, instances)
再來回到執行個體資訊持久化的方法,該方法通過調用RaftCore#signalPublish()方法,進行具體的執行個體資訊持久化
7)實作執行個體資訊持久化,分為兩個部分
第一部分 基本判斷
a 若節點不為leader狀态,則轉發請求給leader
b 若節點為leader狀态,則将包發送給所有follower
第二部分 發送消息,同步給大多數節點
發送消息,同步等待,接受到大多數節點的響應之後,傳回成功
小結 3 用戶端執行個體注冊步驟
如下圖
04.下線(登出)
當服務端不可用,或者主動下關閉用戶端服務執行個體時,會進入deregister()方法,取消服務的注冊
比如當手動停掉目前的生産者服務,控制台會打出這樣的日志
從日志中我們也可以看到,實際上它是調用了NacosServiceRegistry.deregister()方法,進行服務的下線操作,
1)進入NacosServiceRegistry.deregister()方法
該方法調用namingService.deregisterInstance()方法進行處理
2)進入namingService.deregisterInstance()方法
該方法會分别進行移除心跳,然後向服務端發送取消服務注冊的請求
3)進入serverProxy.deregisterService()方法
該方法則使用reqAPI()調用服務端的/nacos/v1/ns/instance的DELETE接口
進入服務端
1)接口接收到用戶端的取消注冊請求,調用方法removeInstance()
2)進入該removeInstance()方法
3)将該方法添加到taskDispatcher中,後續将進行删除該執行個體資訊的操作
小結 4 用戶端執行個體取消注冊步驟
三 總結
總而言之
好了,今天的推送到這裡就結束啦
與上一篇一樣,這篇也是花了蠻多心血在裡面的,也不是說自己有什麼執念,而是你們的回報與自己的收獲,讓我有了堅持的動力,感謝自己,也感謝你們~
本篇文章,主要從以下幾點進行介紹:
1 初始化工作介紹
2 源碼探究 - 事件分發器EventDispatcher
3 源碼探究 - 心跳
4 源碼探究 - 服務更新HostReactor
5 源碼探究 - 注冊和下線
站在用戶端的角度,分别探究了心跳、服務更新、服務注冊和下線這幾個主要事件。通過分析用戶端、服務端的處理流程,以及用戶端與服務端的互動,将一個動作整體貫穿起來,讓大家更全面地了解到nacos注冊中心的原理機制,每部分也都用流程圖,呈現給了大家。
相信大家結合前兩篇關于nacos作為注冊中心的基本機制有了一定的了解了!
嗯,就這樣。每天學習一點,時間會見證你的強大~
下期預告:
Spring Cloud(八):注冊中心-Consul