天天看点

kubernetes 源码解析,kubelet pod 变更事件处理逻辑整理

代码划分

由于

kubelet

代码还是比较复杂的,下面是我整理的自己对

kubelet

的代码功能划分,可以先通过下面的描述在脑海里构造一个大致的框架,这样看到具体代码逻辑大致能知道属于哪一部分负责什么逻辑,理解会轻松一些。

根据整理代码,目前我将

kubelet

POD

相关的代码划分为3大部分

  1. 事件源 - POD变更事件产生
  2. 抽象层 - 一些中间逻辑抽象,以及对很多特性的支持
  3. 执行层 - 负责POD真正的创建,修改,删除

关键路径

下面这张图列出来了

POD

变更事件从产生到被执行的一整个调用链,可以帮助看具体代码那段时对上下文有所了解,而不是一片混乱连不起来。

需要注意一下:

  1. 下面这张图不是完整的调用层级,对于不太重要的层级进行了选择性省略。
  2. 同时下面这张图也只是列出了关键路径的调用,很多逻辑没有列出来。

具体代码

1. 事件源

任何一个事情都会有一个起点,

kubelet

的工作是负责执行

kubernetes

管理

POD

的一系列如创建,修改,删除的操作指令,自然

kubelet

的工作起点是

POD变更事件的产生

,相关的代码有。

入口-初始化

// pkg/kubelet/kubelet.go:331
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...) (*Kubelet, error) {
	...

	if kubeDeps.PodConfig == nil {
		var err error
		// 创建变更事件管理对象
		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
		if err != nil {
			return nil, err
		}
	}
	
	...
}
           

创建对象-变更事件管理对象

// pkg/kubelet/config/config.go:57
type PodConfig interface {
	// 生成一个 chan,变更事件将会发送到这个 chan
	Channel(source string) chan<- interface{}
	// 检查指定源类型是否存在
	SeenAllSources(seenSources sets.String) bool
	// 获取变更接收 chan
	Updates() <-chan kubetypes.PodUpdate
	// 强制所有 pod 触发一个同步事件
	Sync()
}

// pkg/kubelet/kubelet.go:247
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ...) (config.PodConfig, error) {
	// 管理所有的pod变更事件源, cfg 类型为 PodConfig 
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
	// 注册本地静态POD(本地文件)的变更事件源
	config.NewSourceFile(..., cfg.Channel(kubetypes.FileSource))
	// 注册本地静态POD(远程URL)的变更事件源
	config.NewSourceURL(..., cfg.Channel(kubetypes.HTTPSource))
	// 接收 APIServer 下发下来的 Pod 变更事件 - 重点,下面会详细说明这个方法
	config.NewSourceApiserver(..., cfg.Channel(kubetypes.ApiserverSource))

	...
}
           

获取 APIServer 下发下来的 Pod 变更事件 -

config.NewSourceApiserver

// pkg/kubelet/config/apiserver.go:32
func NewSourceApiserver(..., updates chan<- interface{}) {
	...
	// 核心逻辑,看下面的方法
	newSourceApiserverFromLW(..., updates)
}

// pkg/kubelet/config/apiserver.go:38
func newSourceApiserverFromLW(..., updates chan<- interface{}) {
	send := func(objs []interface{}) {
		...
		// 将变更事件发送到 PodConfig 创建的 chan 里
		updates <- kubetypes.PodUpdate{...}
	}
	r := cache.NewReflector(..., send) // 为了方便理解,代码有所简化
	
	// 核心逻辑,看下面的方法 - 这里用的 go,专门启动的协程监听变更事件
	go r.Run(wait.NeverStop)
}

// vendor/k8s.io/client-go/tools/cache/reflector.go:218
func (r *Reflector) Run(stopCh <-chan struct{}) {
 	// 为了方便理解,代码有所简化
 	for {
		// 调用接口的方法,具体见下面
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
 	}
}

