天天看點

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

<a href="http://my.oschina.net/pingpangkuangmo/blog/484955">zookeeper源碼研究系列(1)源碼環境搭建</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/486780">zookeeper源碼研究系列(2)用戶端建立連接配接過程分析</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/491673">zookeeper源碼研究系列(3)單機版伺服器介紹</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/495311">zookeeper源碼研究系列(4)叢集版伺服器介紹</a>

一個最簡單的demo如下:

使用的maven依賴如下:

對于目前來說,zookeeper的伺服器端代碼和用戶端代碼還是混在一起的,估計日後能改吧。

使用的zookeeper的構造函數有三個參數構成

zookeeper叢集的伺服器位址清單

該位址是可以填寫多個的,以逗号分隔。如"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",那用戶端連接配接的時候到底是使用哪一個呢?先随機打亂,然後輪詢着用,後面再詳細介紹。

sessiontimeout

最終會引出三個時間設定:和伺服器端協商後的sessiontimeout、readtimeout、connecttimeout

伺服器端使用協商後的sessiontimeout:即超過該時間後,用戶端沒有向伺服器端發送任何請求(正常情況下用戶端會每隔一段時間發送心跳請求,此時伺服器端會從新計算用戶端的逾時時間點的),則伺服器端認為session逾時,清理資料。此時用戶端的zookeeper對象就不再起作用了,需要再重新new一個新的對象了。

用戶端使用connecttimeout、readtimeout分别用于檢測連接配接逾時和讀取逾時,一旦逾時,則該用戶端認為該伺服器不穩定,就會從新連接配接下一個伺服器位址。

watcher

作為zookeeper對象一個預設的watcher,用于接收一些事件通知。如和伺服器連接配接成功的通知、斷開連接配接的通知、session過期的通知等。

同時我們可以看到,一旦和zookeeper伺服器連接配接建立成功,就會擷取伺服器端配置設定的sessionid和password,如下:

下面就通過源碼來詳細說明這個建立連接配接的過程。

首先與zookeeper伺服器建立連接配接,有兩層連接配接要建立。

用戶端與伺服器端的tcp連接配接

在tcp連接配接的基礎上建立session關聯

建立tcp連接配接之後,用戶端發送connectrequest請求,申請建立session關聯,此時伺服器端會為該用戶端配置設定sessionid和密碼,同時開啟對該session是否逾時的檢測。

當在sessiontimeout時間内,即還未逾時,此時tcp連接配接斷開,伺服器端仍然認為該sessionid處于存活狀态。此時,用戶端會選擇下一個zookeeper伺服器位址進行tcp連接配接建立,tcp連接配接建立完成後,拿着之前的sessionid和密碼發送connectrequest請求,如果還未到該sessionid的逾時時間,則表示自動重連成功,對用戶端使用者是透明的,一切都在背後默默執行,zookeeper對象是有效的。

如果重建立立tcp連接配接後,已經達到該sessionid的逾時時間了(伺服器端就會清理與該sessionid相關的資料),則傳回給用戶端的sessiontimeout時間為0,sessionid為0,密碼為空位元組數組。用戶端接收到該資料後,會判斷協商後的sessiontimeout時間是否小于等于0,如果小于等于0,則使用eventthread線程先發出一個keeperstate.expired事件,通知相應的watcher,然後結束eventthread線程的循環,開始走向結束。此時zookeeper對象就是無效的了,必須要重新new一個新的zookeeper對象,配置設定新的sessionid了。

它是面向使用者的,提供一些操作api。

它又兩個重要的屬性:

clientcnxn cnxn:負責所有的zookeeper節點操作的執行

zkwatchmanager watchmanager:負責維護某個path上注冊的watcher

如建立某個node操作(同步方式):

zookeeper對象負責建立出request,并交給clientcnxn來執行,zookeeper對象再對傳回結果進行處理。

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

下面來看下異步回調的方式建立node:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

同步方式送出一個請求後,開始循環判斷該請求包的狀态是否結束,即處于阻塞狀态,一旦結束則繼續往下走下去,傳回結果。異步方式則送出一個請求後,直接傳回,對結果的處理邏輯包含在回調函數中。一旦該對該請求包響應完畢,則取出回調函數執行相應的回調方法。

至此簡單了解了,zookeeper對象主要封裝使用者的請求以及處理響應等操作。使用者請求的執行全部交給clientcnxn來執行,那我們就詳細看下clientcnxn的來源及大體内容。

