天天看點

k8s statefulset controller源碼分析

statefulset controller是kube-controller-manager元件中衆多控制器中的一個,是 statefulset 資源對象的控制器,其通過對statefulset、pod資源的監聽,當資源發生變化時會觸發 statefulset controller 對相應的statefulset資源對象進行調諧操作,進而完成statefulset對于pod的建立、删除、更新、擴縮容、statefulset的滾動更新、statefulset狀态status更新、舊版本statefulset清理等操作。

statefulset controller分析

statefulset簡介

statefulset是Kubernetes提供的管理有狀态應用的對象,而deployment用于管理無狀态應用。

有狀态的pod與無狀态的pod不一樣的是,有狀态的pod有時候需要通過其主機名來定位,而無狀态的不需要,因為無狀态的pod每個都是一樣的,随機選一個就行,但對于有狀态的來說,每一個pod都不一樣,通常希望操作的是特定的某一個。

statefulset适用于需要以下特點的應用:

(1)穩定的網絡标志(主機名):Pod重新排程後其PodName和HostName不變;

(2)穩定的持久化存儲:基于PVC,Pod重新排程後仍能通路到相同的持久化資料;

(3)穩定的建立與擴容次序:有序建立或擴容,從序号小到大的順序對pod進行建立(即從0到N-1),且在下一個Pod建立運作之前,所有之前的Pod必須都是Running和Ready狀态;

(4)穩定的删除與縮容次序:有序删除或縮容,從序号大到小的順序對pod進行删除(即從N-1到0),且在下一個Pod終止與删除之前,所有之前的Pod必須都已經被删除;

(5)穩定的滾動更新次序:從序号大到小的順序對pod進行更新(即從N-1到0),先删除後建立,且需等待目前序号的pod再次建立完成且狀态為Ready時才能進行下一個pod的更新處理。

statefulset controller簡介

statefulset controller架構圖

statefulset controller的大緻組成和處理流程如下圖,statefulset controller對statefulset、pod對象注冊了event handler,當有事件時,會watch到然後将對應的statefulset對象放入到queue中,然後

syncStatefulSet

方法為statefulset controller調諧statefulset對象的核心處理邏輯所在,從queue中取出statefulset對象,做調諧處理。

k8s statefulset controller源碼分析

statefulset pod的命名規則、pod建立與删除

如果建立一個名稱為web、replicas為3的statefulset對象,則其pod名稱分别為web-0、web-1、web-2。

statefulset pod的建立按0 - n的順序建立,且在建立下一個pod之前,需要等待前一個pod建立完成并處于ready狀态。

同樣拿上面的例子來說明,在web statefulset建立後,3 個 Pod 将按順序建立 web-0,web-1,web-2。在 web-0 處于 ready 狀态之前,web-1 将不會被建立,同樣當 web-1 處于ready狀态之前 web-2也不會被建立。如果在 web-1 ready後,web-2 建立之前, web-0 不處于ready狀态了,這個時候 web-2 将不會被建立,直到 web-0 再次回到ready狀态。

statefulset滾動更新或縮容過程中pod的删除按n - 0的順序删除,且在删除下一個pod之前,需要等待前一個pod删除完成。

另外,當statefulset.Spec.VolumeClaimTemplates中定義了pod所需的pvc時,statefulset controller在建立pod時,會同時建立對應的pvc出來,但删除pod時,不會做對應pvc的删除操作,這些pvc需要人工額外做删除操作。

statefulset更新政策

(1)OnDelete:使用 OnDelete 更新政策時,在更新 statefulset pod模闆後,隻有當你手動删除老的 statefulset pods 之後,新的 statefulset Pod 才會被自動建立。

(2)RollingUpdate:使用 RollingUpdate 更新政策時,在更新 statefulset pod模闆後, 老的 statefulset pods 将被删除,并且将根據滾動更新配置自動建立新的 statefulset pods。滾動更新期間,每個序号的statefulset pod最多隻能有一個,且滾動更新下一個pod之前,需等待前一個pod更新完成并處于ready狀态。與statefulset pod按0 - n的順序建立不同,滾動更新時Pod按逆序的方式(即n - 0)删除并建立。

statefulset的滾動更新中還有一個Partition配置,在設定partition後,滾動更新過程中,statefulset的Pod中序号大于或等于partition的Pod會進行滾動更新,而其餘的Pod保持不變,不會進行滾動更新。

statefulset controller分析将分為兩大塊進行,分别是:

