kubelet節點壓力驅逐-源碼分析篇。kubelet監控叢集節點的 CPU、記憶體、磁盤空間和檔案系統的inode 等資源,根據kubelet啟動參數中的驅逐政策配置,當這些資源中的一個或者多個達到特定的消耗水準,kubelet 可以主動地驅逐節點上一個或者多個pod,以回收資源,降低節點資源壓力。
kubelet節點壓力驅逐-概述
kubelet監控叢集節點的 CPU、記憶體、磁盤空間和檔案系統的inode 等資源,根據kubelet啟動參數中的驅逐政策配置,當這些資源中的一個或者多個達到特定的消耗水準,kubelet 可以主動地驅逐節點上一個或者多個pod,以回收資源,降低節點資源壓力。
驅逐信号
節點上的memory、nodefs、pid等資源都有驅逐信号,kubelet通過将驅逐信号與驅逐政策進行比較來做出驅逐決定;
驅逐政策
kubelet節點壓力驅逐包括了兩種,軟驅逐和硬驅逐;
軟驅逐
軟驅逐機制表示,當node節點的memory、nodefs等資源達到一定的門檻值後,需要持續觀察一段時間(寬限期),如果期間該資源又恢複到低于門檻值,則不進行pod的驅逐,若高于門檻值持續了一段時間(寬限期),則觸發pod的驅逐。
硬驅逐
硬驅逐政策沒有寬限期,當達到硬驅逐條件時,kubelet會立即觸發pod的驅逐,而不是優雅終止。
關于kubelet節點壓力驅逐的詳細介紹,可以檢視上一篇文章-kubelet節點壓力驅逐;
kubelet節點壓力驅逐-源碼分析
負責kubelet節點壓力驅逐的是kubelet中的
evictionManager
;
基于kubernets v1.17.4
從kubelet的Run方法為入口,通過一系列的調用,調用了
evictionManager.Start
方法;
調用鍊:
kubelet.Run() --> kubelet.updateRuntimeUp() --> kubelet.initializeRuntimeDependentModules() --> kubelet.evictionManager.Start()
// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
...
}
// pkg/kubelet/kubelet.go
func (kl *Kubelet) updateRuntimeUp() {
...
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
...
}
evictionManager在沒有發生pod驅逐時,驅逐監測間隔時間為10s;
// pkg/kubelet/kubelet.go
const (
...
evictionMonitoringPeriod = time.Second * 10
...
)
func (kl *Kubelet) initializeRuntimeDependentModules() {
...
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
...
}
1.evictionManager結構體分析
managerImpl struct
分析
evictionManager.Start
方法前,先來分析下
eviction_manager
的結構體
managerImpl struct
,看其有哪些比較重要的屬性:
(1)config:存儲着
eviction_manager
的相關配置,根據kubelet啟動參數配置值來初始化config;
(2)thresholdsMet:記錄已經達到驅逐門檻值,但還沒有滿足驅逐政策條件,觸發驅逐的
Threshold
切片(
Threshold
後面做介紹);
(3)thresholdsFirstObservedAt:記錄各個
Threshold
的第一次發現時間點;
(4)lastObservations:記錄上一次調諧進行中,軟驅逐、硬驅逐中各個驅逐信号的資源總量、資源可用量、探測時間;
(5)signalToRankFunc:存儲軟驅逐、硬驅逐中各個驅逐信号所對應的排序函數,排序函數用于計算被驅逐pod的順序;
(6)killPodFunc:定義了驅逐pod的具體函數,在
eviction_manager
初始化的時候,該值被指派為
pkg/kubelet/pod_workers.go-killPodNow()
函數;
// pkg/kubelet/eviction/eviction_manager.go
type managerImpl struct {
...
config Config
thresholdsMet []evictionapi.Threshold
thresholdsFirstObservedAt thresholdsObservedAt
lastObservations signalObservations
signalToRankFunc map[evictionapi.Signal]rankFunc
killPodFunc KillPodFunc
...
}
1.1 Config struct
Config存儲着
eviction_manager
的相關配置,根據kubelet啟動參數配置值來初始化Config;
其中Config.Thresholds屬性存儲着配置的驅逐政策資訊;
// pkg/kubelet/eviction/types.go
type Config struct {
...
Thresholds []evictionapi.Threshold
MaxPodGracePeriodSeconds int64
PressureTransitionPeriod time.Duration
...
}
1.2 evictionapi.Threshold
看到Threshold結構體,重要的幾個屬性如下:
(1)
Signal
:驅逐信号;
(2)
Operator
:驅逐信号對應資源的實際統計值與驅逐門檻值之間的比較運算符;
(3)
Value
:驅逐門檻值;
(4)
GracePeriod
:驅逐信号對應資源的實際統計值達到驅逐門檻值之後需要持續GracePeriod時間後,才會觸發驅逐;
(5)
MinReclaim
:觸發驅逐後的資源最小回收值;
// pkg/kubelet/eviction/api/types.go
type Threshold struct {
// Signal defines the entity that was measured.
Signal Signal
// Operator represents a relationship of a signal to a value.
Operator ThresholdOperator
// Value is the threshold the resource is evaluated against.
Value ThresholdValue
// GracePeriod represents the amount of time that a threshold must be met before eviction is triggered.
GracePeriod time.Duration
// MinReclaim represents the minimum amount of resource to reclaim if the threshold is met.
MinReclaim *ThresholdValue
}
1.3 thresholdsObservedAt
thresholdsObservedAt是個map類型,記錄各個
Threshold
及其第一次發現時間點;
type thresholdsObservedAt map[evictionapi.Threshold]time.Time
1.4 signalObservations
signalObservations是個map類型,記錄上一次調諧進行中,軟驅逐、硬驅逐中各個驅逐信号的資源總量、資源可用量、探測時間;
type signalObservations map[evictionapi.Signal]signalObservation
type signalObservation struct {
// The resource capacity
capacity *resource.Quantity
// The available resource
available *resource.Quantity
// Time at which the observation was taken
time metav1.Time
}
1.5 rankFunc
rankFunc是排序函數,用于計算被驅逐pod的順序;
其函數入參為pod清單以及一個
statsFunc
,
statsFunc
是個函數,傳回pod相關資源統計的一個函數;
// pkg/kubelet/eviction/types.go
type statsFunc func(pod *v1.Pod) (statsapi.PodStats, bool)
type rankFunc func(pods []*v1.Pod, stats statsFunc)
// pkg/kubelet/apis/stats/v1alpha1/types.go
type PodStats struct {
// Reference to the measured Pod.
PodRef PodReference `json:"podRef"`
// The time at which data collection for the pod-scoped (e.g. network) stats was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats of containers in the measured pod.
Containers []ContainerStats `json:"containers" patchStrategy:"merge" patchMergeKey:"name"`
// Stats pertaining to CPU resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
Memory *MemoryStats `json:"memory,omitempty"`
// Stats pertaining to network resources.
Network *NetworkStats `json:"network,omitempty"`
// Stats pertaining to volume usage of filesystem resources.
// VolumeStats.UsedBytes is the number of bytes used by the Volume
VolumeStats []VolumeStats `json:"volume,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
// EphemeralStorage reports the total filesystem usage for the containers and emptyDir-backed volumes in the measured Pod.
EphemeralStorage *FsStats `json:"ephemeral-storage,omitempty"`
}
2.evictionManager處理邏輯分析
evictionManager.Start
evictionManager.Start方法中包含了兩部分的啟動:
(1)實時驅逐:如果配置了
KernelMemcgNotification
(即kubelet啟動參數
--experimental-kernel-memcg-notification
配置為true,預設為false),則會針對memory記憶體資源,利用kernel memcg notification,根據核心實時通知,調用
m.synchronize
方法執行驅逐邏輯(暫不展開分析);
(2)輪詢驅逐:拉起一個goroutine,循環調用
m.synchronize
方法執行驅逐邏輯,如果被驅逐的pod不為空,則調用
m.waitForPodsCleanup
方法等待被驅逐的pod删除成功,如果沒有pod被驅逐,則sleep 10秒後再循環;
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
klog.Infof(message)
m.synchronize(diskInfoProvider, podFunc)
}
// 啟動實時驅逐
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
if err != nil {
klog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// 啟動輪詢驅逐
// start the eviction manager monitoring
go func() {
for {
if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
klog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
}
}()
}
2.1 m.synchronize
m.synchronize方法為kubelet節點壓力驅逐的核心處理方法,方法中會根據kubelet配置的驅逐政策,計算并判斷是否符合驅逐條件,符合則根據一定的優先級來驅逐pod,然後傳回被驅逐的pod(每次調用
m.synchronize
方法最多隻會驅逐一個pod);
且這裡說的驅逐pod,隻是将
pod.status.phase
值更新為
Failed
,并附上驅逐reason:
Evicted
以及觸發驅逐的詳細資訊,不會删除pod;而
pod.status.phase
值被更新為
Failed
後,replicaset controller會再次建立出新的pod調用到其他節點上,達到驅逐pod的效果;
方法主要邏輯為:
(1)從kubelet啟動參數中擷取驅逐政策配置,傳回thresholds;
(2)判斷imageFs和rootfs是否為同一個,然後調用
buildSignalToRankFunc
函數來建構pod的排序函數(buildSignalToRankFunc函數傳回軟驅逐、硬驅逐中各個驅逐信号所對應的排序函數,排序函數用于計算被驅逐pod的順序),調用
buildSignalToNodeReclaimFuncs
函數建構節點資源回收函數(用于後續在執行驅逐pod之前,先調用節點資源回收函數來回收資源,如果回收的資源足夠,則不用走驅逐邏輯);
(3)調用
podFunc
,即調用
kl.GetActivePods
方法,擷取會被驅逐的pod清單-activePods;
(4)調用
m.summaryProvider.Get
,擷取各種統計資訊,如節點上各個資源的總量以及使用量情況、容器的資源聲明及使用量情況等;
(5)ThresholdNotifier相關的通知實作,ThresholdNotifier-基于觀察者模式實作對特殊資源驅逐管理的支援;
(6)調用
makeSignalObservations
函數,根據前面擷取到的節點資源總量及使用量等各種統計資訊,組裝傳回observations,并傳回擷取pod資源總量及使用量等統計資訊的方法statsFunc,該方法後面會用到;
(7)調用thresholdsMet函數,比較observations中的資源使用量和thresholds中的驅逐政策配置門檻值之間的大小,将超過門檻值的驅逐信号(即
memory.available
、
nodefs.available
等)組裝成
thresholds
傳回;
(8)判斷
m.thresholdsMet
(
m.thresholdsMet
記錄了已經達到驅逐門檻值,但還沒有滿足驅逐政策條件,觸發驅逐的
Threshold
切片)長度是否大于0,大于0則調用mergeThresholds函數,将上面得到的
thresholds
與
m.thresholdsMet
合并;
(9)調用
thresholdsFirstObservedAt
函數,傳入
thresholds
與
m.thresholdsFirstObservedAt
(記錄各個
Threshold
的第一次發現時間點),記錄并更新各個驅逐信号的第一次超過門檻值的時間,傳回
thresholdsFirstObservedAt
;
(10)調用nodeConditionsObservedSince函數,判斷距離上次更新nodeCondition時間是否已經超過了
m.config.PressureTransitionPeriod
(即kubelet啟動參數配置
--eviction-pressure-transition-period
),超過則更新nodeConditions并傳回(這裡還沒有把nodeCondition更新到node對象中去);
(11)調用
thresholdsMetGracePeriod
函數,篩選出驅逐信号達到驅逐門檻值并持續了
evictionSoftGracePeriod
時間的(即kubelet啟動參數配置
--eviction-soft-grace-period
),組裝并傳回
thresholds
,此時的
thresholds
是滿足驅逐政策即将觸發驅逐的thresholds;
(12)更新
managerImpl
的部分成員變量的值,如
nodeConditions
、
thresholdsFirstObservedAt
、
nodeConditionsLastObservedAt
、
thresholdsMet
、
lastObservations
;
(13)判斷
LocalStorageCapacityIsolation
即localStorage驅逐的featuregate是否開啟,是則先調用
m.localStorageEviction
處理localstorage驅逐,如果傳回驅逐的pod清單不為空,則證明是localStorage觸發的驅逐,且已經處理完畢,直接return;
(14)判斷即将觸發驅逐的
thresholds
長度是否為0,是則代表沒有觸發驅逐,不需要執行驅逐邏輯,直接return;
(15)調用
sort.Sort(byEvictionPriority(thresholds))
,給
thresholds
排序,将記憶體排在所有其他資源信号之前,并将沒有資源可回收的門檻值排在最後;
(16)根據排序結果,調用
getReclaimableThreshold
,周遊
thresholds
,從中擷取第一個可以被回收的
threshold
,傳回
thresholdToReclaim
;
(17)調用
m.reclaimNodeLevelResources
,回收上面擷取到的節點級的資源
thresholdToReclaim
,如果回收的資源足夠,則直接return,不需要往下執行驅逐pod的邏輯;
(18)調用
m.signalToRankFunc[thresholdToReclaim.Signal]
,擷取對應驅逐信号的pod排序函數;
(19)判斷
activePods
長度是否為0,是則直接return,沒有可被驅逐的pod,無法執行驅逐邏輯;
(20)調用
rank(activePods, statsFunc)
,根據之前擷取到的pod排序算法,給pod清單進行排序,再次得到
activePods
,用于後面驅逐pod;
(21)周遊
activePods
清單,擷取pod的
gracePeriod
(硬驅逐為0,軟驅逐則根據kubelet啟動參數
--eviction-max-pod-grace-period
配置值獲得),調用
evictionMessage
函數,構造驅逐message,後續更新到pod的event和status中,用于說明為什麼發生驅逐,最後調用
m.evictPod
,判斷pod能否被驅逐,能則開始驅逐pod;
但這裡要注意的是,每次調用
m.synchronize
方法,最多隻驅逐一個pod,驅逐成功一個pod則直接return;
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
// (1)擷取驅逐政策配置
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 && !utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
return nil
}
klog.V(3).Infof("eviction manager: synchronize housekeeping")
// build the ranking functions (if not yet known)
// TODO: have a function in cadvisor that lets us know if global housekeeping has completed
if m.dedicatedImageFs == nil {
hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
if ok != nil {
return nil
}
m.dedicatedImageFs = &hasImageFs
// (2)調用`buildSignalToRankFunc`函數來建構pod的排序函數(buildSignalToRankFunc函數傳回軟驅逐、硬驅逐中各個驅逐信号所對應的排序函數,排序函數用于計算被驅逐pod的順序)
m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
}
// (3)調用`podFunc`,即調用`kl.GetActivePods`方法,擷取會被驅逐的pod清單-activePods
activePods := podFunc()
updateStats := true
// (4)擷取各種統計資訊,如節點上各個資源的總量以及使用量情況、容器的資源聲明及使用量情況等
summary, err := m.summaryProvider.Get(updateStats)
if err != nil {
klog.Errorf("eviction manager: failed to get summary stats: %v", err)
return nil
}
// (5)ThresholdNotifier相關的通知實作,ThresholdNotifier-基于觀察者模式實作對特殊資源驅逐管理的支援;
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
m.thresholdsLastUpdated = m.clock.Now()
for _, notifier := range m.thresholdNotifiers {
if err := notifier.UpdateThreshold(summary); err != nil {
klog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err)
}
}
}
// (6)調用`makeSignalObservations`函數,根據前面擷取到的節點資源總量及使用量等各種統計資訊,組裝傳回observations,并傳回擷取pod資源總量及使用量等統計資訊的方法statsFunc,該方法後面會用到
// make observations and get a function to derive pod usage stats relative to those observations.
observations, statsFunc := makeSignalObservations(summary)
debugLogObservations("observations", observations)
// (7)調用thresholdsMet函數,比較observations中的資源使用量和thresholds中的驅逐政策配置門檻值之間的大小,将超過門檻值的驅逐信号(即`memory.available`、`nodefs.available`等)組裝成`thresholds`傳回
// determine the set of thresholds met independent of grace period
thresholds = thresholdsMet(thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
// (8)判斷`m.thresholdsMet`(`m.thresholdsMet`記錄了已經達到驅逐門檻值,但還沒有滿足驅逐政策條件,觸發驅逐的`Threshold`切片)長度是否大于0,大于0則調用mergeThresholds函數,将上面得到的`thresholds`與`m.thresholdsMet`合并
// determine the set of thresholds previously met that have not yet satisfied the associated min-reclaim
if len(m.thresholdsMet) > 0 {
thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
}
debugLogThresholdsWithObservation("thresholds - reclaim not satisfied", thresholds, observations)
// (9)調用`thresholdsFirstObservedAt`函數,傳入`thresholds`與`m.thresholdsFirstObservedAt`(記錄各個`Threshold`的第一次發現時間點),記錄并更新各個驅逐信号的第一次超過門檻值的時間,傳回`thresholdsFirstObservedAt`
// track when a threshold was first observed
now := m.clock.Now()
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
// the set of node conditions that are triggered by currently observed thresholds
nodeConditions := nodeConditions(thresholds)
if len(nodeConditions) > 0 {
klog.V(3).Infof("eviction manager: node conditions - observed: %v", nodeConditions)
}
// (10)調用nodeConditionsObservedSince函數,判斷距離上次更新nodeCondition時間是否已經超過了`m.config.PressureTransitionPeriod`(即kubelet啟動參數配置`--eviction-pressure-transition-period`),超過則更新nodeConditions并傳回(這裡還沒有把nodeCondition更新到node對象中去)
// track when a node condition was last observed
nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)
// node conditions report true if it has been observed within the transition period window
nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
if len(nodeConditions) > 0 {
klog.V(3).Infof("eviction manager: node conditions - transition period not met: %v", nodeConditions)
}
// (11)調用`thresholdsMetGracePeriod`函數,篩選出驅逐信号達到驅逐門檻值并持續了`evictionSoftGracePeriod`時間的(即kubelet啟動參數配置`--eviction-soft-grace-period`),組裝并傳回`thresholds`,此時的`thresholds`是滿足驅逐政策即将觸發驅逐的thresholds;
// determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met)
thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
debugLogThresholdsWithObservation("thresholds - grace periods satisfied", thresholds, observations)
// (12)更新`managerImpl`的部分成員變量的值,如`nodeConditions`、`thresholdsFirstObservedAt`、`nodeConditionsLastObservedAt`、`thresholdsMet`、`lastObservations`
// update internal state
m.Lock()
m.nodeConditions = nodeConditions
m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
m.thresholdsMet = thresholds
// determine the set of thresholds whose stats have been updated since the last sync
thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations)
m.lastObservations = observations
m.Unlock()
// (13)判斷`LocalStorageCapacityIsolation`即localStorage驅逐的featuregate是否開啟,是則先調用`m.localStorageEviction`處理localstorage驅逐,如果傳回驅逐的pod清單不為空,則證明是localStorage觸發的驅逐,且已經處理完畢,直接return
// evict pods if there is a resource usage violation from local volume temporary storage
// If eviction happens in localStorageEviction function, skip the rest of eviction action
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
if evictedPods := m.localStorageEviction(summary, activePods); len(evictedPods) > 0 {
return evictedPods
}
}
// (14)判斷即将觸發驅逐的`thresholds`長度是否為0,是則代表沒有觸發驅逐,不需要執行驅逐邏輯,直接return
if len(thresholds) == 0 {
klog.V(3).Infof("eviction manager: no resources are starved")
return nil
}
// (15)調用`sort.Sort(byEvictionPriority(thresholds))`,給`thresholds`排序,将記憶體排在所有其他資源信号之前,并将沒有資源可回收的門檻值排在最後
// rank the thresholds by eviction priority
sort.Sort(byEvictionPriority(thresholds))
// (16)根據排序結果,調用`getReclaimableThreshold`,周遊`thresholds`,從中擷取第一個可以被回收的`threshold`,傳回`thresholdToReclaim`
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
if !foundAny {
return nil
}
klog.Warningf("eviction manager: attempting to reclaim %v", resourceToReclaim)
// record an event about the resources we are now attempting to reclaim via eviction
m.recorder.Eventf(m.nodeRef, v1.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)
// (17)調用`m.reclaimNodeLevelResources`,回收上面擷取到的節點級的資源`thresholdToReclaim`,如果回收的資源足夠,則直接return,不需要往下執行驅逐pod的邏輯
// check if there are node-level resources we can reclaim to reduce pressure before evicting end-user pods.
if m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) {
klog.Infof("eviction manager: able to reduce %v pressure without evicting pods.", resourceToReclaim)
return nil
}
klog.Infof("eviction manager: must evict pod(s) to reclaim %v", resourceToReclaim)
// (18)調用`m.signalToRankFunc[thresholdToReclaim.Signal]`,擷取對應驅逐信号的pod排序函數
// rank the pods for eviction
rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
if !ok {
klog.Errorf("eviction manager: no ranking function for signal %s", thresholdToReclaim.Signal)
return nil
}
// (19)判斷`activePods`長度是否為0,是則直接return,沒有可被驅逐的pod,無法執行驅逐邏輯
// the only candidates viable for eviction are those pods that had anything running.
if len(activePods) == 0 {
klog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict")
return nil
}
// (20)調用`rank(activePods, statsFunc)`,根據之前擷取到的pod排序算法,給pod清單進行排序,再次得到`activePods`,用于後面驅逐pod
// rank the running pods for eviction for the specified resource
rank(activePods, statsFunc)
klog.Infof("eviction manager: pods ranked for eviction: %s", format.Pods(activePods))
//record age of metrics for met thresholds that we are using for evictions.
for _, t := range thresholds {
timeObserved := observations[t.Signal].time
if !timeObserved.IsZero() {
metrics.EvictionStatsAge.WithLabelValues(string(t.Signal)).Observe(metrics.SinceInSeconds(timeObserved.Time))
metrics.DeprecatedEvictionStatsAge.WithLabelValues(string(t.Signal)).Observe(metrics.SinceInMicroseconds(timeObserved.Time))
}
}
// (21)周遊`activePods`清單,擷取pod的`gracePeriod`(硬驅逐為0,軟驅逐則根據kubelet啟動參數`--eviction-max-pod-grace-period`配置值獲得),調用`m.evictPod`,判斷pod能否被驅逐,能則開始驅逐pod,但這裡要注意的是,每次調用`m.synchronize`方法,最多隻驅逐一個pod,驅逐成功一個pod則直接return
// we kill at most a single pod during each eviction interval
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64(0)
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
// 調用`evictionMessage`函數,構造驅逐message,後續更新到pod的event和status中,用于說明為什麼發生驅逐
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
if m.evictPod(pod, gracePeriodOverride, message, annotations) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}
}
}
klog.Infof("eviction manager: unable to evict any pods from the node")
return nil
}
2.1.1 m.config.Thresholds
m.config.Thresholds屬性存儲着配置的驅逐政策資訊,在kubelet初始化的時候調用
eviction.ParseThresholdConfig
函數,根據函數傳回被指派;
// pkg/kubelet/eviction/types.go
type Config struct {
...
Thresholds []evictionapi.Threshold
...
}
// pkg/kubelet/kubelet.go
func NewMainKubelet(...) {
...
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: experimentalKernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
...
}
調用
eviction.ParseThresholdConfig
函數時的入參
kubeCfg.EvictionHard
、
kubeCfg.EvictionSoft
、
kubeCfg.EvictionSoftGracePeriod
、
kubeCfg.EvictionMinimumReclaim
等值都來源于kubelet的啟動參數配置;
// cmd/kubelet/app/options/options.go
func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfiguration) {
...
fs.Var(cliflag.NewLangleSeparatedMapStringString(&c.EvictionHard), "eviction-hard", "A set of eviction thresholds (e.g. memory.available<1Gi) that if met would trigger a pod eviction.")
fs.Var(cliflag.NewLangleSeparatedMapStringString(&c.EvictionSoft), "eviction-soft", "A set of eviction thresholds (e.g. memory.available<1.5Gi) that if met over a corresponding grace period would trigger a pod eviction.")
fs.Var(cliflag.NewMapStringString(&c.EvictionSoftGracePeriod), "eviction-soft-grace-period", "A set of eviction grace periods (e.g. memory.available=1m30s) that correspond to how long a soft eviction threshold must hold before triggering a pod eviction.")
fs.Var(cliflag.NewMapStringString(&c.EvictionMinimumReclaim), "eviction-minimum-reclaim", "A set of minimum reclaims (e.g. imagefs.available=2Gi) that describes the minimum amount of resource the kubelet will reclaim when performing a pod eviction if that resource is under pressure.")
...
}
eviction.ParseThresholdConfig
eviction.ParseThresholdConfig函數中對軟驅逐、硬驅逐相關的配置值進行處理并最終合并傳回存儲着驅逐政策資訊的
[]evictionapi.Threshold
結構體;
從方法中也可以看到,軟驅逐、硬驅逐中的每個驅逐信号,都會生成一個
evictionapi.Threshold
,是以最終方法傳回是
[]evictionapi.Threshold
;
// pkg/kubelet/eviction/helpers.go
func ParseThresholdConfig(allocatableConfig []string, evictionHard, evictionSoft, evictionSoftGracePeriod, evictionMinimumReclaim map[string]string) ([]evictionapi.Threshold, error) {
results := []evictionapi.Threshold{}
hardThresholds, err := parseThresholdStatements(evictionHard)
if err != nil {
return nil, err
}
results = append(results, hardThresholds...)
softThresholds, err := parseThresholdStatements(evictionSoft)
if err != nil {
return nil, err
}
gracePeriods, err := parseGracePeriods(evictionSoftGracePeriod)
if err != nil {
return nil, err
}
minReclaims, err := parseMinimumReclaims(evictionMinimumReclaim)
if err != nil {
return nil, err
}
for i := range softThresholds {
signal := softThresholds[i].Signal
period, found := gracePeriods[signal]
if !found {
return nil, fmt.Errorf("grace period must be specified for the soft eviction threshold %v", signal)
}
softThresholds[i].GracePeriod = period
}
results = append(results, softThresholds...)
for i := range results {
if minReclaim, ok := minReclaims[results[i].Signal]; ok {
results[i].MinReclaim = &minReclaim
}
}
for _, key := range allocatableConfig {
if key == kubetypes.NodeAllocatableEnforcementKey {
results = addAllocatableThresholds(results)
break
}
}
return results, nil
}
2.1.2 buildSignalToRankFunc
buildSignalToRankFunc函數傳回
map[evictionapi.Signal]rankFunc
,其代表了軟驅逐、硬驅逐中各個驅逐信号所對應的排序函數,排序函數用于計算被驅逐pod的順序;
// pkg/kubelet/eviction/helpers.go
func buildSignalToRankFunc(withImageFs bool) map[evictionapi.Signal]rankFunc {
signalToRankFunc := map[evictionapi.Signal]rankFunc{
evictionapi.SignalMemoryAvailable: rankMemoryPressure,
evictionapi.SignalAllocatableMemoryAvailable: rankMemoryPressure,
evictionapi.SignalPIDAvailable: rankPIDPressure,
}
// usage of an imagefs is optional
if withImageFs {
// with an imagefs, nodefs pod rank func for eviction only includes logs and local volumes
signalToRankFunc[evictionapi.SignalNodeFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalNodeFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}, resourceInodes)
// with an imagefs, imagefs pod rank func for eviction only includes rootfs
signalToRankFunc[evictionapi.SignalImageFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalImageFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot}, resourceInodes)
} else {
// without an imagefs, nodefs pod rank func for eviction looks at all fs stats.
// since imagefs and nodefs share a common device, they share common ranking functions.
signalToRankFunc[evictionapi.SignalNodeFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalNodeFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, resourceInodes)
signalToRankFunc[evictionapi.SignalImageFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalImageFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, resourceInodes)
}
return signalToRankFunc
}
因記憶體資源緊張導緻的驅逐比較常見,是以這裡對其中記憶體的pod排序函數來做一下分析;
rankMemoryPressure
可以看到記憶體資源的pod排序邏輯為:
(1)先根據pod的記憶體使用量是否超過記憶體request排序,超過的排在前面;
(2)再根據pod的priority值大小排序,值小的排在前面;
(3)最後根據pod記憶體request值減去pod的記憶體使用量的值,得到值小的排在前面;
// pkg/kubelet/eviction/helpers.go
func rankMemoryPressure(pods []*v1.Pod, stats statsFunc) {
orderedBy(exceedMemoryRequests(stats), priority, memory(stats)).Sort(pods)
}
從這個排序函數也可以看出,當因為宿主記憶體資源緊張發生驅逐時,什麼樣的pod會最先被驅逐;
關于pod的priority詳細介紹,可以檢視官方文檔:https://kubernetes.io/zh/docs/concepts/scheduling-eviction/pod-priority-preemption/
2.1.3 buildSignalToNodeReclaimFuncs
buildSignalToNodeReclaimFuncs用于建構節點資源回收函數,回收函數用于後續在執行驅逐pod之前,先調用節點資源回收函數來回收資源,如果回收的資源足夠,則不用走驅逐邏輯;
可以看到隻有
nodefs.available
、
nodefs.inodesfree
、
imagefs.available
、
imagefs.inodesfree
四個驅逐信号有回收函數,其餘驅逐信号均沒有;且當有專門的imageFs時,
nodefs.available
、
nodefs.inodesfree
也不會有回收函數;
// pkg/kubelet/eviction/helpers.go
func buildSignalToNodeReclaimFuncs(imageGC ImageGC, containerGC ContainerGC, withImageFs bool) map[evictionapi.Signal]nodeReclaimFuncs {
signalToReclaimFunc := map[evictionapi.Signal]nodeReclaimFuncs{}
// usage of an imagefs is optional
if withImageFs {
// with an imagefs, nodefs pressure should just delete logs
signalToReclaimFunc[evictionapi.SignalNodeFsAvailable] = nodeReclaimFuncs{}
signalToReclaimFunc[evictionapi.SignalNodeFsInodesFree] = nodeReclaimFuncs{}
// with an imagefs, imagefs pressure should delete unused images
signalToReclaimFunc[evictionapi.SignalImageFsAvailable] = nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
signalToReclaimFunc[evictionapi.SignalImageFsInodesFree] = nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
} else {
// without an imagefs, nodefs pressure should delete logs, and unused images
// since imagefs and nodefs share a common device, they share common reclaim functions
signalToReclaimFunc[evictionapi.SignalNodeFsAvailable] = nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
signalToReclaimFunc[evictionapi.SignalNodeFsInodesFree] = nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
signalToReclaimFunc[evictionapi.SignalImageFsAvailable] = nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
signalToReclaimFunc[evictionapi.SignalImageFsInodesFree] = nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
}
return signalToReclaimFunc
}
2.1.4 kl.GetActivePods
kl.GetActivePods方法用于擷取能被驅逐的pod清單,過濾掉以下情形的pod之後,傳回的pod清單即為能被驅逐的pod清單:
(1)failed狀态;
(2)succeeded狀态;
(3)pod的DeletionTimestamp不為空,且notRunning函數傳回true;
// pkg/kubelet/kubelet_pods.go
func (kl *Kubelet) GetActivePods() []*v1.Pod {
allPods := kl.podManager.GetPods()
activePods := kl.filterOutTerminatedPods(allPods)
return activePods
}
func (kl *Kubelet) filterOutTerminatedPods(pods []*v1.Pod) []*v1.Pod {
var filteredPods []*v1.Pod
for _, p := range pods {
if kl.podIsTerminated(p) {
continue
}
filteredPods = append(filteredPods, p)
}
return filteredPods
}
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod.Status
}
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
func notRunning(statuses []v1.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}
2.1.5 m.summaryProvider.Get
m.summaryProvider.Get方法從各個途徑擷取各種統計資訊,然後組裝并傳回,各種統計資訊如節點上各種資源的總量以及使用量情況、容器的資源聲明及使用量情況等;
(1)
sp.provider.GetNode()
,最終是從client-go informer的本地緩存中擷取node對象;
(2)
sp.provider.GetNodeConfig()
,最終是從container_manager中擷取NodeConfig結構體;
(3)
sp.provider.GetCgroupStats()
,從cadvisor中擷取根目錄
“/”
下的cgroup的統計資訊;
(4)
sp.provider.RootFsStats()
,從cadvisor中擷取root檔案系統的統計資訊;
(5)
sp.provider.ImageFsStats()
,擷取image檔案系統的統計資訊,其有兩個實作,一個是
criStatsProvider
,另一個是
cadvisorStatsProvider
;
(6)
sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage()
,更新所有容器的cpu usage資訊并擷取所有pod的啟動時間、容器狀态、cpu使用量、記憶體使用量等統計資訊,其有兩個實作,一個是
criStatsProvider
,另一個是
cadvisorStatsProvider
;
(7)
sp.provider.ListPodStats()
,擷取所有pod的啟動時間、容器狀态、cpu使用量、記憶體使用量等統計資訊,其有兩個實作,一個是
criStatsProvider
,另一個是
cadvisorStatsProvider
;
(8)
sp.provider.RlimitStats()
,擷取pid限制資訊;
// pkg/kubelet/server/stats/summary.go
func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error) {
// TODO(timstclair): Consider returning a best-effort response if any of
// the following errors occur.
// 從client-go informer的本地緩存中擷取node對象
node, err := sp.provider.GetNode()
if err != nil {
return nil, fmt.Errorf("failed to get node info: %v", err)
}
// 從container_manager中擷取NodeConfig結構體
nodeConfig := sp.provider.GetNodeConfig()
// 從cadvisor中擷取根目錄“/”下的cgroup的統計資訊
rootStats, networkStats, err := sp.provider.GetCgroupStats("/", updateStats)
if err != nil {
return nil, fmt.Errorf("failed to get root cgroup stats: %v", err)
}
// 從cadvisor中擷取root檔案系統的統計資訊
rootFsStats, err := sp.provider.RootFsStats()
if err != nil {
return nil, fmt.Errorf("failed to get rootFs stats: %v", err)
}
// 擷取image檔案系統的統計資訊
imageFsStats, err := sp.provider.ImageFsStats()
if err != nil {
return nil, fmt.Errorf("failed to get imageFs stats: %v", err)
}
var podStats []statsapi.PodStats
if updateStats {
// 更新所有容器的cpu usage資訊并擷取所有pod的啟動時間、容器狀态、cpu使用量、記憶體使用量等統計資訊
podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage()
} else {
// 擷取所有pod的啟動時間、容器狀态、cpu使用量、記憶體使用量等統計資訊
podStats, err = sp.provider.ListPodStats()
}
if err != nil {
return nil, fmt.Errorf("failed to list pod stats: %v", err)
}
// 擷取pid限制資訊
rlimit, err := sp.provider.RlimitStats()
if err != nil {
return nil, fmt.Errorf("failed to get rlimit stats: %v", err)
}
// 組裝以上的統計資訊并傳回
nodeStats := statsapi.NodeStats{
NodeName: node.Name,
CPU: rootStats.CPU,
Memory: rootStats.Memory,
Network: networkStats,
StartTime: sp.systemBootTime,
Fs: rootFsStats,
Runtime: &statsapi.RuntimeStats{ImageFs: imageFsStats},
Rlimit: rlimit,
SystemContainers: sp.GetSystemContainersStats(nodeConfig, podStats, updateStats),
}
summary := statsapi.Summary{
Node: nodeStats,
Pods: podStats,
}
return &summary, nil
}
2.1.6 byEvictionPriority
該排序方法将記憶體排在所有其他資源信号之前,并将沒有資源可回收的門檻值排在最後;
// pkg/kubelet/eviction/helpers.go
func (a byEvictionPriority) Less(i, j int) bool {
_, jSignalHasResource := signalToResource[a[j].Signal]
return a[i].Signal == evictionapi.SignalMemoryAvailable || a[i].Signal == evictionapi.SignalAllocatableMemoryAvailable || !jSignalHasResource
}
2.1.7 getReclaimableThreshold
getReclaimableThreshold函數周遊
thresholds
,從中擷取第一個可以被回收的
threshold
并傳回;
// pkg/kubelet/eviction/helpers.go
func getReclaimableThreshold(thresholds []evictionapi.Threshold) (evictionapi.Threshold, v1.ResourceName, bool) {
for _, thresholdToReclaim := range thresholds {
if resourceToReclaim, ok := signalToResource[thresholdToReclaim.Signal]; ok {
return thresholdToReclaim, resourceToReclaim, true
}
klog.V(3).Infof("eviction manager: threshold %s was crossed, but reclaim is not implemented for this threshold.", thresholdToReclaim.Signal)
}
return evictionapi.Threshold{}, "", false
}
func init() {
...
signalToResource = map[evictionapi.Signal]v1.ResourceName{}
signalToResource[evictionapi.SignalMemoryAvailable] = v1.ResourceMemory
signalToResource[evictionapi.SignalAllocatableMemoryAvailable] = v1.ResourceMemory
signalToResource[evictionapi.SignalImageFsAvailable] = v1.ResourceEphemeralStorage
signalToResource[evictionapi.SignalImageFsInodesFree] = resourceInodes
signalToResource[evictionapi.SignalNodeFsAvailable] = v1.ResourceEphemeralStorage
signalToResource[evictionapi.SignalNodeFsInodesFree] = resourceInodes
signalToResource[evictionapi.SignalPIDAvailable] = resourcePids
}
2.1.8 m.reclaimNodeLevelResources
m.reclaimNodeLevelResources方法用于提前回收節點資源,并判斷是否需要繼續走驅逐pod的邏輯,方法傳回true則代表回收節點資源已足夠,無需再執行驅逐pod邏輯,傳回false則代表需要繼續執行驅逐pod的邏輯;
方法主要邏輯為:
(1)根據驅逐信号,擷取對應的節點資源回收函數,周遊并調用回收函數來回收資源;
(2)如果回收函數為空,直接return false;
(3)調用
m.summaryProvider.Get
擷取實時的資源統計資訊;
(4)判斷調用回收函數回收節點資源過後,現在的各個資源使用情況是否還是超過配置的各個驅逐門檻值,沒有超過則傳回true,否則傳回false;
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) reclaimNodeLevelResources(signalToReclaim evictionapi.Signal, resourceToReclaim v1.ResourceName) bool {
nodeReclaimFuncs := m.signalToNodeReclaimFuncs[signalToReclaim]
for _, nodeReclaimFunc := range nodeReclaimFuncs {
// attempt to reclaim the pressured resource.
if err := nodeReclaimFunc(); err != nil {
klog.Warningf("eviction manager: unexpected error when attempting to reduce %v pressure: %v", resourceToReclaim, err)
}
}
if len(nodeReclaimFuncs) > 0 {
summary, err := m.summaryProvider.Get(true)
if err != nil {
klog.Errorf("eviction manager: failed to get summary stats after resource reclaim: %v", err)
return false
}
// make observations and get a function to derive pod usage stats relative to those observations.
observations, _ := makeSignalObservations(summary)
debugLogObservations("observations after resource reclaim", observations)
// determine the set of thresholds met independent of grace period
thresholds := thresholdsMet(m.config.Thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds after resource reclaim - ignoring grace period", thresholds, observations)
if len(thresholds) == 0 {
return true
}
}
return false
}
2.1.9 m.evictPod
m.evictPod方法主要邏輯:
(1)調用
kubelettypes.IsCriticalPod
,判斷是否是critical pod,是則傳回false,說明該pod不能是被驅逐的對象;
(2)調用
m.recorder.AnnotatedEventf
,上報驅逐event;
(3)調用
m.killPodFunc
,驅逐pod;
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string) bool {
// If the pod is marked as critical and static, and support for critical pod annotations is enabled,
// do not evict such pods. Static pods are not re-admitted after evictions.
// https://github.com/kubernetes/kubernetes/issues/40573 has more details.
if kubelettypes.IsCriticalPod(pod) {
klog.Errorf("eviction manager: cannot evict a critical pod %s", format.Pod(pod))
return false
}
status := v1.PodStatus{
Phase: v1.PodFailed,
Message: evictMsg,
Reason: Reason,
}
// record that we are evicting the pod
m.recorder.AnnotatedEventf(pod, annotations, v1.EventTypeWarning, Reason, evictMsg)
// this is a blocking call and should only return when the pod and its containers are killed.
err := m.killPodFunc(pod, status, &gracePeriodOverride)
if err != nil {
klog.Errorf("eviction manager: pod %s failed to evict %v", format.Pod(pod), err)
} else {
klog.Infof("eviction manager: pod %s is evicted successfully", format.Pod(pod))
}
return true
}
IsCriticalPod
IsCriticalPod函數判斷一個pod是否是critical pod;
是static pod,是mirror pod,
pod.Spec.Priority
屬性不為空且其值大于等于
2000000000
,三個條件均符合則方法傳回true,否則傳回false;
// pkg/kubelet/types/pod_update.go
func IsCriticalPod(pod *v1.Pod) bool {
if IsStaticPod(pod) {
return true
}
if IsMirrorPod(pod) {
return true
}
if pod.Spec.Priority != nil && IsCriticalPodBasedOnPriority(*pod.Spec.Priority) {
return true
}
return false
}
IsStaticPod
看到IsStaticPod函數,可以知道是否是static pod是根據pod annotation中是否有key:
"kubernetes.io/config.source"
,且其值為
"api"
,滿足條件則為static pod;
// pkg/kubelet/types/pod_update.go
const (
ConfigSourceAnnotationKey = "kubernetes.io/config.source"
ApiserverSource = "api"
)
func IsStaticPod(pod *v1.Pod) bool {
source, err := GetPodSource(pod)
return err == nil && source != ApiserverSource
}
func GetPodSource(pod *v1.Pod) (string, error) {
if pod.Annotations != nil {
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
return source, nil
}
}
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
}
IsCriticalPodBasedOnPriority
// pkg/kubelet/types/pod_update.go
func IsCriticalPodBasedOnPriority(priority int32) bool {
return priority >= scheduling.SystemCriticalPriority
}
// pkg/apis/scheduling/types.go
const (
HighestUserDefinablePriority = int32(1000000000)
SystemCriticalPriority = 2 * HighestUserDefinablePriority
)
2.1.10 m.killPodFunc
m.killPodFunc主要是停止pod中的所有業務容器以及sandbox容器;
前面的分析講過,在
eviction_manager
初始化的時候,
m.killPodFunc
被指派為
pkg/kubelet/pod_workers.go-killPodNow()
函數,是以接下來直接看到
killPodNow
函數的分析;
killPodNow函數主要邏輯:擷取
gracePeriod
,拼湊
UpdatePodOptions
,并調用
podWorkers.UpdatePod
來kill Pod(這裡的kill pod最終隻是停止了pod中的所有業務容器以及sandbox容器,沒有做任何删除操作);
// pkg/kubelet/pod_workers.go
func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
return func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
// determine the grace period to use when killing the pod
gracePeriod := int64(0)
if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
} else if pod.Spec.TerminationGracePeriodSeconds != nil {
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
// we timeout and return an error if we don't get a callback within a reasonable time.
// the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
timeout := int64(gracePeriod + (gracePeriod / 2))
minTimeout := int64(10)
if timeout < minTimeout {
timeout = minTimeout
}
timeoutDuration := time.Duration(timeout) * time.Second
// open a channel we block against until we get a result
type response struct {
err error
}
ch := make(chan response, 1)
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
OnCompleteFunc: func(err error) {
ch <- response{err: err}
},
KillPodOptions: &KillPodOptions{
PodStatusFunc: func(p *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
return status
},
PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
},
})
// wait for either a response, or a timeout
select {
case r := <-ch:
return r.err
case <-time.After(timeoutDuration):
recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
return fmt.Errorf("timeout waiting to kill pod")
}
}
}
podWorkers.UpdatePod方法這裡不展開分析,給出方法調用鍊,可自行檢視;
podWorkers.UpdatePod() --> p.managePodLoop() --> kl.syncPod() --> kl.killPod() --> kl.containerRuntime.KillPod() --> kl.containerRuntime.killContainersWithSyncResult()/kl.containerRuntime.runtimeService.StopPodSandbox()
2.2 m.waitForPodsCleanup
m.waitForPodsCleanup方法會循環調用
podCleanedUpFunc
,等待pod的相關資源被清理、回收(pod的所有業務容器停止并被删除、volume被清理),清理完成後return;
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) waitForPodsCleanup(podCleanedUpFunc PodCleanedUpFunc, pods []*v1.Pod) {
timeout := m.clock.NewTimer(podCleanupTimeout)
defer timeout.Stop()
ticker := m.clock.NewTicker(podCleanupPollFreq)
defer ticker.Stop()
for {
select {
case <-timeout.C():
klog.Warningf("eviction manager: timed out waiting for pods %s to be cleaned up", format.Pods(pods))
return
case <-ticker.C():
for i, pod := range pods {
if !podCleanedUpFunc(pod) {
break
}
if i == len(pods)-1 {
klog.Infof("eviction manager: pods %s successfully cleaned up", format.Pods(pods))
return
}
}
}
}
}
podCleanedUpFunc
podCleanedUpFunc實際上是
podResourcesAreReclaimed
方法,
podResourcesAreReclaimed
方法調用了
kl.PodResourcesAreReclaimed
方法做進一步處理;
// pkg/kubelet/kubelet_pods.go
func (kl *Kubelet) podResourcesAreReclaimed(pod *v1.Pod) bool {
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
status = pod.Status
}
return kl.PodResourcesAreReclaimed(pod, status)
}
從
PodResourcesAreReclaimed
方法中可以看出,會等待pod的所有業務容器停止運作并被删除,等待pod的volume被清理完成,等待pod的cgroup sandbox被清理完成;
// pkg/kubelet/kubelet_pods.go
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
if !notRunning(status.ContainerStatuses) {
// We shouldn't delete pods that still have running containers
klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return false
}
// pod's containers should be deleted
runtimeStatus, err := kl.podCache.Get(pod.UID)
if err != nil {
klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
var statusStr string
for _, status := range runtimeStatus.ContainerStatuses {
statusStr += fmt.Sprintf("%+v ", *status)
}
klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
return false
}
if kl.kubeletConfiguration.CgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
if pcm.Exists(pod) {
klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod))
return false
}
}
return true
}
總結
kubelet節點壓力驅逐中包括了兩部分,一個是實時驅逐,一個是輪詢驅逐;
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SMwQTMzQDO50SN2QzNxkzMwEzMwkDMyIDMy0yNwAzM5MjMvwVOwIjMwIzLcdDMwMTOzIzLcd2bsJ2Lc12bj5ycn9Gbi52YuIjMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
klog.Infof(message)
m.synchronize(diskInfoProvider, podFunc)
}
// 啟動實時驅逐
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
if err != nil {
klog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// 啟動輪詢驅逐
// start the eviction manager monitoring
go func() {
for {
if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
klog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
}
}()
}
這裡主要對輪詢驅逐做一下分析總結,
m.synchronize
方法為驅逐核心邏輯所在;
m.synchronize
方法驅逐邏輯概要總結:
(1)根據kubelet啟動參數配置,擷取驅逐政策配置;
(2)初始化軟驅逐、硬驅逐中各個驅逐信号的pod排序函數;
(3)擷取會被驅逐的pod清單-activePods;
(4)從cAdvisor、CRIRuntimes擷取各種統計資訊,如節點上各個資源的總量以及使用量情況、容器的資源聲明及使用量情況等;
(5)比對驅逐政策配置以及上述的各種資源統計資訊,篩選出會觸發驅逐的驅逐信号;
(6)将上面篩選出來的驅逐信号做排序,将記憶體驅逐信号排在所有其他信号之前,并将沒有資源可回收的驅逐信号排在最後,并從排序後的結果中取出第一個驅逐信号;
(7)調用
m.reclaimNodeLevelResources
,回收上面擷取到的驅逐信号的節點級資源,如果回收的資源足夠,則直接return,不需要往下執行驅逐pod的邏輯;
(8)擷取上述取出的驅逐信号對應的pod排序函數,給pod清單進行排序;
(9)周遊排序後的pod清單,調用
m.evictPod
,判斷pod能否被驅逐,能則開始驅逐pod;
驅逐邏輯三個注意點:
(1)每次調用
m.synchronize
方法,即每次的驅逐邏輯,最多隻驅逐一個pod;
(2)如果調用
m.synchronize
方法沒有驅逐pod,則會等待10s後再進行下一次的
m.synchronize
方法輪詢調用,也就是說輪詢驅逐會有一定的時延;
(3)驅逐pod,隻是将
pod.status.phase
值更新為
Failed
,并附上驅逐reason:
Evicted
以及觸發驅逐的詳細資訊,不會删除pod;而
pod.status.phase
值被更新為
Failed
後,replicaset controller會再次建立出新的pod調用到其他節點上,達到驅逐pod的效果;