天天看點

帶你讀《雲原生應用開發 Operator原理與實踐》第二章 Operator 原理2.2Client-go 原理(十一)

2.2.6        Client-go Informer 解析

1. Client-goInformer子產品

Informer可以對 KubernetesAPIServer的資源執行 Watch操作 , 類型可以是Kubernetes内置資源,也可以是 CRD。其中最核心的子產品是 Reflector、DeltaFIFO、Indexer。接下來我們逐個進行分析。

首先分析   Reflector,Reflector   用于監控指定資源的    Kubernetes。當資源發生變化時,如發生了資源添加(Added)、資源更新(Updated)等事件,Reflector會将其資源對象存放在本地緩存   DeltaFIFO   中。它的作用就是擷取 APIServer中對象資料并實時地更新到本地,使得本地資料和ETCD資料完全一樣。它的資料結構見代碼清單2-37。

typeReflectorstruct{

namestring//這個 Reflector的名稱,預設為⽂件 : ⾏數metrics*reflectorMetrics//⽤于儲存 Reflector的⼀些監控名額expectedTypereflect.Type//期望放到 Store中的類型名稱storeStore//與 Watch源同步的⽬标Store

listerWatcherListerWatcher//ListerWatcher接⼝,⽤于指定 List-Watch⽅法

period           time.Duration//Watch周期resyncPeriodtime.Duration//重新同步周期ShouldResyncfunc() bool

//clockallowsteststomanipulatetimeclockclock.Clock

lastSyncResourceVersionstring//最後同步的資源的版本号

lastSyncResourceVersionMutexsync.RWMutex//lastSyncResourceVersion的讀寫鎖

}

通過 NewRefector執行個體化 Reflector對象,執行個體化過程中必須傳入 ListerWatcher資料接口對象,它擁有List和 Watch方法,用于擷取及監控資源清單,隻要是實作了 List和 Watch方法的對象都可以成為 ListerWatcher,Reflector對象通過 run函數啟動監控并處理事件,而在 Reflector源碼實作中最主要的是List-Watch函數,它負責 List/Watch指定的 KubernetesAPIServer資源,見代碼清單 2-38。

//NewNamedReflectorsameasNewReflector,butwithaspecifiednameforloggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},storeStore,resyncPeriodtime.Duration)*Reflector{

reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)r:=&Reflector{

name:name,

//weneedthistobeuniqueperprocess(somenamesarestillthesame)butobviouswhoitbelongsto

metrics:        newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.

Sprintf("reflector_"+name+"_%d",reflectorSuffix))),

listerWatcher:lw,store:store,

expectedType:reflect.TypeOf(expectedType),

period:    time.Second,resyncPeriod:resyncPeriod,clock:&clock.RealClock{},

returnr

List-Watch是怎麼實作的? List-Watch主要分為 List和 Watch兩部分。List負責擷取對應資源的全量清單,Watch負責擷取變化的部分。首先進行 List操作,這裡把ResourceVersion設定為 0,因為要擷取同步的對象的全部版本,是以從 0開始 List,主要流程如下(見代碼清單2-39)。

(1)r.listerWatcher.List 用于擷取資源下的所有對象的資料。

(2)  listMetaInterface.GetResourceVersion   用于擷取資源版本号(ResouceVersion),資源版本号非常重要,Kubernetes中所有的資源都擁有該字段,它辨別目前資源對象的版本号。每次修改目前資源對象時,KubernetesAPIServer都會更改 ResouceVersion,使得 Client-go執行 Watch操作時可以根據 ResourceVersion 來确定目前資源對象是否發生過變化。

(3)   meta.ExtractList用于将資源資料轉換成資源對象清單,将runtime.Object轉換成[]runtime.Object,因為 r.listerWatcher.List隻是擷取一個清單。

(4)r.syncWith 用于将資源對象清單中的資源對象和資源版本号存儲至DeltaFIFO中,并替換已存在的對象。

(5)r.setLastSyncResourceVersion 用于設定最新的資源版本号。

func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{glog.V(3).Infof("Listingandwatching%vfrom%s",r.expectedType,r.name)varresourceVersionstring

options:=metav1.ListOptions{ResourceVersion:"0"}r.metrics.numberOfLists.Inc()

start:=r.clock.Now()

list,err:=r.listerWatcher.List(options)iferr!=nil{

returnfmt.Errorf("%s:Failedtolist%v:%v",r.name,r.expectedType,

err)

r.metrics.listDuration.Observe(time.Since(start).Seconds())listMetaInterface,err:=meta.ListAccessor(list)

iferr!=nil{

returnfmt.Errorf("%s:Unabletounderstandlistresult%#v:%v",

r.name,list,err)

resourceVersion=listMetaInterface.GetResourceVersion()items,err:=meta.ExtractList(list)

returnfmt.Errorf("%s:Unabletounderstandlistresult%#v(%v)",r.name,list,err)

r.metrics.numberOfItemsInList.Observe(float64(len(items)))

iferr:=r.syncWith(items,resourceVersion);err!=nil{

returnfmt.Errorf("%s:Unabletosynclistresult:%v",r.name,err)

r.setLastSyncResourceVersion(resourceVersion)

resyncerrc:=make(chanerror,1)

cancelCh:=make(chanstruct{})deferclose(cancelCh)

gofunc(){

resyncCh,cleanup:=r.resyncChan()deferfunc(){

cleanup()//Callthelastonewrittenintocleanup

}()

for{

select{

case<-resyncCh:case<-stopCh:

returncase<-cancelCh:

return

ifr.ShouldResync==nil||r.ShouldResync(){glog.V(4).Infof("%s:forcingresync",r.name)iferr:=r.store.Resync();err!=nil{

resyncerrc<-errreturn

cleanup()

resyncCh,cleanup=r.resyncChan()

//givethestopChachancetostoptheloop,evenincaseofcontinue

statementsfurtherdownonerrorsselect{

case<-stopCh:

returnnildefault:

timeoutSeconds:=int64(minWatchTimeout.Seconds()* (rand.Float64()+

1.0))

options=metav1.ListOptions{ResourceVersion:resourceVersion,TimeoutSeconds:&timeoutSeconds,

r.metrics.numberOfWatches.Inc()

w,err:=r.listerWatcher.Watch(options)

switcherr{

caseio.EOF:

//watchclosednormally

caseio.ErrUnexpectedEOF:

glog.V(1).Infof("%s:Watchfor%vclosedwithunexpected

EOF:%v",r.name, r.expectedType, err)

default:

utilruntime.HandleError(fmt.Errorf("%s:Failedtowatch

%v:%v",r.name,r.expectedType,err))

ifurlError,ok:=err.(*url.Error);ok{

ifopError,ok:=urlError.Err.(*net.OpError);ok{

iferrno,ok:=opError.Err.(syscall.Errno);ok&&

errno==syscall.ECONNREFUSED{

time.Sleep(time.Second)continue

returnnil

err!=nil{

iferr:=r.watchHandler(w,&resourceVersion,resyncerrc,stopCh);

iferr!=errorStopRequested{

glog.Warningf("%s:watchof%vendedwith:%v",r.name,

r.expectedType,err)