前言
Pod priority
Pod 有了 priority(优先级) 后才有优先级调度、抢占调度的说法,高优先级的 pod 可以在调度队列中排到前面,优先选择 node;另外当高优先级的 pod 找不到合适的 node 时,就会看 node 上低优先级的 pod 驱逐之后是否能够 run 起来,如果可以,那么 node 上的一个或多个低优先级的 pod 会被驱逐,然后高优先级的 pod 得以成功运行1个 node 上。今天我们分析 pod 抢占相关的代码。开始之前我们看一下和 priority 相关的2个示例配置文件:
PriorityClass 例子
apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata: name: high-priorityvalue: 1000000globalDefault: falsedescription: "This priority class should be used for XYZ service pods only."
使用上述 PriorityClass
apiVersion: v1kind: Podmetadata: name: nginx labels: env: testspec: containers: - name: nginx image: nginx imagePullPolicy: IfNotPresent priorityClassName: high-priority
这两个文件的内容这里不解释,Pod priority 相关知识点不熟悉的小伙伴请先查阅官方文档,我们下面看调度器中和 preempt 相关的代码逻辑。
官网地址:https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/
preempt 入口
代码调用:
sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt
当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt。
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
// 特性没有开启就返回 ""
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
return "", nil
}
// 更新 pod 信息;入参和返回值都是 *v1.Pod 类型
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
// preempt 过程,下文分析
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
var nodeName = ""
if node != nil {
nodeName = node.Name
// 更新队列中“任命pod”队列
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// 设置pod的Status.NominatedNodeName
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
// 如果出错就从 queue 中移除
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
// 将要驱逐的 pod 驱逐
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
// Clearing nominated pods should happen outside of "if node != nil".
// 这个清理过程在上面的if外部,我们回头从 Preempt() 的实现去理解
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
前面看完了正常调度,再来看看Scheduler.scheduleOne方法中,如果预选/优选失败以后的PodPriority优先级调度是如何处理的,PodPriority优先级调度对应启动的方法为sched.preempt(pod, fitError),Scheduler.preempt方法中是优先级调度的逻辑。
- 检查PodPriority是否开启,如果未开启,直接返回
- 由于该Pod在Predicate/Priortiy调度过程失败后,会更新PodCondition,记录调度失败状态及失败原因。因此需要从apiserver中获取PodCondition更新后的Pod Object;
- 执行scheduler的Preempt的方法,选出要执行优先调度的node以及node上要删除的pod
- 将要调度的pod绑定到上一步选出的node
- delete前两步用Preempt方法选出来的要删除的pod
- 抹去上一步删除的pod与node绑定的信息
preempt 实现
上面 preempt() 函数中涉及到了一些值得深入看看的对象,下面我们逐个看一下这些对象的实现。
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
//检查error是不是预选失败的error(因为优选只是选择更优的,所以只会是预选失败)
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
//检查cachedNodeInfoMap是否有比将要执行优先调度pod的Priority更小的pod
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// 1.获取预选调度失败的节点,但是可能是潜在的抢占可能成功的节点(所有的抢占节点都是在潜在节点内部选择)
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
// 2.获取PDB(Pod中断预算)列表
// ljs:部署在Kubernetes的每个App都可以创建一个对应PDB Object,// 用来限制Voluntary Disruptions时最大可以down的副本数或者最少应该保持Available的副本数,以此来保证应用的高可用。
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
// 3.获取所有可以进行Preempt的Node节点的信息,主要包含该节点哪些Pod需要被抢占掉
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
// We will only check nodeToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
// 4.扩展的Preempt调度判断
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
// 5.选中某一个Node
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, err
}
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
// 6.判断哪些Pod优先级较低,后续需要被清除掉,不作为NominatedPods存在
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
Preempt方法主要执行以下几个步骤:
1、从预选失败的节点中获取可以用来做抢占调度的节点,通过一个switch语句排除不可以用来做抢占调度的节点
2、获取PDB(Pod中断预算)列表,用来做后续的判断标准;部署在Kubernetes的每个App都可以创建一个对应PDB Object,用来限制Voluntary Disruptions时最大可以down的副本数或者最少应该保持Available的副本数,以此来保证应用的高可用。
3、通过调用selectNodesForPreemption方法,判断哪些Node可以进行抢占调度。通过ParallelizeUntil方法同步对所有的Node进行判断,判断路径为checkNode-->selectVictimsOnNode-->podFitsOnNode,最终同预选方法类似,使用了podFitsOnNode方法。不同于普通预选,抢占调度会先对Pod优先级判断,然后在移除掉优先级较低的Pod之后再调用podFitsOnNode方法,以此达到抢占的效果。selectNodesForPreemption方法返回的参数是一个map类型的值,key为Node信息,value为该Node如果作为调度节点,将要清除的一些信息,包括Pods和PDB信息
4、获取到抢占调度可以实现的Nodes资源后,继续通过扩展的算法进行过滤;
5、选中最终的抢占调度的Node,调用pickOneNodeForPreemption方法,主要基于5个原则:
a)PDB violations(违规)值最小的Node;
b)挑选具有最低优先级受害者的节点,即被清除的Node上的Pods,它的优先级是最低的;
c)通过所有受害者Pods(将被删除的低优先级Pods)的优先级总和做区分;
d)如果多个Node优先级总和仍然相等,则选择具有最小受害者数量的Node;
e)如果多个Node优先级总和仍然相等,则选择第一个这样的Node(随机排序);
6、选中最终的Node之后,记录该Node上优先级较低的NominatedPods,这些Pod还未调度,需要将其调度关系进行删除,重新应用。
函数调用关系
genericScheduler.Preempt
|-->nodesWherePreemptionMightHelp
|-->selectNodesForPreemption
|-->selectVictimsOnNode
|-->filterPodsWithPDBViolation
|-->pickOneNodeForPreemption
|-->getLowerPriorityNominatedPods
nodesWherePreemptionMightHelp
nodesWherePreemptionMightHelp 要做的事情是寻找 predicates 阶段失败但是通过抢占也许能够调度成功的 nodes.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
// 潜力 node, 用于存储返回值的 slice
potentialNodes := []*v1.Node{}
for _, node := range nodes {
// 这个为 true 表示一个 node 驱逐 pod 也不一定能适合当前 pod 运行
unresolvableReasonExist := false
// 一个 node 对应的所有失败的 predicates
failedPredicates, _ := failedPredicatesMap[node.Name]
// 遍历,看是不是再下面指定的这些原因中,如果在,就标记 unresolvableReasonExist = true
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodAffinityRulesNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnderDiskPressure,
predicates.ErrNodeUnderPIDPressure,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeOutOfDisk,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition,
predicates.ErrVolumeZoneConflict,
predicates.ErrVolumeNodeConflict,
predicates.ErrVolumeBindConflict:
unresolvableReasonExist = true
// 如果找到一个上述失败原因,说明这个 node 已经可以排除了,break 后继续下一个 node 的计算
break
}
}
// false 的时候,也就是这个 node 也许驱逐 pods 后有用,那就添加到 potentialNodes 中
if !unresolvableReasonExist {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
selectNodesForPreemption
这个函数会并发计算所有的 nodes 是否通过驱逐实现 pod 抢占。
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node, // 上一个函数计算出来的 nodes
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue, // 这里其实是前面讲的优先级队列 PriorityQueue
pdbs []*policy.PodDisruptionBudget, // pdb 列表) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
// 这种形式的并发已经不陌生了,前面遇到过几次了
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
// 这里有一个子过程调用,下面单独介绍
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
Pods: pods,
NumPDBViolations: numPDBViolations,
}
// 如果 fit,就添加到 nodeToVictims 中,也就是最后的返回值
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil}
selectVictimsOnNode
这个函数尝试在给定的 node 中寻找最少数量的需要被驱逐的 pods,同时需要保证驱逐了这些 pods 之后,这个 noode 能够满足“pod”运行需求。
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
// 排个序
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
// 定义删除 pod 函数
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
// 定义添加 pod 函数
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
// 删除所有的低优先级 pod 看是不是能够满足调度需求了
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
// 删除的意思其实就是添加元素到 potentialVictims.Items
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
// 排个序
potentialVictims.Sort()
// 如果删除了所有的低优先级 pods 之后还不能跑这个新 pod,那么差不多就可以判断这个 node 不适合 preemption 了,还有一点点需要考虑的是这个“pod”的不 fit 的原因是由于 pod affinity 不满足了。
// 后续可能会增加当前 pod 和低优先级 pod 之间的 优先级检查。
// 这个函数调用其实就是之前讲到过的预选函数的调用逻辑,判断这个 pod 是否合适跑在这个 node 上。
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
// 对前两步删除pod,检查是否会导致pod的数量小于pdb的min-avilable,分为会和不会两类
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
// 释放 pods 的函数,来一个放一个
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits {
removePod(p)
victims = append(victims, p)
klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
// 尝试尽量多地释放这些 pods,也就是说能少杀就少杀;这里先从 PDB violating victims 中释放,再从 PDB non-violating victims 中释放;两个组都是从高优先级的 pod 开始释放。
// 释放 violatingVictims 中元素的同时会记录放了多少个
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
// 开始释放 non-violating victims.
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true}
selectVictimsOnNode选出node给优先调度pod腾出资源要需要删除的pod(最少数量的),以及对删除pod而导致pod少于pdb定义进行计数。流程如下:
- 获取优先调度pod的PodPriority
- 获取所有比优先调度pod的PodPriority更小的pod,并执行removepod函数删除这些pod
- 在上一步删除Pod之后,再在node上对优先调度pod进行预选
- 对前两步删除pod,检查是否会导致pod的数量小于pdb的min-avilable,分为会和不会两类
- 先对因为pdb限制不能删除的pod执行reprievePod函数(reprievePod函数:先add pod再执行预选,如果预选失败,则再remove pod)
- 再对没有pdb限制或者pdb允许删除的pod执行reprievePod函数
pickOneNodeForPreemption
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
// 初始化为最大值
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
// 这个循环要找到 PDBViolatingPods 最少的 node,如果有多个,就全部存在 minNodes1 中
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
// 如果发现一个不需要驱逐 pod 的 node,马上返回
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
// 如果只找到1个 PDB violations 最少的 node,那就直接返回这个 node 就 ok 了
if lenNodes1 == 1 {
return minNodes1[0]
}
// 还剩下多个 node,那就寻找 highest priority victim 最小的 node
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
// 这个循环要做的事情是看2个 node 上 victims 中最高优先级的 pod 哪个优先级更高
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
lenNodes2 = 0
}
if highestPodPriority == minHighestPriority {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 发现只有1个,那就直接返回
if lenNodes2 == 1 {
return minNodes2[0]
}
// 这时候还没有抉择出一个 node,那就开始计算优先级总和了,看哪个更低
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
for _, pod := range nodesToVictims[node].Pods {
// 这里的累加考虑到了先把优先级搞成正数。不然会出现1个 node 上有1优先级为 -3 的 pod,另外一个 node 上有2个优先级为 -3 的 pod,结果 -3>-6,有2个 pod 的 node 反而被认为总优先级更低!
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// 还是没有分出胜负,于是开始用 pod 总数做比较
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 还是没有区分出来1个 node 的话,只能放弃区分了,直接返回第一个结果
if lenNodes2 > 0 {
return minNodes2[0]
}
klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil}
pickOneNodeForPreemption从可以进行优先调度的node中选出最优的一个node。
- 如果node不用preemption pod,直接返回该node
- 选出对pdb影响最小的node,如果最小的只有一个node,直接返回该node
- 选择victims中最高PodPriority最低的Node,如果只有一个node,直接返回该node
- 选择所有victims优先级之和最小的那个Node,如果只有一个node,直接返回该node
- 选择victims pod数最少的Node,如果只有一个node,直接返回该node
- 如果上一步有不止一个Node满足条件,随机选择一个Node返回
参考
https://www.kubernetes.org.cn/5221.html
https://juejin.im/post/5c889c2e5188257df700a732
https://my.oschina.net/u/3797264/blog/2615842