(1)statefulset controller初始化與啟動分析;

(2)statefulset controller處理邏輯分析。

1.statefulset controller初始化與啟動分析

基于tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

直接看到startStatefulSetController函數,作為statefulset controller初始化與啟動分析的入口。

startStatefulSetController

startStatefulSetController主要邏輯:

(1)調用statefulset.NewStatefulSetController建立并初始化StatefulSetController;

(2)拉起一個goroutine,跑StatefulSetController的Run方法。

// cmd/kube-controller-manager/app/apps.go
func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
		return nil, false, nil
	}
	go statefulset.NewStatefulSetController(
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.InformerFactory.Apps().V1().StatefulSets(),
		ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
		ctx.InformerFactory.Apps().V1().ControllerRevisions(),
		ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
	).Run(int(ctx.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Stop)
	return nil, true, nil
}
           

1.1 statefulset.NewStatefulSetController

statefulset.NewStatefulSetController

函數代碼中可以看到,statefulset controller注冊了statefulset、pod對象的EventHandler,也即對這幾個對象的event進行監聽,把event放入事件隊列并做處理,對statefulset controller做了初始化。

// pkg/controller/statefulset/stateful_set.go
func NewStatefulSetController(
	podInformer coreinformers.PodInformer,
	setInformer appsinformers.StatefulSetInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	revInformer appsinformers.ControllerRevisionInformer,
	kubeClient clientset.Interface,
) *StatefulSetController {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})

	ssc := &StatefulSetController{
		kubeClient: kubeClient,
		control: NewDefaultStatefulSetControl(
			NewRealStatefulPodControl(
				kubeClient,
				setInformer.Lister(),
				podInformer.Lister(),
				pvcInformer.Lister(),
				recorder),
			NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
			history.NewHistory(kubeClient, revInformer.Lister()),
			recorder,
		),
		pvcListerSynced: pvcInformer.Informer().HasSynced,
		queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
		podControl:      controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

		revListerSynced: revInformer.Informer().HasSynced,
	}

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		// lookup the statefulset and enqueue
		AddFunc: ssc.addPod,
		// lookup current and old statefulset if labels changed
		UpdateFunc: ssc.updatePod,
		// lookup statefulset accounting for deletion tombstones
		DeleteFunc: ssc.deletePod,
	})
	ssc.podLister = podInformer.Lister()
	ssc.podListerSynced = podInformer.Informer().HasSynced

	setInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc: ssc.enqueueStatefulSet,
			UpdateFunc: func(old, cur interface{}) {
				oldPS := old.(*apps.StatefulSet)
				curPS := cur.(*apps.StatefulSet)
				if oldPS.Status.Replicas != curPS.Status.Replicas {
					klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
				}
				ssc.enqueueStatefulSet(cur)
			},
			DeleteFunc: ssc.enqueueStatefulSet,
		},
	)
	ssc.setLister = setInformer.Lister()
	ssc.setListerSynced = setInformer.Informer().HasSynced

	// TODO: Watch volumes
	return ssc
}
           

1.2 Run

主要看到for循環處,根據workers的值(可通過kube-controller-manager元件啟動參數

concurrent-statefulset-syncs

來設定,預設值為5),啟動相應數量的goroutine,跑

ssc.worker

方法,調用daemonset controller核心處理方法

ssc.sync

來調諧statefulset對象。

// pkg/controller/statefulset/stateful_set.go
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer ssc.queue.ShutDown()

	klog.Infof("Starting stateful set controller")
	defer klog.Infof("Shutting down statefulset controller")

	if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(ssc.worker, time.Second, stopCh)
	}

	<-stopCh
}
           

1.2.1 ssc.worker

從queue隊列中取出事件key,并調用

ssc.sync

(關于ssc.sync方法會在後面做詳細分析)對statefulset對象做調諧處理。queue隊列裡的事件來源前面講過,是statefulset controller注冊的statefulset、pod對象的EventHandler,它們的變化event會被監聽到然後放入queue中。

// pkg/controller/daemon/daemon_controller.go
func (ssc *StatefulSetController) worker() {
	for ssc.processNextWorkItem() {
	}
}

func (ssc *StatefulSetController) processNextWorkItem() bool {
	key, quit := ssc.queue.Get()
	if quit {
		return false
	}
	defer ssc.queue.Done(key)
	if err := ssc.sync(key.(string)); err != nil {
		utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err))
		ssc.queue.AddRateLimited(key)
	} else {
		ssc.queue.Forget(key)
	}
	return true
}
           

