天天看點

k8s client-go源碼分析 informer源碼分析(4)-DeltaFIFO源碼分析

k8s client-go k8s informers實作了持續擷取叢集的所有資源對象、監聽叢集的資源對象變化功能,并在本地維護了全量資源對象的記憶體緩存,以減少對apiserver、對etcd的請求壓力。Informers在啟動的時候會首先在用戶端調用List接口來擷取全量的對象集合,然後通過Watch接口來擷取增量的對象,然後更新本地緩存。

client-go之DeltaFIFO源碼分析

1.DeltaFIFO概述

先從名字上來看,DeltaFIFO,首先它是一個FIFO,也就是一個先進先出的隊列,而Delta代表變化的資源對象,其包含資源對象資料本身及其變化類型。

Delta的組成:

type Delta struct {
    Type   DeltaType
    Object interface{}
}
           

DeltaFIFO的組成:

type DeltaFIFO struct {
    ...
    items map[string]Deltas
	queue []string
    ...
}

type Deltas []Delta
           

具體來說,DeltaFIFO存儲着map[object key]Deltas以及object key的queue,Delta裝有對象資料及對象的變化類型。輸入輸出方面,Reflector負責DeltaFIFO的輸入,Controller負責處理DeltaFIFO的輸出。

一個對象能算出一個唯一的object key,其對應着一個Deltas,是以一個對象對應着一個Deltas。

而目前Delta有4種Type,分别是: Added、Updated、Deleted、Sync。針對同一個對象,可能有多個不同Type的Delta元素在Deltas中,表示對該對象做了不同的操作,另外,也可能有多個相同Type的Delta元素在Deltas中(除Deleted外,Delted類型會被去重),比如短時間内,多次對某一個對象進行了更新操作,那麼就會有多個Updated類型的Delta放入Deltas中。

k8s client-go源碼分析 informer源碼分析(4)-DeltaFIFO源碼分析

2.DeltaFIFO的定義與初始化分析

2.1 DeltaFIFO struct

DeltaFIFO struct定義了DeltaFIFO的一些屬性,下面挑幾個重要的分析一下。

(1)lock:讀寫鎖,操作DeltaFIFO中的items與queue之前都要先加鎖;

(2)items:是個map,key根據對象算出,value為Deltas類型;

(3)queue:存儲對象key的隊列;

(4)keyFunc:計算對象key的函數;

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex

           

type Deltas

再來看一下Deltas類型,是Delta的切片類型。

type Deltas []Delta
           

type Delta

繼續看到Delta類型,其包含兩個屬性:

(1)Type:代表的是Delta的類型,有Added、Updated、Deleted、Sync四個類型;

(2)Object:存儲的資源對象,如pod等資源對象;

type Delta struct {
	Type   DeltaType
	Object interface{}
}
           
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string

// Change type definition
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// The other types are obvious. You'll get Sync deltas when:
	//  * A watch expires/errors out and a new list/watch cycle is started.
	//  * You've turned on periodic syncs.
	// (Anything that trigger's DeltaFIFO's Replace() method.)
	Sync DeltaType = "Sync"
)
           

2.2 DeltaFIFO初始化-NewDeltaFIFO

NewDeltaFIFO初始化了一個items和queue都為空的DeltaFIFO并傳回。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
	}
	f.cond.L = &f.lock
	return f
}
           

3.DeltaFIFO核心處理方法分析

在前面分析Reflector時,Reflector的核心處理方法裡有調用過幾個方法,分别是r.store.Replace、r.store.Add、r.store.Update、r.store.Delete,結合前面文章的k8s informer的初始化與啟動分析,或者簡要的看一下下面的代碼調用,就可以知道Reflector裡的r.store其實就是DeltaFIFO,而那幾個方法其實就是DeltaFIFO的Replace、Add、Update、Delete方法。

sharedIndexInformer.Run方法中調用NewDeltaFIFO初始化了DeltaFIFO,随後将DeltaFIFO作為參數傳入初始化Config;

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    ...
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    
    cfg := &Config{
		Queue:            fifo,
		...
	}
	
	func() {
		...
		s.controller = New(cfg)
		...
	}()
	...
	s.controller.Run(stopCh)
           

在controller的Run方法中,調用NewReflector初始化Reflector時,将之前的DeltaFIFO傳入,指派給Reflector的store屬性,是以Reflector裡的r.store其實就是DeltaFIFO,而調用的r.store.Replace、r.store.Add、r.store.Update、r.store.Delete方法其實就是DeltaFIFO的Replace、Add、Update、Delete方法。

func (c *controller) Run(stopCh <-chan struct{}) {
	...
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	...
}
           
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	r := &Reflector{
		...
		store:         store,
		...
	}
	...
	return r
}
           

是以這裡對DeltaFIFO核心處理方法進行分析,主要是分析DeltaFIFO的Replace、Add、Update、Delete方法。

3.1 DeltaFIFO.Add

DeltaFIFO的Add操作,主要邏輯:

(1)加鎖;

(2)調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Added類型的新Delta追加到相應的Deltas中;

(3)釋放鎖。

