天天看點

k8s replicaset controller源碼分析(2)-核心處理邏輯分析

replicaset controller是kube-controller-manager元件中衆多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,完成replicaset期望副本數的調諧。

replicaset controller分析

replicaset controller簡介

replicaset controller是kube-controller-manager元件中衆多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,當這2種資源發生變化時會觸發 replicaset controller 對相應的replicaset對象進行調諧操作,進而完成replicaset期望副本數的調諧,當實際pod的數量未達到預期時建立pod,當實際pod的數量超過預期時删除pod。

replicaset controller主要作用是根據replicaset對象所期望的pod數量與現存pod數量做比較,然後根據比較結果建立/删除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等。

replicaset controller架構圖

replicaset controller的大緻組成和處理流程如下圖,replicaset controller對pod和replicaset對象注冊了event handler,當有事件時,會watch到然後将對應的replicaset對象放入到queue中,然後

syncReplicaSet

方法為replicaset controller調諧replicaset對象的核心處理邏輯所在,從queue中取出replicaset對象,做調諧處理。

k8s replicaset controller源碼分析(2)-核心處理邏輯分析

replicaset controller分析分為3大塊進行,分别是:

(1)replicaset controller初始化和啟動分析;

(2)replicaset controller核心處理邏輯分析;

(3)replicaset controller expectations機制分析。

本篇部落格進行replicaset controller核心處理邏輯分析。

replicaset controller核心處理邏輯分析

基于v1.17.4

經過前面分析的replicaset controller的初始化與啟動,知道了replicaset controller監聽replicaset、pod對象的add、update與delete事件,然後對replicaset對象做相應的調諧處理,這裡來接着分析replicaset controller的調諧處理(核心處理)邏輯,從

rsc.syncHandler

作為入口進行分析。

rsc.syncHandler

rsc.syncHandler即

rsc.syncReplicaSet

方法,主要邏輯:

(1)擷取replicaset對象以及關聯的pod對象清單;

(2)調用

rsc.expectations.SatisfiedExpectations

,判斷上一輪對replicaset期望副本的創删操作是否完成,也可以認為是判斷上一次對replicaset對象的調諧操作中,調用的

rsc.manageReplicas

方法是否執行完成;

(3)如果上一輪對replicaset期望副本的創删操作已經完成,且replicaset對象的DeletionTimestamp字段為nil,則調用rsc.manageReplicas做replicaset期望副本的核心調諧處理,即創删pod;

(4)調用calculateStatus計算replicaset的status,并更新。

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
	}()

	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
		rsc.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return err
	}

	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
		return nil
	}

	// list all pods to include the pods that don't match the rs`s selector
	// anymore but has the stale controller ref.
	// TODO: Do the List and Filter in a single pass, or use an index.
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	if err != nil {
		return err
	}
	// Ignore inactive pods.
	filteredPods := controller.FilterActivePods(allPods)

	// NOTE: filteredPods are pointing to objects from cache - if you need to
	// modify them, you need to copy it first.
	filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
	if err != nil {
		return err
	}

	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
	}
	rs = rs.DeepCopy()
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

	// Always updates status as pods come up or die.
	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
	if err != nil {
		// Multiple things could lead to this update failing. Requeuing the replica set ensures
		// Returning an error causes a requeue without forcing a hotloop
		return err
	}
	// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
	}
	return manageReplicasErr
}
           

1 rsc.expectations.SatisfiedExpectations

該方法主要是判斷上一輪對replicaset期望副本的創删操作是否完成,也可以認為是判斷上一次對replicaset對象的調諧操作中,調用的

rsc.manageReplicas

方法是否執行完成。待上一次建立删除pod的操作完成後,才能進行下一次的

rsc.manageReplicas

方法調用。

若某replicaset對象的調諧中從未調用過

rsc.manageReplicas

方法,或上一輪調諧時建立/删除pod的數量已達成或調用

rsc.manageReplicas

後已達到逾時期限(逾時時間5分鐘),則傳回true,代表上一次建立删除pod的操作完成,可以進行下一次的

rsc.manageReplicas

方法調用,否則傳回false。

expectations記錄了replicaset對象在某一次調諧中期望建立/删除的pod數量,pod建立/删除完成後,該期望數會相應的減少,當期望建立/删除的pod數量小于等于0時,說明上一次調諧中期望建立/删除的pod數量已經達到,傳回true。

