天天看點

kube-proxy 源碼分析

上篇文章 kubernetes service 原了解析 已經分析了 service 原理以 kube-proxy 中三種模式的原理,本篇文章會從源碼角度分析 kube-proxy 的設計與實作。

kubernetes 版本: v1.16

kube-proxy 啟動流程

前面的文章已經說過 kubernetes 中所有元件都是通過其

run()

方法啟動主邏輯的,

run()

方法調用之前會進行解析指令行參數、添加預設值等。下面就直接看 kube-proxy 的

run()

方法:

  • 若啟動時指定了

    --write-config-to

    參數,kube-proxy 隻将啟動的預設參數寫到指定的配置檔案中,然後退出
  • 初始化 ProxyServer 對象
  • 如果啟動參數

    --cleanup

    設定為 true,則清理 iptables 和 ipvs 規則并退出

k8s.io/kubernetes/cmd/kube-proxy/app/server.go:290

func (o *Options) Run() error {
    defer close(o.errCh)
    // 1.如果指定了 --write-config-to 參數,則将預設的配置檔案寫到指定檔案并退出
    if len(o.WriteConfigTo) > 0 {
        return o.writeConfigFile()
    }

    // 2.初始化 ProxyServer 對象
    proxyServer, err := NewProxyServer(o)
    if err != nil {
        return err
    }

    // 3.如果啟動參數 --cleanup 設定為 true,則清理 iptables 和 ipvs 規則并退出
    if o.CleanupAndExit {
        return proxyServer.CleanupAndExit()
    }

    o.proxyServer = proxyServer
    return o.runLoop()
}           

Run()

方法中主要調用了

NewProxyServer()

方法來初始化 ProxyServer,然後會調用

runLoop()

啟動主循環,繼續看初始化 ProxyServer 的具體實作:

  • 初始化 iptables、ipvs 相關的 interface
  • 若啟用了 ipvs 則檢查核心版本、ipvs 依賴的核心子產品、ipset 版本,核心子產品主要包括:

    ip_vs

    ip_vs_rr

    ,

    ip_vs_wrr

    ip_vs_sh

    nf_conntrack_ipv4

    nf_conntrack

    ,若沒有相關子產品,kube-proxy 會嘗試使用

    modprobe

    自動加載
  • 根據 proxyMode 初始化 proxier,kube-proxy 啟動後隻運作一種 proxier

k8s.io/kubernetes/cmd/kube-proxy/app/server_others.go:57

func NewProxyServer(o *Options) (*ProxyServer, error) {
    return newProxyServer(o.config, o.CleanupAndExit, o.master)
}

func newProxyServer(
    config *proxyconfigapi.KubeProxyConfiguration,
    cleanupAndExit bool,
    master string) (*ProxyServer, error) {
    ......

    if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
        c.Set(config)
    } else {
        return nil, fmt.Errorf("unable to register configz: %s", err)
    }

    ......

    // 1.關鍵依賴工具 iptables/ipvs/ipset/dbus
    var iptInterface utiliptables.Interface
    var ipvsInterface utilipvs.Interface
    var kernelHandler ipvs.KernelHandler
    var ipsetInterface utilipset.Interface
    var dbus utildbus.Interface

    // 2.執行 linux 指令行的工具
    execer := exec.New()

    // 3.初始化 iptables/ipvs/ipset/dbus 對象
    dbus = utildbus.New()
    iptInterface = utiliptables.New(execer, dbus, protocol)
    kernelHandler = ipvs.NewLinuxKernelHandler()
    ipsetInterface = utilipset.New(execer)

    // 4.檢查該機器是否支援使用 ipvs 模式
    canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
    if canUseIPVS {
        ipvsInterface = utilipvs.New(execer)
    }

    if cleanupAndExit {
        return &ProxyServer{
            ......
        }, nil
    }

    // 5.初始化 kube client 和 event client
    client, eventClient, err := createClients(config.ClientConnection, master)
    if err != nil {
        return nil, err
    }
    ......

    // 6.初始化 healthzServer
    var healthzServer *healthcheck.HealthzServer
    var healthzUpdater healthcheck.HealthzUpdater
    if len(config.HealthzBindAddress) > 0 {
        healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
        healthzUpdater = healthzServer
    }

    // 7.proxier 是一個 interface,每種模式都是一個 proxier
    var proxier proxy.Provider

    // 8.根據 proxyMode 初始化 proxier
    proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
    ......

    if proxyMode == proxyModeIPTables {
        klog.V(0).Info("Using iptables Proxier.")
        if config.IPTables.MasqueradeBit == nil {
            return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
        }

        // 9.初始化 iptables 模式的 proxier
        proxier, err = iptables.NewProxier(
            .......
        )
        if err != nil {
            return nil, fmt.Errorf("unable to create proxier: %v", err)
        }
        metrics.RegisterMetrics()
    } else if proxyMode == proxyModeIPVS {
        // 10.判斷是夠啟用了 ipv6 雙棧
        if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
            ......
            // 11.初始化 ipvs 模式的 proxier
            proxier, err = ipvs.NewDualStackProxier(
                ......
            )
        } else {
            proxier, err = ipvs.NewProxier(
                ......
            )
        }
        if err != nil {
            return nil, fmt.Errorf("unable to create proxier: %v", err)
        }
        metrics.RegisterMetrics()
    } else {
        // 12.初始化 userspace 模式的 proxier
        proxier, err = userspace.NewProxier(
            ......
        )
        if err != nil {
            return nil, fmt.Errorf("unable to create proxier: %v", err)
        }
    }

    iptInterface.AddReloadFunc(proxier.Sync)
    return &ProxyServer{
        ......
    }, nil
}           

