天天看點

kubernetes pv-controller 解析

kubernetes pv-controller 解析

作者 | 牧琦

來源 | 阿裡技術公衆号

基于 kubernetes 1.23

一 簡介

pv controller是 kcm 的元件之一,它負責處理叢集中的pvc/pv對象,對pvc/pv 對象進行狀态轉換。

二 pvController 初始化

初始化代碼在 pkg/controller/volume/persistentvolume/pv_controller_base.go 檔案中,NewController 主要做了如下幾件事情

  • 初始化 eventRecorder
  • 初始化 PersistentVolumeController 對象,
  • 調用 VolumePluginMgr.InitPlugins() 方法 初始化存儲插件,代碼存在于 pkg/volume/plugins.go 檔案中
  • 開始建立 informer 監聽叢集内的資源,初始化了如下 informer
    • PersistentVolumeInformer
    • PersistentVolumeClaimInformer
    • StorageClassInformer
    • PodInformer
    • NodeInformer
  • 将 PV & PVC 的 event 分别放入 volumeQueue & claimQueue
  • 為了不每次都疊代 pods ,自定義一個通過 pvc 鍵索引 pod 的索引器
  • 初始化 intree 存儲 -> csi 遷移相關功能的 manager

NewController代碼在cmd/kube-controller-manager代碼裡面被調用,初始化成功之後緊接着調用go Run()方法運作 pvController

三 開始運作

// 開始運作 pvController 
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer ctrl.claimQueue.ShutDown()
  defer ctrl.volumeQueue.ShutDown()

  klog.Infof("Starting persistent volume controller")
  defer klog.Infof("Shutting down persistent volume controller")

  if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
    return
  }

  ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)

  go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
  go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
  go wait.Until(ctrl.claimWorker, time.Second, stopCh)

  metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)

  <-stopCh
}           

同步緩存之後開始周期性執行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 我們看下 initalizeCaches 方法

func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
  // 這裡不通路 apiserver,是從本地緩存拿出的對象,這些對象不可以被外部函數修改
  volumeList, err := volumeLister.List(labels.Everything())
  if err != nil {
    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
    return
  }
  for _, volume := range volumeList {
    // 我們不能改變 volume 對象,是以這裡我們copy一份新對象,對新對象進行操作
    volumeClone := volume.DeepCopy()
    if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {
      klog.Errorf("error updating volume cache: %v", err)
    }
  }

  claimList, err := claimLister.List(labels.Everything())
  if err != nil {
    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
    return
  }
  for _, claim := range claimList {
    if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {
      klog.Errorf("error updating claim cache: %v", err)
    }
  }
  klog.V(4).Infof("controller initialized")
}

type persistentVolumeOrderedIndex struct {
  store cache.Indexer
}           

該方法将 cache.listener 裡面的緩存轉存在 persistentVolumeOrderedIndex 中,它是按 AccessModes 索引并按存儲容量排序的 persistentVolume 的緩存。

1 resync

func (ctrl *PersistentVolumeController) resync() {
  klog.V(4).Infof("resyncing PV controller")

  pvcs, err := ctrl.claimLister.List(labels.NewSelector())
  if err != nil {
    klog.Warningf("cannot list claims: %s", err)
    return
  }
  for _, pvc := range pvcs {
    ctrl.enqueueWork(ctrl.claimQueue, pvc)
  }

  pvs, err := ctrl.volumeLister.List(labels.NewSelector())
  if err != nil {
    klog.Warningf("cannot list persistent volumes: %s", err)
    return
  }
  for _, pv := range pvs {
    ctrl.enqueueWork(ctrl.volumeQueue, pv)
  }
}           

這裡将叢集内所有的 pvc/pv 統一都放到對應的 claimQueue & volumeQueue 裡面重新處理。 這個resyncPeriod 等于一個random time.Duration * config.time(在 kcm 啟動時設定)。

2 volumeWorker

一個無限循環, 不斷的處理從 volumeQueue 裡面擷取到的 PersistentVolume

