天天看点

k8s源码分析--kube-scheduler源码(二)

前言

Pod priority

Pod 有了 priority(优先级) 后才有优先级调度、抢占调度的说法,高优先级的 pod 可以在调度队列中排到前面,优先选择 node;另外当高优先级的 pod 找不到合适的 node 时,就会看 node 上低优先级的 pod 驱逐之后是否能够 run 起来,如果可以,那么 node 上的一个或多个低优先级的 pod 会被驱逐,然后高优先级的 pod 得以成功运行1个 node 上。今天我们分析 pod 抢占相关的代码。开始之前我们看一下和 priority 相关的2个示例配置文件:

PriorityClass 例子

apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata:  name: high-priorityvalue: 1000000globalDefault: falsedescription: "This priority class should be used for XYZ service pods only."
           

使用上述 PriorityClass

apiVersion: v1kind: Podmetadata:  name: nginx  labels:    env: testspec:  containers:  - name: nginx    image: nginx    imagePullPolicy: IfNotPresent  priorityClassName: high-priority
           

这两个文件的内容这里不解释,Pod priority 相关知识点不熟悉的小伙伴请先查阅官方文档,我们下面看调度器中和 preempt 相关的代码逻辑。

官网地址:https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/

preempt 入口

代码调用:

sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt

当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt。

func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
    // 特性没有开启就返回 ""
    if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
        return "", nil
    }
    // 更新 pod 信息;入参和返回值都是 *v1.Pod 类型
    preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)

    // preempt 过程,下文分析
    node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)

    var nodeName = ""
    if node != nil {
        nodeName = node.Name
        // 更新队列中“任命pod”队列
        sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

        // 设置pod的Status.NominatedNodeName
        err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
        if err != nil {
            // 如果出错就从 queue 中移除
            sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
            return "", err
        }

        for _, victim := range victims {
            // 将要驱逐的 pod 驱逐
            if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
                return "", err
            }
            sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
        }
    }
    // Clearing nominated pods should happen outside of "if node != nil". 
    // 这个清理过程在上面的if外部,我们回头从 Preempt() 的实现去理解
    for _, p := range nominatedPodsToClear {
        rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
        if rErr != nil {
            klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
            // We do not return as this error is not critical.
        }
    }
    return nodeName, err
}
           

前面看完了正常调度,再来看看Scheduler.scheduleOne方法中,如果预选/优选失败以后的PodPriority优先级调度是如何处理的,PodPriority优先级调度对应启动的方法为sched.preempt(pod, fitError),Scheduler.preempt方法中是优先级调度的逻辑。

  • 检查PodPriority是否开启,如果未开启,直接返回
  • 由于该Pod在Predicate/Priortiy调度过程失败后,会更新PodCondition,记录调度失败状态及失败原因。因此需要从apiserver中获取PodCondition更新后的Pod Object;
  • 执行scheduler的Preempt的方法,选出要执行优先调度的node以及node上要删除的pod
  • 将要调度的pod绑定到上一步选出的node
  • delete前两步用Preempt方法选出来的要删除的pod
  • 抹去上一步删除的pod与node绑定的信息

preempt 实现

上面 preempt() 函数中涉及到了一些值得深入看看的对象,下面我们逐个看一下这些对象的实现。

