天天看點

kubelet 源碼分析:statusManager 和 probeManager

kubelet 源碼分析:statusManager 和 probeManager

在 kubelet 初始化的時候,會建立

statusManager

probeManager

,兩者都和 pod 的狀态有關系,是以我們放到一起來講解。

statusManager

負責維護狀态資訊,并把 pod 狀态更新到 apiserver,但是它并不負責監控 pod 狀态的變化,而是提供對應的接口供其他元件調用,比如

probeManager

probeManager

會定時去監控 pod 中容器的健康狀況,一旦發現狀态發生變化,就調用

statusManager

提供的方法更新 pod 的狀态。

klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
        klet.statusManager,
        klet.livenessManager,
        klet.runner,
        containerRefManager,
        kubeDeps.Recorder)
           

StatusManager

statusManager 對應的代碼在 pkg/kubelet/status/status_manager.go 檔案中,

type PodStatusProvider interface {
    GetPodStatus(uid types.UID) (api.PodStatus, bool)
}

type Manager interface {
    PodStatusProvider

    Start()

    SetPodStatus(pod *api.Pod, status api.PodStatus)
    SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
    TerminatePod(pod *api.Pod)
    RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
           

這個接口的方法可以分成三組:擷取某個 pod 的狀态、背景運作 goroutine 執行同步工作、修改 pod 的狀态。修改狀态的方法有多個,每個都有不同的用途:

SetPodStatus:如果 pod 的狀态發生了變化,會調用這個方法,把新狀态更新到 apiserver,一般在 kubelet 維護 pod 生命周期的時候會調用
SetContainerReadiness:如果健康檢查發現 pod 中容器的健康狀态發生變化,會調用這個方法,修改 pod 的健康狀态
TerminatePod:kubelet 在删除 pod 的時候,會調用這個方法,把 pod 中所有的容器設定為 terminated 狀态
RemoveOrphanedStatuses:删除孤兒 pod,直接把對應的狀态資料從緩存中删除即可
           

Start() 方法是在 kubelet 運作的時候調用的,它會啟動一個 goroutine 執行更新操作:

const syncPeriod =  * time.Second

func (m *manager) Start() {
    ......
    glog.Info("Starting to sync pod status with apiserver")
    syncTicker := time.Tick(syncPeriod)
    // syncPod and syncBatch share the same go routine to avoid sync races.
    go wait.Forever(func() {
        select {
        case syncRequest := <-m.podStatusChannel:
            m.syncPod(syncRequest.podUID, syncRequest.status)
        case <-syncTicker:
            m.syncBatch()
        }
    }, )
}
           

這個 goroutine 就能不斷地從兩個 channel 監聽資料進行處理:syncTicker 是個定時器,也就是說它會定時保證 apiserver 和自己緩存的最新 pod 狀态保持一緻;podStatusChannel 是所有 pod 狀态更新發送到的地方,調用方不會直接操作這個 channel,而是通過調用上面提到的修改狀态的各種方法,這些方法内部會往這個 channel 寫資料。

m.syncPod 根據參數中的 pod 和它的狀态資訊對 apiserver 中的資料進行更新,如果發現 pod 已經被删除也會把它從内部資料結構中删除。

ProbeManager

probeManager 檢測 pod 中容器的健康狀态,目前有兩種 probe:readiness 和 liveness。readinessProbe 檢測容器是否可以接受請求,如果檢測結果失敗,則将其從 service 的 endpoints 中移除,後續的請求也就不會發送給這個容器;livenessProbe 檢測容器是否存活,如果檢測結果失敗,kubelet 會殺死這個容器,并重新開機一個新的(除非 RestartPolicy 設定成了 Never)。

并不是所有的 pod 中的容器都有健康檢查的探針,如果沒有,則不對容器進行檢測,預設認為容器是正常的。在每次建立新 pod 的時候,kubelet 都會調用 probeManager.AddPod(pod) 方法,它對應的實作在 pkg/kubelet/prober/prober_manager.go 檔案中:

func (m *manager) AddPod(pod *api.Pod) {
    m.workerLock.Lock()
    defer m.workerLock.Unlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
        key.containerName = c.Name

        if c.ReadinessProbe != nil {
            key.probeType = readiness
            if _, ok := m.workers[key]; ok {
                glog.Errorf("Readiness probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, readiness, pod, c)
            m.workers[key] = w
            go w.run()
        }

        if c.LivenessProbe != nil {
            key.probeType = liveness
            if _, ok := m.workers[key]; ok {
                glog.Errorf("Liveness probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, liveness, pod, c)
            m.workers[key] = w
            go w.run()
        }
    }
}
           

周遊 pod 中的容器,如果其定義了 readiness 或者 liveness,就建立一個 worker,并啟動一個 goroutine 在背景運作這個 worker。

pkg/kubelet/prober/worker.go:

func (w *worker) run() {
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
    probeTicker := time.NewTicker(probeTickerPeriod)

    defer func() {
        probeTicker.Stop()
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }

        w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
    }()

    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

probeLoop:
    for w.doProbe() {
        // Wait for next probe tick.
        select {
        case <-w.stopCh:
            break probeLoop
        case <-probeTicker.C:
            // continue
        }
    }
}

func (w *worker) doProbe() (keepGoing bool) {
    defer func() { recover() }() 
    defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

    // pod 沒有被建立,或者已經被删除了,直接跳過檢測,但是會繼續檢測
    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
    if !ok {
        glog.V().Infof("No status for pod: %v", format.Pod(w.pod))
        return true
    }

    // pod 已經退出(不管是成功還是失敗),直接傳回,并終止 worker
    if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
        glog.V().Infof("Pod %v %v, exiting probe worker",
            format.Pod(w.pod), status.Phase)
        return false
    }

    // 容器沒有建立,或者已經删除了,直接傳回,并繼續檢測,等待更多的資訊
    c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) ==  {
        glog.V().Infof("Probe target container not found: %v - %v",
            format.Pod(w.pod), w.container.Name)
        return true 
    }

    // pod 更新了容器,使用最新的容器資訊
    if w.containerID.String() != c.ContainerID {
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }
        w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
        w.onHold = false
    }

    if w.onHold {
        return true
    }

    if c.State.Running == nil {
        glog.V().Infof("Non-running container probed: %v - %v",
            format.Pod(w.pod), w.container.Name)
        if !w.containerID.IsEmpty() {
            w.resultsManager.Set(w.containerID, results.Failure, w.pod)
        }
        // 容器失敗退出,并且不會再重新開機,終止 worker
        return c.State.Terminated == nil ||
            w.pod.Spec.RestartPolicy != api.RestartPolicyNever
    }

    // 容器啟動時間太短,沒有超過配置的初始化等待時間 InitialDelaySeconds
    if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }

    // 調用 prober 進行檢測容器的狀态
    result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
    if err != nil {
        return true
    }

    if w.lastResult == result {
        w.resultRun++
    } else {
        w.lastResult = result
        w.resultRun = 
    }

    // 如果容器退出,并且沒有超過最大的失敗次數,則繼續檢測
    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
        (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
        return true
    }

    // 儲存最新的檢測結果
    w.resultsManager.Set(w.containerID, result, w.pod)

    if w.probeType == liveness && result == results.Failure {
        // 容器 liveness 檢測失敗,需要删除容器并重新建立,在新容器成功建立出來之前,暫停檢測
        w.onHold = true
    }

    return true
}
           

