天天看點

k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

k8s client-go k8s informers實作了持續擷取叢集的所有資源對象、監聽叢集的資源對象變化功能,并在本地維護了全量資源對象的記憶體緩存,以減少對apiserver、對etcd的請求壓力。Informers在啟動的時候會首先在用戶端調用List接口來擷取全量的對象集合,然後通過Watch接口來擷取增量的對象,然後更新本地緩存。

k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

1.Reflector概述

Reflector從kube-apiserver中list&watch資源對象,然後将對象的變化包裝成Delta并将其丢到DeltaFIFO中。簡單點來說,就是将Etcd 的對象及其變化反射到DeltaFIFO中。

Reflector首先通過List操作擷取全量的資源對象資料,調用DeltaFIFO的Replace方法全量插入DeltaFIFO,然後後續通過Watch操作根據資源對象的變化類型相應的調用DeltaFIFO的Add、Update、Delete方法,将對象及其變化插入到DeltaFIFO中。

Reflector的健壯性處理機制

Reflector有健壯性處理機制,用于處理與

apiserver

斷連後重新進行

List&Watch

的場景。也是因為有這樣的健壯性處理機制,是以我們一般不去直接使用用戶端的

Watch

方法來處理自己的業務邏輯,而是使用

informers

Reflector核心操作

Reflector的兩個核心操作:

(1)List&Watch;

(2)将對象的變化包裝成Delta然後扔進DeltaFIFO。

informer概要架構圖

通過下面這個informer的概要架構圖,可以大概看到Reflector在整個informer中所處的位置及其作用。

k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

2.Reflector初始化與啟動分析

2.1 Reflector結構體

先來看到Reflector結構體,這裡重點看到以下屬性:

(1)expectedType:放到Store中(即DeltaFIFO中)的對象類型;

(2)store:store會指派為DeltaFIFO,具體可以看之前的informer初始化與啟動分析即可得知,這裡不再展開分析;

(3)listerWatcher:存放list方法和watch方法的ListerWatcher interface實作;

// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
	// name identifies this reflector. By default it will be a file:line if possible.
	name string

	// The name of the type we expect to place in the store. The name
	// will be the stringification of expectedGVK if provided, and the
	// stringification of expectedType otherwise. It is for display
	// only, and should not be used for parsing or comparison.
	expectedTypeName string
	// The type of object we expect to place in the store.
	expectedType reflect.Type
	// The GVK of the object we expect to place in the store if unstructured.
	expectedGVK *schema.GroupVersionKind
	// The destination to sync up with the watch source
	store Store
	// listerWatcher is used to perform lists and watches.
	listerWatcher ListerWatcher
	// period controls timing between one watch ending and
	// the beginning of the next one.
	period       time.Duration
	resyncPeriod time.Duration
	ShouldResync func() bool
	// clock allows tests to manipulate time
	clock clock.Clock
	// lastSyncResourceVersion is the resource version token last
	// observed when doing a sync with the underlying store
	// it is thread safe, but not synchronized with the underlying store
	lastSyncResourceVersion string
	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
	lastSyncResourceVersionMutex sync.RWMutex
	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
	// Defaults to pager.PageSize.
	WatchListPageSize int64
}
           

2.2 Reflector初始化-NewReflector

NewReflector為Reflector的初始化方法,傳回一個Reflector結構體,這裡主要看到初始化Reflector的時候,需要傳入ListerWatcher interface的實作。

// k8s.io/client-go/tools/cache/reflector.go
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	r := &Reflector{
		name:          name,
		listerWatcher: lw,
		store:         store,
		period:        time.Second,
		resyncPeriod:  resyncPeriod,
		clock:         &clock.RealClock{},
	}
	r.setExpectedType(expectedType)
	return r
}
           

2.3 ListerWatcher interface

ListerWatcher interface定義了

Reflector

應該擁有的最核心的兩個方法,即

List

Watch

,用于全量擷取資源對象以及監控資源對象的變化。關于

List

Watch

什麼時候會被調用,怎麼被調用,在後續分析Reflector核心處理方法的時候會詳細做分析。

// k8s.io/client-go/tools/cache/listwatch.go
type Lister interface {
	// List should return a list type object; the Items field will be extracted, and the
	// ResourceVersion field will be used to start the watch in the right place.
	List(options metav1.ListOptions) (runtime.Object, error)
}

type Watcher interface {
	// Watch should begin a watch at the specified version.
	Watch(options metav1.ListOptions) (watch.Interface, error)
}

type ListerWatcher interface {
	Lister
	Watcher
}
           

2.4 ListWatch struct

繼續看到

ListWatch struct

,其實作了

ListerWatcher interface

// k8s.io/client-go/tools/cache/listwatch.go
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

type ListWatch struct {
	ListFunc  ListFunc
	WatchFunc WatchFunc
	// DisableChunking requests no chunking for this list watcher.
	DisableChunking bool
}
           