func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
        // Scheduler may return various types of errors. Consider preemption only if
        // the error is of type FitError.
    
    //检查error是不是预选失败的error(因为优选只是选择更优的,所以只会是预选失败)
        fitError, ok := scheduleErr.(*FitError)
        if !ok || fitError == nil {
                return nil, nil, nil, nil
        }
    
    //检查cachedNodeInfoMap是否有比将要执行优先调度pod的Priority更小的pod
        if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
                klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
                return nil, nil, nil, nil
        }
        allNodes, err := nodeLister.List()
        if err != nil {
                return nil, nil, nil, err
        }
        if len(allNodes) == 0 {
                return nil, nil, nil, ErrNoNodesAvailable
        }
        // 1.获取预选调度失败的节点,但是可能是潜在的抢占可能成功的节点(所有的抢占节点都是在潜在节点内部选择)
        potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
        if len(potentialNodes) == 0 {
                klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
                // In this case, we should clean-up any existing nominated node name of the pod.
                return nil, nil, []*v1.Pod{pod}, nil
        }
        // 2.获取PDB(Pod中断预算)列表
    // ljs:部署在Kubernetes的每个App都可以创建一个对应PDB Object,// 用来限制Voluntary Disruptions时最大可以down的副本数或者最少应该保持Available的副本数,以此来保证应用的高可用。
        pdbs, err := g.pdbLister.List(labels.Everything())
        if err != nil {
                return nil, nil, nil, err
        }
        // 3.获取所有可以进行Preempt的Node节点的信息,主要包含该节点哪些Pod需要被抢占掉
        nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
                g.predicateMetaProducer, g.schedulingQueue, pdbs)
        if err != nil {
                return nil, nil, nil, err
        }

        // We will only check nodeToVictims with extenders that support preemption.
        // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
        // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
        // 4.扩展的Preempt调度判断
        nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
        if err != nil {
                return nil, nil, nil, err
        }

        // 5.选中某一个Node
        candidateNode := pickOneNodeForPreemption(nodeToVictims)
        if candidateNode == nil {
                return nil, nil, nil, err
        }

        // Lower priority pods nominated to run on this node, may no longer fit on
        // this node. So, we should remove their nomination. Removing their
        // nomination updates these pods and moves them to the active queue. It
        // lets scheduler find another place for them.
        // 6.判断哪些Pod优先级较低,后续需要被清除掉,不作为NominatedPods存在
        nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
        if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
                return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
        }

        return nil, nil, nil, fmt.Errorf(
                "preemption failed: the target node %s has been deleted from scheduler cache",
                candidateNode.Name)
}
           

Preempt方法主要执行以下几个步骤:

1、从预选失败的节点中获取可以用来做抢占调度的节点,通过一个switch语句排除不可以用来做抢占调度的节点

2、获取PDB(Pod中断预算)列表,用来做后续的判断标准;部署在Kubernetes的每个App都可以创建一个对应PDB Object,用来限制Voluntary Disruptions时最大可以down的副本数或者最少应该保持Available的副本数,以此来保证应用的高可用。

3、通过调用selectNodesForPreemption方法,判断哪些Node可以进行抢占调度。通过ParallelizeUntil方法同步对所有的Node进行判断,判断路径为checkNode-->selectVictimsOnNode-->podFitsOnNode,最终同预选方法类似,使用了podFitsOnNode方法。不同于普通预选,抢占调度会先对Pod优先级判断,然后在移除掉优先级较低的Pod之后再调用podFitsOnNode方法,以此达到抢占的效果。selectNodesForPreemption方法返回的参数是一个map类型的值,key为Node信息,value为该Node如果作为调度节点,将要清除的一些信息,包括Pods和PDB信息

4、获取到抢占调度可以实现的Nodes资源后,继续通过扩展的算法进行过滤;

5、选中最终的抢占调度的Node,调用pickOneNodeForPreemption方法,主要基于5个原则:

a)PDB violations(违规)值最小的Node;

b)挑选具有最低优先级受害者的节点,即被清除的Node上的Pods,它的优先级是最低的;

c)通过所有受害者Pods(将被删除的低优先级Pods)的优先级总和做区分;

d)如果多个Node优先级总和仍然相等,则选择具有最小受害者数量的Node;

e)如果多个Node优先级总和仍然相等,则选择第一个这样的Node(随机排序);

6、选中最终的Node之后,记录该Node上优先级较低的NominatedPods,这些Pod还未调度,需要将其调度关系进行删除,重新应用。

函数调用关系

genericScheduler.Preempt

|-->nodesWherePreemptionMightHelp

|-->selectNodesForPreemption

|-->selectVictimsOnNode

|-->filterPodsWithPDBViolation

|-->pickOneNodeForPreemption

|-->getLowerPriorityNominatedPods

nodesWherePreemptionMightHelp

