天天看點

k8s replicaset controller源碼分析(3)-expectations 機制分析

replicaset controller是kube-controller-manager元件中衆多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,完成replicaset期望副本數的調諧。

replicaset controller分析

replicaset controller簡介

replicaset controller是kube-controller-manager元件中衆多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,當這2種資源發生變化時會觸發 replicaset controller 對相應的replicaset對象進行調諧操作,進而完成replicaset期望副本數的調諧,當實際pod的數量未達到預期時建立pod,當實際pod的數量超過預期時删除pod。

replicaset controller主要作用是根據replicaset對象所期望的pod數量與現存pod數量做比較,然後根據比較結果建立/删除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等。

replicaset controller架構圖

replicaset controller的大緻組成和處理流程如下圖,replicaset controller對pod和replicaset對象注冊了event handler,當有事件時,會watch到然後将對應的replicaset對象放入到queue中,然後

syncReplicaSet

方法為replicaset controller調諧replicaset對象的核心處理邏輯所在,從queue中取出replicaset對象,做調諧處理。

k8s replicaset controller源碼分析(3)-expectations 機制分析

replicaset controller分析分為3大塊進行,分别是:

(1)replicaset controller初始化和啟動分析;

(2)replicaset controller核心處理邏輯分析;

(3)replicaset controller expectations機制分析。

本篇部落格進行replicaset controller expectations機制分析。

expectations機制概述

expectations記錄了replicaset對象在某一次調諧中期望建立/删除的pod數量,pod建立/删除完成後,該期望數會相應的減少,當期望建立/删除的pod數量小于等于0時,說明上一次調諧中期望建立/删除的pod數量已經達到,調用

rsc.expectations.SatisfiedExpectations

方法傳回true。

根據前面的分析,在replicaset controller對replicaset對象進行調諧操作時,首先會調用

rsc.expectations.SatisfiedExpectations

方法,傳回true且replicaset對象的deletetimestamp為空,才會調用

rsc.manageReplicas

方法進行期望副本數的調諧操作,也即pod的建立/删除操作。

replicaset controller expectations機制分析

這個 expectations 機制的作用是什麼?下面來分析一下。

以建立1000個副本的replicaset為例,分析下expectations的作用。根據前面對replicaset controller的核心處理分析可以得知,1000個pod将通過兩次對replicaset對象的調諧,每次500個進行建立。

直接看到replicaset controller的核心處理邏輯方法

syncReplicaSet

syncReplicaSet

每次調用rsc.manageReplicas方法前,都會調用

rsc.expectations.SatisfiedExpectations

來判斷是否可以進行replicaset期望副本的調諧操作(pod的建立删除操作),傳回true時才會調用

rsc.manageReplicas

方法。

// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
	}()

	...

	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
		return nil
	}

	...

	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
	}
	
	...
}
           

rsc.expectations.SatisfiedExpectations

接下來看到rsc.expectations.SatisfiedExpectations方法,主要是用于判斷是否需要在syncReplicaSet核心處理方法中調用rsc.manageReplicas方法來進行pod的建立删除操作。

(1)第一次進來(首次建立replicaset)時r.GetExpectations找不到該rs對象對應的expectations,exists的值為false,是以rsc.expectations.SatisfiedExpectations方法傳回true,也就是說syncReplicaSet方法中會調用rsc.manageReplicas方法來進行pod的建立操作,并在rsc.manageReplicas方法中設定expectations為期望建立500個pod;

(2)在第一次建立500個pod的操作沒有完成之前,以及第一次建立500個pod的操作開始後的5分鐘之内,exp.Fulfilled與exp.isExpired都傳回false,是以rsc.expectations.SatisfiedExpectations方法傳回false,也就是說syncReplicaSet方法中不會調用rsc.manageReplicas方法來進行pod的建立操作;

(3)在第一次建立500個pod的操作完成之後,或者第一次建立500個pod操作進行了5分鐘有餘,則exp.Fulfilled或exp.isExpired會傳回true,是以rsc.expectations.SatisfiedExpectations方法傳回true,也就是說syncReplicaSet方法中會調用rsc.manageReplicas方法來進行第二次500個pod的建立操作,并在rsc.manageReplicas方法中再次設定expectations為期望建立500個pod。

// pkg/controller/controller_utils.go
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
	if exp, exists, err := r.GetExpectations(controllerKey); exists {
		if exp.Fulfilled() {
			klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
			return true
		} else if exp.isExpired() {
			klog.V(4).Infof("Controller expectations expired %#v", exp)
			return true
		} else {
			klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
			return false
		}
	} else if err != nil {
		klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
	} else {
		// When a new controller is created, it doesn't have expectations.
		// When it doesn't see expected watch events for > TTL, the expectations expire.
		//	- In this case it wakes up, creates/deletes controllees, and sets expectations again.
		// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
		//	- In this case it continues without setting expectations till it needs to create/delete controllees.
		klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
	}
	// Trigger a sync if we either encountered and error (which shouldn't happen since we're
	// getting from local store) or this controller hasn't established expectations.
	return true
}
           