每次檢測的時候都會用 w.resultsManager.Set(w.containerID, result, w.pod) 來儲存檢測結果,resultsManager 的代碼在 pkg/kubelet/prober/results/results_manager.go:

func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
    if m.setInternal(id, result) {
        m.updates <- Update{id, result, pod.UID}
    }
}

func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
    m.Lock()
    defer m.Unlock()
    prev, exists := m.cache[id]
    if !exists || prev != result {
        m.cache[id] = result
        return true
    }
    return false
}

func (m *manager) Updates() <-chan Update {
    return m.updates
}
它把結果儲存在緩存中,并發送到 m.updates 管道。對于 liveness 來說,它的管道消費者是 kubelet,還記得 syncLoopIteration 中的這段代碼邏輯嗎?

case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
            // The liveness manager detected a failure; sync the pod.
            pod, ok := kl.podManager.GetPodByUID(update.PodUID)
            if !ok {
                // If the pod no longer exists, ignore the update.
                glog.V().Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
                break
            }
            glog.V().Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
            handler.HandlePodSyncs([]*api.Pod{pod})
        }
因為 liveness 關系者 pod 的生死,是以需要 kubelet 的處理邏輯。而 readiness 即使失敗也不會重新建立 pod,它的處理邏輯是不同的,它的處理代碼同樣在 pkg/kubelet/prober/prober_manager.go:

func (m *manager) Start() {
    go wait.Forever(m.updateReadiness, )
}

func (m *manager) updateReadiness() {
    update := <-m.readinessManager.Updates()

    ready := update.Result == results.Success
    m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
           

proberManager 啟動的時候,會運作一個 goroutine 定時讀取 readinessManager 管道中的資料,并根據資料調用 statusManager 去更新 apiserver 中 pod 的狀态資訊。負責 Service 邏輯的元件擷取到了這個狀态,就能根據不同的值來決定是否需要更新 endpoints 的内容,也就是 service 的請求是否發送到這個 pod。

具體執行檢測的代碼在 pkg/kubelet/prober/prober.go 檔案中,它會根據不同的 prober 方法(exec、HTTP、TCP)調用對應的處理邏輯,而這些具體的邏輯代碼是在 pkg/probe/ 檔案夾中,三種方法的實作都不複雜,就不再詳細解釋了。

原文位址

http://cizixs.com/2017/06/12/kubelet-source-code-analysis-part4-status-manager

繼續閱讀