runLoop()

方法主要是啟動 proxyServer。

k8s.io/kubernetes/cmd/kube-proxy/app/server.go:311

func (o *Options) runLoop() error {
    // 1.watch 配置檔案變化
    if o.watcher != nil {
        o.watcher.Run()
    }

    // 2.以 goroutine 方式啟動 proxyServer
    go func() {
        err := o.proxyServer.Run()
        o.errCh <- err
    }()

    for {
        err := <-o.errCh
        if err != nil {
            return err
        }
    }
}           

o.proxyServer.Run()

中會啟動已經初始化好的所有服務:

  • 設定程序 OOMScore,可通過指令行配置,預設值為

    --oom-score-adj="-999"

  • 啟動 metric server 和 healthz server,兩者分别監聽 10256 和 10249 端口
  • 設定核心參數

    nf_conntrack_tcp_timeout_established

    nf_conntrack_tcp_timeout_close_wait

  • 将 proxier 注冊到 serviceEventHandler、endpointsEventHandler 中
  • 啟動 informer 監聽 service 和 endpoints 變化
  • 執行

    s.Proxier.SyncLoop()

    ,啟動 proxier 主循環

k8s.io/kubernetes/cmd/kube-proxy/app/server.go:527

func (s *ProxyServer) Run() error {
    ......

    // 1.程序 OOMScore,避免程序因 oom 被殺掉,此處預設值為 -999
    var oomAdjuster *oom.OOMAdjuster
    if s.OOMScoreAdj != nil {
        oomAdjuster = oom.NewOOMAdjuster()
        if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
            klog.V(2).Info(err)
        }
    }
    ......

    // 2.啟動 healthz server
    if s.HealthzServer != nil {
        s.HealthzServer.Run()
    }

    // 3.啟動 metrics server
    if len(s.MetricsBindAddress) > 0 {
        ......
        go wait.Until(func() {
            err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
            }
        }, 5*time.Second, wait.NeverStop)
    }

    // 4.配置 conntrack,設定核心參數 nf_conntrack_tcp_timeout_established 和 nf_conntrack_tcp_timeout_close_wait
    if s.Conntracker != nil {
        max, err := getConntrackMax(s.ConntrackConfiguration)
        if err != nil {
            return err
        }
        if max > 0 {
            err := s.Conntracker.SetMax(max)
            ......
        }

        if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
            timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
            if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
                return err
            }
        }
        if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
            timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
            if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
                return err
            }
        }
    }

    ......

    // 5.啟動 informer 監聽 Services 和 Endpoints 或者 EndpointSlices 資訊
    informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
        informers.WithTweakListOptions(func(options *metav1.ListOptions) {
            options.LabelSelector = labelSelector.String()
        }))


    // 6.将 proxier 注冊到 serviceConfig、endpointsConfig 中
    serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
    serviceConfig.RegisterEventHandler(s.Proxier)
    go serviceConfig.Run(wait.NeverStop)

    if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
        endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
        endpointSliceConfig.RegisterEventHandler(s.Proxier)
        go endpointSliceConfig.Run(wait.NeverStop)
    } else {
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.Proxier)
        go endpointsConfig.Run(wait.NeverStop)
    }

    // 7.啟動 informer
    informerFactory.Start(wait.NeverStop)

    s.birthCry()

    // 8.啟動 proxier 主循環
    s.Proxier.SyncLoop()
    return nil
}           