workFunc := func() bool {
    keyObj, quit := ctrl.volumeQueue.Get()
    if quit {
      return true
    }
    defer ctrl.volumeQueue.Done(keyObj)
    key := keyObj.(string)
    klog.V(5).Infof("volumeWorker[%s]", key)

    _, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
      klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
      return false
    }
    volume, err := ctrl.volumeLister.Get(name)
    if err == nil {
      // The volume still exists in informer cache, the event must have
      // been add/update/sync
      ctrl.updateVolume(volume)
      return false
    }
    if !errors.IsNotFound(err) {
      klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
      return false
    }

    // The volume is not in informer cache, the event must have been
    // "delete"
    volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
    if err != nil {
      klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
      return false
    }
    if !found {
      // The controller has already processed the delete event and
      // deleted the volume from its cache
      klog.V(2).Infof("deletion of volume %q was already processed", key)
      return false
    }
    volume, ok := volumeObj.(*v1.PersistentVolume)
    if !ok {
      klog.Errorf("expected volume, got %+v", volumeObj)
      return false
    }
    ctrl.deleteVolume(volume)
    return false
  }           

我們主要關注 ctrl.updateVolume(volume) 方法

updateVolume

updateVolume 方法是對于叢集内的 events 實際 handler 方法,它裡面主要調用了 ctrl.syncVolume 方法來處理

func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {
  klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))

    ...
  // [Unit test set 4]
  if volume.Spec.ClaimRef == nil {
    // Volume is unused
    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
    if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
      // Nothing was saved; we will fall back into the same
      // condition in the next call to this method
      return err
    }
    return nil
  } else /* pv.Spec.ClaimRef != nil */ {
    // Volume is bound to a claim.
    if volume.Spec.ClaimRef.UID == "" {
      // The PV is reserved for a PVC; that PVC has not yet been
      // bound to this PV; the PVC sync will handle it.
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
      if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
      }
      return nil
    }
    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
    // Get the PVC by _name_
    var claim *v1.PersistentVolumeClaim
    claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
    obj, found, err := ctrl.claims.GetByKey(claimName)
    if err != nil {
      return err
    }
    if !found {
      if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
        obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
        if err != nil && !apierrors.IsNotFound(err) {
          return err
        }
        found = !apierrors.IsNotFound(err)
        if !found {
          obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})
          if err != nil && !apierrors.IsNotFound(err) {
            return err
          }
          found = !apierrors.IsNotFound(err)
        }
      }
    }
    if !found {
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
      // Fall through with claim = nil
    } else {
      var ok bool
      claim, ok = obj.(*v1.PersistentVolumeClaim)
      if !ok {
        return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
      }
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
    }
    if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
      klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))

      claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})
      if err != nil && !apierrors.IsNotFound(err) {
        return err
      } else if claim != nil {
        // Treat the volume as bound to a missing claim.
        if claim.UID != volume.Spec.ClaimRef.UID {
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
          claim = nil
        } else {
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
        }
      }
    }

    if claim == nil {
      if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
        // Also, log this only once:
        klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
        if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
          // Nothing was saved; we will fall back into the same condition
          // in the next call to this method
          return err
        }
      }
      if err = ctrl.reclaimVolume(volume); err != nil {
        // Release failed, we will fall back into the same condition
        // in the next call to this method
        return err
      }
      if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {
        // volume is being retained, it references a claim that does not exist now.
        klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)
      }
      return nil
    } else if claim.Spec.VolumeName == "" {
      if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
        volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
        ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
        claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
        ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
        // Skipping syncClaim
        return nil
      }

      if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
        // The binding is not completed; let PVC sync handle it
        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
      } else {
        // Dangling PV; try to re-establish the link in the PVC sync
        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
      }
      ctrl.claimQueue.Add(claimToClaimKey(claim))
      return nil
    } else if claim.Spec.VolumeName == volume.Name {
      // Volume is bound to a claim properly, update status if necessary
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
      if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
      }
      return nil
    } else {
      // Volume is bound to a claim, but the claim is bound elsewhere
      if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
        if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
          // Also, log this only once:
          klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
          if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
            // Nothing was saved; we will fall back into the same condition
            // in the next call to this method
            return err
          }
        }
        if err = ctrl.reclaimVolume(volume); err != nil {
          return err
        }
        return nil
      } else {
        if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
          if err = ctrl.unbindVolume(volume); err != nil {
            return err
          }
          return nil
        } else {
          // The PV must have been created with this ptr; leave it alone.
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
          if err = ctrl.unbindVolume(volume); err != nil {
            return err
          }
          return nil
        }
      }
    }
  }
}           

