天天看點

【SpringCloud技術專題】「Eureka源碼分析」從源碼層面讓你認識Eureka工作流程和運作機制(下)

承接上文的對應的Eureka的上篇介紹,我們開始介紹,詳見 [【SpringCloud技術專題】「Eureka源碼分析」從源碼層面讓你認識Eureka工作流程和運作機制(上)]

Eureka Server 提供服務注冊服務,各個節點啟動後,會在Eureka Server中進行注冊,這樣Eureka Server中的服務系統資料庫中将會存儲所有可用服務節點的資訊,服務節點的資訊可以在界面中直覺的看到。

Eureka Client 是一個Java 用戶端,用于簡化與Eureka Server的互動,用戶端同時也具備一個内置的、使用輪詢負載算法的負載均衡器。

在應用啟動後,将會向Eureka Server發送心跳(預設周期為30秒),如果Eureka Server在多個心跳周期(預設3個心跳周期=90秒)沒有收到某個節點的心跳,Eureka Server将會從服務系統資料庫中把這個服務節點移除。

高可用情況下的:Eureka Server之間将會通過複制的方式完成資料的同步;

Eureka Client具有緩存的機制,即使所有的Eureka Server 都挂掉的話,用戶端依然可以利用緩存中的資訊消費其它服務的API;

剛才在org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每個方法都打了一個斷點,而且現在EurekaServer已經處于Debug運作狀态,那麼我們就随便找一個被 @EnableEurekaClient 的微服務啟動試試微服務來試試吧,直接Run。

當啟動後,就一定會調用注冊register方法,那麼就接着往下看,拭目以待;

執行個體注冊方法機制

InstanceRegistry.register順着堆棧資訊往上看,是 ApplicationResource.addInstance 方法被調用了,分析addInstance;

主要是處理接收 Http 的服務請求。

這裡的寫法貌似看起來和我們之前 Controller 的 RESTFUL 寫法有點不一樣,仔細一看,原來是Jersey RESTful 架構,是一個産品級的RESTful service 和 client 架構。與Struts類似,它同樣可以和hibernate,spring架構整合。

看到 registry.register(info, "true".equals(isReplication)); 注冊啊,原來EurekaClient用戶端啟動後會調用會通過Http(s)請求,直接調到ApplicationResource.addInstance 方法,隻要是和注冊有關的,都會調用這個方法。

接着我們深入 registry.register(info, "true".equals(isReplication)) 檢視;

handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;

然後通過 ApplicationContext 釋出了一個事件 EurekaInstanceRegisteredEvent 服務注冊事件,可以給 EurekaInstanceRegisteredEvent 添加監聽事件,那麼使用者就可以在此刻實作自己想要的一些業務邏輯。

然後我們再來看看 super.register(info, isReplication) 方法,該方法是 InstanceRegistry 的父類 PeerAwareInstanceRegistryImpl 的方法。

進入PeerAwareInstanceRegistryImpl 類的 register(final InstanceInfo info, final boolean isReplication) 方法;
進入super.register(info, leaseDuration, isReplication),如何寫入EurekaServer 的系統資料庫的,進入AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。

發現這個方法有點長,大緻閱讀,主要更新了系統資料庫的時間之外,還更新了緩存等其它東西,大家有興趣的可以深究閱讀該方法;

replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的這個方法。

每當有注冊請求,首先更新 EurekaServer 的系統資料庫,然後再将資訊同步到其它EurekaServer的節點上去;

接下來我們看看 node 節點是如何進行複制操作的,進入 replicateInstanceActionsToPeers 方法。

節點之間的複制狀态操作,都在這裡展現的淋漓盡緻,那麼我們就拿 Register 類型 node.register(info) 來看,我們來看看 node 究竟是如何做到同步資訊的,進入 node.register(info) 方法看看;

PeerEurekaNode.register(final InstanceInfo info) 方法,一窺究竟如何同步資料。

這裡涉及到了 Eureka 的任務批處理,通常情況下Peer之間的同步需要調用多次,如果EurekaServer一多的話,那麼将會有很多http請求,所

以自然而然的孕育出了任務批處理,但是也在一定程度上導緻了注冊和下線的一些延遲,突出優勢的同時也勢必會造成一些劣勢,但是這些延遲情況還是能符合

常理在容忍範圍之内的。

在 expiryTime 逾時時間之内,批次處理要做的事情就是合并任務為一個List,然後發送請求的時候,将這個批次List直接打包發送請求出去,這樣的話,在這個批次的List裡面,可能包含取消、注冊、心跳、狀态等一系列狀态的集合List。

我們再接着看源碼,batchingDispatcher.process 這麼一調用,然後我們就直接看這個 TaskDispatchers.createBatchingTaskDispatcher 方法。

這裡的 process 方法會将任務添加到隊列中,有入隊列自然有出隊列,具體怎麼取任務,我就不一一給大家講解了,我就講講最後是怎麼觸發任務的。進入 final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor) 這句代碼的 TaskExecutors.batchExecutors 方法。

我們發現 TaskExecutors 類中的 batchExecutors 這個靜态方法,有個 BatchWorkerRunnable 傳回的實作類,是以我們再次進入 BatchWorkerRunnable 類看看究竟,而且既然是 Runnable,那麼勢必會有 run 方法。

這就是我們 BatchWorkerRunnable 類的 run 方法,這裡面首先要擷取信号量釋放,才能獲得任務集合,一旦擷取到了任務集合的話,那麼就直接調用 processor.process(tasks) 方法請求 Peer 節點同步資料,接下來我們看看 ReplicationTaskProcessor.process 方法;

感覺快要見到真相了,是以我們迫不及待的進入 JerseyReplicationClient.submitBatchUpdates(ReplicationList replicationList) 方法一窺究竟。