2.statefulset controller核心處理邏輯分析

sync

直接看到statefulset controller核心處理入口sync方法。

主要邏輯:

(1)擷取執行方法時的目前時間,并定義

defer

函數,用于計算該方法總執行時間,也即統計對一個 statefulset 進行同步調諧操作的耗時;

(2)根據 statefulset 對象的命名空間與名稱,擷取 statefulset 對象;

(3)調用ssc.adoptOrphanRevisions,檢查是否有孤兒 controllerrevisions 對象(即.spec.ownerReferences中無controller屬性定義或其屬性值為false),若有且其與 statefulset 對象的selector比對 的則添加 ownerReferences 進行關聯;

(4)調用ssc.getPodsForStatefulSet,根據 statefulset 對象的selector去查找pod清單,且若有孤兒pod的label與 statefulset 的selector能比對的則進行關聯,若已關聯的pod的label不再與statefulset的selector比對,則更新解除它們的關聯關系;

(5)調用ssc.syncStatefulSet,對 statefulset 對象做調諧處理。

// pkg/controller/statefulset/stateful_set.go
func (ssc *StatefulSetController) sync(key string) error {
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
	}()

	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	set, err := ssc.setLister.StatefulSets(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.Infof("StatefulSet has been deleted %v", key)
		return nil
	}
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
		return err
	}

	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
		// This is a non-transient error, so don't retry.
		return nil
	}

	if err := ssc.adoptOrphanRevisions(set); err != nil {
		return err
	}

	pods, err := ssc.getPodsForStatefulSet(set, selector)
	if err != nil {
		return err
	}

	return ssc.syncStatefulSet(set, pods)
}
           

2.1 ssc.getPodsForStatefulSet

ssc.getPodsForStatefulSet方法主要作用是擷取屬于 statefulset 對象的pod清單并傳回,并檢查孤兒pod與已比對的pod,看是否需要更新statefulset與pod的比對。

(1)擷取 statefulset 所在命名空間下的所有pod;

(2)定義過濾出屬于 statefulset 對象的pod的函數,即isMemberOf函數(根據pod的名稱與statefulset名稱比對來過濾屬于statefulset的pod);

(3)調用cm.ClaimPods,過濾出屬于該statefulset對象的pod,且若有孤兒pod的label與 statefulset 的selector能比對的則進行關聯,若已關聯的pod的label不再與statefulset的selector比對,則更新解除它們的關聯關系。

// pkg/controller/statefulset/stateful_set.go
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
	// List all pods to include the pods that don't match the selector anymore but
	// has a ControllerRef pointing to this StatefulSet.
	pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
	if err != nil {
		return nil, err
	}

	filter := func(pod *v1.Pod) bool {
		// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
		return isMemberOf(set, pod)
	}

	// If any adoptions are attempted, we should first recheck for deletion with
	// an uncached quorum read sometime after listing Pods (see #42639).
	canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
		fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
		if err != nil {
			return nil, err
		}
		if fresh.UID != set.UID {
			return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
		}
		return fresh, nil
	})

	cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc)
	return cm.ClaimPods(pods, filter)
}
           

2.2 ssc.syncStatefulSet

ssc.syncStatefulSet方法可以說是statefulset controller的核心處理邏輯所在了,主要看到

ssc.control.UpdateStatefulSet

方法。

// pkg/controller/statefulset/stateful_set.go
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
	klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
	// TODO: investigate where we mutate the set during the update as it is not obvious.
	if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
		return err
	}
	klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
	return nil
}
           

ssc.control.UpdateStatefulSet方法主要邏輯:

(1)擷取statefulset的所有ControllerRevision并根據版本新老順序排序;

(2)調用ssc.getStatefulSetRevisions,擷取現存最新的statefulset版本以及計算出一個新的版本;

(3)調用ssc.updateStatefulSet,完成statefulset對象對于pod的建立、删除、更新、擴縮容等操作;

(4)調用ssc.updateStatefulSetStatus,更新statefulset對象的status狀态;

(5)調用ssc.truncateHistory,根據statefulset對象配置的曆史版本數量限制,按之前的排序順序清理掉沒有pod的statefulset曆史版本。