關于Expectations機制後面會做詳細分析。

// pkg/controller/controller_utils.go
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
	if exp, exists, err := r.GetExpectations(controllerKey); exists {
		if exp.Fulfilled() {
			klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
			return true
		} else if exp.isExpired() {
			klog.V(4).Infof("Controller expectations expired %#v", exp)
			return true
		} else {
			klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
			return false
		}
	} else if err != nil {
		klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
	} else {
		// When a new controller is created, it doesn't have expectations.
		// When it doesn't see expected watch events for > TTL, the expectations expire.
		//	- In this case it wakes up, creates/deletes controllees, and sets expectations again.
		// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
		//	- In this case it continues without setting expectations till it needs to create/delete controllees.
		klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
	}
	// Trigger a sync if we either encountered and error (which shouldn't happen since we're
	// getting from local store) or this controller hasn't established expectations.
	return true
}

func (exp *ControlleeExpectations) isExpired() bool {
	return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout // ExpectationsTimeout = 5 * time.Minute
}
           

2 核心建立删除pod方法-rsc.manageReplicas

核心建立删除pod方法,主要是根據replicaset所期望的pod數量與現存pod數量做比較,然後根據比較結果來建立/删除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等,需要特别注意的是,每一次調用

rsc.manageReplicas

方法,建立/删除pod的個數上限為500。

在replicaset對象的調諧中,

rsc.manageReplicas

方法不一定每一次都會調用執行,隻有當

rsc.expectations.SatisfiedExpectations

方法傳回true,且replicaset對象的

DeletionTimestamp

屬性為空時,才會進行

rsc.manageReplicas

方法的調用。

先簡單的看一下代碼,代碼後面會做詳細的邏輯分析。

// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	rsKey, err := controller.KeyFunc(rs)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
		return nil
	}
	if diff < 0 {
		diff *= -1
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		// TODO: Track UIDs of creates just like deletes. The problem currently
		// is we'd need to wait on the result of a create to record the pod's
		// UID, which would require locking *across* the create, which will turn
		// into a performance bottleneck. We should generate a UID for the pod
		// beforehand and store it via ExpectCreations.
		rsc.expectations.ExpectCreations(rsKey, diff)
		glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
		// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
		// and double with each successful iteration in a kind of "slow start".
		// This handles attempts to start large numbers of pods that would
		// likely all fail with the same error. For example a project with a
		// low quota that attempts to create a large number of pods will be
		// prevented from spamming the API service with the pod create requests
		// after one of its pods fails.  Conveniently, this also prevents the
		// event spam that those failures would generate.
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			boolPtr := func(b bool) *bool { return &b }
			controllerRef := &metav1.OwnerReference{
				APIVersion:         rsc.GroupVersion().String(),
				Kind:               rsc.Kind,
				Name:               rs.Name,
				UID:                rs.UID,
				BlockOwnerDeletion: boolPtr(true),
				Controller:         boolPtr(true),
			}
			err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
			if err != nil && errors.IsTimeout(err) {
				// Pod is created but its initialization has timed out.
				// If the initialization is successful eventually, the
				// controller will observe the creation via the informer.
				// If the initialization fails, or if the pod keeps
				// uninitialized for a long time, the informer will not
				// receive any update, and the controller will create a new
				// pod when the expectation expires.
				return nil
			}
			return err
		})

		// Any skipped pods that we never attempted to start shouldn't be expected.
		// The skipped pods will be retried later. The next controller resync will
		// retry the slow start process.
		if skippedPods := diff - successfulCreations; skippedPods > 0 {
			glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
			for i := 0; i < skippedPods; i++ {
				// Decrement the expected number of creates because the informer won't observe this pod
				rsc.expectations.CreationObserved(rsKey)
			}
		}
		return err
	} else if diff > 0 {
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)

		// Choose which Pods to delete, preferring those in earlier phases of startup.
		podsToDelete := getPodsToDelete(filteredPods, diff)

		// Snapshot the UIDs (ns/name) of the pods we're expecting to see
		// deleted, so we know to record their expectations exactly once either
		// when we see it as an update of the deletion timestamp, or as a delete.
		// Note that if the labels on a pod/rs change in a way that the pod gets
		// orphaned, the rs will only wake up after the expectations have
		// expired even if other pods are deleted.
		rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

		errCh := make(chan error, diff)
		var wg sync.WaitGroup
		wg.Add(diff)
		for _, pod := range podsToDelete {
			go func(targetPod *v1.Pod) {
				defer wg.Done()
				if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
					// Decrement the expected number of deletes because the informer won't observe this deletion
					podKey := controller.PodKey(targetPod)
					glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
					rsc.expectations.DeletionObserved(rsKey, podKey)
					errCh <- err
				}
			}(pod)
		}
		wg.Wait()

		select {
		case err := <-errCh:
			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
			if err != nil {
				return err
			}
		default:
		}
	}

	return nil
}
           

