天天看點

client-go實戰之九:手寫一個kubernetes的controller

歡迎通路我的GitHub

這裡分類和彙總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos

本篇概覽

  • 本文是《client-go實戰》系列的第九篇,前面咱們已經了解了client-go的基本功能,現在要來一次經典的綜合實戰了,接下來咱們會手寫一個kubernetes的controller,其功能是:監聽某種資源的變化,一旦資源發生變化(例如增加或者删除),apiserver就會有廣播發出,controller使用client-go可以訂閱這個廣播,然後在收到廣播後進行各種業務操作,
  • 本次實戰代碼量略大,但如果随本文一步步先設計再開發,并不會覺得有太多,總的來說由以下内容構成
  1. 代碼整體架構一覽
  2. 對着架構細說流程
  3. 全局重點的小結
  4. 編碼實戰

代碼整體架構一覽

  • 首先,再次明确本次實戰的目标:開發出類似kubernetes的controller那樣的功能,實時監聽pod資源的變化,針對每個變化做出響應
  • 今天的實戰源自client-go的官方demo,其主要架構如下
client-go實戰之九:手寫一個kubernetes的controller
  • 可能您會覺得上圖有些複雜,沒關系,接下來咱們細說此圖,為後面的編碼打好理論基礎

對着架構細說流程

  • 首先将上述架構圖中涉及的内容進行分類,共有三部分
  1. 最左側的Kubernetes API Server+etcd是第一部分,它們都是kubernetes的内部元件
  2. 第二部分是整個informer,informer是client-go庫的核心子產品
  3. 第三部分是WorkQueue和Conrol Loop,它們都是controller的業務邏輯代碼
  • 上面三部分合作,就能做到監聽資源變化并做出響應
  • 另外,informer内部很複雜也很精巧,後面會有專門的文章去細說,本篇隻會提到與controller有關系的informer細節,其餘的能不提就不提(不然内容太多,這篇文章寫不完了)
  • 分類完畢後,再來聊流程
  1. controller會通過client-go的list&watch機制與API Server建立長連接配接(http2的stream),隻要pod資源發生變化,API Server就會通過長連接配接推送到controller
  2. API Server推的資料到達Reflector,它将資料寫入Delta FIFO Queue
  3. Delta FIFO Queue是個先入先出的隊列,除了pod資訊還儲存了操作類型(增加、修改、删除),informer内部不斷從這個隊列擷取資料,再執行AddFunc、UpdateFunc、DeleteFunc等方法
  4. 完整的pod資料被存放在Local Store中,外部通過Indexer随時可以擷取到
  5. controller中準備一個或多個工作隊列,在執行AddFunc、UpdateFunc、DeleteFunc等方法時,可以将定制化的資料放入工作隊列中
  6. controller中啟動一個或多個協程,持續從工作隊列中取資料,執行業務邏輯,執行過程中如果需要pod的詳細資料,可以通過indexder擷取
  • 差不多了,我有種胸有成竹的感覺,迫不及待想寫代碼,但還是忍忍吧,先規劃再動手

編碼規劃

  • 所謂規劃就是把步驟捋清楚,先寫啥再寫啥,如下圖所示
client-go實戰之九:手寫一個kubernetes的controller
  • 捋順了,開始寫代碼吧

編碼之一:定義Controller資料結構(controller.go)

  • 為了便于管理《client-go實戰》系列的源碼,本篇實戰的源碼依然存放在《client-go實戰之七:準備一個工程管理後續實戰的代碼》中新增的golang工程中
  • 先定義資料結構,新增controller.go檔案,裡面新增一個struct
type Controller struct {
	indexer  cache.Indexer
	queue    workqueue.RateLimitingInterface
	informer cache.Controller
}
           
  • 從上述代碼可見Controller結構體有三個成員,indexer是informer内負責存取完整資源資訊的對象,queue是用于業務邏輯的工作隊列

