天天看点

Kubernetes 控制器管理器的工作原理

在 Kubernetes Master 节点中,有三个重要的组件:ApiServer、ControllerManager 和 Scheduler,它们共同负责整个集群的管理。在本文中,我们尝试梳理一下ControllerManager的工作流程和原理。

Kubernetes 控制器管理器的工作原理

什么是控制器管理器

根据官方文档:kube-controller-manager 运行控制器,这是处理集群中常规任务的后台线程。

例如,当通过 Deployment 创建的 Pod 异常退出时,RS Controller 会接受并处理退出并创建新的 Pod 以保持预期的副本数。

几乎每个特定的资源都由特定的 Controller 管理以维持预期的状态,而 Controller Manager 的职责是聚合所有 Controller:

  1. 提供基础设施以降低控制器实现的复杂性
  2. 启动和维护控制器的正常运行时间

这样,Controller 保证集群中的资源保持在预期状态,Controller Manager 确保 Controller 保持在预期状态。

控制器工作流程

在我们解释 Controller Manager 如何为 Controller 提供基础架构和运行时环境之前,让我们先了解一下 Controller 工作流程是什么样的。

从高维的角度来看,ControllerManager主要提供了分发事件的能力,而不同的Controller只需要注册相应的Handler即可等待接收和处理事件。

Kubernetes 控制器管理器的工作原理

以Deployment Controller为例,其中的​

​NewDeploymentController​

​​方法​

​pkg/controller/deployment/deployment_controller.go​

​包括Event Handler的注册,对于Deployment Controller,只需要根据不同的事件实现不同的处理逻辑,就可以实现对相应资源的管理。

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    dc.addDeployment,UpdateFunc: dc.updateDeployment,// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    dc.addReplicaSet,UpdateFunc: dc.updateReplicaSet,DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: dc.deletePod,
})      

可以看到,在ControllerManager的帮助下,Controller的逻辑可以很纯粹的通过实现对应的EventHandler来完成,那么ControllerManager具体做了哪些工作呢?

控制器管理器架构

帮助 Controller Manager 进行事件分发的关键模块是 client-go,而更关键的模块之一是 informer。

kubernetes在github上提供了client-go的架构图,从中可以看出Controller是描述的下半部分(CustomController),而Controller Manager主要是完成的上半部分。

Kubernetes 控制器管理器的工作原理

Informer Factory

从上图中可以看出,Informer 是一个非常关键的“桥梁”,所以对 Informer 的管理是 Controller Manager 做的第一件事。

由于每个 Informer 都与 Api Server 保持一个 watch long 连接,因此这个单实例工厂通过为所有 Controller 提供一个唯一的入口点来获取 Informer,从而确保每种类型的 Informer 仅实例化一次。

这个单例工厂的初始化逻辑。

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
  factory := &sharedInformerFactory{
    client:           client,namespace:        v1.NamespaceAll,
    defaultResync:    defaultResync,
    informers:        make(map[reflect.Type]cache.SharedIndexInformer),
    startedInformers: make(map[reflect.Type]bool),
    customResync:     make(map[reflect.Type]time.Duration),}

  // Apply all optionsfor _, opt := range options {
    factory = opt(factory)}

  return factory
}      

从上面的初始化逻辑可以看出,其中最重要的部分​

​sharedInformerFactory​

​​是名为 的map ​

​informers​

​,其中key是资源类型,value是关心该资源类型的Informer。每种类型的 Informer 只会被实例化一次并存储在map中。不同的 Controller 只有在需要相同的资源时才会得到相同的 Informer 实例。

对于Controller Manager来说,保持所有Informer正常工作是所有Controller正常工作的基本条件。通过这个​

​sharedInformerFactory​

​​map维护所有informer实例,所以​

​sharedInformerFactory​

​也负责提供一个统一的启动入口。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  f.lock.Lock()
  defer f.lock.Unlock()

  for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)
      f.startedInformers[informerType] = true}}
}      

Controller Manager启动时,最重要的是通过​

​Start​

​这个工厂的方法运行所有的Informer。