回顧一下整個啟動邏輯:

o.Run() --> o.runLoop() --> o.proxyServer.Run() --> s.Proxier.SyncLoop()           

o.Run()

中調用了

NewProxyServer()

來初始化 proxyServer 對象,其中包括初始化每種模式對應的 proxier,該方法最終會調用

s.Proxier.SyncLoop()

執行 proxier 的主循環。

proxier 的初始化

看完了啟動流程的邏輯代碼,接着再看一下各代理模式的初始化,上文已經提到每種模式都是一個 proxier,即要實作

proxy.Provider

對應的 interface,如下所示:

type Provider interface {
    config.EndpointsHandler
    config.EndpointSliceHandler
    config.ServiceHandler

    Sync()
    SyncLoop()
}           

首先要實作 service、endpoints 和 endpointSlice 對應的 handler,也就是對

OnAdd

OnUpdate

OnDelete

OnSynced

四種方法的處理,詳細的代碼在下文進行講解。EndpointSlice 是在 v1.16 中新加入的一個 API。

Sync()

SyncLoop()

是主要用來處理iptables 規則的方法。

iptables proxier 初始化

首先看 iptables 模式的

NewProxier()

方法,其函數的具體執行邏輯為:

  • 設定相關的核心參數

    route_localnet

    bridge-nf-call-iptables

  • 生成 masquerade 标記
  • 設定預設排程算法 rr
  • 初始化 proxier 對象
  • 使用

    BoundedFrequencyRunner

    初始化 proxier.syncRunner,将 proxier.syncProxyRules 方法注入,

    BoundedFrequencyRunner

    是一個管理器用于執行使用者注入的函數,可以指定運作的時間政策。

k8s.io/kubernetes/pkg/proxy/iptables/proxier.go:249

func NewProxier(ipt utiliptables.Interface,
    ......
) (*Proxier, error) {
    // 1.設定相關的核心參數
    if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
        ......
    }

    if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
        ......
    }

    // 2.設定 masqueradeMark,預設為 0x00004000/0x00004000
    // 用來标記 k8s 管理的封包,masqueradeBit 預設為 14
    // 标記 0x4000 的封包(即 POD 發出的封包),在離開 Node 的時候需要進行 SNAT 轉換
    masqueradeValue := 1 << uint(masqueradeBit)
    masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)

    ......

    endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)

    healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil)

    // 3.初始化 proxier
    isIPv6 := ipt.IsIpv6()
    proxier := &Proxier{
        ......
    }
    burstSyncs := 2

    // 4.初始化 syncRunner,BoundedFrequencyRunner 是一個定時執行器,會定時執行
    // proxier.syncProxyRules 方法,syncProxyRules 是每個 proxier 實際重新整理iptables 規則的方法
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
    return proxier, nil
}           

ipvs proxier 初始化

ipvs

NewProxier()

方法主要邏輯為:

  • 設定核心參數,

    route_localnet

    br_netfilter

    bridge-nf-call-iptables

    conntrack

    conn_reuse_mode

    ip_forward

    arp_ignore

    arp_announce

  • 和 iptables 一樣,對于 SNAT iptables 規則生成 masquerade 标記
  • 初始化 ipset 規則
  • 初始化 syncRunner 将 proxier.syncProxyRules 方法注入
  • 啟動

    gracefuldeleteManager

    定時清理 RS (realServer) 記錄

k8s.io/kubernetes/pkg/proxy/ipvs/proxier.go:316

