天天看點

kube-proxy保姆級别源碼閱讀iptables資料流

kubernetes源代碼版本: 1.22 commit: fba7198a2cc81c4602f358c7b77ee4e733d20aa2

閱讀一個項目的源代碼帶着問題去閱讀是一個不錯的選擇,下面是我之前存在的問題,答案在最後。

  • 為什麼理論上ipvs的轉發性能高于iptables卻預設是iptables而不是ipvs?
  • kube-proxy怎麼保持規則的同步和生成對應的規則,第一次全量資料是怎麼拿到的?
  • iptables怎麼保留iptables上已有的規則,怎麼確定自己的規則沒有被刷掉?

kube-proxy在linux上一共有三種模式, userspace, iptables, ipvs, 現在預設是iptables

其中userspace基本不會再用,因為性能較之後兩者太差。

本文主要閱讀是iptables代理模式下的kube-proxy代碼,是以ipvs相關代碼不會在本文展現。

kube-proxy代碼大概分為三個部分。

  • 初始化,即指令行解析,環境檢查,核心參數配置等。
  • 啟動流程,即ProxyServer的運作邏輯
  • 事件監聽/規則同步, 監聽endpointslice(或endpoint), service, node等資源變化,并根據變化來生成并寫入規則到iptables。

但是有一部分比較有趣也相對比較難的是在iptables規則建立之後的pod與pod之間的資料流向,這一部分作為本文的最後一部分,如果大家覺得代碼看起來比較枯燥,可以直接看第四部分來了解資料流向,友善排查問題。

注意: 結合文中代碼裡面的注釋食用效果更佳,因為有些說明跟代碼放在一起更适合, 然後就是會削減一定的代碼來保證文章不會過于冗長。

初始化

k8s所有的元件都是使用的cobra這個指令行解析庫來解析指令行,模式都差不多,代碼如下:

// cmd\kube-proxy\proxy.go
func main() {
    // 建立command對象并執行
    command := app.NewProxyCommand()
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}


// cmd\kube-proxy\app\server.go
func NewProxyCommand() *cobra.Command {
    // k8s每個元件都有類似的*options對象用來存儲使用者的配置
    opts := NewOptions()

    cmd := &cobra.Command{
        Run: func(cmd *cobra.Command, args []string) {
            // 如果是windows則配置系統
            if err := initForOS(opts.WindowsService); err != nil {
                klog.Fatalf("failed OS init: %v", err)
            }
            // 填充預設參數
            if err := opts.Complete(); err != nil {
                klog.Fatalf("failed complete: %v", err)
            }
            // 驗證參數是否合法
            if err := opts.Validate(); err != nil {
                klog.Fatalf("failed validate: %v", err)
            }
            // 基于所給的參數運作
            if err := opts.Run(); err != nil {
                klog.Exit(err)
            }
        },
    }

    // 應用預設值和添加指令行參數
    var err error
    opts.config, err = opts.ApplyDefaults(opts.config)
    opts.AddFlags(cmd.Flags())
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")
    return cmd
}


func (o *Options) Complete() error {
    // Load the config file here in Complete, so that Validate validates the fully-resolved config.
    if len(o.ConfigFile) > 0 {
        // 讀取本地配置檔案
        c, err := o.loadConfigFromFile(o.ConfigFile)
        o.config = c

        // 用來監聽配置檔案是否發生變化, 如果修改,重命名等情況就會觸發一個error
        // 會導緻kube-proxy退出,因為在pod裡面,是以會導緻重新開機
        if err := o.initWatcher(); err != nil {
            return err
        }
    }

    // 
    return utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
}
           

k8s各元件的啟動流程一般是将使用者參數和指定的配置檔案解析到一個*Options對象中,然後填充預設參數,驗證參數,最後基于這些參數構造元件執行個體并運作,kube-proxy也是如此。

在Options裡面有四個比較重要的對象。

type Options struct {
    // kube-proxy配置檔案位置, 如/var/lib/kube-proxy/config.conf
    ConfigFile string

    // 用來運作kube-proxy所需的配置參數
    config *kubeproxyconfig.KubeProxyConfiguration

    // 一個proxyServer對象 
    proxyServer proxyRun

    // kube-apiserver的位址
    master string
}
           

其中proxyServer是一個在proxier之上更進階的抽象,proxier屬于負責底層幹活的對象,用于直接與iptables或ipvs等代理模式的具體實作互動,而ProxyServer用來做一些通用的操作,以及決定用那種模式的代理

然後我們來看看Options對象是如何将kube-proxy拉起來的,從上文我們知道運作時調用的

opts.Run()

,代碼如下:

// 建立ProxyServer對象并運作循環
func (o *Options) Run() error {
    proxyServer, err := NewProxyServer(o)
    o.proxyServer = proxyServer
    return o.runLoop()
}


// 
func (o *Options) runLoop() error {
    // 啟動檔案監聽器,監聽配置檔案的是否發生非預期的變化
    if o.watcher != nil {
        o.watcher.Run()
    }

    // proxyServer對象以一個額外的gorouting啟動
    go func() {
        err := o.proxyServer.Run()
        o.errCh <- err
    }()

    // 進入死循環,直至發生錯誤, 才會退出
    for {
        err := <-o.errCh
        if err != nil {
            return err
        }
    }
}
           

可以看到主程序在啟動之後拉起proxyServer之後就會進入死循環,直至發生錯誤才會退出,也可以看到後續的邏輯交給了ProxyServer來執行,即

o.proxyServer.Run()

那麼在回過頭看看ProxyServer怎麼建立以及怎麼運作。