diff = 現存pod數量 - 期望的pod數量

diff := len(filteredPods) - int(*(rs.Spec.Replicas))
           

(1)當現存pod數量比期望的少時,需要建立pod,進入建立pod的邏輯代碼塊。

(2)當現存pod數量比期望的多時,需要删除pod,進入删除pod的邏輯代碼塊。

一次同步操作中批量建立或删除pod的個數上限為

rsc.burstReplicas

,即500個。

// pkg/controller/replicaset/replica_set.go
const (
	// Realistic value of the burstReplica field for the replica set manager based off
	// performance requirements for kubernetes 1.0.
	BurstReplicas = 500

	// The number of times we retry updating a ReplicaSet's status.
	statusUpdateRetries = 1
)
           
if diff > rsc.burstReplicas {
		diff = rsc.burstReplicas
	}
           

接下來分析一下建立/删除pod的邏輯代碼塊。

2.1 建立pod邏輯代碼塊

主要邏輯:

(1)運算擷取需要建立的pod數量,并設定數量上限500;

rsc.expectations.ExpectCreations

,将本輪調諧期望建立的pod數量設定進expectations;

(3)調用

slowStartBatch

函數來對pod進行建立邏輯處理;

(4)調用

slowStartBatch

函數完成後,計算擷取建立失敗的pod的數量,然後調用相應次數的

rsc.expectations.CreationObserved

方法,減去本輪調諧中期望建立的pod數量。

為什麼要減呢?因為expectations記錄了replicaset對象在某一次調諧中期望建立/删除的pod數量,pod建立/删除完成後,replicaset controller會watch到pod的建立/删除事件,進而調用

rsc.expectations.CreationObserved

方法來使期望建立/删除的pod數量減少。當有相應數量的pod建立/删除失敗後,replicaset controller是不會watch到相應的pod建立/删除事件的,是以必須把本輪調諧期望建立/删除的pod數量做相應的減法,否則本輪調諧中的期望建立/删除pod數量永遠不可能小于等于0,這樣的話,

rsc.expectations.SatisfiedExpectations

方法就隻會等待expectations逾時期限到達才會傳回true了。

diff *= -1
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		
		rsc.expectations.ExpectCreations(rsKey, diff)
		glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
		
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			boolPtr := func(b bool) *bool { return &b }
			controllerRef := &metav1.OwnerReference{
				APIVersion:         rsc.GroupVersion().String(),
				Kind:               rsc.Kind,
				Name:               rs.Name,
				UID:                rs.UID,
				BlockOwnerDeletion: boolPtr(true),
				Controller:         boolPtr(true),
			}
			err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
			if err != nil && errors.IsTimeout(err) {
				// Pod is created but its initialization has timed out.
				// If the initialization is successful eventually, the
				// controller will observe the creation via the informer.
				// If the initialization fails, or if the pod keeps
				// uninitialized for a long time, the informer will not
				// receive any update, and the controller will create a new
				// pod when the expectation expires.
				return nil
			}
			return err
		})

		if skippedPods := diff - successfulCreations; skippedPods > 0 {
			glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
			for i := 0; i < skippedPods; i++ {
				// Decrement the expected number of creates because the informer won't observe this pod
				rsc.expectations.CreationObserved(rsKey)
			}
		}
		return err
           

2.1.1 slowStartBatch

來看到slowStartBatch,可以看到建立pod的算法為:

(1)每次批量建立的 pod 數依次為 1、2、4、8......,呈指數級增長,起與要建立的pod數量相同的goroutine來負責建立pod。

(2)建立pod按1、2、4、8...的遞增趨勢分多批次進行,若某批次建立pod有失敗的(如apiserver限流,丢棄請求等,注意:逾時除外,因為initialization處理有可能逾時),則後續批次不再進行,結束本次函數調用。

