天天看點

k8s源碼分析--kube-scheduler源碼(二)

前言

Pod priority

Pod 有了 priority(優先級) 後才有優先級排程、搶占排程的說法,高優先級的 pod 可以在排程隊列中排到前面,優先選擇 node;另外當高優先級的 pod 找不到合适的 node 時,就會看 node 上低優先級的 pod 驅逐之後是否能夠 run 起來,如果可以,那麼 node 上的一個或多個低優先級的 pod 會被驅逐,然後高優先級的 pod 得以成功運作1個 node 上。今天我們分析 pod 搶占相關的代碼。開始之前我們看一下和 priority 相關的2個示例配置檔案:

PriorityClass 例子

apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata:  name: high-priorityvalue: 1000000globalDefault: falsedescription: "This priority class should be used for XYZ service pods only."
           

使用上述 PriorityClass

apiVersion: v1kind: Podmetadata:  name: nginx  labels:    env: testspec:  containers:  - name: nginx    image: nginx    imagePullPolicy: IfNotPresent  priorityClassName: high-priority
           

這兩個檔案的内容這裡不解釋,Pod priority 相關知識點不熟悉的小夥伴請先查閱官方文檔,我們下面看排程器中和 preempt 相關的代碼邏輯。

官網位址:https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/

preempt 入口

代碼調用:

sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt

當通過正常的排程流程如果沒有找到合适的節點(主要是預選沒有合适的節點),會判斷需不需要進行搶占排程,具體的代碼在pkg/scheduler/scheduler.go檔案下,用到的方法preempt。

func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
    // 特性沒有開啟就傳回 ""
    if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
        return "", nil
    }
    // 更新 pod 資訊;入參和傳回值都是 *v1.Pod 類型
    preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)

    // preempt 過程,下文分析
    node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)

    var nodeName = ""
    if node != nil {
        nodeName = node.Name
        // 更新隊列中“任命pod”隊列
        sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

        // 設定pod的Status.NominatedNodeName
        err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
        if err != nil {
            // 如果出錯就從 queue 中移除
            sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
            return "", err
        }

        for _, victim := range victims {
            // 将要驅逐的 pod 驅逐
            if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
                return "", err
            }
            sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
        }
    }
    // Clearing nominated pods should happen outside of "if node != nil". 
    // 這個清理過程在上面的if外部,我們回頭從 Preempt() 的實作去了解
    for _, p := range nominatedPodsToClear {
        rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
        if rErr != nil {
            klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
            // We do not return as this error is not critical.
        }
    }
    return nodeName, err
}
           

前面看完了正常排程,再來看看Scheduler.scheduleOne方法中,如果預選/優選失敗以後的PodPriority優先級排程是如何處理的,PodPriority優先級排程對應啟動的方法為sched.preempt(pod, fitError),Scheduler.preempt方法中是優先級排程的邏輯。

  • 檢查PodPriority是否開啟,如果未開啟,直接傳回
  • 由于該Pod在Predicate/Priortiy排程過程失敗後,會更新PodCondition,記錄排程失敗狀态及失敗原因。是以需要從apiserver中擷取PodCondition更新後的Pod Object;
  • 執行scheduler的Preempt的方法,選出要執行優先排程的node以及node上要删除的pod
  • 将要排程的pod綁定到上一步選出的node
  • delete前兩步用Preempt方法選出來的要删除的pod
  • 抹去上一步删除的pod與node綁定的資訊

preempt 實作

上面 preempt() 函數中涉及到了一些值得深入看看的對象,下面我們逐個看一下這些對象的實作。