注意: NewProxyServer有windows版本和其他系統兩個版本,這裡自然是要看linux版本的,是以代碼在cmd\kube-proxy\app\server_others.go
// cmd\kube-proxy\app\server_others.go
func NewProxyServer(o *Options) (*ProxyServer, error) {
    return newProxyServer(o.config, o.CleanupAndExit, o.master)
}


func newProxyServer(
    config *proxyconfigapi.KubeProxyConfiguration,
    cleanupAndExit bool,
    master string) (*ProxyServer, error) {

    // /configz 用來檢查運作時的配置
    if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
        c.Set(config)
    }

    // 用來操作iptables和核心的接口
    var iptInterface utiliptables.Interface
    var kernelHandler ipvs.KernelHandler

    // 用來操作iptables的指令行接口
    // 即os/exec的封裝,用來執行iptables等指令
    execer := exec.New()
    kernelHandler = ipvs.NewLinuxKernelHandler()
    // 建立可以跟k8s叢集互動的client
    client, eventClient, err := createClients(config.ClientConnection, master)
    // 拿到運作節點的ip
    nodeIP := detectNodeIP(client, hostname, config.BindAddress)

    // 用來傳播事件的對象, 即kubectl get events
    eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")
    // 一個用來表示目前節點的引用對象
    nodeRef := &v1.ObjectReference{
        Kind:      "Node",
        Name:      hostname,
        UID:       types.UID(hostname),
        Namespace: "",
    }

    // proxier 代理模式的具體實作
    var proxier proxy.Provider
    var detectLocalMode proxyconfigapi.LocalMode

    // 得到代理模式 
    proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})
    // 判斷是否檢查本地流量,即同一節點的兩個pod互動的流量
    detectLocalMode, err = getDetectLocalMode(config)

    // iptables有ipv4和ipv6協定, 預設是隻有ipv4
    primaryProtocol := utiliptables.ProtocolIPv4
    iptInterface = utiliptables.New(execer, primaryProtocol)

    var ipt [2]utiliptables.Interface
    // 判斷是否同時啟用ipv4和ipv6, 預設隻有ipv4
    dualStack := utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && proxyMode != proxyModeUserspace
    if proxyMode == proxyModeIPTables {
        if dualStack {
           // 移除了建立雙棧(ipv4+ipv6)的邏輯
        } else { // Create a single-stack proxier.
            // 探測本地流量應該為了TopologyAwareHints特性
            var localDetector proxyutiliptables.LocalTrafficDetector
            localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)

            // TODO this has side effects that should only happen when Run() is invoked.
            proxier, err = iptables.NewProxier(
                // 操作iptables指令的接口, 建立/删除/确認等操作。
                iptInterface,
                // 用來操作/proc/sys核心參數,如記憶體配置設定政策vm/overcommit_memory
                utilsysctl.New(),
                execer,
                config.IPTables.SyncPeriod.Duration,
                config.IPTables.MinSyncPeriod.Duration,
                config.IPTables.MasqueradeAll,
                int(*config.IPTables.MasqueradeBit),
                localDetector,
                hostname,
                nodeIP,
                recorder,
                healthzServer,
                config.NodePortAddresses,
            )
        }

        proxymetrics.RegisterMetrics()
    }

    // 删除了ipvs相關代碼

    useEndpointSlices := true
    return &ProxyServer{
        Client:                 client,
        EventClient:            eventClient,
        IptInterface:           iptInterface,
        IpvsInterface:          ipvsInterface,
        IpsetInterface:         ipsetInterface,
        execer:                 execer,
        Proxier:                proxier,
        Broadcaster:            eventBroadcaster,
        Recorder:               recorder,
        ConntrackConfiguration: config.Conntrack,
        // 用來操作/proc/sys/net/netfilter等參數
        // snat/dnat都需要核心跟蹤建立的連接配接
        Conntracker:            &realConntracker{},
        ProxyMode:              proxyMode,
        NodeRef:                nodeRef,
        MetricsBindAddress:     config.MetricsBindAddress,
        BindAddressHardFail:    config.BindAddressHardFail,
        EnableProfiling:        config.EnableProfiling,
        OOMScoreAdj:            config.OOMScoreAdj,
        ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
        HealthzServer:          healthzServer,
        UseEndpointSlices:      useEndpointSlices,
    }, nil
}
           

這樣一個ProxyServer 就初始化完成了,整個過程就是根據代理模式建立對應的接口和對象。

小結

通過這部的代碼可以發現,初始化主要分為兩個部分,一個是參數解析和填充,一個是ProxyServer的建立邏輯。之是以在proxier之上在抽象出來一個proxyServer是為了讓proxier的功能更加純粹,proxier隻需負責同步規則即可,而proxyServer會适配各個proxier,并在這些proxier中選擇一個合适的,以及将各個proxier之間一些通用的操作抽象出來放在proxyServer的邏輯中統一處理。

啟動流程

在環境初始化完成之後就是啟動流程了,主程序在啟動之後拉起proxyServer之後就會進入死循環,直至發生錯誤才會退出,而kube-proxy的主要業務邏輯交給了proxyServer。

主程序回顧:

func (o *Options) Run() error {
    proxyServer, err := NewProxyServer(o)
    o.proxyServer = proxyServer
    return o.runLoop()
}

func (o *Options) runLoop() error {
    go func() {
        err := o.proxyServer.Run()
        o.errCh <- err
    }()

    for {
        err := <-o.errCh
        if err != nil {
            return err
        }
    }
}
           

可以看到proxyServer通過

o.proxyServer.Run()

啟動。