// pkg/controller/replicaset/replica_set.go
// slowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
	remaining := count
	successes := 0
	for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
		errCh := make(chan error, batchSize)
		var wg sync.WaitGroup
		wg.Add(batchSize)
		for i := 0; i < batchSize; i++ {
			go func() {
				defer wg.Done()
				if err := fn(); err != nil {
					errCh <- err
				}
			}()
		}
		wg.Wait()
		curSuccesses := batchSize - len(errCh)
		successes += curSuccesses
		if len(errCh) > 0 {
			return successes, <-errCh
		}
		remaining -= batchSize
	}
	return successes, nil
}

           

rsc.podControl.CreatePodsWithControllerRef

前面定義的建立pod時調用的方法為

rsc.podControl.CreatePodsWithControllerRef

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
	if err := validateControllerRef(controllerRef); err != nil {
		return err
	}
	return r.createPods("", namespace, template, controllerObject, controllerRef)
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
	pod, err := GetPodFromTemplate(template, object, controllerRef)
	if err != nil {
		return err
	}
	if len(nodeName) != 0 {
		pod.Spec.NodeName = nodeName
	}
	if len(labels.Set(pod.Labels)) == 0 {
		return fmt.Errorf("unable to create pods, no labels")
	}
	newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
	if err != nil {
		// only send an event if the namespace isn't terminating
		if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
			r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
		}
		return err
	}
	accessor, err := meta.Accessor(object)
	if err != nil {
		klog.Errorf("parentObject does not have ObjectMeta, %v", err)
		return nil
	}
	klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
	r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)

	return nil
}
           

2.2 删除邏輯代碼塊

(1)運算擷取需要删除的pod數量,并設定數量上限500;

(2)根據要縮容删除的pod數量,先調用

getPodsToDelete

函數找出需要删除的pod清單;

rsc.expectations.ExpectCreations

,将本輪調諧期望删除的pod數量設定進expectations;

(4)每個pod拉起一個goroutine,調用

rsc.podControl.DeletePod

來删除該pod;

(5)對于删除失敗的pod,會調用

rsc.expectations.DeletionObserved

至于為什麼要減,原因跟上面建立邏輯代碼塊中分析的一樣。

(6)等待所有gorouutine完成,return傳回。

if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)

		// Choose which Pods to delete, preferring those in earlier phases of startup.
		podsToDelete := getPodsToDelete(filteredPods, diff)

		rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

		errCh := make(chan error, diff)
		var wg sync.WaitGroup
		wg.Add(diff)
		for _, pod := range podsToDelete {
			go func(targetPod *v1.Pod) {
				defer wg.Done()
				if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
					// Decrement the expected number of deletes because the informer won't observe this deletion
					podKey := controller.PodKey(targetPod)
					glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
					rsc.expectations.DeletionObserved(rsKey, podKey)
					errCh <- err
				}
			}(pod)
		}
		wg.Wait()

		select {
		case err := <-errCh:
			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
			if err != nil {
				return err
			}
		default:
		}
           

2.2.1 getPodsToDelete

getPodsToDelete:根據要縮容删除的pod數量,然後傳回需要删除的pod清單。

// pkg/controller/replicaset/replica_set.go
func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
	// No need to sort pods if we are about to delete all of them.
	// diff will always be <= len(filteredPods), so not need to handle > case.
	if diff < len(filteredPods) {
		podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
		sort.Sort(podsWithRanks)
	}
	return filteredPods[:diff]
}

func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
	podsOnNode := make(map[string]int)
	for _, pod := range relatedPods {
		if controller.IsPodActive(pod) {
			podsOnNode[pod.Spec.NodeName]++
		}
	}
	ranks := make([]int, len(podsToRank))
	for i, pod := range podsToRank {
		ranks[i] = podsOnNode[pod.Spec.NodeName]
	}
	return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}
}
           

篩選要删除的pod邏輯

按照下面的排序規則,從上到下進行排序,各個條件互相互斥,符合其中一個條件則排序完成:

(1)優先删除沒有綁定node的pod;

(2)優先删除處于Pending狀态的pod,然後是Unknown,最後才是Running;

(3)優先删除Not ready的pod,然後才是ready的pod;

(4)按同node上所屬replicaset的pod數量排序,優先删除所屬replicaset的pod數量多的node上的pod;