func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
        // Scheduler may return various types of errors. Consider preemption only if
        // the error is of type FitError.
    
    //檢查error是不是預選失敗的error(因為優選隻是選擇更優的,是以隻會是預選失敗)
        fitError, ok := scheduleErr.(*FitError)
        if !ok || fitError == nil {
                return nil, nil, nil, nil
        }
    
    //檢查cachedNodeInfoMap是否有比将要執行優先排程pod的Priority更小的pod
        if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
                klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
                return nil, nil, nil, nil
        }
        allNodes, err := nodeLister.List()
        if err != nil {
                return nil, nil, nil, err
        }
        if len(allNodes) == 0 {
                return nil, nil, nil, ErrNoNodesAvailable
        }
        // 1.擷取預選排程失敗的節點,但是可能是潛在的搶占可能成功的節點(所有的搶占節點都是在潛在節點内部選擇)
        potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
        if len(potentialNodes) == 0 {
                klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
                // In this case, we should clean-up any existing nominated node name of the pod.
                return nil, nil, []*v1.Pod{pod}, nil
        }
        // 2.擷取PDB(Pod中斷預算)清單
    // ljs:部署在Kubernetes的每個App都可以建立一個對應PDB Object,// 用來限制Voluntary Disruptions時最大可以down的副本數或者最少應該保持Available的副本數,以此來保證應用的高可用。
        pdbs, err := g.pdbLister.List(labels.Everything())
        if err != nil {
                return nil, nil, nil, err
        }
        // 3.擷取所有可以進行Preempt的Node節點的資訊,主要包含該節點哪些Pod需要被搶占掉
        nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
                g.predicateMetaProducer, g.schedulingQueue, pdbs)
        if err != nil {
                return nil, nil, nil, err
        }

        // We will only check nodeToVictims with extenders that support preemption.
        // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
        // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
        // 4.擴充的Preempt排程判斷
        nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
        if err != nil {
                return nil, nil, nil, err
        }

        // 5.選中某一個Node
        candidateNode := pickOneNodeForPreemption(nodeToVictims)
        if candidateNode == nil {
                return nil, nil, nil, err
        }

        // Lower priority pods nominated to run on this node, may no longer fit on
        // this node. So, we should remove their nomination. Removing their
        // nomination updates these pods and moves them to the active queue. It
        // lets scheduler find another place for them.
        // 6.判斷哪些Pod優先級較低,後續需要被清除掉,不作為NominatedPods存在
        nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
        if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
                return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
        }

        return nil, nil, nil, fmt.Errorf(
                "preemption failed: the target node %s has been deleted from scheduler cache",
                candidateNode.Name)
}
           

Preempt方法主要執行以下幾個步驟:

1、從預選失敗的節點中擷取可以用來做搶占排程的節點,通過一個switch語句排除不可以用來做搶占排程的節點

2、擷取PDB(Pod中斷預算)清單,用來做後續的判斷标準;部署在Kubernetes的每個App都可以建立一個對應PDB Object,用來限制Voluntary Disruptions時最大可以down的副本數或者最少應該保持Available的副本數,以此來保證應用的高可用。

3、通過調用selectNodesForPreemption方法,判斷哪些Node可以進行搶占排程。通過ParallelizeUntil方法同步對所有的Node進行判斷,判斷路徑為checkNode-->selectVictimsOnNode-->podFitsOnNode,最終同預選方法類似,使用了podFitsOnNode方法。不同于普通預選,搶占排程會先對Pod優先級判斷,然後在移除掉優先級較低的Pod之後再調用podFitsOnNode方法,以此達到搶占的效果。selectNodesForPreemption方法傳回的參數是一個map類型的值,key為Node資訊,value為該Node如果作為排程節點,将要清除的一些資訊,包括Pods和PDB資訊

4、擷取到搶占排程可以實作的Nodes資源後,繼續通過擴充的算法進行過濾;

5、選中最終的搶占排程的Node,調用pickOneNodeForPreemption方法,主要基于5個原則:

a)PDB violations(違規)值最小的Node;

b)挑選具有最低優先級受害者的節點,即被清除的Node上的Pods,它的優先級是最低的;

c)通過所有受害者Pods(将被删除的低優先級Pods)的優先級總和做區分;

d)如果多個Node優先級總和仍然相等,則選擇具有最小受害者數量的Node;

e)如果多個Node優先級總和仍然相等,則選擇第一個這樣的Node(随機排序);

6、選中最終的Node之後,記錄該Node上優先級較低的NominatedPods,這些Pod還未排程,需要将其排程關系進行删除,重新應用。

函數調用關系

genericScheduler.Preempt

|-->nodesWherePreemptionMightHelp

|-->selectNodesForPreemption

|-->selectVictimsOnNode

