k8s client-go k8s informers實作了持續擷取叢集的所有資源對象、監聽叢集的資源對象變化功能,并在本地維護了全量資源對象的記憶體緩存,以減少對apiserver、對etcd的請求壓力。Informers在啟動的時候會首先在用戶端調用List接口來擷取全量的對象集合,然後通過Watch接口來擷取增量的對象,然後更新本地緩存。
k8s informer概述
我們都知道可以使用k8s的Clientset來擷取所有的原生資源對象,那麼怎麼能持續的擷取叢集的所有資源對象,或監聽叢集的資源對象資料的變化呢?這裡不需要輪詢去不斷執行List操作,而是調用Watch接口,即可監聽資源對象的變化,當資源對象發生變化,用戶端即可通過Watch接口收到資源對象的變化。
Watch接口雖然可以直接使用,但一般情況下很少直接使用,因為往往由于叢集中的資源較多,我們需要自己在用戶端去維護一套緩存,而這個維護成本比較大。
也是因為如此,client-go提供了自己的實作機制,Informers應運而生。informers實作了持續擷取叢集的所有資源對象、監聽叢集的資源對象變化功能,并在本地維護了全量資源對象的記憶體緩存,以減少對apiserver、對etcd的請求壓力。Informers在啟動的時候會首先在用戶端調用List接口來擷取全量的對象集合,然後通過Watch接口來擷取增量的對象,然後更新本地緩存。
此外informers也有很強的健壯性,當長期運作的watch連接配接中斷時,informers會嘗試拉起一個新的watch請求來恢複連接配接,在不丢失任何事件的情況下恢複事件流。另外,informers還可以配置一個重新同步的周期參數,每間隔該周期,informers就會重新List全量資料。
在informers的使用上,通常每個GroupVersionResource(GVR)隻執行個體化一個informers,但有時候我們在一個應用中往往會在多個地方對同一種資源對象都有informer的需求,是以就有了共享informer,即SharedInformerFactory。是以可以通過使用SharedInformerFactory來執行個體化informers,這樣本地記憶體緩存就隻有一份,通知機制也隻有一套,大大提高了效率,減少了資源浪費。
k8s informer架構
k8s client-go informer主要包括以下部件:
(1)Reflector:Reflector從kube-apiserver中list&watch資源對象,然後調用DeltaFIFO的Add/Update/Delete/Replace方法将資源對象及其變化包裝成Delta并将其丢到DeltaFIFO中;
(2)DeltaFIFO:DeltaFIFO中存儲着一個map和一個queue,即map[object key]Deltas以及object key的queue,Deltas為Delta的切片類型,Delta裝有對象及對象的變化類型(Added/Updated/Deleted/Sync) ,Reflector負責DeltaFIFO的輸入,Controller負責處理DeltaFIFO的輸出;
(3)Controller:Controller從DeltaFIFO的queue中pop一個object key出來,并擷取其關聯的 Deltas出來進行處理,周遊Deltas,根據對象的變化更新Indexer中的本地記憶體緩存,并通知Processor,相關對象有變化事件發生;
(4)Processor:Processor根據對象的變化事件類型,調用相應的ResourceEventHandler來處理對象的變化;
(5)Indexer:Indexer中有informer維護的指定資源對象的相對于etcd資料的一份本地記憶體緩存,可通過該緩存擷取資源對象,以減少對apiserver、對etcd的請求壓力;
(6)ResourceEventHandler:使用者根據自身處理邏輯需要,注冊自定義的的ResourceEventHandler,當對象發生變化時,将觸發調用對應類型的ResourceEventHandler來做處理。
根據informer架構,對k8s informer的分析将分為以下幾部分進行,本篇為概要分析:
(1)informer概要分析;
(2)informer之初始化與啟動分析;
(3)informer之Reflector分析;
(4)informer之DeltaFIFO分析;
(5)informer之Controller&Processor分析;
(6)informer之Indexer分析;
informer使用示例代碼
使用大緻過程如下:
(1)建構與kube-apiserver通信的config配置;
(2)初始化與apiserver通信的clientset;
(3)利用clientset初始化shared informer factory以及pod informer;
(4)注冊informer的自定義ResourceEventHandler;
(5)啟動shared informer factory,開始informer的list & watch操作;
(6)等待informer從kube-apiserver同步資源完成,即informer的list操作擷取的對象都存入到informer中的indexer本地緩存中;
(7)建立lister,可以從informer中的indexer本地緩存中擷取對象;
func main() {
// 自定義與kube-apiserver通信的config配置
master := "192.168.1.10" // apiserver url
kubeconfig := "/.kube/config"
config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
if err != nil {
klog.Fatalf("Failed to create config: %v", err)
}
// 或使用k8s serviceAccount機制與kube-apiserver通信
// config, err = rest.InClusterConfig()
// 初始化與apiserver通信的clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
// 初始化shared informer factory以及pod informer
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()
// 注冊informer的自定義ResourceEventHandler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: xxx,
UpdateFunc: xxx,
DeleteFunc: xxx,
})
// 啟動shared informer factory,開始informer的list & watch操作
stopper := make(chan struct{})
go factory.Start(stopper)
// 等待informer從kube-apiserver同步資源完成,即informer的list操作擷取的對象都存入到informer中的indexer本地緩存中
// 或者調用factory.WaitForCacheSync(stopper)
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 建立lister
podLister := podInformer.Lister()
// 從informer中的indexer本地緩存中擷取對象
podList, err := podLister.List(labels.Everything())
if err != nil {
fmt.Println(err)
}
}
總結
以上隻是對K8s informer做了簡單的介紹,以及簡單的寫了一下如何使用informer的示例代碼,後面将開始對informer的各個部件做進一步的源碼分析,敬請期待。
最後以一張k8s informer的架構圖作為結尾總結,大家回憶一下k8s informer的架構組成以及各個部件的作用。