(5)按pod ready的時間排序,優先删除ready時間最短的pod;

(6)優先删除pod中容器重新開機次數較多的pod;

(7)按pod建立時間排序,優先删除建立時間最短的pod。

// pkg/controller/controller_utils.go
func (s ActivePodsWithRanks) Less(i, j int) bool {
	// 1. Unassigned < assigned
	// If only one of the pods is unassigned, the unassigned one is smaller
	if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) {
		return len(s.Pods[i].Spec.NodeName) == 0
	}
	// 2. PodPending < PodUnknown < PodRunning
	if podPhaseToOrdinal[s.Pods[i].Status.Phase] != podPhaseToOrdinal[s.Pods[j].Status.Phase] {
		return podPhaseToOrdinal[s.Pods[i].Status.Phase] < podPhaseToOrdinal[s.Pods[j].Status.Phase]
	}
	// 3. Not ready < ready
	// If only one of the pods is not ready, the not ready one is smaller
	if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) {
		return !podutil.IsPodReady(s.Pods[i])
	}
	// 4. Doubled up < not doubled up
	// If one of the two pods is on the same node as one or more additional
	// ready pods that belong to the same replicaset, whichever pod has more
	// colocated ready pods is less
	if s.Rank[i] != s.Rank[j] {
		return s.Rank[i] > s.Rank[j]
	}
	// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
	//       see https://github.com/kubernetes/kubernetes/issues/22065
	// 5. Been ready for empty time < less time < more time
	// If both pods are ready, the latest ready one is smaller
	if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) {
		readyTime1 := podReadyTime(s.Pods[i])
		readyTime2 := podReadyTime(s.Pods[j])
		if !readyTime1.Equal(readyTime2) {
			return afterOrZero(readyTime1, readyTime2)
		}
	}
	// 6. Pods with containers with higher restart counts < lower restart counts
	if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) {
		return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j])
	}
	// 7. Empty creation time pods < newer pods < older pods
	if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) {
		return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp)
	}
	return false
}
           

2.2.2 rsc.podControl.DeletePod

删除pod的方法。

// pkg/controller/controller_utils.go
func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
	accessor, err := meta.Accessor(object)
	if err != nil {
		return fmt.Errorf("object does not have ObjectMeta, %v", err)
	}
	klog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
	if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil && !apierrors.IsNotFound(err) {
		r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
		return fmt.Errorf("unable to delete pods: %v", err)
	}
	r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)

	return nil
}
           

3 calculateStatus

calculateStatus函數計算并傳回replicaset對象的status。

怎麼計算status呢?

(1)根據現存pod數量、Ready狀态的pod數量、availabel狀态的pod數量等,給replicaset對象的status的Replicas、ReadyReplicas、AvailableReplicas等字段指派;

(2)根據replicaset對象現有status中的condition配置以及前面調用

rsc.manageReplicas

方法後是否有錯誤,來決定給status新增condition或移除condition,

conditionType

ReplicaFailure

當調用

rsc.manageReplicas

方法出錯,且replicaset對象的status中,沒有

conditionType

ReplicaFailure

的condition,則新增

conditionType

ReplicaFailure

的condition,表示該replicaset建立/删除pod出錯;

rsc.manageReplicas

方法沒有任何錯誤,且replicaset對象的status中,有

conditionType

ReplicaFailure

的condition,則去除該condition,表示該replicaset建立/删除pod成功。

func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
	newStatus := rs.Status
	// Count the number of pods that have labels matching the labels of the pod
	// template of the replica set, the matching pods may have more
	// labels than are in the template. Because the label of podTemplateSpec is
	// a superset of the selector of the replica set, so the possible
	// matching pods must be part of the filteredPods.
	fullyLabeledReplicasCount := 0
	readyReplicasCount := 0
	availableReplicasCount := 0
	templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
	for _, pod := range filteredPods {
		if templateLabel.Matches(labels.Set(pod.Labels)) {
			fullyLabeledReplicasCount++
		}
		if podutil.IsPodReady(pod) {
			readyReplicasCount++
			if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
				availableReplicasCount++
			}
		}
	}

	failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
	if manageReplicasErr != nil && failureCond == nil {
		var reason string
		if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
			reason = "FailedCreate"
		} else if diff > 0 {
			reason = "FailedDelete"
		}
		cond := NewReplicaSetCondition(apps.ReplicaSetReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
		SetCondition(&newStatus, cond)
	} else if manageReplicasErr == nil && failureCond != nil {
		RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
	}

	newStatus.Replicas = int32(len(filteredPods))
	newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
	newStatus.ReadyReplicas = int32(readyReplicasCount)
	newStatus.AvailableReplicas = int32(availableReplicasCount)
	return newStatus
}
           