|-->filterPodsWithPDBViolation

|-->pickOneNodeForPreemption

|-->getLowerPriorityNominatedPods

nodesWherePreemptionMightHelp

nodesWherePreemptionMightHelp 要做的事情是尋找 predicates 階段失敗但是通過搶占也許能夠排程成功的 nodes.

func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
    // 潛力 node, 用于存儲傳回值的 slice
   potentialNodes := []*v1.Node{}
   for _, node := range nodes {
       // 這個為 true 表示一個 node 驅逐 pod 也不一定能适合目前 pod 運作
      unresolvableReasonExist := false
       // 一個 node 對應的所有失敗的 predicates
      failedPredicates, _ := failedPredicatesMap[node.Name]
      // 周遊,看是不是再下面指定的這些原因中,如果在,就标記 unresolvableReasonExist = true
      for _, failedPredicate := range failedPredicates {
         switch failedPredicate {
         case
            predicates.ErrNodeSelectorNotMatch,
            predicates.ErrPodAffinityRulesNotMatch,
            predicates.ErrPodNotMatchHostName,
            predicates.ErrTaintsTolerationsNotMatch,
            predicates.ErrNodeLabelPresenceViolated,
            predicates.ErrNodeNotReady,
            predicates.ErrNodeNetworkUnavailable,
            predicates.ErrNodeUnderDiskPressure,
            predicates.ErrNodeUnderPIDPressure,
            predicates.ErrNodeUnderMemoryPressure,
            predicates.ErrNodeOutOfDisk,
            predicates.ErrNodeUnschedulable,
            predicates.ErrNodeUnknownCondition,
            predicates.ErrVolumeZoneConflict,
            predicates.ErrVolumeNodeConflict,
            predicates.ErrVolumeBindConflict:
            unresolvableReasonExist = true
             // 如果找到一個上述失敗原因,說明這個 node 已經可以排除了,break 後繼續下一個 node 的計算
            break
         }
      }
       // false 的時候,也就是這個 node 也許驅逐 pods 後有用,那就添加到 potentialNodes 中
      if !unresolvableReasonExist {
         klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
         potentialNodes = append(potentialNodes, node)
      }
   }
   return potentialNodes
}
           

selectNodesForPreemption

這個函數會并發計算所有的 nodes 是否通過驅逐實作 pod 搶占。

func selectNodesForPreemption(pod *v1.Pod,
   nodeNameToInfo map[string]*schedulercache.NodeInfo,
   potentialNodes []*v1.Node, // 上一個函數計算出來的 nodes
   predicates map[string]algorithm.FitPredicate,
   metadataProducer algorithm.PredicateMetadataProducer,
   queue internalqueue.SchedulingQueue, // 這裡其實是前面講的優先級隊列 PriorityQueue
   pdbs []*policy.PodDisruptionBudget, // pdb 清單) (map[*v1.Node]*schedulerapi.Victims, error) { 
   nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
   var resultLock sync.Mutex

   // We can use the same metadata producer for all nodes.
   meta := metadataProducer(pod, nodeNameToInfo)
    // 這種形式的并發已經不陌生了,前面遇到過幾次了
   checkNode := func(i int) {
      nodeName := potentialNodes[i].Name
      var metaCopy algorithm.PredicateMetadata
      if meta != nil {
         metaCopy = meta.ShallowCopy()
      }
       // 這裡有一個子過程調用,下面單獨介紹
      pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
      if fits {
         resultLock.Lock()
         victims := schedulerapi.Victims{
            Pods:             pods,
            NumPDBViolations: numPDBViolations,
         }
          // 如果 fit,就添加到 nodeToVictims 中,也就是最後的傳回值
         nodeToVictims[potentialNodes[i]] = &victims
         resultLock.Unlock()
      }
   }
   workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
   return nodeToVictims, nil}
           

selectVictimsOnNode

這個函數嘗試在給定的 node 中尋找最少數量的需要被驅逐的 pods,同時需要保證驅逐了這些 pods 之後,這個 noode 能夠滿足“pod”運作需求。