nodesWherePreemptionMightHelp 要做的事情是寻找 predicates 阶段失败但是通过抢占也许能够调度成功的 nodes.

func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
    // 潜力 node, 用于存储返回值的 slice
   potentialNodes := []*v1.Node{}
   for _, node := range nodes {
       // 这个为 true 表示一个 node 驱逐 pod 也不一定能适合当前 pod 运行
      unresolvableReasonExist := false
       // 一个 node 对应的所有失败的 predicates
      failedPredicates, _ := failedPredicatesMap[node.Name]
      // 遍历,看是不是再下面指定的这些原因中,如果在,就标记 unresolvableReasonExist = true
      for _, failedPredicate := range failedPredicates {
         switch failedPredicate {
         case
            predicates.ErrNodeSelectorNotMatch,
            predicates.ErrPodAffinityRulesNotMatch,
            predicates.ErrPodNotMatchHostName,
            predicates.ErrTaintsTolerationsNotMatch,
            predicates.ErrNodeLabelPresenceViolated,
            predicates.ErrNodeNotReady,
            predicates.ErrNodeNetworkUnavailable,
            predicates.ErrNodeUnderDiskPressure,
            predicates.ErrNodeUnderPIDPressure,
            predicates.ErrNodeUnderMemoryPressure,
            predicates.ErrNodeOutOfDisk,
            predicates.ErrNodeUnschedulable,
            predicates.ErrNodeUnknownCondition,
            predicates.ErrVolumeZoneConflict,
            predicates.ErrVolumeNodeConflict,
            predicates.ErrVolumeBindConflict:
            unresolvableReasonExist = true
             // 如果找到一个上述失败原因,说明这个 node 已经可以排除了,break 后继续下一个 node 的计算
            break
         }
      }
       // false 的时候,也就是这个 node 也许驱逐 pods 后有用,那就添加到 potentialNodes 中
      if !unresolvableReasonExist {
         klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
         potentialNodes = append(potentialNodes, node)
      }
   }
   return potentialNodes
}
           

selectNodesForPreemption

这个函数会并发计算所有的 nodes 是否通过驱逐实现 pod 抢占。

func selectNodesForPreemption(pod *v1.Pod,
   nodeNameToInfo map[string]*schedulercache.NodeInfo,
   potentialNodes []*v1.Node, // 上一个函数计算出来的 nodes
   predicates map[string]algorithm.FitPredicate,
   metadataProducer algorithm.PredicateMetadataProducer,
   queue internalqueue.SchedulingQueue, // 这里其实是前面讲的优先级队列 PriorityQueue
   pdbs []*policy.PodDisruptionBudget, // pdb 列表) (map[*v1.Node]*schedulerapi.Victims, error) { 
   nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
   var resultLock sync.Mutex

   // We can use the same metadata producer for all nodes.
   meta := metadataProducer(pod, nodeNameToInfo)
    // 这种形式的并发已经不陌生了,前面遇到过几次了
   checkNode := func(i int) {
      nodeName := potentialNodes[i].Name
      var metaCopy algorithm.PredicateMetadata
      if meta != nil {
         metaCopy = meta.ShallowCopy()
      }
       // 这里有一个子过程调用,下面单独介绍
      pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
      if fits {
         resultLock.Lock()
         victims := schedulerapi.Victims{
            Pods:             pods,
            NumPDBViolations: numPDBViolations,
         }
          // 如果 fit,就添加到 nodeToVictims 中,也就是最后的返回值
         nodeToVictims[potentialNodes[i]] = &victims
         resultLock.Unlock()
      }
   }
   workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
   return nodeToVictims, nil}
           

selectVictimsOnNode

这个函数尝试在给定的 node 中寻找最少数量的需要被驱逐的 pods,同时需要保证驱逐了这些 pods 之后,这个 noode 能够满足“pod”运行需求。