Informer creation

以下是这些 Informer 的创建方式,Controller Manager​

​NewControllerInitializers​

​​在​

​cmd/kube-controller-manager/app/controllermanager.go​

​. 由于代码冗长,这里仅提供部署控制器的示例。

初始化部署控制器的逻辑​

​startDeploymentController​

​​在​

​cmd/kube-controller-manager/app/apps.go​

​.

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {return nil, false, nil}
  dc, err := deployment.NewDeploymentController(
    ctx.InformerFactory.Apps().V1().Deployments(),
    ctx.InformerFactory.Apps().V1().ReplicaSets(),
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.ClientBuilder.ClientOrDie("deployment-controller"),)if err != nil {return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)}
  go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)return nil, true, nil
}      

最关键的逻辑在 中​

​deployment.NewDeploymentController​

​,实际上是创建了 Deployment Controller,创建函数的前三个参数分别是 Deployment、ReplicaSet 和 Pod 的 Informer。如您所见,Informer 的单例工厂提供了一个入口点,用于使用 ApiGroup 作为路径创建具有不同资源的 Informer。

但是,重要的是要注意这一点。​

​Apps().V1().Deployments()​

​​ 返回 type 的实例​

​deploymentInformer​

​​,但​

​deploymentInformer​

​不是真正的 Informer(尽管它的 Informer 名称)。它只是一个模板类,其主要功能是为创建专注于部署的 Informer 提供模板作为特定资源。

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}      

创建 Informer 的真正逻辑在​

​deploymentInformer.Informer()​

​​( ​

​client-go/informers/apps/v1/deployment.go​

​​) 中,是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传递给 Informer 工厂​

​f.defaultInformer​

​​的方法来创建只关注 Deployment 资源的 Informer。​

​InformerFor​

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}      

简要说明。

  1. 您可以通过 Informer 工厂获取特定类型的 Informer 模板类(即​

    ​deploymentInformer​

    ​本例中)
  2. ​Informer()​

    ​实际上,为特定资源创建 Informer 的是 Informer 模板类的方法。
  3. 该​

    ​Informer()​

    ​​方法只是通过​

    ​InformerFor​

    ​Informer 工厂创建真正的 Informer

这里使用了模板方法(设计模式),虽然有点绕,但是可以参考下图梳理一下。理解它的关键是 Informer 的差异化创建逻辑委托给了模板类。

Kubernetes 控制器管理器的工作原理

最后,命名的结构​

​sharedIndexInformer​

​将被实例化,并实际承担 Informer 的职责。它也是注册到 Informer 工厂映射的实例。

Informer operation

由于真正的 Informer 实例是一个类型的对象​

​sharedIndexInformer​

​​,当 Informer 工厂启动时(通过执行​

​Start​

​​方法),它​

​sharedIndexInformer​

​就是实际运行的。

​sharedIndexInformer​

​​是client-go中的一个组件,它的方法​

​Run​

​有几十行,但是工作量很大。这是我们进入控制器管理器最有趣的部分的地方。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()

  fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

  cfg := &Config{Queue:            fifo,ListerWatcher:    s.listerWatcher,ObjectType:       s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:     false,ShouldResync:     s.processor.shouldResync,

    Process: s.HandleDeltas,}

  func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true}()

  // Separate stop channel because Processor should be stopped strictly after controller
  processorStopCh := make(chan struct{})var wg wait.Group
  defer wg.Wait()              // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop
  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  wg.StartWithChannel(processorStopCh, s.processor.run)

  defer func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    s.stopped = true // Don't want any new listeners}()
  s.controller.Run(stopCh)
}      

启动逻辑​

​sharedIndexInformer​

​做了几件事。

  1. 创建一个名为 的队列​

    ​fifo​

    ​。
  2. 创建并运行一个名为​

    ​controller​

    ​.
  3. 开始了​

    ​cacheMutationDetector​

    ​。
  4. 开始了​

    ​processor​

    ​。

这些术语(或组件)在上一篇文章中没有提到,但这四件事是 Controller Manager 工作的核心,因此我将在下面逐一介绍。