func selectVictimsOnNode(
   pod *v1.Pod,
   meta algorithm.PredicateMetadata,
   nodeInfo *schedulercache.NodeInfo,
   fitPredicates map[string]algorithm.FitPredicate,
   queue internalqueue.SchedulingQueue,
   pdbs []*policy.PodDisruptionBudget,) ([]*v1.Pod, int, bool) {
   if nodeInfo == nil {
      return nil, 0, false
   }
    // 排個序
   potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
   nodeInfoCopy := nodeInfo.Clone()

    // 定義删除 pod 函數
   removePod := func(rp *v1.Pod) {
      nodeInfoCopy.RemovePod(rp)
      if meta != nil {
         meta.RemovePod(rp)
      }
   }
    // 定義添加 pod 函數
   addPod := func(ap *v1.Pod) {
      nodeInfoCopy.AddPod(ap)
      if meta != nil {
         meta.AddPod(ap, nodeInfoCopy)
      }
   }
   // 删除所有的低優先級 pod 看是不是能夠滿足排程需求了
   podPriority := util.GetPodPriority(pod)
   for _, p := range nodeInfoCopy.Pods() {
      if util.GetPodPriority(p) < podPriority {
          // 删除的意思其實就是添加元素到 potentialVictims.Items
         potentialVictims.Items = append(potentialVictims.Items, p)
         removePod(p)
      }
   }
    // 排個序
   potentialVictims.Sort()
   // 如果删除了所有的低優先級 pods 之後還不能跑這個新 pod,那麼差不多就可以判斷這個 node 不适合 preemption 了,還有一點點需要考慮的是這個“pod”的不 fit 的原因是由于 pod affinity 不滿足了。
    // 後續可能會增加目前 pod 和低優先級 pod 之間的 優先級檢查。

    // 這個函數調用其實就是之前講到過的預選函數的調用邏輯,判斷這個 pod 是否合适跑在這個 node 上。
   if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
      if err != nil {
         klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
      }
      return nil, 0, false
   }
   var victims []*v1.Pod
   numViolatingVictim := 0

// 對前兩步删除pod,檢查是否會導緻pod的數量小于pdb的min-avilable,分為會和不會兩類
   violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
    // 釋放 pods 的函數,來一個放一個
   reprievePod := func(p *v1.Pod) bool {
      addPod(p)
      fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
      if !fits {
         removePod(p)
         victims = append(victims, p)
         klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
      }
      return fits
   }
    // 嘗試盡量多地釋放這些 pods,也就是說能少殺就少殺;這裡先從 PDB violating victims 中釋放,再從 PDB non-violating victims 中釋放;兩個組都是從高優先級的 pod 開始釋放。
    // 釋放 violatingVictims 中元素的同時會記錄放了多少個
   for _, p := range violatingVictims {
      if !reprievePod(p) {
         numViolatingVictim++
      }
   }
   // 開始釋放 non-violating victims.
   for _, p := range nonViolatingVictims {
      reprievePod(p)
   }
   return victims, numViolatingVictim, true}
           

selectVictimsOnNode選出node給優先排程pod騰出資源要需要删除的pod(最少數量的),以及對删除pod而導緻pod少于pdb定義進行計數。流程如下:

  • 擷取優先排程pod的PodPriority
  • 擷取所有比優先排程pod的PodPriority更小的pod,并執行removepod函數删除這些pod
  • 在上一步删除Pod之後,再在node上對優先排程pod進行預選
  • 對前兩步删除pod,檢查是否會導緻pod的數量小于pdb的min-avilable,分為會和不會兩類
  • 先對因為pdb限制不能删除的pod執行reprievePod函數(reprievePod函數:先add pod再執行預選,如果預選失敗,則再remove pod)
  • 再對沒有pdb限制或者pdb允許删除的pod執行reprievePod函數

pickOneNodeForPreemption