exp.Fulfilled

判斷replicaset對象的expectations裡的期望建立pod數量以及期望删除pod數量,都小于等于0時傳回true。

// Fulfilled returns true if this expectation has been fulfilled.
func (e *ControlleeExpectations) Fulfilled() bool {
	// TODO: think about why this line being atomic doesn't matter
	return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}
           

exp.isExpired

判斷replicaset對象上次設定expectations時的時間距離現在的時間是否已經超過5分鐘,是則傳回true。

func (exp *ControlleeExpectations) isExpired() bool {
	return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}
           

rsc.manageReplicas

核心處理方法,主要是根據replicaset所期望的pod數量與現存pod數量做比較,然後根據比較結果建立/删除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等。

(1)建立pod之前,會調用rsc.expectations.ExpectCreations來設定Expectations:(key,add:500,del:0);

(2)調用slowStartBatch來執行pod的建立;

(3)建立完pod之後,判斷是否有建立失敗的pod,并根據建立失敗的pod數量,調用rsc.expectations.CreationObserved減去Expectations中相應的add的值。

// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	
	...
	
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		
		rsc.expectations.ExpectCreations(rsKey, diff)
		klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
		
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			...
		})

		if skippedPods := diff - successfulCreations; skippedPods > 0 {
			klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
			for i := 0; i < skippedPods; i++ {
				// Decrement the expected number of creates because the informer won't observe this pod
				rsc.expectations.CreationObserved(rsKey)
			}
		}
	...
           

rsc.expectations.ExpectCreations

設定replicaset對象的expectations。

// pkg/controller/controller_utils.go
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
	return r.SetExpectations(controllerKey, adds, 0)
}

// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
	exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
	klog.V(4).Infof("Setting expectations %#v", exp)
	return r.Add(exp)
}
           

rsc.expectations.CreationObserved

将replicaset對象expectations中期望建立的pod數量減1.

// pkg/controller/controller_utils.go
// CreationObserved atomically decrements the `add` expectation count of the given controller.
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
	r.LowerExpectations(controllerKey, 1, 0)
}

// Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
	if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
		exp.Add(int64(-add), int64(-del))
		// The expectations might've been modified since the update on the previous line.
		klog.V(4).Infof("Lowered expectations %#v", exp)
	}
}
           

那正常情況下(即沒有pod建立異常)Expectations在什麼時候會更新為(key,add:0,del:0)呢,繼續看下面的分析。

pod add event handlerFunc-addPod

replicaset controller會監聽pod的新增事件,每成功建立出一個pod,會調用addPod方法。在addPod方法中,同樣會調用一次rsc.expectations.CreationObserved,将Expectations中期望建立的pod數量減1。

// pkg/controller/replicaset/replica_set.go
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)

	...
	
	// If it has a ControllerRef, that's all that matters.
	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
		rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
		if rs == nil {
			return
		}
		rsKey, err := controller.KeyFunc(rs)
		if err != nil {
			return
		}
		klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
		rsc.expectations.CreationObserved(rsKey)
		rsc.queue.Add(rsKey)
		return
	}

	...
}
           

replicaset controller第一次建立了500個pod之後,通過replicaset controller對pod新增事件的watch,然後調用rsc.expectations.CreationObserved方法将Expectations中期望建立的pod數量減1,以及rsc.manageReplicas方法中對建立失敗的pod數量,調用相應次數的rsc.expectations.CreationObserved方法将Expectations中期望建立的pod數量相應減少,最終使該replicaset對象的Expectations的值将變為:(key,add:0,del:0),這樣在下次對該replicaset對象的調諧操作中,即可進行下一批次的500個pod的建立。

關于replicaset controller删除pod時的expectations機制,與上述建立pod時分析的expectations機制差不多,可以自己去分析下,這裡不再展開分析。

總結

上面以replicaset controller建立pod為例分析了expectations的作用,删除pod的邏輯中expectations起到了類似的作用,此處不再分析。下面來總結一下replicaset controller中expectations機制的作用。

expectations機制作用總結

expectations的過期時間機制解決了某一批次建立/删除pod因某些原因一直卡住不能完成而導緻的replicaset期望副本數永遠達不到預期的問題。

expectations.SatisfiedExpectations傳回true,則進入核心處理方法rsc.manageReplicas,根據replicaset所期望的pod數量與現存pod數量做比較,判斷是否需要進行下一批次的建立/删除pod的任務。

綜上可以看出,expectations主要用于控制讓多個建立/删除pod批次串行執行,不讓其并行執行,防止了并發執行所可能産生的重複删除pod、建立出replicaset所期望的pod數量以外的多餘的pod等問題(當replicaset對象的某一建立/删除pod的批次還在進行中,這時再次進行pod的建立删除操作,如果沒有expectations的判斷控制,就會再次進行pod的批量建立/删除時,進而導緻該問題的發生)。

繼續閱讀