先看看clientcnxn是怎麼來的:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

第一步:為zkwatchmanager watchmanager設定一個預設的watcher

第二步:将連接配接字元串資訊交給connectstringparser進行解析

連接配接字元串比如: "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181/root"

解析結果如下:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

得到兩個資料string chrootpath預設的跟路徑和arraylist&lt;inetsocketaddress&gt; serveraddresses即多個host和port資訊。

第三步:根據上述解析的host和port清單結果,建立一個hostprovider

有了connectstringparser的解析結果,為什麼還需要一個hostprovider再來包裝下呢?主要是為将來留下擴充的餘地

來看下hostprovider的詳細接口介紹:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

hostprovider主要負責不斷的對外提供可用的zookeeper伺服器位址,這些伺服器位址可以是從一個url中加載得來或者其他途徑得來。同時對于不同的zookeeper用戶端,給出就近的zookeeper伺服器位址等。

來看下預設的hostprovider實作statichostprovider:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

有三個屬性,一個就是伺服器位址清單(經過如下方式随機打亂了):

collections.shuffle(this.serveraddresses)

另外兩個屬性用于标記,下面來具體看下,statichostprovider是如何實作不斷的對外提供zookeeper伺服器位址的:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

代碼也很簡單,就是在打亂的伺服器位址清單中,不斷地周遊,到頭之後,在從0開始。

上面的spindelay是個什麼情況呢?

正常情況下,currentindex先加1,然後傳回currentindex+1的位址,當該位址連接配接成功後會執行onconnected方法,即lastindex = currentindex了。然而當傳回的currentindex+1的位址連接配接不成功,繼續嘗試下一個,仍不成功,仍繼續下一個,就會遇到currentindex=lastindex的情況,此時即輪詢了一遍,仍然沒有一個位址能夠連接配接上,此時的政策就是先暫停休息休息,然後再繼續。

第四步:為建立clientcnxn準備參數并建立clientcnxn。

首先是通過getclientcnxnsocket()擷取一個clientcnxnsocket。來看下clientcnxnsocket是主要做什麼工作的:

a clientcnxnsocket does the lower level communication with a socket implementation. this code has been moved out of clientcnxn so that a netty implementation can be provided as an alternative to the nio socket code.

專門用于負責socket通信的,把一些公共部分抽象出來,其他的留給不同的實作者來實作。如可以選擇預設的clientcnxnsocketnio,也可以使用netty等。

來看下getclientcnxnsocket()的擷取clientcnxnsocket的過程:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

首先擷取系統參數"zookeeper.clientcnxnsocket",如果沒有的話,使用預設的clientcnxnsocketnio,是以我們可以通過指定該參數來替換預設的實作。

參數準備好了,clientcnxn是如何來建立的呢?

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

首先就是儲存一些對象參數,此時的sessionid和sessionpasswd都還沒有。然後就是兩個timeout參數:connecttimeout和readtimeout。在clientcnxn的發送和接收資料的線程中,會不斷的檢測連接配接逾時和讀取逾時,一旦出現逾時,就認為服務不穩定,需要更換伺服器,就會從hostprovider中擷取下一個伺服器位址進行連接配接。

最後就是兩個線程,一個事件線程即eventthread,一個發送和接收socket資料的線程即sendthread。

事件線程eventthread呢就是從一個事件隊列中不斷取出事件并進行處理:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

看下具體的處理過程,主要分成兩種情況,一種就是我們注冊的watch事件,另一種就是處理異步回調函數:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

可以看到這裡就是觸發我們注冊watch的,還有觸發上文提到的異步回調的情況的。

明白了eventthread是如何來處理事件的,需要知道這些事件是如何來的:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

對外提供了三個方法來添加不同類型的事件,如sendthread線程就會調用這三個方法來添加事件。其中對于事件通知,會首先根據zkwatchmanager watchmanager來擷取關心該事件的所有watcher,然後觸發他們。

再來看看sendthread的工作内容:

sendthread = new sendthread(clientcnxnsocket); 把傳遞給clientcnxn的clientcnxnsocket,再傳遞給sendthread,讓它服務于sendthread。

在sendthread的run方法中,有一個while循環,不斷的做着以下幾件事:

任務1:不斷檢測clientcnxnsocket是否和伺服器處于連接配接狀态,如果是未連接配接狀态,則從hostprovider中取出一個伺服器位址,使用clientcnxnsocket進行連接配接。

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

