kubelet 源碼分析:statusManager 和 probeManager
在 kubelet 初始化的時候,會建立
statusManager
和
probeManager
,兩者都和 pod 的狀态有關系,是以我們放到一起來講解。
statusManager
負責維護狀态資訊,并把 pod 狀态更新到 apiserver,但是它并不負責監控 pod 狀态的變化,而是提供對應的接口供其他元件調用,比如
probeManager
。
probeManager
會定時去監控 pod 中容器的健康狀況,一旦發現狀态發生變化,就調用
statusManager
提供的方法更新 pod 的狀态。
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)
StatusManager
statusManager 對應的代碼在 pkg/kubelet/status/status_manager.go 檔案中,
type PodStatusProvider interface {
GetPodStatus(uid types.UID) (api.PodStatus, bool)
}
type Manager interface {
PodStatusProvider
Start()
SetPodStatus(pod *api.Pod, status api.PodStatus)
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
TerminatePod(pod *api.Pod)
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
這個接口的方法可以分成三組:擷取某個 pod 的狀态、背景運作 goroutine 執行同步工作、修改 pod 的狀态。修改狀态的方法有多個,每個都有不同的用途:
SetPodStatus:如果 pod 的狀态發生了變化,會調用這個方法,把新狀态更新到 apiserver,一般在 kubelet 維護 pod 生命周期的時候會調用
SetContainerReadiness:如果健康檢查發現 pod 中容器的健康狀态發生變化,會調用這個方法,修改 pod 的健康狀态
TerminatePod:kubelet 在删除 pod 的時候,會調用這個方法,把 pod 中所有的容器設定為 terminated 狀态
RemoveOrphanedStatuses:删除孤兒 pod,直接把對應的狀态資料從緩存中删除即可
Start() 方法是在 kubelet 運作的時候調用的,它會啟動一個 goroutine 執行更新操作:
const syncPeriod = * time.Second
func (m *manager) Start() {
......
glog.Info("Starting to sync pod status with apiserver")
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, )
}
這個 goroutine 就能不斷地從兩個 channel 監聽資料進行處理:syncTicker 是個定時器,也就是說它會定時保證 apiserver 和自己緩存的最新 pod 狀态保持一緻;podStatusChannel 是所有 pod 狀态更新發送到的地方,調用方不會直接操作這個 channel,而是通過調用上面提到的修改狀态的各種方法,這些方法内部會往這個 channel 寫資料。
m.syncPod 根據參數中的 pod 和它的狀态資訊對 apiserver 中的資料進行更新,如果發現 pod 已經被删除也會把它從内部資料結構中删除。
ProbeManager
probeManager 檢測 pod 中容器的健康狀态,目前有兩種 probe:readiness 和 liveness。readinessProbe 檢測容器是否可以接受請求,如果檢測結果失敗,則将其從 service 的 endpoints 中移除,後續的請求也就不會發送給這個容器;livenessProbe 檢測容器是否存活,如果檢測結果失敗,kubelet 會殺死這個容器,并重新開機一個新的(除非 RestartPolicy 設定成了 Never)。
并不是所有的 pod 中的容器都有健康檢查的探針,如果沒有,則不對容器進行檢測,預設認為容器是正常的。在每次建立新 pod 的時候,kubelet 都會調用 probeManager.AddPod(pod) 方法,它對應的實作在 pkg/kubelet/prober/prober_manager.go 檔案中:
func (m *manager) AddPod(pod *api.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
glog.Errorf("Liveness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
周遊 pod 中的容器,如果其定義了 readiness 或者 liveness,就建立一個 worker,并啟動一個 goroutine 在背景運作這個 worker。
pkg/kubelet/prober/worker.go:
func (w *worker) run() {
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
probeTicker := time.NewTicker(probeTickerPeriod)
defer func() {
probeTicker.Stop()
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
}()
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
probeLoop:
for w.doProbe() {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
// continue
}
}
}
func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }()
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
// pod 沒有被建立,或者已經被删除了,直接跳過檢測,但是會繼續檢測
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok {
glog.V().Infof("No status for pod: %v", format.Pod(w.pod))
return true
}
// pod 已經退出(不管是成功還是失敗),直接傳回,并終止 worker
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
glog.V().Infof("Pod %v %v, exiting probe worker",
format.Pod(w.pod), status.Phase)
return false
}
// 容器沒有建立,或者已經删除了,直接傳回,并繼續檢測,等待更多的資訊
c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok || len(c.ContainerID) == {
glog.V().Infof("Probe target container not found: %v - %v",
format.Pod(w.pod), w.container.Name)
return true
}
// pod 更新了容器,使用最新的容器資訊
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
w.onHold = false
}
if w.onHold {
return true
}
if c.State.Running == nil {
glog.V().Infof("Non-running container probed: %v - %v",
format.Pod(w.pod), w.container.Name)
if !w.containerID.IsEmpty() {
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
}
// 容器失敗退出,并且不會再重新開機,終止 worker
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
}
// 容器啟動時間太短,沒有超過配置的初始化等待時間 InitialDelaySeconds
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
return true
}
// 調用 prober 進行檢測容器的狀态
result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
if err != nil {
return true
}
if w.lastResult == result {
w.resultRun++
} else {
w.lastResult = result
w.resultRun =
}
// 如果容器退出,并且沒有超過最大的失敗次數,則繼續檢測
if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
return true
}
// 儲存最新的檢測結果
w.resultsManager.Set(w.containerID, result, w.pod)
if w.probeType == liveness && result == results.Failure {
// 容器 liveness 檢測失敗,需要删除容器并重新建立,在新容器成功建立出來之前,暫停檢測
w.onHold = true
}
return true
}
每次檢測的時候都會用 w.resultsManager.Set(w.containerID, result, w.pod) 來儲存檢測結果,resultsManager 的代碼在 pkg/kubelet/prober/results/results_manager.go:
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
if m.setInternal(id, result) {
m.updates <- Update{id, result, pod.UID}
}
}
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
m.Lock()
defer m.Unlock()
prev, exists := m.cache[id]
if !exists || prev != result {
m.cache[id] = result
return true
}
return false
}
func (m *manager) Updates() <-chan Update {
return m.updates
}
它把結果儲存在緩存中,并發送到 m.updates 管道。對于 liveness 來說,它的管道消費者是 kubelet,還記得 syncLoopIteration 中的這段代碼邏輯嗎?
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V().Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V().Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*api.Pod{pod})
}
因為 liveness 關系者 pod 的生死,是以需要 kubelet 的處理邏輯。而 readiness 即使失敗也不會重新建立 pod,它的處理邏輯是不同的,它的處理代碼同樣在 pkg/kubelet/prober/prober_manager.go:
func (m *manager) Start() {
go wait.Forever(m.updateReadiness, )
}
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
proberManager 啟動的時候,會運作一個 goroutine 定時讀取 readinessManager 管道中的資料,并根據資料調用 statusManager 去更新 apiserver 中 pod 的狀态資訊。負責 Service 邏輯的元件擷取到了這個狀态,就能根據不同的值來決定是否需要更新 endpoints 的内容,也就是 service 的請求是否發送到這個 pod。
具體執行檢測的代碼在 pkg/kubelet/prober/prober.go 檔案中,它會根據不同的 prober 方法(exec、HTTP、TCP)調用對應的處理邏輯,而這些具體的邏輯代碼是在 pkg/probe/ 檔案夾中,三種方法的實作都不複雜,就不再詳細解釋了。
原文位址
http://cizixs.com/2017/06/12/kubelet-source-code-analysis-part4-status-manager