// pkg/controller/statefulset/stateful_set_control.go
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {

	// list all revisions and sort them
	revisions, err := ssc.ListRevisions(set)
	if err != nil {
		return err
	}
	history.SortControllerRevisions(revisions)

	// get the current, and update revisions
	currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
	if err != nil {
		return err
	}

	// perform the main update function and get the status
	status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
	if err != nil {
		return err
	}

	// update the set's status
	err = ssc.updateStatefulSetStatus(set, status)
	if err != nil {
		return err
	}

	klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
		set.Namespace,
		set.Name,
		status.Replicas,
		status.ReadyReplicas,
		status.CurrentReplicas,
		status.UpdatedReplicas)

	klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
		set.Namespace,
		set.Name,
		status.CurrentRevision,
		status.UpdateRevision)

	// maintain the set's revision history limit
	return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
           

2.2.1 ssc.updateStatefulSet

updateStatefulSet方法是statefulset對象調諧操作中的核心方法,完成statefulset對象對于pod的建立、删除、更新、擴縮容等操作。此方法代碼比較長,跟随我的節奏慢慢分析。

(1)看到第一個for循環,将statefulset的所有pod按ord(ord即為pod name中的序号)的值分到replicas和condemned兩個數組中,序号小于statefulset對象期望副本數值的放到replicas數組(因為序号從0開始,是以是小于期望副本數值),大于等于期望副本數值的放到condemned數組,replicas數組代表正常的可用的pod清單,condemned數組中的是需要被删除的pod清單;在周遊pod時,同時根據pod的狀态計算statefulset對象的status值;

(2)第二個for循環,當序号小于statefulset期望副本數值的pod未建立出來時,則根據statefulset對象中的pod模闆,建構出相應序号值的pod對象(此時還并沒有向apiserver發起建立pod的請求,隻是建構好pod結構體);

(3)第三個和第四個for循環,周遊replicas和condemned兩個數組,找到非healthy狀态的最小序号的pod記錄下來,并記錄序号;

(4)當statefulset對象的DeletionTimestamp不為nil時,直接傳回前面計算出來的statefulset的新status值,不再進行方法後續的邏輯處理;

(5)擷取monotonic的值,當statefulset.Spec.PodManagementPolicy的值為Parallel時,monotonic的值為false,否則為true(Parallel代表statefulset controller可以并行的處理同一statefulset的pod,串行則代表在啟動和終止下一個pod之前需要等待前一個pod變成ready狀态或pod對象被删除掉);

(6)第五個for循環,周遊replicas數組,處理statefulset的pod,主要是做pod的建立(包括根據statefulset.Spec.VolumeClaimTemplates中定義的pod所需的pvc的建立):

(6.1)當pod處于fail狀态(pod.Status.Phase的值為Failed)時,調用apiserver删除該pod(pod對應的pvc在這裡不會做删除操作)并給replicas數組建構相應序号的新的pod結構體(用于下一步中重新建立該序号的pod);

(6.2)如果相應序号的pod未建立時,調用apiserver建立該序号的pod(包括建立pvc),且當monotonic為true時(statefulset沒有配置Parallel),直接return,結束updateStatefulSet方法的執行;

(6.3)剩下的邏輯就是當沒有配置Parallel時,将串行處理pod,在啟動和終止下一個pod之前需要等待前一個pod變成ready狀态或pod對象被删除掉,不再展開分析;

(7)第六個for循環,逆序(pod序号從大到小)周遊condemned數組,處理statefulset的pod,主要是做多餘pod的删除,删除邏輯也受Parallel影響,不展開分析。

(8)判斷statefulset的更新政策,若為OnDelete,則直接return(使用了該更新政策,則需要人工删除pod後才會重建相應序号的pod);

(9)擷取滾動更新配置中的Partition值,當statefulset進行滾動更新時,小于等于該序号的pod将不會被更新;

(10)第七個for循環,主要是處理更新政策為RollingUpdate的statefulset對象的更新。statefulset的滾動更新,從序号大到小的順序對pod進行更新,先删除後建立,且需等待目前序号的pod再次建立完成且狀态為Ready時才能進行下一個pod的更新處理。