func NewProxier(ipt utiliptables.Interface,
    ......
) (*Proxier, error) {

    // 1.設定核心參數
    if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
        ......
    }
    ......

    // 2.生成 masquerade 标記
    masqueradeValue := 1 << uint(masqueradeBit)
    masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)

    // 3.設定預設排程算法 rr
    if len(scheduler) == 0 {
        scheduler = DefaultScheduler
    }

    healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps

    endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)

    // 4.初始化 proxier
    proxier := &Proxier{
        ......
    }
    // 5.初始化 ipset 規則
    proxier.ipsetList = make(map[string]*IPSet)
    for _, is := range ipsetInfo {
        proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
    }
    burstSyncs := 2

    // 6.初始化 syncRunner
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

    // 7.啟動 gracefuldeleteManager
    proxier.gracefuldeleteManager.Run()
    return proxier, nil
}           

userspace proxier 初始化

userspace

NewProxier()

  • 初始化 iptables 規則
  • 初始化 proxier

k8s.io/kubernetes/pkg/proxy/userspace/proxier.go:187

func NewProxier(......) (*Proxier, error) {
    return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, nodePortAddresses, newProxySocket)
}

func NewCustomProxier(......) (*Proxier, error) {
    ......

    // 1.設定打開檔案數
    err = setRLimit(64 * 1000)
    if err != nil {
        return nil, fmt.Errorf("failed to set open file handler limit: %v", err)
    }

    proxyPorts := newPortAllocator(pr)

    return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
}

func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
    if proxyPorts == nil {
        proxyPorts = newPortAllocator(utilnet.PortRange{})
    }

    // 2.初始化 iptables 規則
    if err := iptablesInit(iptables); err != nil {
        return nil, fmt.Errorf("failed to initialize iptables: %v", err)
    }

    if err := iptablesFlush(iptables); err != nil {
        return nil, fmt.Errorf("failed to flush iptables: %v", err)
    }

    // 3.初始化 proxier
    proxier := &Proxier{
        ......
    }

    // 4.初始化 syncRunner
    proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs)
    return proxier, nil
}           

proxier 接口實作

handler 的實作

上文已經提到過每種 proxier 都需要實作 interface 中的幾個方法,首先看一下

ServiceHandler

EndpointsHandler

EndpointSliceHandler

相關的,對于 service、endpoints 和 endpointSlices 三種對象都實作了

OnAdd

OnUpdate

OnDelete

OnSynced

方法。

// 1.service 相關的方法
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
    proxier.OnServiceUpdate(nil, service)
}

func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
    if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
        proxier.syncRunner.Run()
    }
}

func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
    proxier.OnServiceUpdate(service, nil)
}

func (proxier *Proxier) OnServiceSynced(){
    ......
    proxier.syncProxyRules()
}

// 2.endpoints 相關的方法
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
    proxier.OnEndpointsUpdate(nil, endpoints)
}

func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
    if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
        proxier.Sync()
    }
}

func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
    proxier.OnEndpointsUpdate(endpoints, nil)
}

func (proxier *Proxier) OnEndpointsSynced() {
    ......
    proxier.syncProxyRules()
}

// 3.endpointSlice 相關的方法
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
    if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
        proxier.Sync()
    }
}

func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
    if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
        proxier.Sync()
    }
}

func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
    if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
        proxier.Sync()
    }
}

func (proxier *Proxier) OnEndpointSlicesSynced() {
    ......
    proxier.syncProxyRules()
}           

在啟動邏輯的

Run()

方法中 proxier 已經被注冊到了 serviceConfig、endpointsConfig、endpointSliceConfig 中,當啟動 informer,cache 同步完成後會調用

OnSynced()

方法,之後當 watch 到變化後會調用 proxier 中對應的

OnUpdate()

方法進行處理,

OnSynced()

會直接調用

proxier.syncProxyRules()

來重新整理iptables 規則,而

OnUpdate()

會調用

proxier.syncRunner.Run()

方法,其最終也是調用

proxier.syncProxyRules()

方法重新整理規則的,這種轉換是在

BoundedFrequencyRunner

中展現出來的,下面看一下具體實作。

Sync() 以及 SyncLoop() 的實作

每種 proxier 的

Sync()

以及

SyncLoop()

方法如下所示,都是調用 syncRunner 中的相關方法,而 syncRunner 在前面的

NewProxier()

中已經說過了,syncRunner 是調用

async.NewBoundedFrequencyRunner()

方法初始化,至此,基本上可以确定了所有的核心都是在

BoundedFrequencyRunner

中實作的。