func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}
           

可以看到基本上DeltaFIFO所有的操作都有加鎖操作,是以都是并發安全的。

3.1.1 DeltaFIFO.queueActionLocked

queueActionLocked負責操作DeltaFIFO中的queue與Deltas,根據對象key構造新的Delta追加到對應的Deltas中,主要邏輯:

(1)計算出對象的key;

(2)構造新的Delta,将新的Delta追加到Deltas末尾;

(3)調用dedupDeltas将Delta去重(目前隻将Deltas最末尾的兩個delete類型的Delta去重);

(4)判斷對象的key是否在queue中,不在則添加入queue中;

(5)根據對象key更新items中的Deltas;

(6)通知所有的消費者解除阻塞;

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    //(1)計算出對象的key
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //(2)構造新的Delta,将新的Delta追加到Deltas末尾
	newDeltas := append(f.items[id], Delta{actionType, obj})
	//(3)調用dedupDeltas将Delta去重(目前隻将Deltas最末尾的兩個delete類型的Delta去重)
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
	    //(4)判斷對象的key是否在queue中,不在則添加入queue中
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		//(5)根據對象key更新items中的Deltas
		f.items[id] = newDeltas
		//(6)通知所有的消費者解除阻塞
		f.cond.Broadcast()
	} else {
		// We need to remove this from our map (extra items in the queue are
		// ignored if they are not in the map).
		delete(f.items, id)
	}
	return nil
}
           

3.2 DeltaFIFO.Update

DeltaFIFO的Update操作,主要邏輯:

(1)加鎖;

(2)調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Updated類型的新Delta追加到相應的Deltas中;

(3)釋放鎖。

func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}
           

3.3 DeltaFIFO.Delete

DeltaFIFO的Delete操作,主要邏輯:

(1)計算出對象的key;

(2)加鎖;

(3)items中不存在對象key,則直接return,跳過處理;

(4)調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Deleted類型的新Delta追加到相應的Deltas中;

(5)釋放鎖。

func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	// informer的用法中,f.knownObjects不為nil
	if f.knownObjects == nil {
		if _, exists := f.items[id]; !exists {
			// Presumably, this was deleted when a relist happened.
			// Don't provide a second report of the same deletion.
			return nil
		}
	} else {
		// We only want to skip the "deletion" action if the object doesn't
		// exist in knownObjects and it doesn't have corresponding item in items.
		// Note that even if there is a "deletion" action in items, we can ignore it,
		// because it will be deduped automatically in "queueActionLocked"
		_, exists, err := f.knownObjects.GetByKey(id)
		_, itemsExist := f.items[id]
		if err == nil && !exists && !itemsExist {
			// Presumably, this was deleted when a relist happened.
			// Don't provide a second report of the same deletion.
			return nil
		}
	}

	return f.queueActionLocked(Deleted, obj)
}
           

3.4 DeltaFIFO.Replace

DeltaFIFO的Replace操作,主要邏輯:

(1)加鎖;

(2)周遊list,計算對象的key,循環調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Sync類型的新Delta追加到相應的Deltas中;

(3)對比DeltaFIFO中的items與Replace方法的list,如果DeltaFIFO中的items有,但傳進來Replace方法的list中沒有某個key,則調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Deleted類型的新Delta追加到相應的Deltas中(避免重複,使用DeletedFinalStateUnknown包裝對象);

(4)釋放鎖;

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    //(1)加鎖
	f.lock.Lock()
	//(4)釋放鎖
	defer f.lock.Unlock()
	keys := make(sets.String, len(list))

    //(2)周遊list,計算對象的key,循環調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Sync類型的新Delta追加到相應的Deltas中
	for _, item := range list {
		key, err := f.KeyOf(item)
		if err != nil {
			return KeyError{item, err}
		}
		keys.Insert(key)
		if err := f.queueActionLocked(Sync, item); err != nil {
			return fmt.Errorf("couldn't enqueue object: %v", err)
		}
	}
    // informer的用法中,f.knownObjects不為nil
	if f.knownObjects == nil {
		// Do deletion detection against our own list.
		queuedDeletions := 0
		for k, oldItem := range f.items {
			if keys.Has(k) {
				continue
			}
			var deletedObj interface{}
			if n := oldItem.Newest(); n != nil {
				deletedObj = n.Object
			}
			queuedDeletions++
			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
				return err
			}
		}
        
		if !f.populated {
			f.populated = true
			// While there shouldn't be any queued deletions in the initial
			// population of the queue, it's better to be on the safe side.
			f.initialPopulationCount = len(list) + queuedDeletions
		}

		return nil
	}
    
    //(3)找出DeltaFIFO中的items有,但傳進來Replace方法的list中沒有的key,調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Deleted類型的新Delta追加到相應的Deltas中(避免重複,使用DeletedFinalStateUnknown包裝對象)
	// Detect deletions not already in the queue.
	knownKeys := f.knownObjects.ListKeys()
	queuedDeletions := 0
	for _, k := range knownKeys {
		if keys.Has(k) {
			continue
		}

		deletedObj, exists, err := f.knownObjects.GetByKey(k)
		if err != nil {
			deletedObj = nil
			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
		} else if !exists {
			deletedObj = nil
			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
		}
		queuedDeletions++
		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}
    
    // 第一次調用Replace方法後,populated值為true
	if !f.populated {
		f.populated = true
		// initialPopulationCount代表第一次調用Replace方法加入DeltaFIFO中的items數量
		f.initialPopulationCount = len(list) + queuedDeletions
	}

	return nil
}
           