1、當 pv 的 Spec.ClaimRef 的值為空的時候,說明目前 pv 未被使用,調用 ctrl.updateVolumePhase 使得 pv 進入 Available 狀态

2、當 pv 的 Spec.ClaimRef 的值不為空的時候, 說明目前 pv 已綁定一個pvc

  • 當Spec.ClaimRef.UID 為空的時候,說明 pvc 還未綁定 pv, 調用ctrl.updateVolumePhase 使得 pv 進入 Available 狀态, 方法傳回,等待 pvc syncClaim 方法處理
  • 使用 Spec.ClaimRef 相關的 pvc 資訊擷取 pv_controller緩存的pvc
  • 如果 pvc 沒有找到
    • 有可能是叢集壓力過大緩存沒有更新,則進一步從 informercache 中找,如果 informercache裡面還是沒有的話則進一步從apiserver中去找
    • 這裡如果發現 非 Released & 非 Failed 的pv 經過上述步驟仍然找不到 pvc 的話,說明 pvc 被删除。在最新的kubernetes 版本中會檢查reclaimPoilcy,對 pv的狀态進行處理
  • 找到 pvc 之後

1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不一緻,這樣一般是 pv 指向的 pvc 被删了,然後立即建立了一個同名的pvc, 而緩存還沒有更新,這時我們需要doublecheck一下,若 double check 之後依舊不存在,則判斷是pv綁定了一個不存在的pvc, 将pvc置為空,執行上述pvc 沒有找到的邏輯

2)如果pvc 的 volumeName 為空

  • 檢查 pvc的 volumeMode 和 pv 的 volumeMode是否一緻,不一緻報 event 出來
  • 如果發現有這個 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 這個annotation 說明 pvc/pv 流程正在綁定中
  • 将 pvc 放到 claimQueue 裡面, 讓 claimWorker 進行處理

3)如果 pvc.Spec.volumeName == pv.volumeName 的時候,直接将 pv 設定為 bound 狀态

4)如果 pvc.Spec.volumeName != pv.volumeName 的時候

  • 如果是 pv 是動态建立的情況下,并且 pv 的 ReclaimPolicy 是 delete 的情況下, 說明 pvc 已經綁定了其他pv, 将 pv 置為 released 的狀态, 等待deleters 删除
  • 如果 pv 不是動态建立的情況下,将 pv 的 ClaimRef 字段置為空,将其 unbound 掉

3 claimWorker

一個無限循環,不斷的處理從 claimQueue 裡面擷取到的 PersistentVolumeClaim

workFunc := func() bool {
    keyObj, quit := ctrl.claimQueue.Get()
    if quit {
      return true
    }
    defer ctrl.claimQueue.Done(keyObj)
    key := keyObj.(string)
    klog.V(5).Infof("claimWorker[%s]", key)

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
      klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
      return false
    }
    claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
    if err == nil {
      // The claim still exists in informer cache, the event must have
      // been add/update/sync
      ctrl.updateClaim(claim)
      return false
    }
    if !errors.IsNotFound(err) {
      klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
      return false
    }

    // The claim is not in informer cache, the event must have been "delete"
    claimObj, found, err := ctrl.claims.GetByKey(key)
    if err != nil {
      klog.V(2).Infof("error getting claim %q from   cache: %v", key, err)
      return false
    }
    if !found {
      // The controller has already processed the delete event and
      // deleted the claim from its cache
      klog.V(2).Infof("deletion of claim %q was already processed", key)
      return false
    }
    claim, ok := claimObj.(*v1.PersistentVolumeClaim)
    if !ok {
      klog.Errorf("expected claim, got %+v", claimObj)
      return false
    }
    ctrl.deleteClaim(claim)
    return false
    }           