func selectVictimsOnNode(
   pod *v1.Pod,
   meta algorithm.PredicateMetadata,
   nodeInfo *schedulercache.NodeInfo,
   fitPredicates map[string]algorithm.FitPredicate,
   queue internalqueue.SchedulingQueue,
   pdbs []*policy.PodDisruptionBudget,) ([]*v1.Pod, int, bool) {
   if nodeInfo == nil {
      return nil, 0, false
   }
    // 排个序
   potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
   nodeInfoCopy := nodeInfo.Clone()

    // 定义删除 pod 函数
   removePod := func(rp *v1.Pod) {
      nodeInfoCopy.RemovePod(rp)
      if meta != nil {
         meta.RemovePod(rp)
      }
   }
    // 定义添加 pod 函数
   addPod := func(ap *v1.Pod) {
      nodeInfoCopy.AddPod(ap)
      if meta != nil {
         meta.AddPod(ap, nodeInfoCopy)
      }
   }
   // 删除所有的低优先级 pod 看是不是能够满足调度需求了
   podPriority := util.GetPodPriority(pod)
   for _, p := range nodeInfoCopy.Pods() {
      if util.GetPodPriority(p) < podPriority {
          // 删除的意思其实就是添加元素到 potentialVictims.Items
         potentialVictims.Items = append(potentialVictims.Items, p)
         removePod(p)
      }
   }
    // 排个序
   potentialVictims.Sort()
   // 如果删除了所有的低优先级 pods 之后还不能跑这个新 pod,那么差不多就可以判断这个 node 不适合 preemption 了,还有一点点需要考虑的是这个“pod”的不 fit 的原因是由于 pod affinity 不满足了。
    // 后续可能会增加当前 pod 和低优先级 pod 之间的 优先级检查。

    // 这个函数调用其实就是之前讲到过的预选函数的调用逻辑,判断这个 pod 是否合适跑在这个 node 上。
   if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
      if err != nil {
         klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
      }
      return nil, 0, false
   }
   var victims []*v1.Pod
   numViolatingVictim := 0

// 对前两步删除pod,检查是否会导致pod的数量小于pdb的min-avilable,分为会和不会两类
   violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
    // 释放 pods 的函数,来一个放一个
   reprievePod := func(p *v1.Pod) bool {
      addPod(p)
      fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
      if !fits {
         removePod(p)
         victims = append(victims, p)
         klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
      }
      return fits
   }
    // 尝试尽量多地释放这些 pods,也就是说能少杀就少杀;这里先从 PDB violating victims 中释放,再从 PDB non-violating victims 中释放;两个组都是从高优先级的 pod 开始释放。
    // 释放 violatingVictims 中元素的同时会记录放了多少个
   for _, p := range violatingVictims {
      if !reprievePod(p) {
         numViolatingVictim++
      }
   }
   // 开始释放 non-violating victims.
   for _, p := range nonViolatingVictims {
      reprievePod(p)
   }
   return victims, numViolatingVictim, true}
           

selectVictimsOnNode选出node给优先调度pod腾出资源要需要删除的pod(最少数量的),以及对删除pod而导致pod少于pdb定义进行计数。流程如下:

  • 获取优先调度pod的PodPriority
  • 获取所有比优先调度pod的PodPriority更小的pod,并执行removepod函数删除这些pod
  • 在上一步删除Pod之后,再在node上对优先调度pod进行预选
  • 对前两步删除pod,检查是否会导致pod的数量小于pdb的min-avilable,分为会和不会两类
  • 先对因为pdb限制不能删除的pod执行reprievePod函数(reprievePod函数:先add pod再执行预选,如果预选失败,则再remove pod)
  • 再对没有pdb限制或者pdb允许删除的pod执行reprievePod函数

pickOneNodeForPreemption

