天天看點

2022-03-22 k8s的pvController的處理流程

摘要:

記錄k8s的pv-controller的核心處理

參考:

​​從零開始入門 K8s | Kubernetes 存儲架構及插件使用 - 知乎​​

核心處理:

入口Run

// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(ctx context.Context) {
  defer utilruntime.HandleCrash()
  defer ctrl.claimQueue.ShutDown()
  defer ctrl.volumeQueue.ShutDown()

  klog.Infof("Starting persistent volume controller")
  defer klog.Infof("Shutting down persistent volume controller")

  if !cache.WaitForNamedCacheSync("persistent volume", ctx.Done(), ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
    return
  }

  ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)

  go wait.Until(ctrl.resync, ctrl.resyncPeriod, ctx.Done())
  go wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second)
  go wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second)

  metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)

  <-ctx.Done()
}      

volumeWorker

2022-03-22 k8s的pvController的處理流程
// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker(ctx context.Context) {
  workFunc := func(ctx context.Context) bool {
    keyObj, quit := ctrl.volumeQueue.Get()
    if quit {
      return true
    }
    defer ctrl.volumeQueue.Done(keyObj)
    key := keyObj.(string)
    klog.V(5).Infof("volumeWorker[%s]", key)

    _, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
      klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
      return false
    }
    volume, err := ctrl.volumeLister.Get(name)
    if err == nil {
      // The volume still exists in informer cache, the event must have
      // been add/update/sync
      ctrl.updateVolume(ctx, volume)
      return false
    }
    if !errors.IsNotFound(err) {
      klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
      return false
    }

    // The volume is not in informer cache, the event must have been
    // "delete"
    volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
    if err != nil {
      klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
      return false
    }
    if !found {
      // The controller has already processed the delete event and
      // deleted the volume from its cache
      klog.V(2).Infof("deletion of volume %q was already processed", key)
      return false
    }
    volume, ok := volumeObj.(*v1.PersistentVolume)
    if !ok {
      klog.Errorf("expected volume, got %+v", volumeObj)
      return false
    }
    ctrl.deleteVolume(volume)
    return false
  }
  for {
    if quit := workFunc(ctx); quit {
      klog.Infof("volume worker queue shutting down")
      return
    }
  }
}      
// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
  var oldIndexValues, indexValues []string
  var err error
  for name, indexFunc := range c.indexers {
    if oldObj != nil {
      oldIndexValues, err = indexFunc(oldObj)
    } else {
      oldIndexValues = oldIndexValues[:0]
    }
    if err != nil {
      panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
    }

    if newObj != nil {
      indexValues, err = indexFunc(newObj)
    } else {
      indexValues = indexValues[:0]
    }
    if err != nil {
      panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
    }

    index := c.indices[name]
    if index == nil {
      index = Index{}
      c.indices[name] = index
    }

    for _, value := range oldIndexValues {
      // We optimize for the most common case where index returns a single value.
      if len(indexValues) == 1 && value == indexValues[0] {
        continue
      }
      c.deleteKeyFromIndex(key, value, index)
    }
    for _, value := range indexValues {
      // We optimize for the most common case where index returns a single value.
      if len(oldIndexValues) == 1 && value == oldIndexValues[0] {
        continue
      }
      c.addKeyToIndex(key, value, index)
    }
  }
}      

claimWorker

// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker(ctx context.Context) {
  workFunc := func() bool {
    keyObj, quit := ctrl.claimQueue.Get()
    if quit {
      return true
    }
    defer ctrl.claimQueue.Done(keyObj)
    key := keyObj.(string)
    klog.V(5).Infof("claimWorker[%s]", key)

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
      klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
      return false
    }
    claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
    if err == nil {
      // The claim still exists in informer cache, the event must have
      // been add/update/sync
      ctrl.updateClaim(ctx, claim)
      return false
    }
    if !errors.IsNotFound(err) {
      klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
      return false
    }

    // The claim is not in informer cache, the event must have been "delete"
    claimObj, found, err := ctrl.claims.GetByKey(key)
    if err != nil {
      klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
      return false
    }
    if !found {
      // The controller has already processed the delete event and
      // deleted the claim from its cache
      klog.V(2).Infof("deletion of claim %q was already processed", key)
      return false
    }
    claim, ok := claimObj.(*v1.PersistentVolumeClaim)
    if !ok {
      klog.Errorf("expected claim, got %+v", claimObj)
      return false
    }
    ctrl.deleteClaim(claim)
    return false
  }
  for {
    if quit := workFunc(); quit {
      klog.Infof("claim worker queue shutting down")
      return
    }
  }
}      

繼續閱讀