3.5 DeltaFIFO.Pop

DeltaFIFO的Pop操作,queue為空時會阻塞,直至非空,主要邏輯:

(1)加鎖;

(2)循環判斷queue的長度是否為0,為0則阻塞住,調用f.cond.Wait(),等待通知(與queueActionLocked方法中的f.cond.Broadcast()相對應,即queue中有對象key則發起通知);

(3)取出queue的隊頭對象key;

(4)更新queue,把queue中所有的對象key前移,相當于把第一個對象key給pop出去;

(5)initialPopulationCount變量減1,當減到0時則說明initialPopulationCount代表第一次調用Replace方法加入DeltaFIFO中的對象key已經被pop完成;

(6)根據對象key從items中擷取Deltas;

(7)把Deltas從items中删除;

(8)調用PopProcessFunc處理擷取到的Deltas;

(9)釋放鎖。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    //(1)加鎖
	f.lock.Lock()
	//(9)釋放鎖
	defer f.lock.Unlock()
	//(2)循環判斷queue的長度是否為0,為0則阻塞住,調用f.cond.Wait(),等待通知(與queueActionLocked方法中的f.cond.Broadcast()相對應,即queue中有對象key則發起通知)
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		//(3)取出queue的隊頭對象key
		id := f.queue[0]
		//(4)更新queue,把queue中所有的對象key前移,相當于把第一個對象key給pop出去
		f.queue = f.queue[1:]
		//(5)initialPopulationCount變量減1,當減到0時則說明initialPopulationCount代表第一次調用Replace方法加入DeltaFIFO中的對象key已經被pop完成
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		//(6)根據對象key從items中擷取對象
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		//(7)把對象從items中删除
		delete(f.items, id)
		//(8)調用PopProcessFunc處理pop出來的對象
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}
           

3.6 DeltaFIFO.HasSynced

HasSynced從字面意思上看代表是否同步完成,是否同步完成其實是指第一次從kube-apiserver中擷取到的全量的對象是否全部從DeltaFIFO中pop完成,全部pop完成,說明list回來的對象已經全部同步到了Indexer緩存中去了。

方法是否傳回true是根據populated和initialPopulationCount兩個變量來判斷的,當且僅當populated為true且initialPopulationCount 為0的時候方法傳回true,否則傳回false。

populated屬性值在第一次調用DeltaFIFO的Replace方法中就已經将其值設定為true。

而initialPopulationCount的值在第一次調用DeltaFIFO的Replace方法中設定值為加入到items中的Deltas的數量,然後每pop一個Deltas,則initialPopulationCount的值減1,pop完成時值則為0。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) HasSynced() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	return f.populated && f.initialPopulationCount == 0
}
           

在前面做informer的初始化與啟動分析時也提到過,DeltaFIFO.HasSynced方法的調用鍊如下:

sharedIndexInformer.WaitForCacheSync --> cache.WaitForCacheSync --> sharedIndexInformer.controller.HasSynced --> controller.config.Queue.HasSynced --> DeltaFIFO.HasSynced

至此DeltaFIFO的分析就結束了,最後來總結一下。

總結

DeltaFIFO核心處理方法

Reflector調用的

r.store.Replace

r.store.Add

r.store.Update

r.store.Delete

方法其實就是DeltaFIFO的Replace、Add、Update、Delete方法。

(1)DeltaFIFO.Replace:構造Sync類型的Delta加入DeltaFIFO中,此外還會對比DeltaFIFO中的items與Replace方法的list,如果DeltaFIFO中的items有,但傳進來Replace方法的list中沒有某個key,則構造Deleted類型的Delta加入DeltaFIFO中;

(2)DeltaFIFO.Add:建構Added類型的Delta加入DeltaFIFO中;

(3)DeltaFIFO.Update:建構Updated類型的Delta加入DeltaFIFO中;

(4)DeltaFIFO.Delete:建構Deleted類型的Delta加入DeltaFIFO中;

(5)DeltaFIFO.Pop:從DeltaFIFO的queue中pop出隊頭key,從map中取出key對應的Deltas傳回,并把該

key:Deltas

從map中移除;

(6)DeltaFIFO.HasSynced:傳回true代表同步完成,是否同步完成指第一次從kube-apiserver中擷取到的全量的對象是否全部從DeltaFIFO中pop完成,全部pop完成,說明list回來的對象已經全部同步到了Indexer緩存中去了;

informer架構中的DeltaFIFO

k8s client-go源碼分析 informer源碼分析(4)-DeltaFIFO源碼分析

在對informer中的DeltaFIFO分析完之後,接下來将分析informer中的Controller與Processor。

繼續閱讀