我們主要關注 ctrl.updateClaim(claim) 方法, 與上面同樣,它裡面主要調用了 ctrl.syncClaim 方法來處理, 在 syncClaim 裡面根據 pvc 的狀态分别調用了 ctrl.syncUnboundClaim & ctrl.syncBoundClaim 方法來處理

syncUnboundClaim

func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
  if claim.Spec.VolumeName == "" {
    // User did not care which PV they get.
    delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
    if err != nil {
      return err
    }

    // [Unit test set 1]
    volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
    if err != nil {
      klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
      return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)
    }
    if volume == nil {
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
      switch {
      case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
        if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {
          return err
        }
      case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":
        if err = ctrl.provisionClaim(ctx, claim); err != nil {
          return err
        }
        return nil
      default:
        ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
      }

      // Mark the claim as Pending and try to find a match in the next
      // periodic syncClaim
      if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
        return err
      }
      return nil
    } else /* pv != nil */ {
      // Found a PV for this claim
      // OBSERVATION: pvc is "Pending", pv is "Available"
      claimKey := claimToClaimKey(claim)
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
      if err = ctrl.bind(volume, claim); err != nil {
        metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
        return err
      }
      metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
      return nil
    }
  } else /* pvc.Spec.VolumeName != nil */ {
    klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
    obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
    if err != nil {
      return err
    }
    if !found {
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
      if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
        return err
      }
      return nil
    } else {
      volume, ok := obj.(*v1.PersistentVolume)
      if !ok {
        return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
      }
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
      if volume.Spec.ClaimRef == nil {
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
        if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
          klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
          // send an event
          msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
          // volume does not satisfy the requirements of the claim
          if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
            return err
          }
        } else if err = ctrl.bind(volume, claim); err != nil {
          // On any error saving the volume or the claim, subsequent
          // syncClaim will finish the binding.
          return err
        }
        // OBSERVATION: pvc is "Bound", pv is "Bound"
        return nil
      } else if pvutil.IsVolumeBoundToClaim(volume, claim) {
        // User asked for a PV that is claimed by this PVC
        // OBSERVATION: pvc is "Pending", pv is "Bound"
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))

        // Finish the volume binding by adding claim UID.
        if err = ctrl.bind(volume, claim); err != nil {
          return err
        }
        // OBSERVATION: pvc is "Bound", pv is "Bound"
        return nil
      } else {
        // User asked for a PV that is claimed by someone else
        // OBSERVATION: pvc is "Pending", pv is "Bound"
        if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {
          klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
          claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
          // User asked for a specific PV, retry later
          if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
            return err
          }
          return nil
        } else {
          klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
          claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)

          return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
        }
      }
    }
  }
}           

梳理下整體流程

  • 如果目前 pvc 的 volumeName 為空
    • 判斷目前pvc 是否是延遲綁定的
    • 調用 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出對應的 pv
  • 如果找到 volume 的話
    • 調用 ctrl.bind(volume, claim) 方法進行綁定
  • 如果沒有找到 volume 的話
    • 如果是延遲綁定, 并且還未觸發(pod 未引用)則 emit event 到 pvc 上
    • 如果 pvc 綁定了 sc, 調用 ctrl.provisionClaim(ctx, claim) 方法
  1. 分析 pvc yaml, 找到 provisioner driver
  2. 啟動一個 goroutine
  3. 調用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 進行建立工作