func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
   if len(nodesToVictims) == 0 {
      return nil
   }
    // 初始化为最大值
   minNumPDBViolatingPods := math.MaxInt32
   var minNodes1 []*v1.Node
   lenNodes1 := 0
    // 这个循环要找到 PDBViolatingPods 最少的 node,如果有多个,就全部存在 minNodes1 中
   for node, victims := range nodesToVictims {
      if len(victims.Pods) == 0 {
         // 如果发现一个不需要驱逐 pod 的 node,马上返回
         return node
      }
      numPDBViolatingPods := victims.NumPDBViolations
      if numPDBViolatingPods < minNumPDBViolatingPods {
         minNumPDBViolatingPods = numPDBViolatingPods
         minNodes1 = nil
         lenNodes1 = 0
      }
      if numPDBViolatingPods == minNumPDBViolatingPods {
         minNodes1 = append(minNodes1, node)
         lenNodes1++
      }
   }
    // 如果只找到1个 PDB violations 最少的 node,那就直接返回这个 node 就 ok 了
   if lenNodes1 == 1 {
      return minNodes1[0]
   }

   // 还剩下多个 node,那就寻找 highest priority victim 最小的 node
   minHighestPriority := int32(math.MaxInt32)
   var minNodes2 = make([]*v1.Node, lenNodes1)
   lenNodes2 := 0
    // 这个循环要做的事情是看2个 node 上 victims 中最高优先级的 pod 哪个优先级更高
   for i := 0; i < lenNodes1; i++ {
      node := minNodes1[i]
      victims := nodesToVictims[node]
      // highestPodPriority is the highest priority among the victims on this node.
      highestPodPriority := util.GetPodPriority(victims.Pods[0])
      if highestPodPriority < minHighestPriority {
         minHighestPriority = highestPodPriority
         lenNodes2 = 0
      }
      if highestPodPriority == minHighestPriority {
         minNodes2[lenNodes2] = node
         lenNodes2++
      }
   }
    // 发现只有1个,那就直接返回
   if lenNodes2 == 1 {
      return minNodes2[0]
   }

   // 这时候还没有抉择出一个 node,那就开始计算优先级总和了,看哪个更低
   minSumPriorities := int64(math.MaxInt64)
   lenNodes1 = 0
   for i := 0; i < lenNodes2; i++ {
      var sumPriorities int64
      node := minNodes2[i]
      for _, pod := range nodesToVictims[node].Pods {
         // 这里的累加考虑到了先把优先级搞成正数。不然会出现1个 node 上有1优先级为 -3 的 pod,另外一个 node 上有2个优先级为 -3 的 pod,结果 -3>-6,有2个 pod 的 node 反而被认为总优先级更低!
         sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
      }
      if sumPriorities < minSumPriorities {
         minSumPriorities = sumPriorities
         lenNodes1 = 0
      }
      if sumPriorities == minSumPriorities {
         minNodes1[lenNodes1] = node
         lenNodes1++
      }
   }
   if lenNodes1 == 1 {
      return minNodes1[0]
   }

   // 还是没有分出胜负,于是开始用 pod 总数做比较
   minNumPods := math.MaxInt32
   lenNodes2 = 0
   for i := 0; i < lenNodes1; i++ {
      node := minNodes1[i]
      numPods := len(nodesToVictims[node].Pods)
      if numPods < minNumPods {
         minNumPods = numPods
         lenNodes2 = 0
      }
      if numPods == minNumPods {
         minNodes2[lenNodes2] = node
         lenNodes2++
      }
   }
   // 还是没有区分出来1个 node 的话,只能放弃区分了,直接返回第一个结果
   if lenNodes2 > 0 {
      return minNodes2[0]
   }
   klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
   return nil}
           

pickOneNodeForPreemption从可以进行优先调度的node中选出最优的一个node。

  • 如果node不用preemption pod,直接返回该node
  • 选出对pdb影响最小的node,如果最小的只有一个node,直接返回该node
  • 选择victims中最高PodPriority最低的Node,如果只有一个node,直接返回该node
  • 选择所有victims优先级之和最小的那个Node,如果只有一个node,直接返回该node
  • 选择victims pod数最少的Node,如果只有一个node,直接返回该node
  • 如果上一步有不止一个Node满足条件,随机选择一个Node返回

参考

https://www.kubernetes.org.cn/5221.html

https://juejin.im/post/5c889c2e5188257df700a732

https://my.oschina.net/u/3797264/blog/2615842

继续阅读