// cmd\kube-proxy\app\server.go
func (s *ProxyServer) Run() error {
    // 用來跳轉OOM參數,保證系統在記憶體緊張的時候不優先kill掉kube-proxy的程序
    var oomAdjuster *oom.OOMAdjuster
    if s.OOMScoreAdj != nil {
        oomAdjuster = oom.NewOOMAdjuster()
        if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
            klog.V(2).Info(err)
        }
    }

    // 根據kube-proxy指令行參數或配置檔案跳轉conntrack參數
    // 主要是tcp建立連接配接的相關參數,比如逾時,存活檢查(keepalive)等, 以及最大連接配接數等
    if s.Conntracker != nil {
        max, err := getConntrackMax(s.ConntrackConfiguration)
        if err != nil {
            return err
        }
        if max > 0 {
            err := s.Conntracker.SetMax(max
        }

        // TCP相關參數
        if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
            timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
            if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
                return err
            }
        }

        if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
            timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
            if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
                return err
            }
        }
    }

    // 這裡建立informer工廠函數,最後建立相應的informer用于監聽service,endpointslice等資源

    // 這兩個NewRequirement用來過濾掉serviceProxyName和noheadless的endpoint
    noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
    if err != nil {
        return err
    }
    noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
    if err != nil {
        return err
    }
    labelSelector := labels.NewSelector()
    labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
    informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
        informers.WithTweakListOptions(func(options *metav1.ListOptions) {
            options.LabelSelector = labelSelector.String()
        }))

    // 開始建立相應的informer并注冊事件函數
    // 依次是service informer, endpointslieces informer
    serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
    serviceConfig.RegisterEventHandler(s.Proxier)
    go serviceConfig.Run(wait.NeverStop)

    if s.UseEndpointSlices {
        endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
        endpointSliceConfig.RegisterEventHandler(s.Proxier)
        go endpointSliceConfig.Run(wait.NeverStop)
    } else {
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.Proxier)
        go endpointsConfig.Run(wait.NeverStop)
    }

    // 啟動所有informer 
    informerFactory.Start(wait.NeverStop)

    // 判斷是否啟用TopologyAwareHints特性以建立node informer

    // 首次觸發一次同步。
    s.birthCry()

    // 最後啟動同步規則的循環
    go s.Proxier.SyncLoop()

    // 如果錯誤出現就退出
    return <-errCh
}
           
pkg\features\kube_features.go裡面有目前版本的各個特性的預設值

ProxyServer的運作邏輯概括起來就是根據配置參數(指令行參數,配置檔案)來配置系統核心參數,比如OOM分值,nf_conntrack等參數。

然後建立service informer, endpointslices informer,并将proxier對象作為事件回調函數傳給informer用來響應informer的事件,proxier實作了OnServiceAdd,OnServiceUpdate等接口。

最後啟動informer并觸發首次更新以及運作同步規則的循環。

其中

birthCry

比較簡單,就是輸出一個事件, 告訴叢集啟動了。

func (s *ProxyServer) birthCry() {
    s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
}
           

在啟動步驟中比較核心的是service,endpoint等informer的建立和事件函數的注冊,以service informer為例,代碼如下:

// pkg\proxy\config\config.go
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
    result := &ServiceConfig{
        listerSynced: serviceInformer.Informer().HasSynced,
    }
    // 建立informer并注冊事件函數
    serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    result.handleAddService,
            UpdateFunc: result.handleUpdateService,
            DeleteFunc: result.handleDeleteService,
        },
        resyncPeriod,
    )

    return result
}

// 将事件回調函數加入eventhandlers中,在每次觸發事件的時候調用
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
    c.eventHandlers = append(c.eventHandlers, handler)
}

// 等待資料同步完成後,調用OnServiceSynced事件回調函數
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
    klog.Info("Starting service config controller")

    // 等待資料同步
    if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
        return
    }

    // 調用proxier的OnServiceSynced方法
    for i := range c.eventHandlers {
        klog.V(3).Info("Calling handler.OnServiceSynced()")
        c.eventHandlers[i].OnServiceSynced()
    }
}
           

關于這些informer的事件函數暫時按下不表,因為事件監聽的邏輯會在本文的第三部分着重說明。

在閱讀syncLoop的代碼之前,我們還需要看看proxier的建立流程。

// pkg\proxy\iptables\proxier.go
func NewProxier(ipt utiliptables.Interface,
    sysctl utilsysctl.Interface,
    exec utilexec.Interface,
    syncPeriod time.Duration,
    minSyncPeriod time.Duration,
    masqueradeAll bool,
    masqueradeBit int,
    localDetector proxyutiliptables.LocalTrafficDetector,
    hostname string,
    nodeIP net.IP,
    recorder events.EventRecorder,
    healthzServer healthcheck.ProxierHealthUpdater,
    nodePortAddresses []string,
) (*Proxier, error) {
    // 正常情況下,核心不會對位址localnet(127.0.0.1/8)的位址做forwarding, 因為這部分代碼被認為是martian.
    // 但是可以通過核心中配置來啟用route_localnet
    if err := utilproxy.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
        return nil, err
    }

    // 確定bridge-nf-call-iptabels=1
    if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
        klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
    }

    // 對snat資料流做标記
    masqueradeValue := 1 << uint(masqueradeBit)
    masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
    klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)

    serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)

    ipFamily := v1.IPv4Protocol
    if ipt.IsIPv6() {
        ipFamily = v1.IPv6Protocol
    }

    ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
    nodePortAddresses = ipFamilyMap[ipFamily]
    // Log the IPs not matching the ipFamily
    if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
        klog.InfoS("Found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ","))
    }

    proxier := &Proxier{
        //各個參數..
    }

    // 瞬時并發數量
    burstSyncs := 2
    klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
    // 建立一個syncRunner 對象,它會保證每個任務之間的時間間隔不大于minSyncPeriod
    // 并且最少maxInterval(這裡預設是time.Hour, 一個小時)同步一次
    // 說明kube-proxy至少每個小時會觸發一次同步
    // **但是同步不一定代表會重新整理規則**
    // syncRunner會控制并發。
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)

    // 通過建立一個KUBE-KUBELET-CANARY的鍊來檢測iptables規則是否被刷掉(iptables flush)
    // 如果這個鍊不存在了,自然說明規則鍊被清理掉了。
    go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
        proxier.syncProxyRules, syncPeriod, wait.NeverStop)

    return proxier, nil
}
           