func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
   if len(nodesToVictims) == 0 {
      return nil
   }
    // 初始化為最大值
   minNumPDBViolatingPods := math.MaxInt32
   var minNodes1 []*v1.Node
   lenNodes1 := 0
    // 這個循環要找到 PDBViolatingPods 最少的 node,如果有多個,就全部存在 minNodes1 中
   for node, victims := range nodesToVictims {
      if len(victims.Pods) == 0 {
         // 如果發現一個不需要驅逐 pod 的 node,馬上傳回
         return node
      }
      numPDBViolatingPods := victims.NumPDBViolations
      if numPDBViolatingPods < minNumPDBViolatingPods {
         minNumPDBViolatingPods = numPDBViolatingPods
         minNodes1 = nil
         lenNodes1 = 0
      }
      if numPDBViolatingPods == minNumPDBViolatingPods {
         minNodes1 = append(minNodes1, node)
         lenNodes1++
      }
   }
    // 如果隻找到1個 PDB violations 最少的 node,那就直接傳回這個 node 就 ok 了
   if lenNodes1 == 1 {
      return minNodes1[0]
   }

   // 還剩下多個 node,那就尋找 highest priority victim 最小的 node
   minHighestPriority := int32(math.MaxInt32)
   var minNodes2 = make([]*v1.Node, lenNodes1)
   lenNodes2 := 0
    // 這個循環要做的事情是看2個 node 上 victims 中最高優先級的 pod 哪個優先級更高
   for i := 0; i < lenNodes1; i++ {
      node := minNodes1[i]
      victims := nodesToVictims[node]
      // highestPodPriority is the highest priority among the victims on this node.
      highestPodPriority := util.GetPodPriority(victims.Pods[0])
      if highestPodPriority < minHighestPriority {
         minHighestPriority = highestPodPriority
         lenNodes2 = 0
      }
      if highestPodPriority == minHighestPriority {
         minNodes2[lenNodes2] = node
         lenNodes2++
      }
   }
    // 發現隻有1個,那就直接傳回
   if lenNodes2 == 1 {
      return minNodes2[0]
   }

   // 這時候還沒有抉擇出一個 node,那就開始計算優先級總和了,看哪個更低
   minSumPriorities := int64(math.MaxInt64)
   lenNodes1 = 0
   for i := 0; i < lenNodes2; i++ {
      var sumPriorities int64
      node := minNodes2[i]
      for _, pod := range nodesToVictims[node].Pods {
         // 這裡的累加考慮到了先把優先級搞成正數。不然會出現1個 node 上有1優先級為 -3 的 pod,另外一個 node 上有2個優先級為 -3 的 pod,結果 -3>-6,有2個 pod 的 node 反而被認為總優先級更低!
         sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
      }
      if sumPriorities < minSumPriorities {
         minSumPriorities = sumPriorities
         lenNodes1 = 0
      }
      if sumPriorities == minSumPriorities {
         minNodes1[lenNodes1] = node
         lenNodes1++
      }
   }
   if lenNodes1 == 1 {
      return minNodes1[0]
   }

   // 還是沒有分出勝負,于是開始用 pod 總數做比較
   minNumPods := math.MaxInt32
   lenNodes2 = 0
   for i := 0; i < lenNodes1; i++ {
      node := minNodes1[i]
      numPods := len(nodesToVictims[node].Pods)
      if numPods < minNumPods {
         minNumPods = numPods
         lenNodes2 = 0
      }
      if numPods == minNumPods {
         minNodes2[lenNodes2] = node
         lenNodes2++
      }
   }
   // 還是沒有區分出來1個 node 的話,隻能放棄區分了,直接傳回第一個結果
   if lenNodes2 > 0 {
      return minNodes2[0]
   }
   klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
   return nil}
           

pickOneNodeForPreemption從可以進行優先排程的node中選出最優的一個node。

  • 如果node不用preemption pod,直接傳回該node
  • 選出對pdb影響最小的node,如果最小的隻有一個node,直接傳回該node
  • 選擇victims中最高PodPriority最低的Node,如果隻有一個node,直接傳回該node
  • 選擇所有victims優先級之和最小的那個Node,如果隻有一個node,直接傳回該node
  • 選擇victims pod數最少的Node,如果隻有一個node,直接傳回該node
  • 如果上一步有不止一個Node滿足條件,随機選擇一個Node傳回

參考

https://www.kubernetes.org.cn/5221.html

https://juejin.im/post/5c889c2e5188257df700a732

https://my.oschina.net/u/3797264/blog/2615842

繼續閱讀