天天看點

帶你讀《雲原生應用開發 Operator原理與實踐》第二章 Operator 原理2.2Client-go 原理(十一)

Watch操作通過 HTTP與 KubernetesAPIServer建立長連結, 接收 KubernetesAPIServer發來的變更時間,Watch操作的實作機制使用的是 HTTP的分塊傳輸編碼。當 Client-go調 用 KubernetesAPIServer 時, 在 Response的 HTTPHeader 中 設 置Transfer-Encoding的值為 Chunked。r.listerWatcher.Watch實際調用了 PodInformer的 watchfunc函數。通過 ClientSet用戶端與 APIServer 建立長連結,監控指定資源的變更事件。r.watchHandler用于處理資源的變更時間,當初發增删改AddedUpdated等事件時,将對應的資源對象更新到本地緩存 DeltaFIFO中,并更新 ResouceVersion。至此實作了 Reflctor元件的功能。

1. Client-goDeltaFIFO

DeltaFIFO是一個 FIFO隊列,記錄了資源對象的變化過程。作為一個 FIFO隊列,它的生産者就是 Reflector元件,前面講過 Reflector将監聽對象同步到 DeltaFIFO中,DeltaFIFO對這些資源對象做了什麼,見代碼清單2-40。

typeDeltaFIFOstruct{

locksync.RWMutex

condsync.Cond//條件變量,喚醒等待的協程

itemsmap[string]Deltas//Delta存儲桶

queue[]string//隊列存儲對象鍵實際就是和items⼀起形成了⼀個有序Map

//true通過Replace() 第⼀批元素被插⼊隊列或者Delete/Add/Update⾸次被調⽤

populatedbool

//Replace() 被⾸次調⽤時插⼊的元素數⽬

initialPopulationCountint

//函數計算元素Key值

keyFuncKeyFunc

//列出已知的對象

knownObjectsKeyListerGetter

//隊列是否被關閉,關閉互斥鎖

closedboolclosedLocksync.Mutex

}

FIFO接收 Reflector的 Adds/Updates 添加和更新事件,并将它們按照順序放入隊列。元素在隊列中被處理之前,如果有多個Adds/Updates 事件發生,事件隻會被處理一次。

使用場景:(1)僅處理對象一次;(2)處理完目前事件後才能處理最新版本的對象;(3)删除對象之後不會處理;(4)不能周期性重新處理對象。這裡的Delta對象就是 Kubernetes系統中對象的變化。Delta有 Type和 Object兩個屬性,DeltaType就是資源變化的類型, 比如 Add、Update等,DeltaObject就是具體的 Kubernetes資源對象,見代碼清單 2-41。例如,此時 Reflector中監聽了一個PodA的 Add事件,那麼此時 DeltaType就是Added,DeltaObject就是 PodA,DeltaFIFO中的資料是什麼樣的呢?此時 Items中會有 Add類型的 Delta,Queue中也會有這個事件的 Key。這個 Key由 KeyFunc生成。Client-go中預設的 KeyFunc是 MetaNamespaceKeyFunc,可以在tools/cache/store.go:76中找到。由 MetaNamespaceKeyFunc生成的 Key格式為/,用來辨別不同命名空間下的不同資源。

typeDeltastruct{

Type    DeltaType                   //Delta類型,⽐如增、減,後⾯有詳細說明

Objectinterface{}                  //對象,Delta的粒度是⼀個對象

typeDeltaTypestring                  //Delta的類型⽤字元串表達

const(

AddedDeltaType= "Added" //增加UpdatedDeltaType= "Updated" //更新DeletedDeltaType= "Deleted" //删除SyncDeltaType= "Sync" //同步

)

typeDeltas[]Delta                    //Delta數組

既然 DeltaFIFO是一個 FIFO,那麼它就應該有基本的 FIFO功能,這裡 DeltaFIFO實作了 Queue接口。下面看一下 Queue接口功能的定義。我們可以看出 Queue擴充了Store接口的功能,附加了 Pop、AddIfNotPresent、HasSynced、Close方法。Store是一個通用的對象存儲和處理的接口,本身提供了 Add、Update、List、Get等方法,Queue接口增加了 Pop方法,實作了一個基本 FIFO隊列,具體見代碼清單 2-42。

typeQueueinterface{Store

Pop(PopProcessFunc)(interface{},error)AddIfNotPresent(interface{})errorHasSynced()bool

Close()

下面我們來看一下FIFO隊列的基本功能是怎麼實作的。首先是 Add方法,我們可以看到 Add方法會先根據 KeyFunc計算出對象的 Key,如果隊列中沒有這個對象,我們就在這個隊列尾部增補對象的Key,并且将這個對象存入 Map,具體見代碼清單2-43。

func(f*FIFO)Add(objinterface{})error{id,err:=f.keyFunc(obj)

iferr!=nil{

returnKeyError{obj,err}

f.lock.Lock()

deferf.lock.Unlock()f.populated=true

if _,exists:=f.items[id];!exists{f.queue=append(f.queue,id)

f.items[id]=objf.cond.Broadcast()returnnil

接下來我們看一下 Pop方法, 在 Queue中至少有一個資源時才會進行 Pop操作。在處理資源之前,資源會從隊列(和存儲)中移除,如果未成功處理資源,應該用AddIfNotPresent()函數将資源添加回隊列。處理邏輯由 PopProcessFunc進行執行,具體見代碼清單2-44。

func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){

deferf.lock.Unlock()for{

forlen(f.queue)==0{

//Whenthequeueisempty,invocationofPop()isblockeduntilnewitemisenqueued.

//WhenClose()iscalled,thef.closedissetandtheconditionisbroadcasted.

//Whichcausesthislooptocontinueandreturnfromthe

Pop().

iff.IsClosed(){

returnnil,FIFOClosedError

f.cond.Wait()

id:=f.queue[0]

f.queue=f.queue[1:]item,ok:=f.items[id]

iff.initialPopulationCount>0{f.initialPopulationCount--

if!ok{

//Itemmayhavebeendeletedsubsequently.continue

delete(f.items,id)err:=process(item)

if e,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err

//Don'tneedtocopyDeltashere,becausewe'retransferring

//ownershiptothecaller.returnitem,err

func(f*DeltaFIFO)KeyOf(objinterface{})(string,error)

if d,ok:=obj.(Deltas);ok{iflen(d)==0{

return"",KeyError{obj,ErrZeroLengthDeltasObject}

obj=d.Newest().Object

ifd,ok:=obj.(DeletedFinalStateUnknown);ok{

returnd.Key,nil

returnf.keyFunc(obj)

值得注意的是,DeltaFIFO中用于計算對象鍵的函數KeyOf為什麼要先進行一次Deltas的類型轉換呢?是因為Pop 出去的對象很可能還要再添加進來(比如處理失敗需要再放進來),此時添加的對象就是已經封裝好的Delta對象了。至此,已實作DeltaFIFO的基本功能。