如果你遇到了兩個pod在不同的機器上可以通信正常而同一機器上卻失敗,那麼你可以看看參數bridge-nf-call-iptabels是否為1

假設pod1,pod2在同一台機器上,并且svc2指向pod2

如果本地的pod1通路svc2, 那麼資料流是pod1 -> svc2 cluster ip -> dnat -> pod2

pod2在接收到資料包後發現,資料來自同一區域網路,那麼會直接在二層(網橋)回包,但是pod1并不是走二層(網橋)來的包,是以會導緻資料流不比對,那麼無法建立連接配接,是以這個參數保證pod2在回包的時候,還是會走iptables, 即網橋的資料流會過iptables,這樣iptables回将資料包原路傳回。

通過閱讀proxier建立的代碼,我們知道一些比較重要的參數,比如bridge-nf-call-iptabels,以及kube-proxy如何通過建立一個不适用的KUBE-KUBELET-CANARY鍊來檢測規則是否被刷掉。

proxier裡面有一個比較重要的對象

syncRunner

, 後續的所有規則都會通過這個對象作為同步規則的入口,這個對象會控制并發的競争,也會控制每次同步的最大間隔和最小間隔。

在進入第三部分之前,我們在回顧一下ProxyServer的啟動過程。

func (s *ProxyServer) Run() error {
    // 建立informer等操作....

    s.birthCry()
    // 啟動proxier的同步規則的循環
    go s.Proxier.SyncLoop()

    return <-errCh
}

// pkg\proxy\iptables\proxier.go 
func (proxier *Proxier) SyncLoop() {
    // 調用syncRunner
    proxier.syncRunner.Loop(wait.NeverStop)
}

// pkg\util\async\bounded_frequency_runner.go
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
    klog.V(3).Infof("%s Loop running", bfr.name)
    // 重置定時器, 即bfr.timer.C(),下次的啟動時間是目前時間加上maxInterval
    bfr.timer.Reset(bfr.maxInterval)
    for {
        select {
        case <-stop:
            bfr.stop()
            klog.V(3).Infof("%s Loop stopping", bfr.name)
            return
        case <-bfr.timer.C():
            bfr.tryRun()
        case <-bfr.run:
            bfr.tryRun()
        case <-bfr.retry:
            bfr.doRetry()
        }
    }
}

func (bfr *BoundedFrequencyRunner) tryRun() {
    bfr.mu.Lock()
    defer bfr.mu.Unlock()

    // 擷取令牌控制并發。
    if bfr.limiter.TryAccept() {
        // 這裡的fn就是proxier.syncProxyRules
        bfr.fn()
        bfr.lastRun = bfr.timer.Now()
        bfr.timer.Stop()
        bfr.timer.Reset(bfr.maxInterval)
        klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
        return
    }

}
           

從上面的代碼可以看到,最終還是調用syncRunner的Loop方法, 從這裡開始,同步的規則的邏輯全部交給了proxier,這部分可能就是kube-proxy最重要的部分了。

小結

至此,我們了解到了ProxyServer會配置一些系統通用的核心參數,然後在建立proxier的時候,每個proxier的建立過程中會根據自己的需要配置一些必要的系統參數。iptables proxier在建立過程中還會啟動一個monitor用來監測iptables規則是否被刷掉,以觸發同步規則的任務,而建立過程中比較核心的一個對象是

syncRunner

,這個對象會控制規則同步任務之間的時間間隔,最少多久時間同步一次以及任務的并發。

上文中的任務,其實就是一次觸發, 最終調用的方法都是一緻的, 即proxier.syncProxyRules

事件監聽/規則同步

在ProxyServer的啟動流程我們知道proxier被作為handler注冊到Service, endpointslice的informer事件函數中。

現在我們來看看iptables模式的各個事件回調函數的實作,本文假設kube-proxy是首次啟動,并且以

OnServiceAdd

作為線索來跟蹤代碼。

informer在啟動之後,在同步資料的時候會調用回調函數

OnXXXAdd

函數。
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
    proxier.OnServiceUpdate(nil, service)
} 

func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
    // proxier.isInitialized在informer首次同步完成之後才會傳回true
    if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
        proxier.Sync()
    }
}
           

上面的代碼邏輯很簡單,如果看

OnServiceDelete

會發現也是調用

OnServiceUpdate

,而endpointSlice的邏輯也差不多。

總的來說,大體邏輯都是最終聚合到

proxier.XXXXChanges.Update

這個方法裡面,統一添加,删除,更新。

proxier對象裡面有兩種比較重要的資料結構

  • XXXMap(serviceMap, endpointsMap): 這個結構用來儲存目前代理規則的狀态(service, endpoints)
  • XXXChanges(serviceChanges, endpointsChanges), 用來記錄同步前發生的狀态變化,每次同步之後就會清空。

從上面我們知道,service的變更最終調用的都是proxier.serviceChanges.Update, 以下是它的代碼.