看到了相對路徑位址,我們搜尋下"batch"這樣的字元串看看有沒有對應的接收方法或者被@Path注解進入的;在 eureka-core-1.4.12.jar 這個包下面,果然搜到到了 @Path("batch") 這樣的字樣,直接進入,發現這是 PeerReplicationResource 類的方法 batchReplication,我們進入這方法看看。

看到了循環一次周遊任務進行處理,不知不覺覺得心花怒放,勝利的重點馬上就要到來了,我們進入 PeerReplicationResource.dispatch 方法看看。

随便抓一個類型,那我們也拿 Register 類型來看,進入 PeerReplicationResource.handleRegister 看看。

Peer節點的同步旅程終于結束了,最終又回調到了 ApplicationResource.addInstance 這個方法,這個方法在最終是EurekaClient啟動後注冊調用的方法,然而Peer節點的資訊同步也調用了這個方法,僅僅隻是通過一個變量 isReplication 為true還是false來判斷是否是節點複制。剩下的ApplicationResource.addInstance流程前面已經提到過了,相信大家已經明白了注冊的流程是如何扭轉的,包括批量任務是如何處理EurekaServer節點之間的資訊同步的了。

Run運作discovery-eureka服務,Debug 運作 provider-user 服務,先觀察日志先;

【1】:仔細檢視下日志,先是 DefaultLifecycleProcessor 類處理了一些 bean,然後接下來肯定會調用一些實作 SmartLifecycle 類的start 方法;

【2】: 接着初始化設定了EurekaClient的狀态為 STARTING,初始化編碼使用的格式,哪些用JSON,哪些用XML;

【3】: 緊接着列印了強制擷取注冊資訊狀态為false,已注冊的應用大小為0,用戶端發送心跳續約,心跳續約間隔為30秒,最後列印Client初始化完成;

這個注解類竟然也使用了注解 @EnableDiscoveryClient,那麼我們有必要去這個注解類看看。

這個注解類有個比較特殊的注解 @Import,由此我們猜想,這裡的大多數邏輯是不是都寫在這個 EnableDiscoveryClientImportSelector 類呢?

EnableDiscoveryClientImportSelector 類繼承了 SpringFactoryImportSelector 類,但是重寫了一個 isEnabled() 方法,預設值傳回 true,為什麼會傳回true。
首先通過注解擷取了一些屬性,然後加載了一些類名稱,我們進入loadFactoryNames 方法看看。
加載了一個配置檔案,配置檔案裡面寫了啥呢?打開SpringFactoryImportSelector該檔案所在的jar包的spring.factories檔案一看。
都是一些 Configuration 字尾的類名,是以這些都是加載的一堆堆的配置檔案類。 factories對象裡面隻有一個類名路徑為 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。

首先看到該類實作了SmartLifecycle 接口,那麼就肯定會實作 start 方法,而且這個 start 方法感覺應在會被加載執行的。

this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 這段代碼有一個觀察者模式的回調存在。

這個方法會因為狀态的改變而回調所有實作 StatusChangeListener 這個類的地方,前提得先注冊到 listeners 中去才行。

于是乎,我們斷定,若想要回調,那麼就必須有地方先注冊這個事件,而且這個注冊還必須提前執行在 start 方法前執行,于是我們得先在ApplicationInfoManager 這個類中找到注冊到 listeners 的這個方法。

于是我們逆向找下 registerStatusChangeListener 被調用的地方。

很不巧的是,盡然隻有1個地方被調用,這個地方就是 DiscoveryClient.initScheduledTasks 方法,而且 initScheduledTasks

方法又是在 DiscoveryClient 的構造函數裡面調用的,同時我們也對 initScheduledTasks 以及 initScheduledTasks 被調用的構造方法地方

打上斷點。

果不其然,EurekaDiscoveryClientConfiguration.start 方法被調用了,緊接着 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也進入斷點,然後在往下走,又進入的 DiscoveryClient.initScheduledTasks 方法中的 notify 回調處。

看着斷點依次經過我們上述分析的地方,然後也符合日志列印的順序,是以我們現在應該是有必要好好看看 DiscoveryClient.initScheduledTasks 這個方法究竟幹了什麼偉大的事情。然而又想了想,還不如看看 initScheduledTasks 被調用的構造方法。

從往下看,initScheduledTasks 這個方法顧名思義就是初始化排程任務,是以這裡面的内容應該就是重頭戲,進入看看。

在這個方法從上往下一路注釋分析下來,幹了EurekaClient我們最想知道的一些事情,定時任務擷取注冊資訊,定時任務重新整理緩存,定時任務心跳續約,定時任務同步資料中心資料,狀态變化監聽回調等。但是唯獨沒看到注冊,這是怎麼回事呢?

instanceInfoReplicator.onDemandUpdate() 就是在狀态改變的時候。

onDemandUpdate 這個方法,唯獨 InstanceInfoReplicator.this.run() 這個方法還有點用,而且還是 run 方法呢,感情 InstanceInfoReplicator 這個類還是實作了 Runnable 接口?經過檢視這個類,還真是實作了 Runnable 接口。

這個方法應該我們要找的注冊所在的地方。

discoveryClient.register() 這個 register 方法,原來注冊方法就是這個。

原來調用了 EurekaHttpClient 封裝的用戶端請求對象來進行注冊的,再繼續深探 registrationClient.register 方法,于是我們來到了 AbstractJerseyEurekaHttpClient.register 方法。

調用的是 Jersey RESTful 架構來進行請求的,然後在 EurekaServer 那邊就會在 ApplicationResource.addInstance 方法接收用戶端的注冊請求,是以我們的 EurekaClient 是如何注冊的就到此為止了。

極限就是為了超越而存在的