sharedIndexInformer

​sharedIndexInformer​

​​是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(就像​

​deploymentInformer​

​上面提到的)来创建一个特定于他们需要的 Informer。

​sharedIndexInformer​

​​包含一堆工具来完成 Informer 的工作,主要代码在​

​client-go/tools/cache/shared_informer.go​

​. 它的创建逻辑也在其中。

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  realClock := &clock.RealClock{}
  sharedIndexInformer := &sharedIndexInformer{
    processor:                       &sharedProcessor{clock: realClock},
    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    listerWatcher:                   lw,
    objectType:                      objType,
    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
    clock:                           realClock,}return sharedIndexInformer
}      

在创建逻辑中,有几点需要注意:

  1. processor:提供EventHandler注册和事件分发的功能
  2. indexer:提供资源缓存功能
  3. listerWatcher:由模板类提供,包含特定资源的List和Watch方法
  4. objectType:用于标记要关注的具体资源类型
  5. cacheMutationDetector:监控Informer的缓存

此外,它还包含​

​DeltaFIFO​

​​队列和​

​controller​

​上面的启动逻辑中提到的,下面分别介绍。

sharedProcessor

处理器是 sharedIndexInformer 中一个非常有趣的组件。ControllerManager通过一个Informer单例工厂保证不同的Controller共享同一个Informer,但是不同的Controller在共享的Informer上注册了不同的Handler。

处理器是管理注册的Handler并将事件分发给不同的Handler的组件。

type sharedProcessor struct {
  listenersStarted bool
  listenersLock    sync.RWMutex
  listeners        []*processorListener
  syncingListeners []*processorListener
  clock            clock.Clock
  wg               wait.Group
}      

sharedProcessor 工作的核心围绕着​

​listeners​

​.

当我们向 Informer 注册一个 Handler 时,它最终会被转换为一个名为​

​processorListener​

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {ret := &processorListener{
    nextCh:                make(chan interface{}),
    addCh:                 make(chan interface{}),
    handler:               handler,
    pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
    requestedResyncPeriod: requestedResyncPeriod,
    resyncPeriod:          resyncPeriod,}

  ret.determineNextResync(now)

  return ret
}      

该实例主要包含两个通道和外部注册的 Handler 方法。此处实例化的​

​processorListener​

​​对象最终将被添加到​

​sharedProcessor.listeners​

​列表中。

func (p *sharedProcessor) addListener(listener *processorListener) {
  p.listenersLock.Lock()
  defer p.listenersLock.Unlock()

  p.addListenerLocked(listener)if p.listenersStarted {
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)}
}      

如图所示,Controller 中的 Handler 方法最终会添加到 Listener 中,Listener 会附加到​

​sharedProcessor​

Kubernetes 控制器管理器的工作原理

前面说过,启动时​

​sharedIndexInformer​

​​会运行​

​sharedProcessor​

​​,启动的逻辑​

​sharedProcessor​

​​与这些有关​

​listeners​

​。

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  func() {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()for _, listener := range p.listeners {
      p.wg.Start(listener.run)
      p.wg.Start(listener.pop)}
    p.listenersStarted = true}()<-stopCh
  p.listenersLock.RLock()
  defer p.listenersLock.RUnlock()for _, listener := range p.listeners {
    close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}
  p.wg.Wait() // Wait for all .pop() and .run() to stop
}      

可以看到,启动的时候会依次​

​sharedProcessor​

​​执行 的​

​run​

​​和​

​pop​

​​方法,所以现在来看这两个方法。​

​listener​

Starting the listener

由于监听器包含注册到Controller的Handler方法,所以监听器最重要的作用就是在事件发生时触发这些方法,并​

​listener.run​

​​不断从​

​nextCh​

​通道中获取事件并执行相应的处理程序。

func (p *processorListener) run() {// this call blocks until the channel is closed.  When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted.  This is usually better than the alternative of never// delivering again.
  stopCh := make(chan struct{})
  wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retries
    err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:
          p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:
          p.handler.OnAdd(notification.newObj)case deleteNotification:
          p.handler.OnDelete(notification.oldObj)default:
          utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})

    // the only way to get here is if the p.nextCh is empty and closedif err == nil {
      close(stopCh)}}, 1*time.Minute, stopCh)
}      