和伺服器建立連接配接成功後,開始發送connectrequest請求,把該請求放到outgoingqueue請求隊列中,等待被發送給伺服器

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

任務2:檢測是否逾時:當處于連接配接狀态時,檢測是否讀逾時,當處于未連接配接狀态時,檢測是否連接配接逾時

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

一旦逾時,則抛出sessiontimeoutexception,然後看下是如何處理呢?

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

可以看到一旦發生逾時異常或者其他異常,都會進行清理,并設定連接配接狀态為未連接配接,然後發送disconnected事件。至此又會進入任務1的流程

任務3:不斷的發送ping通知,伺服器端每接收到ping請求,就會從目前時間重新計算session過期時間,是以當用戶端按照一定時間間隔不斷的發送ping請求,就能保證用戶端的session不會過期。發送時間間隔如下:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

clientcnxnsocket.getidlesend():是最後一次發送資料包的時間與目前時間的間隔。當readtimeout的時間已經過去一半多了,都沒有發送資料包的話,則執行一次ping發送。或者過去max_send_ping_interval(10s)都還沒有發送資料包的話,則執行一次ping發送。

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

ping發送的内容隻有請求頭opcode.ping的标示,其他都為空。發送ping請求,也是把該請求放到outgoingqueue發送隊列中,等待被執行。

任務4:執行io操作,即發送請求隊列中的請求和讀取伺服器端的響應資料。

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

首先從outgoingqueue請求隊列中取出第一個請求,然後進行序列化,然後使用socket進行發送。

讀取伺服器端資料如下:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

分為兩種:一種是讀取針對connectrequest請求的響應,另一種就是其他響應,先暫時不說。

先來看看針對connectrequest請求的響應:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

首先進行反序列化,得到connectresponse對象,我們就可以擷取到伺服器端給我們用戶端配置設定的sessionid和passwd,以及協商後的sessiontimeout時間。

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

首選要根據協商後的sessiontimeout時間,重新計算readtimeout和connecttimeout值。然後保留和記錄sessionid和passwd。最後通過eventthread發送一個syncconnected連接配接成功事件。至此,tcp連接配接和session初始化請求都完成了,用戶端的zookeeper對象可以正常使用了。

至此,我們便了解用戶端與伺服器端建立連接配接的過程。

伺服器端情況分很多種,先暫時說最簡單的單機版。同時也不再給出伺服器端的啟動過程(後面的文章再來詳細說明)。

首先介紹下伺服器端的大體概況:

首先是伺服器端的配置檔案,有ticktime、minsessiontimeout、maxsessiontimeout相關屬性。預設情況下,ticktime是3000ms,minsessiontimeout是2倍的ticktime,maxsessiontimeout是20倍的ticktime。

伺服器端預設采用nioservercnxnfactory來負責socket的處理。每來一個用戶端socket請求,為該用戶端建立一個nioservercnxn。之後與該用戶端的互動,就交給了nioservercnxn來處理。對于用戶端的connectrequest請求,處理如下:

首先反序列化出connectrequest

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

然後開始協商sessiontimeout時間

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

下面來分别詳細的說明這三個過程:

sessiontracker是用來建立删除session,執行session的過期檢查的。

直接看下預設使用的sessiontrackerimpl:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

先看下session有哪些屬性:

final long sessionid:session的唯一标示

final int timeout:這個session的timeout時間(即上文中用戶端和伺服器端商定下來的timeout時間)

long ticktime:這個session的下一次逾時時間點(随着用戶端不斷的發送ping請求,就會不斷的重新整理該時間,不斷的往後變化)

boolean isclosing:session的标示符,用于标示session是否還被正常使用

object owner:建立該session的owner。會在用戶端更換所連接配接的伺服器的時候用到(之後詳細說明)

然後再來看看sessiontracker的幾個資料:

hashmap&lt;long, sessionimpl&gt; sessionsbyid:很簡單,以sessionid存儲session

concurrenthashmap&lt;long, integer&gt; sessionswithtimeout:以sessionid存儲每個session的timeout時間

hashmap&lt;long, sessionset&gt; sessionsets:某個時間點上的session集合(用于session過期檢查)

long nextsessionid:初始的sessionid,之後建立的sessionid就在此基礎上自增

nextexpirationtime:下一次過期時間點,每當到該時間點就會進行一次session的過期檢查