編碼之二:編寫業務邏輯代碼(controller.go)

  • 業務邏輯代碼共有四部分
  1. 把資源變化資訊存入工作隊列,這裡可能按實際需求定制(例如有的資料不關注就丢棄了)
  2. 從工作隊列中取出資料
  3. 取出資料後的處理邏輯,這邊是純粹的業務需求了,各人的實作都不一樣
  4. 異常處理
  • 步驟1,存入工作隊列的操作,留待初始化informer的時候再做,
  • 步驟4,異常處理稍後也有單獨段落細說
  • 這裡隻聚焦步驟2和3:怎麼取,取出後怎麼用
  • 先寫步驟2的代碼:從工作隊列中取取資料,用名為processNextItem的方法來實作(對每一行代碼進行中文注釋着實不易,支援的話請點個贊)
func (c *Controller) processNextItem() bool {
	// 阻塞等待,直到隊列中有資料可以被取出,
	// 另外有可能是多協程并發擷取資料,此key會被放入processing中,表示正在被處理
	key, quit := c.queue.Get()
	// 如果最外層調用了隊列的Shutdown,這裡的quit就會傳回true,
	// 調用processNextItem的地方發現processNextItem傳回false,就不會再次調用processNextItem了
	if quit {
		return false
	}

	// 表示該key已經被處理完成(從processing中移除)
	defer c.queue.Done(key)

	// 調用業務方法,實作具體的業務需求
	err := c.syncToStdout(key.(string))
	// Handle the error if something went wrong during the execution of the business logic

	// 判斷業務邏輯處理是否出現異常,如果出現就重新放入隊列,以此實作重試,如果已經重試過5次,就放棄
	c.handleErr(err, key)

	// 調用processNextItem的地方發現processNextItem傳回true,就會再次調用processNextItem
	return true
}
           
  • 接下來寫業務處理的代碼,就是上面調用的syncToStdout方法,正常套路是檢查spec和status的差距,然後讓status和spec保持一緻,(例如spec中指定副本數為2,而status中記錄了真實的副本數是1,是以業務處理就是增加一個副本數),這裡僅僅是為了展示業務處理代碼在哪些,是以就簡(fu)化(yan)一些了,隻列印pod的名稱
func (c *Controller) syncToStdout(key string) error {
	// 根據key從本地存儲中擷取完整的pod資訊
	// 由于有長連接配接與apiserver保持同步,是以本地的pod資訊與kubernetes叢集内保持一緻
	obj, exists, err := c.indexer.GetByKey(key)
	if err != nil {
		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
		return err
	}

	if !exists {
		fmt.Printf("Pod %s does not exist anymore\n", key)
	} else {
		// 這裡就是真正的業務邏輯代碼了,一般會比較spce和status的差異,然後做出處理使得status與spce保持一緻,
		// 此處為了代碼簡單僅僅列印一行日志
		fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
	}
	return nil
}
           

編碼之三:編寫錯誤處理代碼(controller.go)

  • 回顧前面的processNextItem方法内容,在調用syncToStdout執行完業務邏輯後就立即調用handleErr方法了,此方法的作用是檢查syncToStdout的傳回值是否有錯誤,然後做針對性處理
func (c *Controller) handleErr(err error, key interface{}) {
	// 沒有錯誤時的處理邏輯
	if err == nil {
		// 确認這個key已經被成功處理,在隊列中徹底清理掉
		// 假設之前在處理該key的時候曾報錯導緻重新進入隊列等待重試,那麼也會因為這個Forget方法而不再被重試
		c.queue.Forget(key)
		return
	}

	// 代碼走到這裡表示前面執行業務邏輯的時候發生了錯誤,
	// 檢查已經重試的次數,如果不操作5次就繼續重試,這裡可以根據實際需求定制
	if c.queue.NumRequeues(key) < 5 {
		klog.Infof("Error syncing pod %v: %v", key, err)
		c.queue.AddRateLimited(key)
		return
	}

	// 如果重試超過了5次就徹底放棄了,也像執行成功那樣調用Forget做徹底清理(否則就沒完沒了了)
	c.queue.Forget(key)
	// 向外部報告錯誤,走通用的錯誤處理流程
	runtime.HandleError(err)
	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
           
  • 好了,和業務有關的代碼已經完成,接下來就是搭建controller架構,把基本功能串起來

編碼之四:編寫Controller主流程(controller.go)

  • 編寫一個完整的Controller,最基本的是構造方法,Controller的構造方法也很簡單,儲存三個重要的成員變量即可
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
	return &Controller{
		informer: informer,
		indexer:  indexer,
		queue:    queue,
	}
}
           
  • 先定義個名為runWorker的簡單方法,裡面是個無限循環,隻要消費消息的processNextItem方法傳回true,就無限循環下去