4 updateReplicaSetStatus

(1)判斷新計算出來的status中的各個屬性如Replicas、ReadyReplicas、AvailableReplicas以及Conditions是否與現存replicaset對象的status中的一緻,一緻則不用做更新操作,直接return;

(2)調用c.UpdateStatus更新replicaset的status。

// pkg/controller/replicaset/replica_set_utils.go
func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
	// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
	// we do a periodic relist every 30s. If the generations differ but the replicas are
	// the same, a caller might've resized to the same replica count.
	if rs.Status.Replicas == newStatus.Replicas &&
		rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
		rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
		rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
		rs.Generation == rs.Status.ObservedGeneration &&
		reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
		return rs, nil
	}

	// Save the generation number we acted on, otherwise we might wrongfully indicate
	// that we've seen a spec update when we retry.
	// TODO: This can clobber an update if we allow multiple agents to write to the
	// same status.
	newStatus.ObservedGeneration = rs.Generation

	var getErr, updateErr error
	var updatedRS *apps.ReplicaSet
	for i, rs := 0, rs; ; i++ {
		klog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
			fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
			fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
			fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
			fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
			fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))

		rs.Status = newStatus
		updatedRS, updateErr = c.UpdateStatus(rs)
		if updateErr == nil {
			return updatedRS, nil
		}
		// Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit.
		if i >= statusUpdateRetries {
			break
		}
		// Update the ReplicaSet with the latest resource version for the next poll
		if rs, getErr = c.Get(rs.Name, metav1.GetOptions{}); getErr != nil {
			// If the GET fails we can't trust status.Replicas anymore. This error
			// is bound to be more interesting than the update failure.
			return nil, getErr
		}
	}

	return nil, updateErr
}
           

c.UpdateStatus

// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/replicaset.go
func (c *replicaSets) UpdateStatus(replicaSet *v1.ReplicaSet) (result *v1.ReplicaSet, err error) {
	result = &v1.ReplicaSet{}
	err = c.client.Put().
		Namespace(c.ns).
		Resource("replicasets").
		Name(replicaSet.Name).
		SubResource("status").
		Body(replicaSet).
		Do().
		Into(result)
	return
}
           

總結

syncReplicaSet

k8s replicaset controller源碼分析(2)-核心處理邏輯分析

replicaset controller核心處理邏輯

replicaset controller的核心處理邏輯是根據replicaset對象裡期望的pod數量以及現存pod數量的比較,當期望pod數量比現存pod數量多時,調用建立pod算法建立出新的pod,直至達到期望數量;當期望pod數量比現存pod數量少時,調用删除pod算法,并根據一定的政策對現存pod清單做排序,從中按順序選擇多餘的pod然後删除,直至達到期望數量。

k8s replicaset controller源碼分析(2)-核心處理邏輯分析

replicaset controller建立pod算法

replicaset controller建立pod的算法是,按1、2、4、8...的遞增趨勢分多批次進行(每次調諧中建立pod的數量上限為500個,超過上限的會在下次調諧中再建立),若某批次建立pod有失敗的(如apiserver限流,丢棄請求等,注意:逾時除外,因為initialization處理有可能逾時),則後續批次的pod建立不再進行,需等待該repliaset對象下次調諧時再觸發該pod建立算法,進行pod的建立,直至達到期望數量。

replicaset controller删除pod算法

replicaset controller删除pod的算法是,先根據一定的政策将現存pod清單做排序,然後按順序從中選擇指定數量的pod,拉起與要删除的pod數量相同的goroutine來删除pod(每次調諧中删除pod的數量上限為500個),并等待所有goroutine執行完成。删除pod有失敗的(如apiserver限流,丢棄請求)或超過500上限的部分,需等待該repliaset對象下次調諧時再觸發該pod删除算法,進行pod的删除,直至達到期望數量。

expectations機制

關于expectations機制的分析,會在下一篇部落格中進行。

繼續閱讀