// vendor/k8s.io/client-go/tools/cache/reflector.go:254
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	// 此处代码逻辑比较复杂,主要是调用 APIServer 的接口获取是否有变更事件,有兴趣的可以下去自己看。
	...
	
 	// 检测到有新变更,调用最开始的 send() 通知有变更
 	// 为了方便理解,代码有所简化 - 此处简化比较严重,和真实代码及其不符,但不影响理解功能。
	r.send(obj)
}
           

2. 抽象层

抽象层包含了一些中间逻辑抽象,代码的整体架构。

入口-启动

// pkg/kubelet/kubelet.go:1374
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	...
	
	// 死循环不会退出,具体见下面的方法
	kl.syncLoop(updates, kl)
}
           

主循环 -

Kubelet::syncLoop

// pkg/kubelet/kubelet.go:1805
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	...
	// 为了方便理解,代码有所简化
	for {
		// 核心逻辑,具体见下面的方法
		if !kl.syncLoopIteration(updates, ...) {
			break
		}	
	}
	...
}

// pkg/kubelet/kubelet.go:1879
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, ...) bool {
	...
	select {
	case u, open := <-configCh:
		// 根据不同的事件类型调用不同的处理方法处理 - 为了方便理解,代码有所简化
		switch u.Op {
		case kubetypes.ADD:
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.Errorf("Kubelet does not support snapshot update")
		default:
			klog.Errorf("Invalid event type received: %d.", u.Op)
		}
	...
}
           

事件处理函数 -

Kubelet::HandlePod

****

// pkg/kubelet/kubelet.go:2030
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	...
	for _, pod := range pods {
		...
		// 调用 kl.dispatchWork 将事件分派出去
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}

// pkg/kubelet/kubelet.go:2068
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
	...
	for _, pod := range pods {
		...
		// 调用 kl.dispatchWork 将事件分派出去
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}

// pkg/kubelet/kubelet.go:2103
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
	...
	for _, pod := range pods {
		...
		// 调用 kl.dispatchWork 将事件分派出去
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}

// pkg/kubelet/kubelet.go:2083
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
	...
	for _, pod := range pods {
		...
		// HandlePodRemoves 比较特殊,这里直接执行删除逻辑了
		kl.deletePod(pod)
	}
}

           

派发事件-

Kubelet::dispatchWork

// pkg/kubelet/kubelet.go:1986
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	...
	// 这里的核心逻辑其实是调 podWorkers 对象的 UpdatePod 方法,具体内容见下面
	kl.podWorkers.UpdatePod(&UpdatePodOptions{...})
	...
}
           

派发事件-

Kubelet::dispatchWork

// pkg/kubelet/pod_workers.go:200
// 这个方法实质上是给每个pod启动了个线程来处理事件,并且在有多个事件到达处理不过来时根据条件丢弃后面的事件。
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
	var podUpdates chan UpdatePodOptions
	...
	if podUpdates, exists = p.podUpdates[uid]; !exists {
		podUpdates = make(chan UpdatePodOptions, 1)
		p.podUpdates[uid] = podUpdates

		go func() {
			// 重要,真正执行工作的线程 - 见下面的方法
			p.managePodLoop(podUpdates)
		}()
	}
	if !p.isWorking[pod.UID] {
		p.isWorking[pod.UID] = true
		// 将事件发送给工作线程 - p.managePodLoop
		podUpdates <- *options
	} else {
		// 如果工作线程在忙,就放到其他位置
		// 这里的逻辑是将多出来的事件临时放到一个map里,
		// 但是如果同样的pod已经有事件放进来了并且事件不是 kubetypes.SyncPodKill 事件,就会覆盖原来的事件(此处会导致事件丢失)
		...
	}
}


