前言
Pod priority
Pod 有了 priority(優先級) 後才有優先級排程、搶占排程的說法,高優先級的 pod 可以在排程隊列中排到前面,優先選擇 node;另外當高優先級的 pod 找不到合适的 node 時,就會看 node 上低優先級的 pod 驅逐之後是否能夠 run 起來,如果可以,那麼 node 上的一個或多個低優先級的 pod 會被驅逐,然後高優先級的 pod 得以成功運作1個 node 上。今天我們分析 pod 搶占相關的代碼。開始之前我們看一下和 priority 相關的2個示例配置檔案:
PriorityClass 例子
apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata: name: high-priorityvalue: 1000000globalDefault: falsedescription: "This priority class should be used for XYZ service pods only."
使用上述 PriorityClass
apiVersion: v1kind: Podmetadata: name: nginx labels: env: testspec: containers: - name: nginx image: nginx imagePullPolicy: IfNotPresent priorityClassName: high-priority
這兩個檔案的内容這裡不解釋,Pod priority 相關知識點不熟悉的小夥伴請先查閱官方文檔,我們下面看排程器中和 preempt 相關的代碼邏輯。
官網位址:https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/
preempt 入口
代碼調用:
sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt
當通過正常的排程流程如果沒有找到合适的節點(主要是預選沒有合适的節點),會判斷需不需要進行搶占排程,具體的代碼在pkg/scheduler/scheduler.go檔案下,用到的方法preempt。
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
// 特性沒有開啟就傳回 ""
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
return "", nil
}
// 更新 pod 資訊;入參和傳回值都是 *v1.Pod 類型
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
// preempt 過程,下文分析
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
var nodeName = ""
if node != nil {
nodeName = node.Name
// 更新隊列中“任命pod”隊列
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// 設定pod的Status.NominatedNodeName
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
// 如果出錯就從 queue 中移除
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
// 将要驅逐的 pod 驅逐
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
// Clearing nominated pods should happen outside of "if node != nil".
// 這個清理過程在上面的if外部,我們回頭從 Preempt() 的實作去了解
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
前面看完了正常排程,再來看看Scheduler.scheduleOne方法中,如果預選/優選失敗以後的PodPriority優先級排程是如何處理的,PodPriority優先級排程對應啟動的方法為sched.preempt(pod, fitError),Scheduler.preempt方法中是優先級排程的邏輯。
- 檢查PodPriority是否開啟,如果未開啟,直接傳回
- 由于該Pod在Predicate/Priortiy排程過程失敗後,會更新PodCondition,記錄排程失敗狀态及失敗原因。是以需要從apiserver中擷取PodCondition更新後的Pod Object;
- 執行scheduler的Preempt的方法,選出要執行優先排程的node以及node上要删除的pod
- 将要排程的pod綁定到上一步選出的node
- delete前兩步用Preempt方法選出來的要删除的pod
- 抹去上一步删除的pod與node綁定的資訊
preempt 實作
上面 preempt() 函數中涉及到了一些值得深入看看的對象,下面我們逐個看一下這些對象的實作。
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
//檢查error是不是預選失敗的error(因為優選隻是選擇更優的,是以隻會是預選失敗)
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
//檢查cachedNodeInfoMap是否有比将要執行優先排程pod的Priority更小的pod
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// 1.擷取預選排程失敗的節點,但是可能是潛在的搶占可能成功的節點(所有的搶占節點都是在潛在節點内部選擇)
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
// 2.擷取PDB(Pod中斷預算)清單
// ljs:部署在Kubernetes的每個App都可以建立一個對應PDB Object,// 用來限制Voluntary Disruptions時最大可以down的副本數或者最少應該保持Available的副本數,以此來保證應用的高可用。
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
// 3.擷取所有可以進行Preempt的Node節點的資訊,主要包含該節點哪些Pod需要被搶占掉
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
// We will only check nodeToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
// 4.擴充的Preempt排程判斷
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
// 5.選中某一個Node
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, err
}
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
// 6.判斷哪些Pod優先級較低,後續需要被清除掉,不作為NominatedPods存在
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
Preempt方法主要執行以下幾個步驟:
1、從預選失敗的節點中擷取可以用來做搶占排程的節點,通過一個switch語句排除不可以用來做搶占排程的節點
2、擷取PDB(Pod中斷預算)清單,用來做後續的判斷标準;部署在Kubernetes的每個App都可以建立一個對應PDB Object,用來限制Voluntary Disruptions時最大可以down的副本數或者最少應該保持Available的副本數,以此來保證應用的高可用。
3、通過調用selectNodesForPreemption方法,判斷哪些Node可以進行搶占排程。通過ParallelizeUntil方法同步對所有的Node進行判斷,判斷路徑為checkNode-->selectVictimsOnNode-->podFitsOnNode,最終同預選方法類似,使用了podFitsOnNode方法。不同于普通預選,搶占排程會先對Pod優先級判斷,然後在移除掉優先級較低的Pod之後再調用podFitsOnNode方法,以此達到搶占的效果。selectNodesForPreemption方法傳回的參數是一個map類型的值,key為Node資訊,value為該Node如果作為排程節點,将要清除的一些資訊,包括Pods和PDB資訊
4、擷取到搶占排程可以實作的Nodes資源後,繼續通過擴充的算法進行過濾;
5、選中最終的搶占排程的Node,調用pickOneNodeForPreemption方法,主要基于5個原則:
a)PDB violations(違規)值最小的Node;
b)挑選具有最低優先級受害者的節點,即被清除的Node上的Pods,它的優先級是最低的;
c)通過所有受害者Pods(将被删除的低優先級Pods)的優先級總和做區分;
d)如果多個Node優先級總和仍然相等,則選擇具有最小受害者數量的Node;
e)如果多個Node優先級總和仍然相等,則選擇第一個這樣的Node(随機排序);
6、選中最終的Node之後,記錄該Node上優先級較低的NominatedPods,這些Pod還未排程,需要将其排程關系進行删除,重新應用。
函數調用關系
genericScheduler.Preempt
|-->nodesWherePreemptionMightHelp
|-->selectNodesForPreemption
|-->selectVictimsOnNode
|-->filterPodsWithPDBViolation
|-->pickOneNodeForPreemption
|-->getLowerPriorityNominatedPods
nodesWherePreemptionMightHelp
nodesWherePreemptionMightHelp 要做的事情是尋找 predicates 階段失敗但是通過搶占也許能夠排程成功的 nodes.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
// 潛力 node, 用于存儲傳回值的 slice
potentialNodes := []*v1.Node{}
for _, node := range nodes {
// 這個為 true 表示一個 node 驅逐 pod 也不一定能适合目前 pod 運作
unresolvableReasonExist := false
// 一個 node 對應的所有失敗的 predicates
failedPredicates, _ := failedPredicatesMap[node.Name]
// 周遊,看是不是再下面指定的這些原因中,如果在,就标記 unresolvableReasonExist = true
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodAffinityRulesNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnderDiskPressure,
predicates.ErrNodeUnderPIDPressure,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeOutOfDisk,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition,
predicates.ErrVolumeZoneConflict,
predicates.ErrVolumeNodeConflict,
predicates.ErrVolumeBindConflict:
unresolvableReasonExist = true
// 如果找到一個上述失敗原因,說明這個 node 已經可以排除了,break 後繼續下一個 node 的計算
break
}
}
// false 的時候,也就是這個 node 也許驅逐 pods 後有用,那就添加到 potentialNodes 中
if !unresolvableReasonExist {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
selectNodesForPreemption
這個函數會并發計算所有的 nodes 是否通過驅逐實作 pod 搶占。
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node, // 上一個函數計算出來的 nodes
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue, // 這裡其實是前面講的優先級隊列 PriorityQueue
pdbs []*policy.PodDisruptionBudget, // pdb 清單) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
// 這種形式的并發已經不陌生了,前面遇到過幾次了
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
// 這裡有一個子過程調用,下面單獨介紹
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
Pods: pods,
NumPDBViolations: numPDBViolations,
}
// 如果 fit,就添加到 nodeToVictims 中,也就是最後的傳回值
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil}
selectVictimsOnNode
這個函數嘗試在給定的 node 中尋找最少數量的需要被驅逐的 pods,同時需要保證驅逐了這些 pods 之後,這個 noode 能夠滿足“pod”運作需求。
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
// 排個序
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
// 定義删除 pod 函數
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
// 定義添加 pod 函數
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
// 删除所有的低優先級 pod 看是不是能夠滿足排程需求了
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
// 删除的意思其實就是添加元素到 potentialVictims.Items
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
// 排個序
potentialVictims.Sort()
// 如果删除了所有的低優先級 pods 之後還不能跑這個新 pod,那麼差不多就可以判斷這個 node 不适合 preemption 了,還有一點點需要考慮的是這個“pod”的不 fit 的原因是由于 pod affinity 不滿足了。
// 後續可能會增加目前 pod 和低優先級 pod 之間的 優先級檢查。
// 這個函數調用其實就是之前講到過的預選函數的調用邏輯,判斷這個 pod 是否合适跑在這個 node 上。
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
// 對前兩步删除pod,檢查是否會導緻pod的數量小于pdb的min-avilable,分為會和不會兩類
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
// 釋放 pods 的函數,來一個放一個
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits {
removePod(p)
victims = append(victims, p)
klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
// 嘗試盡量多地釋放這些 pods,也就是說能少殺就少殺;這裡先從 PDB violating victims 中釋放,再從 PDB non-violating victims 中釋放;兩個組都是從高優先級的 pod 開始釋放。
// 釋放 violatingVictims 中元素的同時會記錄放了多少個
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
// 開始釋放 non-violating victims.
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true}
selectVictimsOnNode選出node給優先排程pod騰出資源要需要删除的pod(最少數量的),以及對删除pod而導緻pod少于pdb定義進行計數。流程如下:
- 擷取優先排程pod的PodPriority
- 擷取所有比優先排程pod的PodPriority更小的pod,并執行removepod函數删除這些pod
- 在上一步删除Pod之後,再在node上對優先排程pod進行預選
- 對前兩步删除pod,檢查是否會導緻pod的數量小于pdb的min-avilable,分為會和不會兩類
- 先對因為pdb限制不能删除的pod執行reprievePod函數(reprievePod函數:先add pod再執行預選,如果預選失敗,則再remove pod)
- 再對沒有pdb限制或者pdb允許删除的pod執行reprievePod函數
pickOneNodeForPreemption
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
// 初始化為最大值
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
// 這個循環要找到 PDBViolatingPods 最少的 node,如果有多個,就全部存在 minNodes1 中
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
// 如果發現一個不需要驅逐 pod 的 node,馬上傳回
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
// 如果隻找到1個 PDB violations 最少的 node,那就直接傳回這個 node 就 ok 了
if lenNodes1 == 1 {
return minNodes1[0]
}
// 還剩下多個 node,那就尋找 highest priority victim 最小的 node
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
// 這個循環要做的事情是看2個 node 上 victims 中最高優先級的 pod 哪個優先級更高
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
lenNodes2 = 0
}
if highestPodPriority == minHighestPriority {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 發現隻有1個,那就直接傳回
if lenNodes2 == 1 {
return minNodes2[0]
}
// 這時候還沒有抉擇出一個 node,那就開始計算優先級總和了,看哪個更低
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
for _, pod := range nodesToVictims[node].Pods {
// 這裡的累加考慮到了先把優先級搞成正數。不然會出現1個 node 上有1優先級為 -3 的 pod,另外一個 node 上有2個優先級為 -3 的 pod,結果 -3>-6,有2個 pod 的 node 反而被認為總優先級更低!
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// 還是沒有分出勝負,于是開始用 pod 總數做比較
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 還是沒有區分出來1個 node 的話,隻能放棄區分了,直接傳回第一個結果
if lenNodes2 > 0 {
return minNodes2[0]
}
klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil}
pickOneNodeForPreemption從可以進行優先排程的node中選出最優的一個node。
- 如果node不用preemption pod,直接傳回該node
- 選出對pdb影響最小的node,如果最小的隻有一個node,直接傳回該node
- 選擇victims中最高PodPriority最低的Node,如果隻有一個node,直接傳回該node
- 選擇所有victims優先級之和最小的那個Node,如果隻有一個node,直接傳回該node
- 選擇victims pod數最少的Node,如果隻有一個node,直接傳回該node
- 如果上一步有不止一個Node滿足條件,随機選擇一個Node傳回
參考
https://www.kubernetes.org.cn/5221.html
https://juejin.im/post/5c889c2e5188257df700a732
https://my.oschina.net/u/3797264/blog/2615842