可以看到​

​listener.run​

​​不断从​

​nextCh​

​​通道中获取事件,但是通道中的事件是​

​nextCh​

​​从哪里来的呢?将​

​listener.pop​

​​事件放入​

​nextCh​

​.

​listener.pop​

​是一个非常聪明和有趣的逻辑。

func (p *processorListener) pop() {
  defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stop

  var nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok bool
      notification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to pop
        nextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotifications
        notification = notificationToAdd
        nextCh = p.nextCh
      } else { // There is already a notification waiting to be dispatched
        p.pendingNotifications.WriteOne(notificationToAdd)}}}
}      

​listener​

​​包含两个通道​

​addCh​

​​的原因​

​nextCh​

​​是:Informer 无法预测​

​listener.handler​

​​消耗事件的速度是否比产生事件的速度快,因此它添加了一个名为​

​pendingNotifications​

​. 队列来保存没有被及时消费的事件。

Kubernetes 控制器管理器的工作原理

​pop​

​​一方面,该方法不断获取最新事件,​

​addCh​

​​以确保生产者不会阻塞。然后它确定缓冲区是否存在,如果存在,则将事件添加到缓冲区,如果不存在,则尝试将其推送到​

​nextCh​

​.

另一方面,它确定缓冲区中是否还有任何事件,如果还有库存,它会不断将其传递给​

​nextCh​

​.

该​

​pop​

​​方法实现了一种带有缓冲区的分发机制,该缓冲区允许事件不断地从 传递​

​addCh​

​​到​

​nextCh​

​​。但是问题来了,这些​

​addCh​

​事件是从哪里来的?

源代码非常简单,​

​listener​

​​有一个​

​add​

​​以事件为输入的方法,它将新事件推送到​

​addCh​

​​. 该​

​add​

​​方法由​

​sharedProcessor​

​​管理所有 s 的​

​listener s​

​调用。

如上所述,​

​sharedProcessor​

​​负责管理所有的Handler和分发事件,但​

​distribute​

​真正分发的是方法。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  p.listenersLock.RLock()
  defer p.listenersLock.RUnlock()

  if sync {for _, listener := range p.syncingListeners {
      listener.add(obj)}} else {for _, listener := range p.listeners {
      listener.add(obj)}}
}      

到目前为止,我们对一个部分有了更清晰的了解:

  1. Controller 向 Informer 注册 Handler。
  2. Informer 通过​

    ​sharedProcessor​

    ​.
  3. Informer 接收事件并通过​

    ​sharedProcessor.distribute​

    ​.
  4. Controller由对应的Handler触发处理自己的逻辑

那么剩下的问题是 Informer 事件从何而来?

DeltaFIFO

在分析 Informer fetch 事件之前,需要提前告知的一个非常有趣的小工具​

​fifo​

​​是​

​sharedIndexInformer.Run​

​.

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)      

DeltaFIFO 是一个非常有趣的队列,其代码定义在​

​client-go/tools/cache/delta_fifo.go​

​​. 对于队列来说,最重要的肯定是 Add 和 Pop 方法。DeltaFIFO 提供了几种 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)来区分不同的方法,但最终都是执行​

​queueActionLocked​

​。

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}

  // If object is supposed to be deleted (last event is Deleted),// then we should ignore Sync events, because it would result in// recreation of this object.if actionType == Sync && f.willObjectBeDeletedLocked(id) {return nil}

  newDeltas := append(f.items[id], Delta{actionType, obj})
  newDeltas = dedupDeltas(newDeltas)

  if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {
      f.queue = append(f.queue, id)}
    f.items[id] = newDeltas
    f.cond.Broadcast()} else {// We need to remove this from our map (extra items in the queue are// ignored if they are not in the map).delete(f.items, id)}return nil
}      

该​

​queueActionLocked​