ListWatch的初始化

再來看到

ListWatch struct

初始化的一個例子。在

NewDeploymentInformer

初始化Deployment對象的informer中,會初始化

ListWatch struct

并定義其

ListFunc

WatchFunc

,可以看到

ListFunc

WatchFunc

即為其資源對象用戶端的

List

Watch

方法。

// staging/src/k8s.io/client-go/informers/apps/v1beta1/deployment.go
func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
	return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
}

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1beta1().Deployments(namespace).List(options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1beta1().Deployments(namespace).Watch(options)
			},
		},
		&appsv1beta1.Deployment{},
		resyncPeriod,
		indexers,
	)
}
           

2.5 Reflector啟動入口-Run

最後來看到

Reflector

的啟動入口

Run

方法,其主要是循環調用

r.ListAndWatch

,該方法是

Reflector

的核心處理方法,後面會詳細進行分析。另外,也可以看到

Reflector

有健壯性處理機制,即循環調用

r.ListAndWatch

方法,用于處理與

apiserver

斷連後重新進行

List&Watch

的場景。也是因為有這樣的健壯性處理機制,是以我們一般不去直接使用用戶端的

Watch

方法來處理自己的業務邏輯,而是使用

informers

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}
           

3.Reflector核心處理方法分析

分析完了初始化與啟動後,現在來看到

Reflector

的核心處理方法

ListAndWatch

ListAndWatch

ListAndWatch的主要邏輯分為三大塊:

A.List操作(隻執行一次):

(1)設定ListOptions,将ResourceVersion設定為“0”;

(2)調用r.listerWatcher.List方法,執行list操作,即擷取全量的資源對象;

(3)根據list回來的資源對象,擷取最新的resourceVersion;

(4)資源轉換,将list操作擷取回來的結果轉換為

[]runtime.Object

結構;

(5)調用r.syncWith,根據list回來轉換後的結果去替換store裡的items;

(6)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值;

B.Resync操作(異步循環執行);

(1)判斷是否需要執行Resync操作,即重新同步;

(2)需要則調用r.store.Resync操作後端store做處理;

C.Watch操作(循環執行):

(1)stopCh處理,判斷是否需要退出循環;

(2)設定ListOptions,設定resourceVersion為最新的resourceVersion,即從list回來的最新resourceVersion開始執行watch操作;

(3)調用r.listerWatcher.Watch,開始監聽操作;

(4)watch監聽操作的錯誤傳回處理;

(5)調用r.watchHandler,處理watch操作傳回來的結果,操作後端store,新增、更新或删除items;

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string
    
    // A.List操作(隻執行一次)
    // (1)設定ListOptions,将ResourceVersion設定為“0”
	// Explicitly set "0" as resource version - it's fine for the List()
	// to be served from cache and potentially be delayed relative to
	// etcd contents. Reflector framework will catch up via Watch() eventually.
	options := metav1.ListOptions{ResourceVersion: "0"}

	if err := func() error {
		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
		defer initTrace.LogIfLong(10 * time.Second)
		var list runtime.Object
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		//(2)調用r.listerWatcher.List方法,執行list操作,即擷取全量的資源對象
		go func() {
			defer func() {
				if r := recover(); r != nil {
					panicCh <- r
				}
			}()
			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
			if r.WatchListPageSize != 0 {
				pager.PageSize = r.WatchListPageSize
			}
			// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
			list, err = pager.List(context.Background(), options)
			close(listCh)
		}()
		select {
		case <-stopCh:
			return nil
		case r := <-panicCh:
			panic(r)
		case <-listCh:
		}
		if err != nil {
			return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
		}
		initTrace.Step("Objects listed")
		listMetaInterface, err := meta.ListAccessor(list)
		if err != nil {
			return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
		}
		//(3)根據list回來的資源對象,擷取最新的resourceVersion
		resourceVersion = listMetaInterface.GetResourceVersion()
		initTrace.Step("Resource version extracted")
		//(4)資源轉換,将list操作擷取回來的結果轉換為```[]runtime.Object```結構
		items, err := meta.ExtractList(list)
		if err != nil {
			return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
		}
		initTrace.Step("Objects extracted")
		//(5)調用r.syncWith,根據list回來轉換後的結果去替換store裡的items
		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
		}
		initTrace.Step("SyncWith done")
		//(6)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值
		r.setLastSyncResourceVersion(resourceVersion)
		initTrace.Step("Resource version updated")
		return nil
	}(); err != nil {
		return err
	}

    // B.Resync操作(異步循環執行)
	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			//(1)判斷是否需要執行Resync操作,即重新同步
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				//(2)需要則調用r.store.Resync操作後端store做處理
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()
    
    // C.Watch操作(循環執行)
	for {
	    //(1)stopCh處理,判斷是否需要退出循環
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}
        
        //(2)設定ListOptions,設定resourceVersion為最新的resourceVersion,即從list回來的最新resourceVersion開始執行watch操作
		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}
        
        //(3)調用r.listerWatcher.Watch,開始監聽操作
		w, err := r.listerWatcher.Watch(options)
		//(4)watch監聽操作的錯誤傳回處理
		if err != nil {
			switch err {
			case io.EOF:
				// watch closed normally
			case io.ErrUnexpectedEOF:
				klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
			default:
				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
			}
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case wait and resend watch request.
			if utilnet.IsConnectionRefused(err) {
				time.Sleep(time.Second)
				continue
			}
			return nil
		}
        
        //(5)調用r.watchHandler,處理watch操作傳回來的結果,操作後端store,新增、更新或删除items
		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case apierrs.IsResourceExpired(err):
					klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}
           