// pkg\proxy\service.go
// 增加對象
// 傳遞參數 nil, service
// 删除對象
// 傳遞參數 service, nil
// 更新對象
// 傳遞參數 oldService, currentService
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
    svc := current
    if svc == nil {
        svc = previous
    }
    // 如果previous, current都是nil, 直接傳回
    if svc == nil {
        return false
    }
    // 用來定位唯一的service, 在一個叢集中namespace+servicename是唯一的
    namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}

    // 判斷是否已經在變更中存在
    change, exists := sct.items[namespacedName]
    // 如果不存在,說明是一個新增操作
    if !exists {
        change = &serviceChange{}
        // 根據service對象建立serviceMap對象
        change.previous = sct.serviceToServiceMap(previous)
        sct.items[namespacedName] = change
    }

    change.current = sct.serviceToServiceMap(current)
    // 判斷是否有變化,沒變化就沒必要加入到變更裡面了
    if reflect.DeepEqual(change.previous, change.current) {
        delete(sct.items, namespacedName)
    } else {
        klog.V(2).Infof("Service %s updated: %d ports", namespacedName, len(change.current))
    }
    return len(sct.items) > 0
}
           

proxier.serviceChanges.Update

的操作比較簡單,就是将變更加入到自己的變更(change items)切片中, 否則什麼都不做。

至此,每個service和endpoints對象都被添加到了XXXChanges對象裡面了。

當informer資料同步完成之後,就會開始規則的同步了,而在資料同步完成之前,所有的資料也都加入到了XXXChanges裡面了。

syncProxyRules

規則同步的這個函數超級長,是以這裡會将這個函數的功能分為以下幾個部分來講解。

  • 計算要更新的規則
  • iptables前置操作
  • 根據最新的資料建立規則和規則鍊
  • 删除不再使用的規則和規則鍊
  • 重新整理iptables規則
  • 删除過時的conntrack連接配接

計算規則

這部分主要就是将serviceChanges, endpointsChanges更新到serviceMap和endpointsMap, 後續的操作都是以此為基礎來做相應的操作的。

代碼如下:

// pkg\proxy\iptables\proxier.go
// 根據changes與目前map來計算最終的代理規則
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)


// pkg\proxy\service.go
func (sm ServiceMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
    result.UDPStaleClusterIP = sets.NewString()
    // 應用changes
    sm.apply(changes, result.UDPStaleClusterIP)

    // 用來健康檢查的端口
    result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
    for svcPortName, info := range sm {
        if info.HealthCheckNodePort() != 0 {
            result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
        }
    }
    return result
}

// 将changes裡的資料合并到serviceMap裡面, 然後将changes置為空
func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
    for _, change := range changes.items {
        // 合并, 過濾, 删除
        sm.merge(change.current)
        change.previous.filter(change.current)
        sm.unmerge(change.previous, UDPStaleClusterIP)
    }
    // 置為空
    changes.items = make(map[types.NamespacedName]*serviceChange)
    metrics.ServiceChangesPending.Set(0)
}
           

計算規則産生的結果是最新的狀态,然後與舊的狀态相比較就可以得到過時的規則,根據這些過時的規則可以用于後續清理操作。

如果過時的連接配接不清理,就會操作網絡異常,比如後端已經改變,但是conntrack那裡還保持連接配接,那麼連接配接不清理掉的話,就會導緻通路到舊的後端,或者通路到沒有響應的對端。

下面是将這些過時的資料儲存起來,以便後續清理。

// 初始化化空對象
    conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
    conntrackCleanupServiceNodePorts := sets.NewInt()
    // 基于這些差異結果來插入過時的資料,用于後續清理
    for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
        if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
            conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
            for _, extIP := range svcInfo.ExternalIPStrings() {
                conntrackCleanupServiceIPs.Insert(extIP)
            }
            for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
                conntrackCleanupServiceIPs.Insert(lbIP)
            }
            nodePort := svcInfo.NodePort()
            if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
                klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort)
                conntrackCleanupServiceNodePorts.Insert(nodePort)
            }
        }
    }
           

iptables前置操作

這一部分確定在寫入規則到iptables之前一些規則鍊和規則必須存在,如果不存在就建立。

// iptablesJumpChains是一個切片,包含各個表的各個鍊
for _, jump := range iptablesJumpChains {
    if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
        klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
        return
    }
    args := append(jump.extraArgs,
        "-m", "comment", "--comment", jump.comment,
        "-j", string(jump.dstChain),
    )
    if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
        klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
        return
    }
}

// 確定KUBE-MARK-DROP規則鍊存在
for _, ch := range iptablesEnsureChains {
    if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
        klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
        return
    }
}
           

EnsureXXX的邏輯都是首先檢查是否存在,如果存在就傳回,否則就嘗試建立。

在所需要的規則和規則鍊确認存在之後就是将所有規則導出。