​方法的第一个参数 actionType 是事件类型。

const (Added   DeltaType = "Added"   // watch api 获得的创建事件Updated DeltaType = "Updated" // watch api 获得的更新事件Deleted DeltaType = "Deleted" // watch api 获得的删除事件Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)      

事件类型和入队方式表明这是一个具有业务功能的队列,而不仅仅是“先进先出”,入队方式有两个非常巧妙的设计。

  1. 队列中的事件会先判断资源是否有未消费的事件,然后进行适当的处​​理。
  2. 如果 list 方法发现资源已经被删除,则不处理。

第二点比较容易理解,如果触发了列表请求,发现要处理的资源已经被删除了,那么就不需要再排队了。第一点需要和out of queue方法一起看。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  f.lock.Lock()
  defer f.lock.Unlock()for {for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}

      f.cond.Wait()}
    id := f.queue[0]
    f.queue = f.queue[1:]if f.initialPopulationCount > 0 {
      f.initialPopulationCount--}
    item, ok := f.items[id]if !ok {// Item may have been deleted subsequently.continue}delete(f.items, id)
    err := process(item)if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)
      err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err
  }
}      

DeltaFIFO 的​

​Pop​

​方法有一个输入,即处理函数。当它从队列中出来时,DeltaFIFO会首先根据资源id获取资源所有的事件,然后交给handler函数。

工作流程如图所示。

Kubernetes 控制器管理器的工作原理

一般来说,DeltaFIFO的queue方法首先判断资源是否已经在​

​items​

​​,如果是,则资源还没有被消费(仍然在排队),所以直接将事件追加到​

​items[resource_id]​

​​。如果不在 中​

​items​

​​,则​

​items[resource_id]​

​​创建 then 并将资源 id 附加到​

​queue​

​.

DeltaFIFO out-of-queue 方法从 获取队列顶部的资源 id ​

​queue​

​​,然后从 获取该资源的所有事件​

​items​

​​,最后调用该方法​

​PopProcessFunc​

​​传入的类型处理程序​

​Pop​

​。

所以,DeltaFIFO 的特点是队列中的(资源的)事件,当它从队列中出来时,它获取队列中第一个资源的所有事件。这种设计确保不会因为某个资源疯狂地创建事件而导致饥饿,从而使其他资源没有机会被处理。

控制器 controller

DeltaFIFO 是一个非常重要的组件,唯一真正使它有价值的是 Informer ​

​controller​

​。

虽然 K8s 源代码确实使用了这个词​

​controller​

​​,但这​

​controller​

​不是像部署控制器那样的资源控制器。相反,它是一个自上而下的事件控制器(从 API 服务器获取事件并将它们发送到 Informer 进行处理)。

职责​

​controller​

​是双重的。

  1. 通过 List-Watch 从 Api Server 获取事件并将事件推送到 DeltaFIFO
  2. ​HandleDeltas​

    ​​以 的方法​

    ​sharedIndexInformer​

    ​作为参数调用 DeltaFIFO 的 Pop 方法

定义​

​controller​

​​很简单,其核心是​

​Reflector​

​。

type controller struct {config         Config
  reflector      *Reflector
  reflectorMutex sync.RWMutex
  clock          clock.Clock
}      

​controllerr​

​​的代码​

​Reflector​

​​比较繁琐但是很简单,就是通过​

​listerWatcher ​

​​

​sharedIndexInformer​

​​中定义的​

​list-watch​

​​,将获取到的事件推送到​

​DeltaFIFO​

​中。

控制器启动后,启动​

​Reflector​

​​,然后执行​

​processLoop​

​​,这是一个死循环,不断从DeltaFIFO中读取资源事件,并交给​

​sharedIndexInformer​

​​(分配给config.Process)的​

​HandleDeltas​

​方法。

func (c *controller) processLoop() {for {
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.
        c.config.Queue.AddIfNotPresent(obj)}}}
}      