關于List操作時設定的ListOptions

這裡主要講一下

ListOptions

中的

ResourceVersion

屬性的作用。

上述講到的Reflector中,list操作時将 resourceVersion 設定了為“0”,此時傳回的資料是apiserver cache中的,并非直接讀取 etcd 而來,而apiserver cache中的資料可能會因網絡或其他原因導緻與etcd中的資料不同。

list操作時,resourceVersion 有三種設定方法:

(1)第一種:不設定,此時會從直接從etcd中讀取,此時資料是最新的;

(2)第二種:設定為“0”,此時從apiserver cache中擷取;

(3)第三種:設定為指定的resourceVersion,擷取resourceVersion大于指定版本的所有資源對象。

詳細參考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/#resource-versions

3.1 r.syncWith

r.syncWith方法主要是調用r.store.Replace方法,即根據list的結果去替換store裡的items,具體關于r.store.Replace方法的分析,在後續對DeltaFIFO進行分析時再做具體的分析。

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
	found := make([]interface{}, 0, len(items))
	for _, item := range items {
		found = append(found, item)
	}
	return r.store.Replace(found, resourceVersion)
}
           

3.2 r.setLastSyncResourceVersion

lastSyncResourceVersion屬性為

Reflector struct

的一個屬性,用于存儲已被Reflector處理的最新資源對象的ResourceVersion,

r.setLastSyncResourceVersion

方法用于更新該值。

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) setLastSyncResourceVersion(v string) {
	r.lastSyncResourceVersionMutex.Lock()
	defer r.lastSyncResourceVersionMutex.Unlock()
	r.lastSyncResourceVersion = v
}

type Reflector struct {
    ...
    lastSyncResourceVersion string
    ...
}
           

3.3 r.watchHandler

r.watchHandler主要是處理watch操作傳回來的結果,其主要邏輯為循環做以下操作,直至event事件處理完畢:

(1)從watch操作傳回來的結果中擷取event事件;

(2)event事件相關錯誤處理;

(3)獲得目前watch到資源的ResourceVersion;

(4)區分watch.Added、watch.Modified、watch.Deleted三種類型的event事件,分别調用r.store.Add、r.store.Update、r.store.Delete做處理,具體關于r.store.xxx的方法分析,在後續對DeltaFIFO進行分析時再做具體的分析;

(5)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值;

// k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	start := r.clock.Now()
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		// (1)從watch操作傳回來的結果中擷取event事件
		case event, ok := <-w.ResultChan():
		    // (2)event事件相關錯誤處理
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrs.FromObject(event.Object)
			}
			if r.expectedType != nil {
				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
					continue
				}
			}
			if r.expectedGVK != nil {
				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
					continue
				}
			}
			// (3)獲得目前watch到資源的ResourceVersion
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				continue
			}
			newResourceVersion := meta.GetResourceVersion()
			// (4)區分watch.Added、watch.Modified、watch.Deleted三種類型的event事件,分别調用r.store.Add、r.store.Update、r.store.Delete做處理
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			// (5)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			eventCount++
		}
	}

	watchDuration := r.clock.Since(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
	return nil
}
           

至此Reflector的分析就結束了,最後來總結一下。

總結

Reflector核心處理邏輯

先來用一幅圖來總結一下Reflector核心處理邏輯。

k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

informer架構中的Reflector

下面這個架構圖相比文章開頭的informer的概要架構圖,将Refletor部分詳細分解了,也順帶回憶一下Reflector在informer架構中的主要作用:

(1)Reflector首先通過List操作擷取全量的資源對象資料,調用DeltaFIFO的Replace方法全量插入DeltaFIFO;

(2)然後後續通過Watch操作根據資源對象的變化類型相應的調用DeltaFIFO的Add、Update、Delete方法,将對象及其變化插入到DeltaFIFO中。

k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

在對informer中的Reflector分析完之後,接下來将分析informer中的DeltaFIFO。

繼續閱讀