expirationinterval:session過期檢查的周期

要搞清楚的内容有:1 建立session的過程 2 session過期檢查的過程

先來看看建立session的過程:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

代碼很簡單,就是建立一個sessionimpl對象,然後存儲到sessiontracker中,同時開始計算session的逾時時間。這裡有一個内容就是sessionid的來曆,我們可以看到就是根據nextsessionid來的,并且是不斷自增的。

sessionid是一個用戶端的重要标示,是全局唯一的,先來看看單機版的nextsessionid初始化:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語
ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語
ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

單機版的伺服器使用1通過計算來初始化nextsessionid。而叢集版對應的id則分别是每個機器指定的sid。計算過程如下:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

第一步:就是取目前時間,為 10100111011100110110010101110100111100011 為41為二進制

第二步:long有64位,左移24位,其實是除掉了前面的1,後面補了24位的0。

第三步:第二步的結果可能是正數也可能是負數,目前是正數,之後可能就是負數了,你可以算一下需要多少年,哈哈。為了保證右移的時候,進行補0操作,需要使用無符号右移,即&gt;&gt;&gt;。這裡使用了無符号右移8位

第四步:将傳過來的id這裡即1左移56位。然後再與第三步的正數結果進行或操作,得到最終的基準nextsessionid,是以當這裡的id值不是很大的話,一般幾台機器而已,也保證了sessionid是一個正數,同時前八位就是機器的sid号。是以每台機器的的前八位是不同的,保證了每台機器中不會配置相同的sessionid,每台機器的sessionid又是自增操作,是以單台機器内sessionid也是不會重複的。

綜上所示保證了sessionid是唯一的,不會出現重複配置設定的情況。

搞清楚了sessionid的配置設定,接下來就要弄清楚如何進行session的過期檢查問題:

我們先看下,session激活過程是怎麼處理的:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

首先擷取這個session資料,然後計算它的超期時間

long expiretime = roundtointerval(system.currenttimemillis() + timeout);

private long roundtointerval(long time) {

}

即是拿目前時間加上這個session的timeout時間,然後對其進行取expirationinterval的整,即始終保持是expirationinterval的正數倍,即每個session的過期時間點最終都會落在expirationinterval的整數倍上。

如果原本該session的超期時間就大于你所計算出的超期時間,則不做任何處理,否則設定該session的超期時間為上述計算結果的超期時間。

取出原本該session所在的超期時間,從集合裡面删除

重新擷取現在超期時間所在的集合,添加進去

綜上所述,session的激活其實就是重新計算下逾時時間,最終取expirationinterval的正數倍,然後從之前時間點的集合中移除,然後再添加到新的時間點的集合中去。

至此,session的檢查就友善多了,隻需要在expirationinterval整數時間點上取出集合,然後一個個标記為過期即可。而那些不斷被激活的session,則不斷的從一個時間點的集合中換到下一個時間點的集合中。

sessiontrackerimpl也是一個線程,該線程執行内容就是session的過期檢查,如下所示:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

回到建立session的三大步驟:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

來看下密碼是如何來産生的:

其中supersecret為常量

使用random的方式來随機生成位元組數組。但是該位元組數組,隻要參數即sessionid相同,位元組數組的内容就相同。即當我們知道了sessionid,就可以利用上述方式算出對應的密碼,我感覺密碼基本上沒什麼用。

再看下當用戶端帶着sessionid和密碼進行連接配接的時候,這時會進行密碼的檢查:

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

看了上面的代碼,就再次驗證了密碼沒什麼鳥用,知道了sessionid,就完全知道了密碼。是以這一塊有待改進吧,應該不能由sessionid完全決定吧,如再加上目前時間等等,讓用戶端造不出來密碼,同時伺服器端存儲加密後的密碼。

本文内容已太多,這裡就先簡單描述下,之後再詳細的講解

ZooKeeper源碼研究系列(2)用戶端建立連接配接過程分析1 系列目錄2 用戶端API簡單使用3 用戶端的建立連接配接的過程4 伺服器端處理連接配接的過程5 結束語

如果是成功建立session,則把sessiontimeout、sessionid、passwd傳遞給用戶端。如果沒有成功建立,上述三者的值分别是0,0,new byte[16]

之後用戶端處理該響應的過程,上面已經說了,可以回頭再看下。

下一篇文章開始講述zookeeper叢集時,伺服器端對使用者的建立連接配接請求的處理。