provisionClaimOperation

func (ctrl *PersistentVolumeController) provisionClaimOperation(
  ctx context.Context,
  claim *v1.PersistentVolumeClaim,
  plugin vol.ProvisionableVolumePlugin,
  storageClass *storage.StorageClass) (string, error) {
  claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)
  klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)

  pluginName := plugin.GetPluginName()
  if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil {
    strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)
    return pluginName, fmt.Errorf(strerr)

  }
  provisionerName := storageClass.Provisioner
  // Add provisioner annotation to be consistent with external provisioner workflow
  newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)
  if err != nil {
    // Save failed, the controller will retry in the next sync
    klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
    return pluginName, err
  }
  claim = newClaim


  pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
  volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
  if err != nil && !apierrors.IsNotFound(err) {
    klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)
    return pluginName, err
  }
  if err == nil && volume != nil {
    // Volume has been already provisioned, nothing to do.
    klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
    return pluginName, err
  }

  // Prepare a claimRef to the claim early (to fail before a volume is
  // provisioned)
  claimRef, err := ref.GetReference(scheme.Scheme, claim)
  if err != nil {
    klog.V(3).Infof("unexpected error getting claim reference: %v", err)
    return pluginName, err
  }

  // Gather provisioning options
  tags := make(map[string]string)
  tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
  tags[CloudVolumeCreatedForClaimNameTag] = claim.Name
  tags[CloudVolumeCreatedForVolumeNameTag] = pvName

  options := vol.VolumeOptions{
    PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
    MountOptions:                  storageClass.MountOptions,
    CloudTags:                     &tags,
    ClusterName:                   ctrl.clusterName,
    PVName:                        pvName,
    PVC:                           claim,
    Parameters:                    storageClass.Parameters,
  }

  // Refuse to provision if the plugin doesn't support mount options, creation
  // of PV would be rejected by validation anyway
  if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {
    strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)
    klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
    return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())
  }

  // Provision the volume
  provisioner, err := plugin.NewProvisioner(options)
  if err != nil {
    strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
    klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
    return pluginName, err
  }

  var selectedNode *v1.Node = nil
  if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {
    selectedNode, err = ctrl.NodeLister.Get(nodeName)
    if err != nil {
      strerr := fmt.Sprintf("Failed to get target node: %v", err)
      klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)
      ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
      return pluginName, err
    }
  }
  allowedTopologies := storageClass.AllowedTopologies

  opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
  volume, err = provisioner.Provision(selectedNode, allowedTopologies)
  opComplete(volumetypes.CompleteFuncParam{Err: &err})
  if err != nil {
    ctrl.rescheduleProvisioning(claim)

    strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
    klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
    return pluginName, err
  }

  klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))

  // Create Kubernetes PV object for the volume.
  if volume.Name == "" {
    volume.Name = pvName
  }
  // Bind it to the claim
  volume.Spec.ClaimRef = claimRef
  volume.Status.Phase = v1.VolumeBound
  volume.Spec.StorageClassName = claimClass

  // Add AnnBoundByController (used in deleting the volume)
  metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")
  metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())

  // Try to create the PV object several times
  for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
    klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
    var newVol *v1.PersistentVolume
    if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err == nil || apierrors.IsAlreadyExists(err) {
      // Save succeeded.
      if err != nil {
        klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))
        err = nil
      } else {
        klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))

        _, updateErr := ctrl.storeVolumeUpdate(newVol)
        if updateErr != nil {
          // We will get an "volume added" event soon, this is not a big error
          klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)
        }
      }
      break
    }
    // Save failed, try again after a while.
    klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
    time.Sleep(ctrl.createProvisionedPVInterval)
  }

  if err != nil {
    strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
    klog.V(3).Info(strerr)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

    var deleteErr error
    var deleted bool
    for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
      _, deleted, deleteErr = ctrl.doDeleteVolume(volume)
      if deleteErr == nil && deleted {
        // Delete succeeded
        klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
        break
      }
      if !deleted {
        klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())
        break
      }
      // Delete failed, try again after a while.
      klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)
      time.Sleep(ctrl.createProvisionedPVInterval)
    }

    if deleteErr != nil {
      strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)
      klog.V(2).Info(strerr)
      ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)
    }
  } else {
    klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
    msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())
    ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)
  }
  return pluginName, nil
}           

