天天看點

【OpenYurt 深度解析】邊緣網關緩存能力的優雅實作OpenYurt:延伸原生 K8s 的能力到邊緣OpenYurt 如何解決邊緣自治問題YurtHub 實作總結

【OpenYurt 深度解析】邊緣網關緩存能力的優雅實作OpenYurt:延伸原生 K8s 的能力到邊緣OpenYurt 如何解決邊緣自治問題YurtHub 實作總結

作者 | 何淋波(新勝)

來源 |

阿裡巴巴雲原生公衆号

OpenYurt:延伸原生 K8s 的能力到邊緣

阿裡雲邊緣容器服務上線 1 年後,正式開源了雲原生邊緣計算解決方案 OpenYurt,跟其他開源的容器化邊緣計算方案不同的地方在于:OpenYurt 秉持 Extending your native Kubernetes to edge 的理念,對 Kubernetes 系統零修改,并提供一鍵式轉換原生 Kubernetes 為 OpenYurt,讓原生 K8s 叢集具備邊緣叢集能力。

同時随着 OpenYurt 的持續演進,也一定會繼續保持如下發展理念:

  • 非侵入式增強 K8s
  • 保持和雲原生社群主流技術同步演進

OpenYurt 如何解決邊緣自治問題

想要實作将 Kubernetes 系統延展到邊緣計算場景,那麼邊緣節點将通過公網和雲端連接配接,網絡連接配接有很大不可控因素,可能帶來邊緣業務運作的不穩定因素,這是雲原生和邊緣計算融合的主要難點之一。

解決這個問題,需要使邊緣側具有自治能力,即當雲邊網絡斷開或者連接配接不穩定時,確定邊緣業務可以持續運作。在 OpenYurt 中,該能力由 yurt-controller-manager 和 YurtHub 元件提供。

1. YurtHub 架構

在之前的文章中,我們詳細介紹了

YurtHub 元件的能力

。其架構圖如下:

【OpenYurt 深度解析】邊緣網關緩存能力的優雅實作OpenYurt:延伸原生 K8s 的能力到邊緣OpenYurt 如何解決邊緣自治問題YurtHub 實作總結
圖檔連結

YurtHub 是一個帶有資料緩存功能的“透明網關”,和雲端網絡斷連狀态下,如果節點或者元件重新開機,各個元件(kubelet/kube-proxy 等)将從 YurtHub 中擷取到業務容器相關資料,有效解決邊緣自治的問題。這也意味着我們需要實作一個輕量的帶資料緩存能力的反向代理。

2. 第一想法

實作一個緩存資料的反向代理,第一想法就是從 response.Body 中讀取資料,然後分别傳回給請求 client 和本地的 Cache 子產品。僞代碼如下:

func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
        bodyBytes, _ := ioutil.ReadAll(resp.Body)
        go func() {
                // cache response on local disk
                cacher.Write(bodyBytes)
        }

        // client reads data from response
        rw.Write(bodyBytes)
}           

當深入思考後,在 Kubernetes 系統中,上述實作會引發下面的問題:

  • 問題 1:流式資料需要如何處理(如: K8s 中的 watch 請求),意味 ioutil.ReadAll() 一次調用無法傳回所有資料。即如何可以傳回流資料同時又緩存流資料。
  • 問題 2:同時在本地緩存資料前,有可能需要對傳入的 byte slice 資料先進行清洗處理。這意味着需要修改 byte slice,或者先備份 byte slice 再處理。這樣會造成記憶體的大量消耗,同時針對流式資料,到底申請多大的 slice 也不好處理。

3. 優雅實作探讨

針對上面的問題,我們将問題逐個抽象,可以發現更優雅的實作方法。

  • 問題 1:如何對流資料同時進行讀寫

針對流式資料的讀寫(一邊傳回一邊緩存),如下圖所示,其實需要的不過是把 response.Body(io.Reader) 轉換成一個 io.Reader 和一個 io.Writer。或者說是一個 io.Reader 和 io.Writer 合成一個 io.Reader。這很容易就聯想到 Linux 裡面的 Tee 指令。

【OpenYurt 深度解析】邊緣網關緩存能力的優雅實作OpenYurt:延伸原生 K8s 的能力到邊緣OpenYurt 如何解決邊緣自治問題YurtHub 實作總結

而在 Golang 中 Tee 指令是實作就是io.TeeReader,那問題 1 的僞代碼如下:

func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
        // create TeeReader with response.Body and cacher
        newRespBody := io.TeeReader(resp.Body, cacher)

        // client reads data from response
        io.Copy(rw, newRespBody)
}           