// pkg/controller/statefulset/stateful_set_control.go
func (ssc *defaultStatefulSetControl) updateStatefulSet(
	set *apps.StatefulSet,
	currentRevision *apps.ControllerRevision,
	updateRevision *apps.ControllerRevision,
	collisionCount int32,
	pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
	// get the current and update revisions of the set.
	currentSet, err := ApplyRevision(set, currentRevision)
	if err != nil {
		return nil, err
	}
	updateSet, err := ApplyRevision(set, updateRevision)
	if err != nil {
		return nil, err
	}

	// set the generation, and revisions in the returned status
	status := apps.StatefulSetStatus{}
	status.ObservedGeneration = set.Generation
	status.CurrentRevision = currentRevision.Name
	status.UpdateRevision = updateRevision.Name
	status.CollisionCount = new(int32)
	*status.CollisionCount = collisionCount

	replicaCount := int(*set.Spec.Replicas)
	// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
	replicas := make([]*v1.Pod, replicaCount)
	// slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
	condemned := make([]*v1.Pod, 0, len(pods))
	unhealthy := 0
	firstUnhealthyOrdinal := math.MaxInt32
	var firstUnhealthyPod *v1.Pod
    
    // 第一個for循環,将statefulset的pod分到replicas和condemned兩個數組中,其中condemned數組中的pod代表需要被删除的
	// First we partition pods into two lists valid replicas and condemned Pods
	for i := range pods {
		status.Replicas++

		// count the number of running and ready replicas
		if isRunningAndReady(pods[i]) {
			status.ReadyReplicas++
		}

		// count the number of current and update replicas
		if isCreated(pods[i]) && !isTerminating(pods[i]) {
			if getPodRevision(pods[i]) == currentRevision.Name {
				status.CurrentReplicas++
			}
			if getPodRevision(pods[i]) == updateRevision.Name {
				status.UpdatedReplicas++
			}
		}

		if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
			// if the ordinal of the pod is within the range of the current number of replicas,
			// insert it at the indirection of its ordinal
			replicas[ord] = pods[i]

		} else if ord >= replicaCount {
			// if the ordinal is greater than the number of replicas add it to the condemned list
			condemned = append(condemned, pods[i])
		}
		// If the ordinal could not be parsed (ord < 0), ignore the Pod.
	}
    
    // 第二個for循環,當序号小于statefulset期望副本數值的pod未建立出來時,則根據statefulset對象中的pod模闆,建構出相應序号值的pod對象(此時還并沒有向apiserver發起建立pod的請求,隻是建構好pod結構體)
	// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
	for ord := 0; ord < replicaCount; ord++ {
		if replicas[ord] == nil {
			replicas[ord] = newVersionedStatefulSetPod(
				currentSet,
				updateSet,
				currentRevision.Name,
				updateRevision.Name, ord)
		}
	}

	// sort the condemned Pods by their ordinals
	sort.Sort(ascendingOrdinal(condemned))
    
    // 第三個和第四個for循環,周遊replicas和condemned兩個數組,找到非healthy狀态的最小序号的pod記錄下來,并記錄序号
	// find the first unhealthy Pod
	for i := range replicas {
		if !isHealthy(replicas[i]) {
			unhealthy++
			if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
				firstUnhealthyOrdinal = ord
				firstUnhealthyPod = replicas[i]
			}
		}
	}
	for i := range condemned {
		if !isHealthy(condemned[i]) {
			unhealthy++
			if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
				firstUnhealthyOrdinal = ord
				firstUnhealthyPod = condemned[i]
			}
		}
	}

	if unhealthy > 0 {
		klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
			set.Namespace,
			set.Name,
			unhealthy,
			firstUnhealthyPod.Name)
	}
    
    // 當statefulset對象的DeletionTimestamp不為nil時,直接傳回前面計算出來的statefulset的新status值,不再進行方法後續的邏輯處理
	// If the StatefulSet is being deleted, don't do anything other than updating
	// status.
	if set.DeletionTimestamp != nil {
		return &status, nil
	}
    
    // 擷取monotonic的值,當statefulset.Spec.PodManagementPolicy的值為Parallel時,monotonic的值為false,否則為true
	monotonic := !allowsBurst(set)
    
    // 第五個for循環,周遊replicas數組,處理statefulset的pod,主要是做pod的建立
	// Examine each replica with respect to its ordinal
	for i := range replicas {
		// delete and recreate failed pods
		if isFailed(replicas[i]) {
			ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
				"StatefulSet %s/%s is recreating failed Pod %s",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
				return &status, err
			}
			if getPodRevision(replicas[i]) == currentRevision.Name {
				status.CurrentReplicas--
			}
			if getPodRevision(replicas[i]) == updateRevision.Name {
				status.UpdatedReplicas--
			}
			status.Replicas--
			replicas[i] = newVersionedStatefulSetPod(
				currentSet,
				updateSet,
				currentRevision.Name,
				updateRevision.Name,
				i)
		}
		// If we find a Pod that has not been created we create the Pod
		if !isCreated(replicas[i]) {
			if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
				return &status, err
			}
			status.Replicas++
			if getPodRevision(replicas[i]) == currentRevision.Name {
				status.CurrentReplicas++
			}
			if getPodRevision(replicas[i]) == updateRevision.Name {
				status.UpdatedReplicas++
			}

			// if the set does not allow bursting, return immediately
			if monotonic {
				return &status, nil
			}
			// pod created, no more work possible for this round
			continue
		}
		// If we find a Pod that is currently terminating, we must wait until graceful deletion
		// completes before we continue to make progress.
		if isTerminating(replicas[i]) && monotonic {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to Terminate",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			return &status, nil
		}
		// If we have a Pod that has been created but is not running and ready we can not make progress.
		// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
		// ordinal, are Running and Ready.
		if !isRunningAndReady(replicas[i]) && monotonic {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			return &status, nil
		}
		// Enforce the StatefulSet invariants
		if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
			continue
		}
		// Make a deep copy so we don't mutate the shared cache
		replica := replicas[i].DeepCopy()
		if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
			return &status, err
		}
	}
    
    // 第六個for循環,逆序(pod序号從大到小)周遊condemned數組,處理statefulset的pod,主要是做多餘pod的删除
	// At this point, all of the current Replicas are Running and Ready, we can consider termination.
	// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
	// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
	// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
	// updates.
	for target := len(condemned) - 1; target >= 0; target-- {
		// wait for terminating pods to expire
		if isTerminating(condemned[target]) {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
				set.Namespace,
				set.Name,
				condemned[target].Name)
			// block if we are in monotonic mode
			if monotonic {
				return &status, nil
			}
			continue
		}
		// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
		if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
				set.Namespace,
				set.Name,
				firstUnhealthyPod.Name)
			return &status, nil
		}
		klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
			set.Namespace,
			set.Name,
			condemned[target].Name)

		if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
			return &status, err
		}
		if getPodRevision(condemned[target]) == currentRevision.Name {
			status.CurrentReplicas--
		}
		if getPodRevision(condemned[target]) == updateRevision.Name {
			status.UpdatedReplicas--
		}
		if monotonic {
			return &status, nil
		}
	}
    
    // 判斷statefulset的更新政策,若為OnDelete,則直接return(使用了該更新政策,則需要人工删除pod後才會重建相應序号的pod)
	// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
	if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
		return &status, nil
	}
    
    // 擷取滾動更新配置中的Partition值,當statefulset進行滾動更新時,小于等于該序号的pod将不會被更新
	// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
	updateMin := 0
	if set.Spec.UpdateStrategy.RollingUpdate != nil {
		updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
	}
	
	// 第七個for循環,主要是處理更新政策為RollingUpdate的statefulset對象的更新
	// we terminate the Pod with the largest ordinal that does not match the update revision.
	for target := len(replicas) - 1; target >= updateMin; target-- {

		// delete the Pod if it is not already terminating and does not match the update revision.
		if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
			klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
				set.Namespace,
				set.Name,
				replicas[target].Name)
			err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
			status.CurrentReplicas--
			return &status, err
		}

		// wait for unhealthy Pods on update
		if !isHealthy(replicas[target]) {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to update",
				set.Namespace,
				set.Name,
				replicas[target].Name)
			return &status, nil
		}

	}
	return &status, nil
}
           

結合上面對該方法的分析,來總結下在此方法中都有哪些步驟涉及了statefulset對象對于pod的建立、删除、擴縮容、更新操作:

1.建立:主要是(6)第五個for循環;

2.删除:主要是(7)第六個for循環;

3.擴縮容: (1)~(7);

4.更新:主要是(8)(9)與(10)第七個for循環,其中(8)為OnDelete更新政策的處理,(9)(10)為滾動更新政策的處理。

總結

syncStatefulSet

k8s statefulset controller源碼分析

statefulset controller核心處理邏輯

statefulset controller的核心處理邏輯是調諧statefulset對象,進而完成statefulset對于pod的建立、删除、更新、擴縮容、statefulset的滾動更新、statefulset狀态status更新、舊版本statefulset清理等操作。

k8s statefulset controller源碼分析

繼續閱讀