func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}
           
  • 然後是Controller主流程代碼,簡介清晰,啟動informer,開始接受apiserver推送,寫入工作隊列,然後開啟無限循環從工作隊列取資料并處理
func (c *Controller) Run(workers int, stopCh chan struct{}) {
	defer runtime.HandleCrash()

	// 隻要工作隊列的ShutDown方法被調用,processNextItem方法就會傳回false,runWorker的無限循環就會結束
	defer c.queue.ShutDown()
	klog.Info("Starting Pod controller")

	// informer的Run方法執行後,就開始接受apiserver推送的資源變更事件,并更新本地存儲
	go c.informer.Run(stopCh)

	// 等待本地存儲和apiserver完成同步
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}

	// 啟動worker,并發從工作隊列取資料,然後執行業務邏輯
	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
	klog.Info("Stopping Pod controller")
}
           
  • 現在一個完整的Controller已經完成了,接下來編寫調用Controller的代碼,将其所需的三個對象傳入,再調用它的Run方法

編碼之五:編寫調用Controller的代碼(controller_demo.go)

  • 為了能讓整個工程的main方法調用Controller,這裡新增controller_demo.go方法,裡面新增名為ControllerDemo的資料結構,建立Controller對象以及為其準備成員變量的操作都在ControllerDemo.DoAction方法中
package action

import (
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
)

type ControllerDemo struct{}

func (controllerDemo ControllerDemo) DoAction(clientset *kubernetes.Clientset) error {

	// 建立ListWatch對象,指定要監控的資源類型是pod,namespace是default
	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

	// 建立工作隊列
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// 建立informer,并将傳回的存儲對象儲存在變量indexer中
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		// 響應新增資源事件的方法,可以按照業務需求來定制,
		// 這裡的做法比較常見:寫入工作隊列
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		// 響應修改資源事件的方法,可以按照業務需求來定制,
		// 這裡的做法比較常見:寫入工作隊列
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		// 響應修改資源事件的方法,可以按照業務需求來定制,
		// 這裡的做法比較常見:寫入工作隊列,注意删除的時候生成key的方法和新增修改不一樣
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	// 建立Controller對象,将所需的三個變量對象傳入
	controller := NewController(queue, indexer, informer)

	// Now let's start the controller
	stop := make(chan struct{})
	defer close(stop)
	// 在協程中啟動controller
	go controller.Run(1, stop)

	// Wait forever
	select {}
	return nil
}
           

編碼之六:main方法中支援(main.go)

  • 然後是整個工程的main方法,裡面增加一段代碼,支援新增的ControllerDemo,如下圖黃框所示
client-go實戰之九:手寫一個kubernetes的controller
  • 最後,如果您使用的是vscode,記得修改launch.json,如下圖黃色箭頭,這樣main方法運作的時候就會執行Controller的代碼了
client-go實戰之九:手寫一個kubernetes的controller

運作和驗證

  • 現在工程目錄執行以下指令,擷取必要的包
go get k8s.io/apimachinery/pkg/util/[email protected]
           
  • 確定kubernetes環境正常,.kube/config配置也能正常使用,然後運作main.go
  • 使用kubectl edit xxx修改kubernetes環境中的pod,例如我這裡改的是下圖黃色箭頭的值
client-go實戰之九:手寫一個kubernetes的controller
  • 修改完畢儲存退出後,運作mian.go的控制台立即有内容輸出,如下圖黃色箭頭,是咱們前面的syncToStdout方法的輸入,符合預期