通過 TeeReader 的對 Response.Body 和 Cacher 的整合,當請求 client 端從 response.Body 中讀取資料時,将同時向 Cache 中寫入傳回資料,優雅的解決了流式資料的處理。

  • 問題 2:如何在緩存前先清洗流資料

如下圖所示,緩存前先清洗流資料,請求端和過濾端需要同時讀取 response.Body(2 次讀取問題)。也就是需要将 response.Body(io.Reader) 轉換成兩個 io.Reader。

【OpenYurt 深度解析】邊緣網關緩存能力的優雅實作OpenYurt:延伸原生 K8s 的能力到邊緣OpenYurt 如何解決邊緣自治問題YurtHub 實作總結

也意味着問題 2 轉化成:問題 1 中緩存端的 io.Writer 轉換成 Data Filter 的 io.Reader。其實在 Linux 指令中也能找到類似指令,就是管道。是以問題 2 的僞代碼如下:

func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
        pr, pw := io.Pipe()
        // create TeeReader with response.Body and Pipe writer
        newRespBody := io.TeeReader(resp.Body, pw)
        go func() {
                // filter reads data from response 
                io.Copy(dataFilter, pr)
        }

        // client reads data from response
        io.Copy(rw, newRespBody)
}           

通過 io.TeeReader 和 io.PiPe,當請求 client 端從 response.Body 中讀取資料時,Filter 将同時從 Response 讀取到資料,優雅的解決了流式資料的 2 次讀取問題。

YurtHub 實作

最後看一下 YurtHub 中相關實作,由于 Response.Body 為 io.ReadCloser,是以實作了 dualReadCloser。同時 YurtHub 可能也面臨對 http.Request 的緩存,是以增加了 isRespBody 參數用于判定是否需要負責關閉 response.Body。

// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/util/util.go#L156
// NewDualReadCloser create an dualReadCloser object
func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
    pr, pw := io.Pipe()
    dr := &dualReadCloser{
        rc:         rc,
        pw:         pw,
        isRespBody: isRespBody,
    }

    return dr, pr
}

type dualReadCloser struct {
    rc io.ReadCloser
    pw *io.PipeWriter
    // isRespBody shows rc(is.ReadCloser) is a response.Body
    // or not(maybe a request.Body). if it is true(it's a response.Body),
    // we should close the response body in Close func, else not,
    // it(request body) will be closed by http request caller
    isRespBody bool
}

// Read read data into p and write into pipe
func (dr *dualReadCloser) Read(p []byte) (n int, err error) {
    n, err = dr.rc.Read(p)
    if n > 0 {
        if n, err := dr.pw.Write(p[:n]); err != nil {
            klog.Errorf("dualReader: failed to write %v", err)
            return n, err
        }
    }

    return
}

// Close close two readers
func (dr *dualReadCloser) Close() error {
    errs := make([]error, 0)
    if dr.isRespBody {
        if err := dr.rc.Close(); err != nil {
            errs = append(errs, err)
        }
    }

    if err := dr.pw.Close(); err != nil {
        errs = append(errs, err)
    }

    if len(errs) != 0 {
        return fmt.Errorf("failed to close dualReader, %v", errs)
    }

    return nil
}           

在使用 dualReadCloser 時,可以在httputil.NewSingleHostReverseProxy的modifyResponse()方法中看到。代碼如下:

// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/proxy/remote/remote.go#L85
func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {rambohe-ch, 10 months ago: • hello openyurt
            // 省略部分前置檢查                                                          
            rc, prc := util.NewDualReadCloser(resp.Body, true)
            go func(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) {
                err := rp.cacheMgr.CacheResponse(ctx, prc, stopCh)
                if err != nil && err != io.EOF && err != context.Canceled {
                    klog.Errorf("%s response cache ended with error, %v", util.ReqString(req), err)
                }
            }(ctx, prc, rp.stopCh)

            resp.Body = rc
}           

總結

OpenYurt 于 2020 年 9 月進入 CNCF 沙箱後,持續保持了快速發展和疊代,在社群同學一起努力下,目前已經開源的能力有:

  • 邊緣自治
  • 邊緣單元化管理
  • 雲邊協同運維
  • 一鍵式無縫轉換能力

同時在和社群同學的充分讨論下,OpenYurt 社群也釋出了2021 roadmap,歡迎有興趣的同學來一起貢獻。

如果大家對 OpenYurt 感興趣,歡迎掃碼加入我們的社群交流群,以及通路 OpenYurt 官網和 GitHub 項目位址:

繼續閱讀