天天看點

帶你讀《雲原生應用開發 Operator原理與實踐》第二章 Operator 原理2.2Client-go 原理(十三)

1. Client-goIndexer

資源對象從 DeltaFIFO中Pop 出去後又經過了哪些處理呢。這要從一開始的 sharedIndexInformer說起。注意,在 sharedIndexInformer的 Run 方法中,初始化了它的配置,并執行了 s.controller.Run方法。我們可以看到s.controller.Run中初始化了 Reflector,開始了指定資源的List-Watch 操作,并且同步到了DeltaFIFO中,同時執行了processLoop方法。此時我們可以看到 processLoop方法不斷從DeltaFIFO中将資源對象 Pop出來, 并且交給了之前的 c.config.Process方法進行處理。而c.config.Process方法就是sharedIndexInformer的 HandleDeltas方法,具體見代碼清單 2-45。

func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){

...

cfg:=&Config{

Queue:              fifo,

ListerWatcher:             s.listerWatcher,ObjectType:          s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:                 false,

ShouldResync:       s.processor.shouldResync,

Process:         s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,

}

func(){

s.startedLock.Lock()

defers.startedLock.Unlock()

}()

s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true

s.controller.Run(stopCh)

func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()

gofunc(){

<-stopCh

c.config.Queue.Close()

r:=NewReflector(

c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,

)

r.ShouldResync=c.config.ShouldResync

r.clock=c.clock

c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()

wait.Until(c.processLoop,time.Second,stopCh)

func(c*controller)processLoop(){for{

obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))

iferr!=nil{

iferr==FIFOClosedError{

return

ifc.config.RetryOnError{

//Thisisthesafewaytore-enqueue.

c.config.Queue.AddIfNotPresent(obj)

綜上可知,由 DeltaFIFO中Pop出來的對象最後交給了 HandleDeltas進行處理,而在 HandleDeltas中,将資源對象同步到了 Indexer中,至此我們引出了 Informer子產品中的第 3個元件 Indexer。Indexer是 Client-go 中實作的一個本地存儲,它可以建立索引并存儲 Resource的對象。Reflector通過 DeltaFIFOQueue将資源對象存儲到Indexer中。需要注意的是,Indexer中的資料與 ETCD中的資料是完全一緻的,當 Client-go需要資料時,無須每次都從 APIServer中擷取,進而減輕了請求過多造成的對 APIServer的壓力, 具體見代碼清單 2-46。

func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{

s.blockDeltas.Lock()

defers.blockDeltas.Unlock()

//fromoldesttonewest

for_,d:=rangeobj.(Deltas){switchd.Type{

caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)

ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&

exists{

if err:=s.indexer.Update(d.Object);err!=nil{returnerr

requestedresync 

nil{

==nil{

isSync:=falseswitch{

cased.Type==Sync:

//Synceventsareonlypropagatedtolistenersthat

isSync=true

cased.Type==Replaced:

ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err

//Replacedeventsthatdidn'tchange

resourceVersionaretreatedasresyncevents

//andonlypropagatedtolisteners

thatrequestedresync

==oldAccessor.GetResourceVersion()

isSync=accessor.GetResourceVersion()

s.processor.distribute(updateNotification{oldObj:old,

newObj:d.Object},isSync)

}else{

if err:=s.indexer.Add(d.Object);err!=nil{returnerr

false)

s.processor.distribute(addNotification{newObj:d.Object},

caseDeleted:

iferr:=s.indexer.Delete(d.Object);err!=nil{

returnerr

s.processor.distribute(deleteNotification{oldObj:d.Object},false)

returnnil

Indexer   是如何實作存儲并快速查找資源的呢?我們先看一下 Indexer接口提供的功能。Cache是 Indexer的一種非常經典的實作,所有的對象緩存在記憶體中,而且從Cache 這個類型的名稱來看它屬于包内私有類型,外部無法直接使用,隻能通過專用的函數建立。 這裡的 Store、Indexer使用了一個 threadSafeMap來保證并發安全的存儲。它擁有存儲相關的增、删、改、查等方法。threadSafeMap繼承了 Store接口,而 Indexer擴充了threadSafeMap, 為 threadSafeMap提供了索引操作。threadSafeMap其實隻能夠存儲和索引。存儲即将runtime.object存儲到 Items的 Map中;索引即為Items的 Map建立三層索引:IndicesMap類型索引(如 namespace、nodeName等);IndexMap 類型索引(如 namespace1、namespace2……);runtime.object類型索引,實作見代碼清 單2-47。

typeIndexerinterface{Store

//indexName索引類,obj是對象,計算obj在indexName索引類中的索引鍵,通過索引鍵

擷取所有的對象

//基本就是擷取符合obj特征的所有對象,所謂的特征就是對象在索引類中的索引鍵

Index(indexNamestring,objinterface{})([]interface{},error)

//indexKey是 indexName索引類中的⼀個索引鍵,函數傳回indexKey指定的所有對象鍵

IndexKeys(indexName,indexedValuestring)([]string,error)

//擷取indexName索引類中的所有索引鍵

ListIndexFuncValues(indexNamestring)[]string

//這個函數和 Index類似,隻是傳回值不是對象鍵,⽽是所有對象

ByIndex(indexName,indexedValuestring)([]interface{},error)

//傳回Indexers

GetIndexers()Indexers

//添加Indexers,就是增加更多的索引分類

AddIndexers(newIndexersIndexers)error

在Kubernetes中使用的比較多的索引函數是MetaNamespaceIndexFunc(() 代碼位置:

client-go/tools/cache/index.go),Indexer索引的實作是通過index.ByIndex來完成的, index.ByIndex的實作見代碼清單 2-48。這個函數傳回了符合索引函數的值的對象清單。

func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()

deferc.lock.RUnlock()

indexFunc:=c.indexers[indexName]ifindexFunc==nil{

returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)

index:=c.indices[indexName]set:=index[indexKey]

list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){

list=append(list,c.items[key])

returnlist,nil

上述方法接收兩個參數:indexName(索引器的名稱)和indexedValue(需要索引的 Key)。首先根據索引器名稱查找指定的索引器函數(c.indexers[indexName]);然後根據索引器名稱查找相應的緩存器函數(c.indices[indexName]) ;最後根據索引 Key

(indexedValue)從緩存中進行資料查詢,并傳回查詢結果。

繼續閱讀