// pkg/kubelet/pod_workers.go:157
// 真实的工作方法
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	for update := range podUpdates {
		// podCache 里面存放的是运行时(docker|containerd)这边的pod状态
		status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
		
		// 为了方便理解,代码有所简化 - 实际上这里是回调,而不是直接调用 kubelet 的方法,到此就正式进入真正执行阶段
		// 由上面的代码可知 syncPodOptions.podStatus 存储的是运行时这边的pod状态,需要注意一下,执行层会用
		err = kubelet.syncPod(syncPodOptions{podStatus: status,...})
		
		p.isWorking[update.Pod.UID] = false
	}
}
           

3. 执行层

执行层是

POD

新增,删除,修改逻辑的真正执行所在。

POD处理逻辑-

Kubelet::syncPod

// pkg/kubelet/kubelet.go:1455
func (kl *Kubelet) syncPod(o syncPodOptions) error {
	pod := o.pod
	mirrorPod := o.mirrorPod
	podStatus := o.podStatus
	updateType := o.updateType
	
	...
	
	// 上面说了 o.podStatus 是运行时那边的pod状态,这边就是把 o.podStatus 中运行时的pod状态转换成APIServer可理解的格式
	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
	
	...

	// 将真实的pod状态设置给 statusManager,statusManager 内部会在判断状态变了的时候上报给APIServer,这样用户就能看到pod的实时状态了。
	kl.statusManager.SetPodStatus(pod, apiPodStatus)
	
	// POD 的删除逻辑,pod.DeletionTimestamp 不为空的时候代表删除 POD
	if ... || pod.DeletionTimestamp != nil || ... {
		// 调用 killPod 方法停止 POD
		err := kl.killPod(pod, nil, podStatus, nil)
		
		return err
	}
	...
	
	// 判断 POD 是不是处于停止状态(success,failed),被删除并且容器停止成功后也认为是停止状态
	if !kl.podIsTerminated(pod) {
		// 如果是需要是运行时状态的 POD,首先等待 volume 都挂载完成
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
			return err
		}
	}
	
	// POD 的创建,修改逻辑
	// 获取拉取镜像用的 secret
	// 调用运行时创建,更新,删除容器
	pullSecrets := kl.getPullSecretsForPod(pod)
	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
	
	...
}
           

APIServer交互-

statusManager::updateStatusInternal

// pkg/kubelet/status/status_manager.go:384
func (m *manager) updateStatusInternal(...) bool {
	...
	
	select {
	// m.podStatusChannel 的消费端看下面的 (m *manager) Start() 方法
	case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
		return true
	default:
		// 这里外面实际上没判断返回值,所以队列满了的话会导致事件丢失
		return false
	}
}

// pkg/kubelet/status/status_manager.go:150
func (m *manager) Start() {
	...
	
	go wait.Forever(func() {
		for {
			select {
			case syncRequest := <-m.podStatusChannel:
				// m.syncPod() 核心逻辑,具体看下面的方法
				m.syncPod(syncRequest.podUID, syncRequest.status)
			case <-syncTicker:
				// 这里是个计时器,写死的每10秒执行一次,里面的逻辑就是对所有的状态检查一遍,把需要触发m.syncPod() 方法的pod都触发一遍。所以主要逻辑还是在 m.syncPod() 里
				m.syncBatch()
			}
		}
	}, 0)
	
	...
}
           

APIServer交互-

statusManager::syncPod

// pkg/kubelet/status/status_manager.go:540
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
	// 不需要变更的直接返回
	if !m.needsUpdate(uid, status) {
		return
	}
	
	...

	// 需要变更的话,首先会调用这个,这里会调用 APIServer 的 Patch 方法修改 POD 的状态
	...,err := statusutil.PatchPodStatus(pod..., *oldStatus, mergePodStatus(*oldStatus, status.status))

	// 判断是否被删除,以及是否可以被删除
	// 可以被删除的状态是所有的容器都停止并且所有的volume都已经被卸载。
	if m.canBeDeleted(pod, status.status) {
		// 调用 APIServer 的删除接口删除POD
		err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
		// 删除本地保存的POD状态
		m.deletePodStatus(uid)
	}	
}
           

继续阅读