func NewProxier() (*Proxier, error) {
    ......
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
    ......
}

// Sync()
func (proxier *Proxier) Sync() {
    proxier.syncRunner.Run()
}

// SyncLoop()
func (proxier *Proxier) SyncLoop() {
    if proxier.healthzServer != nil {
        proxier.healthzServer.UpdateTimestamp()
    }
    proxier.syncRunner.Loop(wait.NeverStop)
}           

NewBoundedFrequencyRunner()

是其初始化的函數,其中的參數

minInterval

maxInterval

分别對應 proxier 中的

minSyncPeriod

syncPeriod

,兩者的預設值分别為 0s 和 30s,其值可以使用

--iptables-min-sync-period

--iptables-sync-period

啟動參數來指定。

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:134

func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
    timer := realTimer{Timer: time.NewTimer(0)}
    // 執行定時器
    <-timer.C()
    // 調用 construct() 函數
    return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}

func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
    if maxInterval < minInterval {
        panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
    }
    if timer == nil {
        panic(fmt.Sprintf("%s: timer must be non-nil", name))
    }

    bfr := &BoundedFrequencyRunner{
        name:        name,
        fn:          fn,               // 被調用的函數,proxier.syncProxyRules
        minInterval: minInterval,
        maxInterval: maxInterval,
        run:         make(chan struct{}, 1),
        timer:       timer,
    }
    // 由于預設的 minInterval = 0,此處使用 nullLimiter
    if minInterval == 0 {
        bfr.limiter = nullLimiter{}
    } else {
        // 采用“令牌桶”算法實作流控機制
        qps := float32(time.Second) / float32(minInterval)
        bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
    }
    return bfr
}           

在啟動流程

Run()

方法最後調用的

s.Proxier.SyncLoop()

最終調用的是

BoundedFrequencyRunner

Loop()

方法,如下所示:

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:169

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
    bfr.timer.Reset(bfr.maxInterval)
    for {
        select {
        case <-stop:
            bfr.stop()
            return
        case <-bfr.timer.C():   // 定時器
            bfr.tryRun()
        case <-bfr.run:       // 接收 channel
            bfr.tryRun()
        }
    }
}           

proxier 的

OnUpdate()

中調用的

syncRunner.Run()

其實隻是在 bfr.run 這個帶 buffer 的 channel 中發送了一條資料,在

BoundedFrequencyRunner

Loop()

方法中接收到該資料後會調用

bfr.tryRun()

進行處理:

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:191

func (bfr *BoundedFrequencyRunner) Run() {
    select {
    case bfr.run <- struct{}{}:   // 向 channel 發送信号
    default:
    }
}           

tryRun()

方法才是最終調用

syncProxyRules()

重新整理iptables 規則的。

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:211

func (bfr *BoundedFrequencyRunner) tryRun() {
    bfr.mu.Lock()
    defer bfr.mu.Unlock()

    if bfr.limiter.TryAccept() {
        // 執行 fn() 即 syncProxyRules() 重新整理iptables 規則
        bfr.fn()
        bfr.lastRun = bfr.timer.Now()
        bfr.timer.Stop()
        bfr.timer.Reset(bfr.maxInterval)
        return
    }

    elapsed := bfr.timer.Since(bfr.lastRun)    // how long since last run
    nextPossible := bfr.minInterval - elapsed  // time to next possible run
    nextScheduled := bfr.maxInterval - elapsed // time to next periodic run

    if nextPossible < nextScheduled {
        bfr.timer.Stop()
        bfr.timer.Reset(nextPossible)
    }
}           

通過以上分析可知,

syncProxyRules()

是每個 proxier 的核心方法,啟動 informer cache 同步完成後會直接調用

proxier.syncProxyRules()

重新整理iptables 規則,之後如果 informer watch 到相關對象的變化後會調用

BoundedFrequencyRunner

tryRun()

來重新整理iptables 規則,定時器每 30s 會執行一次iptables 規則的重新整理。

總結

本文主要介紹了 kube-proxy 的啟動邏輯以及三種模式 proxier 的初始化,還有最終調用重新整理iptables 規則的 BoundedFrequencyRunner,可以看到其中的代碼寫的很巧妙。而每種模式下的iptables 規則是如何建立、重新整理以及轉發的是如何實作的會在後面的文章中進行分析。

繼續閱讀