歡迎通路我的GitHub
這裡分類和彙總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos
本篇概覽
- 本文是《client-go實戰》系列的第九篇,前面咱們已經了解了client-go的基本功能,現在要來一次經典的綜合實戰了,接下來咱們會手寫一個kubernetes的controller,其功能是:監聽某種資源的變化,一旦資源發生變化(例如增加或者删除),apiserver就會有廣播發出,controller使用client-go可以訂閱這個廣播,然後在收到廣播後進行各種業務操作,
- 本次實戰代碼量略大,但如果随本文一步步先設計再開發,并不會覺得有太多,總的來說由以下内容構成
- 代碼整體架構一覽
- 對着架構細說流程
- 全局重點的小結
- 編碼實戰
代碼整體架構一覽
- 首先,再次明确本次實戰的目标:開發出類似kubernetes的controller那樣的功能,實時監聽pod資源的變化,針對每個變化做出響應
- 今天的實戰源自client-go的官方demo,其主要架構如下
- 可能您會覺得上圖有些複雜,沒關系,接下來咱們細說此圖,為後面的編碼打好理論基礎
對着架構細說流程
- 首先将上述架構圖中涉及的内容進行分類,共有三部分
- 最左側的Kubernetes API Server+etcd是第一部分,它們都是kubernetes的内部元件
- 第二部分是整個informer,informer是client-go庫的核心子產品
- 第三部分是WorkQueue和Conrol Loop,它們都是controller的業務邏輯代碼
- 上面三部分合作,就能做到監聽資源變化并做出響應
- 另外,informer内部很複雜也很精巧,後面會有專門的文章去細說,本篇隻會提到與controller有關系的informer細節,其餘的能不提就不提(不然内容太多,這篇文章寫不完了)
- 分類完畢後,再來聊流程
- controller會通過client-go的list&watch機制與API Server建立長連接配接(http2的stream),隻要pod資源發生變化,API Server就會通過長連接配接推送到controller
- API Server推的資料到達Reflector,它将資料寫入Delta FIFO Queue
- Delta FIFO Queue是個先入先出的隊列,除了pod資訊還儲存了操作類型(增加、修改、删除),informer内部不斷從這個隊列擷取資料,再執行AddFunc、UpdateFunc、DeleteFunc等方法
- 完整的pod資料被存放在Local Store中,外部通過Indexer随時可以擷取到
- controller中準備一個或多個工作隊列,在執行AddFunc、UpdateFunc、DeleteFunc等方法時,可以将定制化的資料放入工作隊列中
- controller中啟動一個或多個協程,持續從工作隊列中取資料,執行業務邏輯,執行過程中如果需要pod的詳細資料,可以通過indexder擷取
- 差不多了,我有種胸有成竹的感覺,迫不及待想寫代碼,但還是忍忍吧,先規劃再動手
編碼規劃
- 所謂規劃就是把步驟捋清楚,先寫啥再寫啥,如下圖所示
- 捋順了,開始寫代碼吧
編碼之一:定義Controller資料結構(controller.go)
- 為了便于管理《client-go實戰》系列的源碼,本篇實戰的源碼依然存放在《client-go實戰之七:準備一個工程管理後續實戰的代碼》中新增的golang工程中
- 先定義資料結構,新增controller.go檔案,裡面新增一個struct
type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
}
- 從上述代碼可見Controller結構體有三個成員,indexer是informer内負責存取完整資源資訊的對象,queue是用于業務邏輯的工作隊列
編碼之二:編寫業務邏輯代碼(controller.go)
- 業務邏輯代碼共有四部分
- 把資源變化資訊存入工作隊列,這裡可能按實際需求定制(例如有的資料不關注就丢棄了)
- 從工作隊列中取出資料
- 取出資料後的處理邏輯,這邊是純粹的業務需求了,各人的實作都不一樣
- 異常處理
- 步驟1,存入工作隊列的操作,留待初始化informer的時候再做,
- 步驟4,異常處理稍後也有單獨段落細說
- 這裡隻聚焦步驟2和3:怎麼取,取出後怎麼用
- 先寫步驟2的代碼:從工作隊列中取取資料,用名為processNextItem的方法來實作(對每一行代碼進行中文注釋着實不易,支援的話請點個贊)
func (c *Controller) processNextItem() bool {
// 阻塞等待,直到隊列中有資料可以被取出,
// 另外有可能是多協程并發擷取資料,此key會被放入processing中,表示正在被處理
key, quit := c.queue.Get()
// 如果最外層調用了隊列的Shutdown,這裡的quit就會傳回true,
// 調用processNextItem的地方發現processNextItem傳回false,就不會再次調用processNextItem了
if quit {
return false
}
// 表示該key已經被處理完成(從processing中移除)
defer c.queue.Done(key)
// 調用業務方法,實作具體的業務需求
err := c.syncToStdout(key.(string))
// Handle the error if something went wrong during the execution of the business logic
// 判斷業務邏輯處理是否出現異常,如果出現就重新放入隊列,以此實作重試,如果已經重試過5次,就放棄
c.handleErr(err, key)
// 調用processNextItem的地方發現processNextItem傳回true,就會再次調用processNextItem
return true
}
- 接下來寫業務處理的代碼,就是上面調用的syncToStdout方法,正常套路是檢查spec和status的差距,然後讓status和spec保持一緻,(例如spec中指定副本數為2,而status中記錄了真實的副本數是1,是以業務處理就是增加一個副本數),這裡僅僅是為了展示業務處理代碼在哪些,是以就簡(fu)化(yan)一些了,隻列印pod的名稱
func (c *Controller) syncToStdout(key string) error {
// 根據key從本地存儲中擷取完整的pod資訊
// 由于有長連接配接與apiserver保持同步,是以本地的pod資訊與kubernetes叢集内保持一緻
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
// 這裡就是真正的業務邏輯代碼了,一般會比較spce和status的差異,然後做出處理使得status與spce保持一緻,
// 此處為了代碼簡單僅僅列印一行日志
fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
}
return nil
}
編碼之三:編寫錯誤處理代碼(controller.go)
- 回顧前面的processNextItem方法内容,在調用syncToStdout執行完業務邏輯後就立即調用handleErr方法了,此方法的作用是檢查syncToStdout的傳回值是否有錯誤,然後做針對性處理
func (c *Controller) handleErr(err error, key interface{}) {
// 沒有錯誤時的處理邏輯
if err == nil {
// 确認這個key已經被成功處理,在隊列中徹底清理掉
// 假設之前在處理該key的時候曾報錯導緻重新進入隊列等待重試,那麼也會因為這個Forget方法而不再被重試
c.queue.Forget(key)
return
}
// 代碼走到這裡表示前面執行業務邏輯的時候發生了錯誤,
// 檢查已經重試的次數,如果不操作5次就繼續重試,這裡可以根據實際需求定制
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
c.queue.AddRateLimited(key)
return
}
// 如果重試超過了5次就徹底放棄了,也像執行成功那樣調用Forget做徹底清理(否則就沒完沒了了)
c.queue.Forget(key)
// 向外部報告錯誤,走通用的錯誤處理流程
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
- 好了,和業務有關的代碼已經完成,接下來就是搭建controller架構,把基本功能串起來
編碼之四:編寫Controller主流程(controller.go)
- 編寫一個完整的Controller,最基本的是構造方法,Controller的構造方法也很簡單,儲存三個重要的成員變量即可
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
informer: informer,
indexer: indexer,
queue: queue,
}
}
- 先定義個名為runWorker的簡單方法,裡面是個無限循環,隻要消費消息的processNextItem方法傳回true,就無限循環下去
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
- 然後是Controller主流程代碼,簡介清晰,啟動informer,開始接受apiserver推送,寫入工作隊列,然後開啟無限循環從工作隊列取資料并處理
func (c *Controller) Run(workers int, stopCh chan struct{}) {
defer runtime.HandleCrash()
// 隻要工作隊列的ShutDown方法被調用,processNextItem方法就會傳回false,runWorker的無限循環就會結束
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
// informer的Run方法執行後,就開始接受apiserver推送的資源變更事件,并更新本地存儲
go c.informer.Run(stopCh)
// 等待本地存儲和apiserver完成同步
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 啟動worker,并發從工作隊列取資料,然後執行業務邏輯
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
klog.Info("Stopping Pod controller")
}
- 現在一個完整的Controller已經完成了,接下來編寫調用Controller的代碼,将其所需的三個對象傳入,再調用它的Run方法
編碼之五:編寫調用Controller的代碼(controller_demo.go)
- 為了能讓整個工程的main方法調用Controller,這裡新增controller_demo.go方法,裡面新增名為ControllerDemo的資料結構,建立Controller對象以及為其準備成員變量的操作都在ControllerDemo.DoAction方法中
package action
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type ControllerDemo struct{}
func (controllerDemo ControllerDemo) DoAction(clientset *kubernetes.Clientset) error {
// 建立ListWatch對象,指定要監控的資源類型是pod,namespace是default
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
// 建立工作隊列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 建立informer,并将傳回的存儲對象儲存在變量indexer中
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
// 響應新增資源事件的方法,可以按照業務需求來定制,
// 這裡的做法比較常見:寫入工作隊列
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
// 響應修改資源事件的方法,可以按照業務需求來定制,
// 這裡的做法比較常見:寫入工作隊列
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
// 響應修改資源事件的方法,可以按照業務需求來定制,
// 這裡的做法比較常見:寫入工作隊列,注意删除的時候生成key的方法和新增修改不一樣
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
// 建立Controller對象,将所需的三個變量對象傳入
controller := NewController(queue, indexer, informer)
// Now let's start the controller
stop := make(chan struct{})
defer close(stop)
// 在協程中啟動controller
go controller.Run(1, stop)
// Wait forever
select {}
return nil
}
編碼之六:main方法中支援(main.go)
- 然後是整個工程的main方法,裡面增加一段代碼,支援新增的ControllerDemo,如下圖黃框所示
- 最後,如果您使用的是vscode,記得修改launch.json,如下圖黃色箭頭,這樣main方法運作的時候就會執行Controller的代碼了
運作和驗證
- 現在工程目錄執行以下指令,擷取必要的包
go get k8s.io/apimachinery/pkg/util/[email protected]
- 確定kubernetes環境正常,.kube/config配置也能正常使用,然後運作main.go
- 使用kubectl edit xxx修改kubernetes環境中的pod,例如我這裡改的是下圖黃色箭頭的值
- 修改完畢儲存退出後,運作mian.go的控制台立即有内容輸出,如下圖黃色箭頭,是咱們前面的syncToStdout方法的輸入,符合預期
- 至此,整個Controller已經開發完成了,相信您已經熟悉了informer和kubernetes的controller的基本套路,加上前面的文章打下的基礎,再去做kubernetes二次開發,或者operator開發等都能輕松駕馭了
本篇涉及知識點串講
- 前幾篇的風格,都是抓住一個問題深入研究和實踐,但是到了本篇似乎多個知識點同時湧出,并且還要緊密配合完成業務目标,可能年輕的您一下子略有不适應,我這裡再次将本次開發中的重點進行總結,經曆過一番實戰,再來看這些總結,相信您很容易就融會貫通了
- 先給出資料流視圖,結合前面的實戰,您應該能一眼看懂
- 接下來開始梳理重點
- 建立一個名為podListWatcher的ListWatch對象,用于對指定資源類型建立監聽(本例中監聽的資源是pod)
- 建立一個名為queue的工作隊列,就是個先進先出的記憶體對象,沒啥特别之處
- 通過podListWatcher建立一個informer,這個informer的功能對podListWatcher監聽的事件作相應
- 在建立informer的時候還會傳回一個名為indexer的本地緩存,這裡面儲存了所有pod資訊(由于pod的變動全部都會被informer收到,是以indexer中儲存了最新的pod資訊)
- 在新協程中啟動informer,這裡面對應兩件事情:第一,建立Reflector對象,這個Reflector對象會把podListWatcher監聽到的資料放入一個DeltaFIFO隊列(注意不是步驟2中的工作隊列),第二是循環地取出fifo隊列中的資料,再調用AddFunc、UpdateFunc、DeleteFunc等方法
- 步驟5中提到的AddFunc、UpdateFunc、DeleteFunc可以在建立informer的時候,由業務開發者自定義,一般會再次将key放入工作隊列中
- 在新協程消費工作隊列queue的資料,這裡可以根據業務需求寫入也任務邏輯代碼
- 基于以上較長的描述,再來個精簡版,介紹重點對象,如果您對較長的描述不感興趣,可以隻看精簡版,掌握其中關鍵即可
- podListWatcher:用于監聽指定類型資源的變化
- queue:工作隊列,從裡面取出的key,其資源都有事件發生
- informer:接受監聽到的事件,再調用指定的回調方法
- Reflector:informer内部三大對象之一,用于接受事件再寫入一個内部fifo隊列
- DeltaFIFO:informer内部三大對象之二,先入先出隊列,還儲存了操作類型
- indexer:informer内部三大對象之三,這裡面儲存的是指定資源的完整資料,和apiserver側保持同步
- 接受消息的協程:informer在這個協程中啟動,也在這個協程中将資料寫入工作隊列
- 處理工作隊列的協程:負責從工作隊列中取出資料處理
- 工作隊列queue和informer内部的fifo是不同的隊列,是兩回事,為了滿足業務需求,我們可以在一個controller中建立多個工作隊列,也可以不要工作隊列(在informer的三個回調方法中完成業務邏輯)
以下是官方參考資訊
- https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md
源碼下載下傳
- 上述完整源碼可在GitHub下載下傳到,位址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 連結 | 備注 |
項目首頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的首頁 |
git倉庫位址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫位址,https協定 |
git倉庫位址(ssh) | [email protected]:zq2599/blog_demos.git | 該項目源碼的倉庫位址,ssh協定 |
- 這個git項目中有多個檔案夾,本篇的源碼在tutorials/client-go-tutorials檔案夾下,如下圖紅框所示:
- 寫到這裡,client-go基本功的學習已經完成了,接下來咱們還要繼續深入研究,讓這個優秀的庫在手中發揮更大的威力,欣宸原創,敬請期待
歡迎關注頭條号:程式員欣宸
學習路上,你不孤單,欣宸原創一路相伴...