client-go實戰之九:手寫一個kubernetes的controller
  • 至此,整個Controller已經開發完成了,相信您已經熟悉了informer和kubernetes的controller的基本套路,加上前面的文章打下的基礎,再去做kubernetes二次開發,或者operator開發等都能輕松駕馭了

本篇涉及知識點串講

  • 前幾篇的風格,都是抓住一個問題深入研究和實踐,但是到了本篇似乎多個知識點同時湧出,并且還要緊密配合完成業務目标,可能年輕的您一下子略有不适應,我這裡再次将本次開發中的重點進行總結,經曆過一番實戰,再來看這些總結,相信您很容易就融會貫通了
  • 先給出資料流視圖,結合前面的實戰,您應該能一眼看懂
client-go實戰之九:手寫一個kubernetes的controller
  • 接下來開始梳理重點
  1. 建立一個名為podListWatcher的ListWatch對象,用于對指定資源類型建立監聽(本例中監聽的資源是pod)
  2. 建立一個名為queue的工作隊列,就是個先進先出的記憶體對象,沒啥特别之處
  3. 通過podListWatcher建立一個informer,這個informer的功能對podListWatcher監聽的事件作相應
  4. 在建立informer的時候還會傳回一個名為indexer的本地緩存,這裡面儲存了所有pod資訊(由于pod的變動全部都會被informer收到,是以indexer中儲存了最新的pod資訊)
  5. 在新協程中啟動informer,這裡面對應兩件事情:第一,建立Reflector對象,這個Reflector對象會把podListWatcher監聽到的資料放入一個DeltaFIFO隊列(注意不是步驟2中的工作隊列),第二是循環地取出fifo隊列中的資料,再調用AddFunc、UpdateFunc、DeleteFunc等方法
  6. 步驟5中提到的AddFunc、UpdateFunc、DeleteFunc可以在建立informer的時候,由業務開發者自定義,一般會再次将key放入工作隊列中
  7. 在新協程消費工作隊列queue的資料,這裡可以根據業務需求寫入也任務邏輯代碼
  • 基于以上較長的描述,再來個精簡版,介紹重點對象,如果您對較長的描述不感興趣,可以隻看精簡版,掌握其中關鍵即可
  1. podListWatcher:用于監聽指定類型資源的變化
  2. queue:工作隊列,從裡面取出的key,其資源都有事件發生
  3. informer:接受監聽到的事件,再調用指定的回調方法
  4. Reflector:informer内部三大對象之一,用于接受事件再寫入一個内部fifo隊列
  5. DeltaFIFO:informer内部三大對象之二,先入先出隊列,還儲存了操作類型
  6. indexer:informer内部三大對象之三,這裡面儲存的是指定資源的完整資料,和apiserver側保持同步
  7. 接受消息的協程:informer在這個協程中啟動,也在這個協程中将資料寫入工作隊列
  8. 處理工作隊列的協程:負責從工作隊列中取出資料處理
  9. 工作隊列queue和informer内部的fifo是不同的隊列,是兩回事,為了滿足業務需求,我們可以在一個controller中建立多個工作隊列,也可以不要工作隊列(在informer的三個回調方法中完成業務邏輯)

以下是官方參考資訊

  • https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md

源碼下載下傳

  • 上述完整源碼可在GitHub下載下傳到,位址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
名稱 連結 備注
項目首頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的首頁
git倉庫位址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫位址,https協定
git倉庫位址(ssh) [email protected]:zq2599/blog_demos.git 該項目源碼的倉庫位址,ssh協定
  • 這個git項目中有多個檔案夾,本篇的源碼在tutorials/client-go-tutorials檔案夾下,如下圖紅框所示:
client-go實戰之九:手寫一個kubernetes的controller
  • 寫到這裡,client-go基本功的學習已經完成了,接下來咱們還要繼續深入研究,讓這個優秀的庫在手中發揮更大的威力,欣宸原創,敬請期待

歡迎關注頭條号:程式員欣宸

學習路上,你不孤單,欣宸原創一路相伴...