// 通過iptables-save -t nat/filter指令将相應的表的資料導出
// filter表資料導出
existingFilterChains := make(map[utiliptables.Chain][]byte)
proxier.existingFilterChainsData.Reset()
err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
if err != nil { // if we failed to get any rules
    klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else {
    // 将導出資料中的規則鍊清單導出
    existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
}

// nat表資料導出
// 與filter表差不多 
existingNATChains := make(map[utiliptables.Chain][]byte)
proxier.iptablesData.Reset()
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil { // if we failed to get any rules
    klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
    existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
}
           

上面的代碼首先将資料以

*bytes.Buffer

對象儲存起來。

這裡的iptablesData儲存了nat表的資料,而不是一個類似于existingFilterChainsData命名的對象

然後基于這些資料得到了目前存在的規則鍊的map。iptables規則的這些資料大緻如下。

:KUBE-KUBELET-CANARY - [0:0]
:KUBE-MARK-DROP - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-PROXY-CANARY - [0:0]
:KUBE-SEP-UEAYFIZ2IBK7HSGA - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-SVC-NPX46M4PTMTKRN6Y - [0:0]
           

基于這些資料得到的map類似下面

{"KUBE-SVC-NPX46M4PTMTKRN6Y": ":KUBE-SVC-NPX46M4PTMTKRN6Y - [0:0]"}
           

然後構造最終要導入到iptables裡面的文本

iptables的規則就是一個文本,無論是導出還是導入
proxier.filterChains.Reset()
proxier.filterRules.Reset()
proxier.natChains.Reset()
proxier.natRules.Reset()

// 寫入表頭
utilproxy.WriteLine(proxier.filterChains, "*filter")
utilproxy.WriteLine(proxier.natChains, "*nat")


// 在構造的文本中寫入規則鍊和規則
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
    if chain, ok := existingFilterChains[chainName]; ok {
        utilproxy.WriteBytesLine(proxier.filterChains, chain)
    } else {
        utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
    }
}
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
    if chain, ok := existingNATChains[chainName]; ok {
        utilproxy.WriteBytesLine(proxier.natChains, chain)
    } else {
        utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
    }
} 

// 插入SNAT規則
utilproxy.WriteLine(proxier.natRules, []string{
    "-A", string(kubePostroutingChain),
    "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
    "-j", "RETURN",
}...)
// Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
utilproxy.WriteLine(proxier.natRules, []string{
    "-A", string(kubePostroutingChain),
    // XOR proxier.masqueradeMark to unset it
    "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
}...)
masqRule := []string{
    "-A", string(kubePostroutingChain),
    "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
    "-j", "MASQUERADE",
}
if proxier.iptables.HasRandomFully() {
    masqRule = append(masqRule, "--random-fully")
}
utilproxy.WriteLine(proxier.natRules, masqRule...) 
// 打标記
utilproxy.WriteLine(proxier.natRules, []string{
    "-A", string(KubeMarkMasqChain),
    "-j", "MARK", "--or-mark", proxier.masqueradeMark,
}...)
           

SNAT規則如下

-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --set-xmark 0x4000/0x0
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUER
           

初始化要插入的對象,在構造文本之前先将這些規則規整成一個個預定義的資料結構。

// 還在使用的nat規則鍊,用來過濾過時的規則鍊
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// 必要的資料結構
replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
readyEndpointChains := make([]utiliptables.Chain, 0)
localEndpointChains := make([]utiliptables.Chain, 0)

// iptables規則參數,比如-m tcp之類,初始化長度為64,一是為了避免記憶體在配置設定
// 二是對于大多數情況足夠了, 即使超過64也沒關系,因為切片可以動态擴容
args := make([]string, 64)

// 計算所有服務的endpoint規則鍊的總數
proxier.endpointChainsNumber = 0
for svcName := range proxier.serviceMap {
    proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
}

// 擷取本地位址
localAddrSet := utilproxy.GetLocalAddrSet()
nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
           

至此,所需要的資料結構全部準備完畢。

根據最新的資料建立規則和規則鍊

這些邏輯在一個大循環中,僅僅是這個循環就占了500多行,是以這一部分也需要分解開來,首先隻看循環本身。

for svcName, svc := range proxier.serviceMap {
}
           

這個循環的邏輯就是周遊目前的ServiceMap, 依次建立相應的規則鍊和規則,更具體的生成邏輯就是根據service找到對應的endpoint, 然後基于這些建立對應的規則。

首先是根據service找到endpoint,然後建立service對應的規則鍊。

// 對象轉換
svcInfo, ok := svc.(*serviceInfo)
protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString

// 根據serviceName到endpointsMap找打對應的endpoint
allEndpoints := proxier.endpointsMap[svcName]

// 這裡的過濾是為了topology aware endpoint這個特性
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)

// Scan the endpoints list to see what we have. "hasEndpoints" will be true
// if there are any usable endpoints for this service anywhere in the cluster.
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range allEndpoints {
// 判斷hasEndpoints, hasLocalReadyEndpoints,hasLocalServingTerminatingEndpoints 
}
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints

// Generate the per-endpoint chains.
readyEndpointChains = readyEndpointChains[:0]
localEndpointChains = localEndpointChains[:0]
for _, ep := range allEndpoints {
    epInfo, ok := ep.(*endpointsInfo)
    if !ok {
        klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
        continue
    }

    endpointChain := epInfo.endpointChain(svcNameString, protocol)
    endpointInUse := false


    // 檢查是否已經存在endpoint鍊,否則就建立
    // endpoint鍊就是 KUBE-SEP-XXXX
    if chain, ok := existingNATChains[endpointChain]; ok {
        utilproxy.WriteBytesLine(proxier.natChains, chain)
    } else {
        utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
    }
    // 表示是一個有效的鍊
    activeNATChains[endpointChain] = true

    args = append(args[:0], "-A", string(endpointChain))
    args = proxier.appendServiceCommentLocked(args, svcNameString)
    // 寫入DNAT規則
    utilproxy.WriteLine(proxier.natRules, append(args,
        "-s", utilproxy.ToCIDR(net.ParseIP(epInfo.IP())),
        "-j", string(KubeMarkMasqChain))...)
    // Update client-affinity lists.
    if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
        args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
    }
    // DNAT to final destination.
    args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
    utilproxy.WriteLine(proxier.natRules, args...)
}

