上篇文章 kubernetes service 原了解析 已經分析了 service 原理以 kube-proxy 中三種模式的原理,本篇文章會從源碼角度分析 kube-proxy 的設計與實作。
kubernetes 版本: v1.16
kube-proxy 啟動流程
前面的文章已經說過 kubernetes 中所有元件都是通過其
run()
方法啟動主邏輯的,
run()
方法調用之前會進行解析指令行參數、添加預設值等。下面就直接看 kube-proxy 的
run()
方法:
- 若啟動時指定了
參數,kube-proxy 隻将啟動的預設參數寫到指定的配置檔案中,然後退出--write-config-to
- 初始化 ProxyServer 對象
- 如果啟動參數
設定為 true,則清理 iptables 和 ipvs 規則并退出--cleanup
k8s.io/kubernetes/cmd/kube-proxy/app/server.go:290
func (o *Options) Run() error {
defer close(o.errCh)
// 1.如果指定了 --write-config-to 參數,則将預設的配置檔案寫到指定檔案并退出
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}
// 2.初始化 ProxyServer 對象
proxyServer, err := NewProxyServer(o)
if err != nil {
return err
}
// 3.如果啟動參數 --cleanup 設定為 true,則清理 iptables 和 ipvs 規則并退出
if o.CleanupAndExit {
return proxyServer.CleanupAndExit()
}
o.proxyServer = proxyServer
return o.runLoop()
}
Run()
方法中主要調用了
NewProxyServer()
方法來初始化 ProxyServer,然後會調用
runLoop()
啟動主循環,繼續看初始化 ProxyServer 的具體實作:
- 初始化 iptables、ipvs 相關的 interface
- 若啟用了 ipvs 則檢查核心版本、ipvs 依賴的核心子產品、ipset 版本,核心子產品主要包括:
,ip_vs
,ip_vs_rr
ip_vs_wrr
ip_vs_sh
nf_conntrack_ipv4
,若沒有相關子產品,kube-proxy 會嘗試使用nf_conntrack
自動加載modprobe
- 根據 proxyMode 初始化 proxier,kube-proxy 啟動後隻運作一種 proxier
k8s.io/kubernetes/cmd/kube-proxy/app/server_others.go:57
func NewProxyServer(o *Options) (*ProxyServer, error) {
return newProxyServer(o.config, o.CleanupAndExit, o.master)
}
func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
cleanupAndExit bool,
master string) (*ProxyServer, error) {
......
if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
......
// 1.關鍵依賴工具 iptables/ipvs/ipset/dbus
var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface
var dbus utildbus.Interface
// 2.執行 linux 指令行的工具
execer := exec.New()
// 3.初始化 iptables/ipvs/ipset/dbus 對象
dbus = utildbus.New()
iptInterface = utiliptables.New(execer, dbus, protocol)
kernelHandler = ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer)
// 4.檢查該機器是否支援使用 ipvs 模式
canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
if canUseIPVS {
ipvsInterface = utilipvs.New(execer)
}
if cleanupAndExit {
return &ProxyServer{
......
}, nil
}
// 5.初始化 kube client 和 event client
client, eventClient, err := createClients(config.ClientConnection, master)
if err != nil {
return nil, err
}
......
// 6.初始化 healthzServer
var healthzServer *healthcheck.HealthzServer
var healthzUpdater healthcheck.HealthzUpdater
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
healthzUpdater = healthzServer
}
// 7.proxier 是一個 interface,每種模式都是一個 proxier
var proxier proxy.Provider
// 8.根據 proxyMode 初始化 proxier
proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
......
if proxyMode == proxyModeIPTables {
klog.V(0).Info("Using iptables Proxier.")
if config.IPTables.MasqueradeBit == nil {
return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
}
// 9.初始化 iptables 模式的 proxier
proxier, err = iptables.NewProxier(
.......
)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
metrics.RegisterMetrics()
} else if proxyMode == proxyModeIPVS {
// 10.判斷是夠啟用了 ipv6 雙棧
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
......
// 11.初始化 ipvs 模式的 proxier
proxier, err = ipvs.NewDualStackProxier(
......
)
} else {
proxier, err = ipvs.NewProxier(
......
)
}
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
metrics.RegisterMetrics()
} else {
// 12.初始化 userspace 模式的 proxier
proxier, err = userspace.NewProxier(
......
)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
}
iptInterface.AddReloadFunc(proxier.Sync)
return &ProxyServer{
......
}, nil
}
runLoop()
方法主要是啟動 proxyServer。
k8s.io/kubernetes/cmd/kube-proxy/app/server.go:311
func (o *Options) runLoop() error {
// 1.watch 配置檔案變化
if o.watcher != nil {
o.watcher.Run()
}
// 2.以 goroutine 方式啟動 proxyServer
go func() {
err := o.proxyServer.Run()
o.errCh <- err
}()
for {
err := <-o.errCh
if err != nil {
return err
}
}
}
o.proxyServer.Run()
中會啟動已經初始化好的所有服務:
- 設定程序 OOMScore,可通過指令行配置,預設值為
--oom-score-adj="-999"
- 啟動 metric server 和 healthz server,兩者分别監聽 10256 和 10249 端口
- 設定核心參數
和nf_conntrack_tcp_timeout_established
nf_conntrack_tcp_timeout_close_wait
- 将 proxier 注冊到 serviceEventHandler、endpointsEventHandler 中
- 啟動 informer 監聽 service 和 endpoints 變化
- 執行
,啟動 proxier 主循環s.Proxier.SyncLoop()
k8s.io/kubernetes/cmd/kube-proxy/app/server.go:527
func (s *ProxyServer) Run() error {
......
// 1.程序 OOMScore,避免程序因 oom 被殺掉,此處預設值為 -999
var oomAdjuster *oom.OOMAdjuster
if s.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
klog.V(2).Info(err)
}
}
......
// 2.啟動 healthz server
if s.HealthzServer != nil {
s.HealthzServer.Run()
}
// 3.啟動 metrics server
if len(s.MetricsBindAddress) > 0 {
......
go wait.Until(func() {
err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
if err != nil {
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
}
}, 5*time.Second, wait.NeverStop)
}
// 4.配置 conntrack,設定核心參數 nf_conntrack_tcp_timeout_established 和 nf_conntrack_tcp_timeout_close_wait
if s.Conntracker != nil {
max, err := getConntrackMax(s.ConntrackConfiguration)
if err != nil {
return err
}
if max > 0 {
err := s.Conntracker.SetMax(max)
......
}
if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
}
......
// 5.啟動 informer 監聽 Services 和 Endpoints 或者 EndpointSlices 資訊
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}))
// 6.将 proxier 注冊到 serviceConfig、endpointsConfig 中
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
} else {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
}
// 7.啟動 informer
informerFactory.Start(wait.NeverStop)
s.birthCry()
// 8.啟動 proxier 主循環
s.Proxier.SyncLoop()
return nil
}
回顧一下整個啟動邏輯:
o.Run() --> o.runLoop() --> o.proxyServer.Run() --> s.Proxier.SyncLoop()
o.Run()
中調用了
NewProxyServer()
來初始化 proxyServer 對象,其中包括初始化每種模式對應的 proxier,該方法最終會調用
s.Proxier.SyncLoop()
執行 proxier 的主循環。
proxier 的初始化
看完了啟動流程的邏輯代碼,接着再看一下各代理模式的初始化,上文已經提到每種模式都是一個 proxier,即要實作
proxy.Provider
對應的 interface,如下所示:
type Provider interface {
config.EndpointsHandler
config.EndpointSliceHandler
config.ServiceHandler
Sync()
SyncLoop()
}
首先要實作 service、endpoints 和 endpointSlice 對應的 handler,也就是對
OnAdd
、
OnUpdate
OnDelete
OnSynced
四種方法的處理,詳細的代碼在下文進行講解。EndpointSlice 是在 v1.16 中新加入的一個 API。
Sync()
SyncLoop()
是主要用來處理iptables 規則的方法。
iptables proxier 初始化
首先看 iptables 模式的
NewProxier()
方法,其函數的具體執行邏輯為:
- 設定相關的核心參數
route_localnet
bridge-nf-call-iptables
- 生成 masquerade 标記
- 設定預設排程算法 rr
- 初始化 proxier 對象
- 使用
初始化 proxier.syncRunner,将 proxier.syncProxyRules 方法注入,BoundedFrequencyRunner
是一個管理器用于執行使用者注入的函數,可以指定運作的時間政策。BoundedFrequencyRunner
k8s.io/kubernetes/pkg/proxy/iptables/proxier.go:249
func NewProxier(ipt utiliptables.Interface,
......
) (*Proxier, error) {
// 1.設定相關的核心參數
if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
......
}
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
......
}
// 2.設定 masqueradeMark,預設為 0x00004000/0x00004000
// 用來标記 k8s 管理的封包,masqueradeBit 預設為 14
// 标記 0x4000 的封包(即 POD 發出的封包),在離開 Node 的時候需要進行 SNAT 轉換
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
......
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil)
// 3.初始化 proxier
isIPv6 := ipt.IsIpv6()
proxier := &Proxier{
......
}
burstSyncs := 2
// 4.初始化 syncRunner,BoundedFrequencyRunner 是一個定時執行器,會定時執行
// proxier.syncProxyRules 方法,syncProxyRules 是每個 proxier 實際重新整理iptables 規則的方法
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
return proxier, nil
}
ipvs proxier 初始化
ipvs
NewProxier()
方法主要邏輯為:
- 設定核心參數,
route_localnet
br_netfilter
bridge-nf-call-iptables
conntrack
conn_reuse_mode
ip_forward
arp_ignore
等arp_announce
- 和 iptables 一樣,對于 SNAT iptables 規則生成 masquerade 标記
- 初始化 ipset 規則
- 初始化 syncRunner 将 proxier.syncProxyRules 方法注入
- 啟動
定時清理 RS (realServer) 記錄gracefuldeleteManager
k8s.io/kubernetes/pkg/proxy/ipvs/proxier.go:316
func NewProxier(ipt utiliptables.Interface,
......
) (*Proxier, error) {
// 1.設定核心參數
if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
......
}
......
// 2.生成 masquerade 标記
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
// 3.設定預設排程算法 rr
if len(scheduler) == 0 {
scheduler = DefaultScheduler
}
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
// 4.初始化 proxier
proxier := &Proxier{
......
}
// 5.初始化 ipset 規則
proxier.ipsetList = make(map[string]*IPSet)
for _, is := range ipsetInfo {
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
}
burstSyncs := 2
// 6.初始化 syncRunner
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
// 7.啟動 gracefuldeleteManager
proxier.gracefuldeleteManager.Run()
return proxier, nil
}
userspace proxier 初始化
userspace
NewProxier()
- 初始化 iptables 規則
- 初始化 proxier
k8s.io/kubernetes/pkg/proxy/userspace/proxier.go:187
func NewProxier(......) (*Proxier, error) {
return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, nodePortAddresses, newProxySocket)
}
func NewCustomProxier(......) (*Proxier, error) {
......
// 1.設定打開檔案數
err = setRLimit(64 * 1000)
if err != nil {
return nil, fmt.Errorf("failed to set open file handler limit: %v", err)
}
proxyPorts := newPortAllocator(pr)
return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
}
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
if proxyPorts == nil {
proxyPorts = newPortAllocator(utilnet.PortRange{})
}
// 2.初始化 iptables 規則
if err := iptablesInit(iptables); err != nil {
return nil, fmt.Errorf("failed to initialize iptables: %v", err)
}
if err := iptablesFlush(iptables); err != nil {
return nil, fmt.Errorf("failed to flush iptables: %v", err)
}
// 3.初始化 proxier
proxier := &Proxier{
......
}
// 4.初始化 syncRunner
proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs)
return proxier, nil
}
proxier 接口實作
handler 的實作
上文已經提到過每種 proxier 都需要實作 interface 中的幾個方法,首先看一下
ServiceHandler
EndpointsHandler
EndpointSliceHandler
相關的,對于 service、endpoints 和 endpointSlices 三種對象都實作了
OnAdd
OnUpdate
OnDelete
OnSynced
方法。
// 1.service 相關的方法
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.OnServiceUpdate(service, nil)
}
func (proxier *Proxier) OnServiceSynced(){
......
proxier.syncProxyRules()
}
// 2.endpoints 相關的方法
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.OnEndpointsUpdate(nil, endpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.Sync()
}
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.OnEndpointsUpdate(endpoints, nil)
}
func (proxier *Proxier) OnEndpointsSynced() {
......
proxier.syncProxyRules()
}
// 3.endpointSlice 相關的方法
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
proxier.Sync()
}
}
func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
proxier.Sync()
}
}
func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
proxier.Sync()
}
}
func (proxier *Proxier) OnEndpointSlicesSynced() {
......
proxier.syncProxyRules()
}
在啟動邏輯的
Run()
方法中 proxier 已經被注冊到了 serviceConfig、endpointsConfig、endpointSliceConfig 中,當啟動 informer,cache 同步完成後會調用
OnSynced()
方法,之後當 watch 到變化後會調用 proxier 中對應的
OnUpdate()
方法進行處理,
OnSynced()
會直接調用
proxier.syncProxyRules()
來重新整理iptables 規則,而
OnUpdate()
會調用
proxier.syncRunner.Run()
方法,其最終也是調用
proxier.syncProxyRules()
方法重新整理規則的,這種轉換是在
BoundedFrequencyRunner
中展現出來的,下面看一下具體實作。
Sync() 以及 SyncLoop() 的實作
每種 proxier 的
Sync()
以及
SyncLoop()
方法如下所示,都是調用 syncRunner 中的相關方法,而 syncRunner 在前面的
NewProxier()
中已經說過了,syncRunner 是調用
async.NewBoundedFrequencyRunner()
方法初始化,至此,基本上可以确定了所有的核心都是在
BoundedFrequencyRunner
中實作的。
func NewProxier() (*Proxier, error) {
......
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
......
}
// Sync()
func (proxier *Proxier) Sync() {
proxier.syncRunner.Run()
}
// SyncLoop()
func (proxier *Proxier) SyncLoop() {
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
proxier.syncRunner.Loop(wait.NeverStop)
}
NewBoundedFrequencyRunner()
是其初始化的函數,其中的參數
minInterval
maxInterval
分别對應 proxier 中的
minSyncPeriod
syncPeriod
,兩者的預設值分别為 0s 和 30s,其值可以使用
--iptables-min-sync-period
--iptables-sync-period
啟動參數來指定。
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:134
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
timer := realTimer{Timer: time.NewTimer(0)}
// 執行定時器
<-timer.C()
// 調用 construct() 函數
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
if maxInterval < minInterval {
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
}
if timer == nil {
panic(fmt.Sprintf("%s: timer must be non-nil", name))
}
bfr := &BoundedFrequencyRunner{
name: name,
fn: fn, // 被調用的函數,proxier.syncProxyRules
minInterval: minInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 1),
timer: timer,
}
// 由于預設的 minInterval = 0,此處使用 nullLimiter
if minInterval == 0 {
bfr.limiter = nullLimiter{}
} else {
// 采用“令牌桶”算法實作流控機制
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
}
return bfr
}
在啟動流程
Run()
方法最後調用的
s.Proxier.SyncLoop()
最終調用的是
BoundedFrequencyRunner
的
Loop()
方法,如下所示:
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:169
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
return
case <-bfr.timer.C(): // 定時器
bfr.tryRun()
case <-bfr.run: // 接收 channel
bfr.tryRun()
}
}
}
proxier 的
OnUpdate()
中調用的
syncRunner.Run()
其實隻是在 bfr.run 這個帶 buffer 的 channel 中發送了一條資料,在
BoundedFrequencyRunner
Loop()
方法中接收到該資料後會調用
bfr.tryRun()
進行處理:
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:191
func (bfr *BoundedFrequencyRunner) Run() {
select {
case bfr.run <- struct{}{}: // 向 channel 發送信号
default:
}
}
而
tryRun()
方法才是最終調用
syncProxyRules()
重新整理iptables 規則的。
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:211
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
if bfr.limiter.TryAccept() {
// 執行 fn() 即 syncProxyRules() 重新整理iptables 規則
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
return
}
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
if nextPossible < nextScheduled {
bfr.timer.Stop()
bfr.timer.Reset(nextPossible)
}
}
通過以上分析可知,
syncProxyRules()
是每個 proxier 的核心方法,啟動 informer cache 同步完成後會直接調用
proxier.syncProxyRules()
重新整理iptables 規則,之後如果 informer watch 到相關對象的變化後會調用
BoundedFrequencyRunner
tryRun()
來重新整理iptables 規則,定時器每 30s 會執行一次iptables 規則的重新整理。
總結
本文主要介紹了 kube-proxy 的啟動邏輯以及三種模式 proxier 的初始化,還有最終調用重新整理iptables 規則的 BoundedFrequencyRunner,可以看到其中的代碼寫的很巧妙。而每種模式下的iptables 規則是如何建立、重新整理以及轉發的是如何實作的會在後面的文章中進行分析。