如果我们看一下 sharedIndexInformer 的 HandleDeltas 方法,我们可以看到整个事件消费过程是有效的。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  s.blockDeltas.Lock()
  defer s.blockDeltas.Unlock()

  // from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:
      isSync := d.Type == Sync
      s.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err
        }
        s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err
        }
        s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err
      }
      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}      

前面我们了解到,该​

​processor.attribute​

​​方法将事件分发给 all ​

​listeners​

​​,并​

​controller​

​​使用​

​Reflector​

​​来从 ApiServer 中获取事件并放入队列中,然后通过​

​processLoop​

​​为要处理的资源从队列中取出事件,最后调用​

​processor.attribute​

​​via的​

​HandleDeltas​

​​方法​

​sharedIndexInformer​

​​。所有事件,最后​

​processor.attribute​

​​是通过 的​

​HandleDeltas​

​​方法调用的​

​sharedIndexInformer​

​。

因此,我们可以按如下方式组织整个事件流。

Kubernetes 控制器管理器的工作原理

Indexer

上面,我们整理了从事件接收到分发的​​所有逻辑,但是在sharedIndexInformer的HandleDeltas方法中,有一些逻辑比较有意思,就是所有的事件都是​

​s.indexer​

​先更新再分发。

前文提到,Indexer是一个线程安全的存储,作为缓存来缓解资源控制器(Controller)查询资源时对ApiServer的压力。

当有事件更新时,会先刷新Indexer中的缓存,然后将事件分发给资源控制器,资源控制器会先从Indexer获取资源详情,从而减少对APIServer的不必要查询请求。

Indexer存储的具体实现在client-go/tools/cache/thread_safe_store.go中,数据存储在​

​threadSafeMap​

​.

type threadSafeMap struct {lock  sync.RWMutex
  items map[string]interface{}

  // indexers maps a name to an IndexFunc
  indexers Indexers// indices maps a name to an Index
  indices Indices
}      

本质上,​

​threadSafeMap​

​是一个带有读/写锁的映射,除此之外还可以定义索引,有趣的是由两个字段实现。

  1. ​Indexers​

    ​是一个定义了多个索引函数的map,key是indexName,value是索引函数(计算资源的索引值)。
  2. ​Indices​

    ​​保存索引值和数据key的映射关系,​

    ​Indices​

    ​​是一个两级的map,第一级的key是indexName,对应​

    ​Indexers​

    ​并决定用什么方法计算索引值,value是一个保存关联的map “索引值-资源键”关联。

相关逻辑比较简单,如下图所示。

Kubernetes 控制器管理器的工作原理

MutationDetector

更新数据的​

​HandleDeltas​

​​方法除了.​

​sharedIndexInformer​

​​

​s.indexer​

​​

​s.cacheMutationDetector​

开头提到,在​

​sharedIndexInformer​

​​启动的时候,也会启动一个​

​cacheMutationDetector​

​来监控索引器的缓存。

因为 indexer 缓存实际上是一个指针,所以多个 Controller 访问 indexer 的缓存资源实际上得到的是同一个资源实例。如果一个Controller玩不好,修改了一个资源的属性,肯定会影响其他Controller的正确性。

当 Informer 接收到新事件时,MutationDetector 会保存指向资源的指针(索引器也是如此)和资源的深层副本。通过周期性地检查指针指向的资源是否与开头存储的深拷贝相匹配,我们就知道缓存的资源是否被修改过。

但是,是否启用监控会受到环境变量的影响​

​KUBE_CACHE_MUTATION_DETECTOR​

​​。如果未设置此环境变量,​

​sharedIndexInformer​

​​ 将实例化​

​dummyMutationDetector​

​并且在启动后不执行任何操作。

如果​

​KUBE_CACHE_MUTATION_DETECTOR​

​​为​

​true​

​​,sharedIndexInformer 实例化​

​defaultCacheMutationDetector​

​,它以 1s 的间隔执行缓存的定期检查,如果发现缓存被修改,则触发故障处理函数,如果未定义该函数,则触发恐慌。

概括

本文对ControllerManager进行了狭义的解释,因为它不包括具体的资源管理器(Controller),而只是解释了ControllerManager是如何“管理控制器”的。

继续阅读