provisionClaimOperation 的基本邏輯如下

  • 檢查driver,隻有 csi 類型的 driver 才允許使用 dataSource 字段
  • 為 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation
  • 根據規則拼出 pv Name = "pvc-" + pvc.UID
  • 如果找到了 pv, 則說明 pv已經存在,跳過 provision
  • 收集pvc/pv 基本資訊封裝到 options 中
  • 對 plugin 進行校驗, 如果plugin不支援mount操作,則直接拒絕provision 請求
  • 調用plugin.NewProvisioner(options) 建立 provisioner, 接口實作了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法,注意,該方法為同步方法
  • Provision 方法傳回了 PersistentVolume執行個體
  • 為建立出來的 pv 關聯 pvc 對象(ClaimRef),嘗試建立 pv 對象 (重複多次)
  • 如果建立 pv 失敗,則嘗試調用 Delete 方法删除建立的volume資源

syncBoundClaim

func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {

  if claim.Spec.VolumeName == "" {
    // Claim was bound before but not any more.
    if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
      return err
    }
    return nil
  }
  obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
  if err != nil {
    return err
  }
  if !found {
    // Claim is bound to a non-existing volume.
    if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
      return err
    }
    return nil
  } else {
    volume, ok := obj.(*v1.PersistentVolume)
    if !ok {
      return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
    }

    klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
    if volume.Spec.ClaimRef == nil {
      // Claim is bound but volume has come unbound.
      // Or, a claim was bound and the controller has not received updated
      // volume yet. We can't distinguish these cases.
      // Bind the volume again and set all states to Bound.
      klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
      if err = ctrl.bind(volume, claim); err != nil {
        // Objects not saved, next syncPV or syncClaim will try again
        return err
      }
      return nil
    } else if volume.Spec.ClaimRef.UID == claim.UID {
      // All is well
      // NOTE: syncPV can handle this so it can be left out.
      // NOTE: bind() call here will do nothing in most cases as
      // everything should be already set.
      klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
      if err = ctrl.bind(volume, claim); err != nil {
        // Objects not saved, next syncPV or syncClaim will try again
        return err
      }
      return nil
    } else {
      // Claim is bound but volume has a different claimant.
      // Set the claim phase to 'Lost', which is a terminal
      // phase.
      if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
        return err
      }
      return nil
    }
  }
}

           

1)如果 pvc.Spec.VolumeName 為空, 說明這個 pvc 之前被 bound 過,但是已經不存在指向的pv, 報出event并傳回

2)從 cache 裡面找 pvc 綁定的 pv

  • 如果沒找到, 說明 pvc 綁定了一個不存在的pv,報 event 并傳回
  • 如果找到了pv
    • 檢查 pv.Spec.ClaimRef 字段, 如果 為空,說明 pv 還沒有綁定 pvc, 調用 ctrl.bind(volume, claim); 方法進行綁定
    • pv.ClaimRef.UID == pvc.UID, 調用 bind 方法,但是大多數情況會直接傳回(因為所有的操作都已經做完了)
    • 其他情況說明 volume 綁定了其他的 pvc, 更新pvc 的狀态 為 lost 并報出 event

四 總結

最後用一張 pvc/pv 的狀态流轉圖來總結一下

kubernetes pv-controller 解析

如何通過Knative輕松實作應用Serverless化傳遞

本課程将會為你介紹如何通過Knative輕松完成應用在Serverless k8s的部署,并實作應用的按比例灰階釋出和基于qps的彈性伸縮。

點選這裡

,檢視詳情。