// 確定KUBE-SVC-XXX鍊存在, 不存在就建立
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
    // Create the per-service chain, retaining counters if possible.
    if chain, ok := existingNATChains[svcChain]; ok {
        utilproxy.WriteBytesLine(proxier.natChains, chain)
    } else {
        utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
    }
    activeNATChains[svcChain] = true
}



// Capture the clusterIP.
if hasEndpoints {
    args = append(args[:0],
        "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
        "-m", protocol, "-p", protocol,
        "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
        "--dport", strconv.Itoa(svcInfo.Port()),
    )
    // 寫入KUBE-SVC-XXX鍊的規則
    utilproxy.WriteRuleLine(proxier.natRules, string(kubeServicesChain), append(args, "-j", string(svcChain))...)
}
           

關于externalService, loadbalancer類型的代碼這裡就跳過了。

上面的代碼總結起來就是建立KUBE-SVC-XXX, KUBE-SEP-XXX等規則鍊,然後在這些鍊上寫入規則,比如

-A KUBE-SERVICES -d 10.152.183.1/32 -p tcp -m comment --comment "default/myservice cluster IP" -m tcp --dport 80 -j KUBE-SVC-NPX46M4PTMTKRN6Y
-A KUBE-SVC-NPX46M4PTMTKRN6Y -m comment --comment "default/myservice" -m statistic --mode random --probability 0.500000000 -j KUBE-SEP-72LVGSP46NP3XHTG
-A KUBE-SVC-NPX46M4PTMTKRN6Y -m comment --comment "default/myservice" -j KUBE-SEP-EDGGJ3GHDFLJOF2D
-A KUBE-SEP-72LVGSP46NP3XHTG -s 10.0.20.4/32 -m comment --comment "default/myservice" -j KUBE-MARK-MASQ
-A KUBE-SEP-72LVGSP46NP3XHTG -p tcp -m comment --comment "default/myservice" -m tcp -j DNAT --to-destination 10.0.20.4:80
           

删除不在使用的規則和規則鍊

基于existingNATChains, activeNATChain确定不在需要的鍊和規則

for chain := range existingNATChains {
    if !activeNATChains[chain] {
        chainString := string(chain) 
        // 如果不是k8s建立的鍊就跳
        if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
            // Ignore chains that aren't ours.
            continue
        }
        // 删除鍊之前確定鍊存在
        // -X KUBE-SVC-XXXX, KUBE-SEP-XXXX
        utilproxy.WriteBytesLine(proxier.natChains, existingNATChains[chain])
        utilproxy.WriteLine(proxier.natRules, "-X", chainString)
    }
}
           

關于filter表的規則和規則鍊這裡就略去了,主要是一些過濾的規則,比如過濾掉非法狀态的資料包,接受哪些狀态的資料包之類的。

最後就是将資料全部寫入要生成的文本中

utilproxy.WriteLine(proxier.filterRules, "COMMIT")
utilproxy.WriteLine(proxier.natRules, "COMMIT")

// Sync rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes()).Bytes())
           

重新整理iptables規則

至此用來導入到iptables的規則文本已經建立完畢,可以導入這些文本到iptables裡面了。

err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
    klog.ErrorS(err, "Failed to execute iptables-restore")
    return
}
success = true
           

restore就是調用

iptables-restore指令

,具體指令差不多如下。

iptables-restore --noflush --counters < xxxx
           
--noflush保證不會刷掉之前已有的規則,--counters保證統計詳細不會重置。

删除過時的conntrack連接配接

這部分直接看代碼就行。

klog.V(4).InfoS("Deleting conntrack stale entries for Services", "ips", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
    if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
        klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP)
    }
}
klog.V(4).InfoS("Deleting conntrack stale entries for Services", "nodeports", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
    err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP)
    if err != nil {
        klog.ErrorS(err, "Failed to clear udp conntrack", "port", nodePort)
    }
}
           

iptables資料流

既然講解iptables模式的kube-proxy, 自然無法避免iptables的相關知識,下面是一張比較詳細的iptables資料流的圖示。

kube-proxy保姆級别源碼閱讀iptables資料流

如果搞不清各個鍊和表之間的關系,可以參考上面的圖。

kube-proxy一般隻用到了兩張表, nat,filter。

本文隻講解兩條資料流

  • 通過service到目标pod的資料流,即 pod1 -> service -> DNAT -> pod2
  • 通過service到節點端口(nodePort)的資料流, 即 pod1 -> service -> DNAT -> nodeport -> pod2

通過service到目标pod的資料流

這裡以下面的service為例, 然後梳理與它相關的iptables規則

apiVersion: v1
kind: Service
metadata:
  name: myservice
spec:
  ports:
  - port: 80
    protocol: TCP
    targetPort: 80
  selector:
    app: myservice
  type: ClusterIP
           

建立完成之後可以看到它對應的endpoint和cluster ip

Name:              myservice
Namespace:         default
Labels:            app=myservice
Annotations:       <none>
Selector:          <none>
Type:              ClusterIP
IP Family Policy:  SingleStack
IP Families:       IPv4
IP:                10.152.183.1
IPs:               10.152.183.1
Port:              http  80/TCP
TargetPort:        80/TCP
Endpoints:         10.0.20.4:80, 10.0.22.3:80
Session Affinity:  None
Events:            <none>
           

當service建立完成之後,就可以看看iptables的規則了,規則可以在k8s叢集中的任意節點可以檢視, 為了簡單起見,文中會去掉與這個service無關的規則。

規則可以通過iptables-save完整輸出,這個指令會輸出所有表的所有鍊。

如果要檢視指定表的規則,可以通過iptables -vnL -t {表名}檢視, 比如nat, 如果不指定-t參數, 預設是filter表。

假設叢集中的一個pod(10.0.21.12)通路此service, 那麼經過的iptables規則如下。

# nat表
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A KUBE-SERVICES -d 10.152.183.1/32 -p tcp -m comment --comment "default/myservice cluster IP" -m tcp --dport 80 -j KUBE-SVC-NPX46M4PTMTKRN6Y
-A KUBE-SVC-NPX46M4PTMTKRN6Y -m comment --comment "default/myservice" -m statistic --mode random --probability 0.500000000 -j KUBE-SEP-72LVGSP46NP3XHTG
-A KUBE-SVC-NPX46M4PTMTKRN6Y -m comment --comment "default/myservice" -j KUBE-SEP-EDGGJ3GHDFLJOF2D
# 注意隻有源ip是10.0.20.4/32
# 這是為了解決自己通路自己的service, 如果不做特殊處理,那麼會發生錯誤
-A KUBE-SEP-72LVGSP46NP3XHTG -s 10.0.20.4/32 -m comment --comment "default/myservice" -j KUBE-MARK-MASQ
-A KUBE-SEP-72LVGSP46NP3XHTG -p tcp -m comment --comment "default/myservice" -m tcp -j DNAT --to-destination 10.0.20.4:80

# KUBE-MARK-MASQ就是簡單的打個标記
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
           

是以資料流如下

  • 是以首先通過dns得到service的cluster ip
  • 主控端在收到pod的資料包之後會先進入PREROUTING繼而進入KUBE-SERVICES鍊,最紅比對到KUBE-SVC-NPX46M4PTMTKRN6Y鍊
  • KUBE-SVC-NPX46M4PTMTKRN6Y會以50%的機率随機選擇KUBE-SEP-EDGGJ3GHDFLJOF2D和KUBE-SEP-72LVGSP46NP3XHTG
  • 這裡假設選擇了KUBE-SEP-72LVGSP46NP3XHTG
  • KUBE-SEP-72LVGSP46NP3XHTG鍊會将流量通過DNAT轉發到10.0.20.4:80

這裡有一個問題,那就是service的後端通路自己對應的service是否會建立不了連接配接?因為DNAT并不會修改源IP,那麼自己通路自己,發出的時候走了iptables,然後回包的時候發現包是自己,那麼肯定不會過iptables了,也不會過網橋,那麼這裡會發生錯誤,怎麼解決呢?kube-proxy的解決辦法是打一個标記,在POSTROUTING的時候做SNAT

假設service的後端通路此service, 即10.0.20.4 -> myservice(10.152.183.1)

那麼在nat表與上面的資料流沒有多大差別,但是在filter表上會有一些差別, 因為在nat表裡面會進入KUBE-MARK-MASQ鍊打上一個0x4000的标記。

然後就會依次比對到POSTROUTING鍊上的SNAT規則。

-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --set-xmark 0x4000/0x0
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
           

是以service後端通路自己對應的service看到的源IP是service的cluster IP。

通過service到節點端口(nodePort)的流量

假設一個外部的主機通路一個類型是nodePort的service。

那麼比對到的iptables規則如下。

# nat表
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst
-type LOCAL -j KUBE-NODEPORTS
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/myservice" -m tcp --dport 80 -j KUBE-SVC-NPX46M4PTMTKRN6Y
-A KUBE-SVC-NPX46M4PTMTKRN6Y -m comment --comment "default/myservice" -m statistic --mode random --probability 0.500000000 -j KUBE-SEP-72LVGSP46NP3XHTG
-A KUBE-SVC-NPX46M4PTMTKRN6Y -m comment --comment "default/myservice" -j KUBE-SEP-EDGGJ3GHDFLJOF2D
# 注意隻有源ip是10.0.20.4/32
-A KUBE-SEP-72LVGSP46NP3XHTG -s 10.0.20.4/32 -m comment --comment "default/myservice" -j KUBE-MARK-MASQ
-A KUBE-SEP-72LVGSP46NP3XHTG -p tcp -m comment --comment "default/myservice" -m tcp -j DNAT --to-destination 10.0.20.4:80
           

可以發現資料流與到service的資料流基本一緻,不同點在于流量入口的比對的是節點的端口

是以資料流如下

  • 節點接受到資料包,KUBE-NODEPORTS鍊比對到流量繼而轉發給KUBE-SVC-NPX46M4PTMTKRN6Y鍊
  • KUBE-SVC-NPX46M4PTMTKRN6Y會以50%的機率随機選擇KUBE-SEP-EDGGJ3GHDFLJOF2D和KUBE-SEP-72LVGSP46NP3XHTG
  • 這裡假設選擇了KUBE-SEP-72LVGSP46NP3XHTG
  • KUBE-SEP-72LVGSP46NP3XHTG鍊會将流量通過DNAT轉發到10.0.20.4:80

開篇的答案

  • 為什麼理論上ipvs的轉發性能高于iptables卻預設是iptables而不是ipvs?

    我也沒有确切的答案,我搜尋到的說法,大都是是說長連接配接iptables會更好,但是ipvs的tcp連接配接逾時時間是可調的,我沒有找到一個足夠信服的答案。

  • kube-proxy怎麼保持規則的同步和生成對應的規則,第一次全量資料是怎麼拿到的?

    kube-proxy通過informer監聽service,endpoint對象, informer能夠提供可靠的同步機制,同步完成之後就拿到了全量資料。

  • iptables怎麼保留iptables上已有的規則,怎麼確定自己的規則沒有被刷掉?

    iptables-restore有一個--noflush參數,這個參數會讓iptables不覆寫已有的規則

總結

可以看到kube-proxy的代碼有三個比較重要的對象。

  • Options
  • ProxyServer

繼續閱讀