main
controllers/nginx/pkg/cmd/controller/main.go:29 func main() {
// start a new nginx controller
ngx := newNGINXController()
// create a custom Ingress controller using NGINX as backend
ic := controller.NewIngressController(ngx)
go handleSigterm(ic) // start the controller
ic.Start() // wait
glog.Infof("shutting down Ingress controller...") for {
glog.Infof("Handled quit, awaiting pod deletion")
time.Sleep(30 * time.Second)
}
}
- start a new nginx controller.
- create a custom Ingress controller using NGINX as backend.
- start the Ingress controller.
newNGINXController
controllers/nginx/pkg/cmd/controller/nginx.go:68 func newNGINXController() ingress.Controller {
// 從環境變量“NGINX_BINARY”中擷取nginx二進制檔案的路徑,如果沒有該環境變量,則使用預設的預設值“/usr/sbin/nginx”
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = binary
}
// 從"/etc/resolv.conf"中讀取dns nameservers的IP清單
h, err := dns.GetSystemNameServers()
...
// 構造NGINXController對象
n := &NGINXController{
binary: ngx,
configmap: &api_v1.ConfigMap{},
// 檢視 "/proc/net/if_inet6"檔案是否存在,檢查是否Enable IPv6
isIPV6Enabled: isIPv6Enabled(),
resolver: h,
proxy: &proxy{
// 設定proxy的default server為 ”127.0.0.1:442”
Default: &server{
Hostname: "localhost",
IP: "127.0.0.1",
Port: 442,
ProxyProtocol: true,
},
},
}
// 啟動對 tcp/443 端口的監聽
listener, err := net.Listen("tcp", ":443")
proxyList := &proxyproto.Listener{Listener: listener}
// start goroutine that accepts tcp connections in port 443 go func() {
for {
var conn net.Conn
var err error
if n.isProxyProtocolEnabled {
// we need to wrap the listener in order to decode // proxy protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.proxy.Handle(conn)
}
}()
// onChange定義nginx.tmpl檔案内容發生變化時需要進行操作:重新生成nginx template instance,并賦給NGINXController var onChange func()
onChange = func() {
template, err := ngx_template.NewTemplate(tmplPath, onChange)
...
n.t.Close()
n.t = template
glog.Info("new NGINX template loaded")
}
// 根據nginx.tmpl生成nginx Template,并将onChange注冊進去,通過FileWatcher監聽nginx.tmpl的變化時,自動調用onChange。
ngxTpl, err := ngx_template.NewTemplate(tmplPath, onChange)
n.t = ngxTpl
// start a new NGINX master process running in foreground.
go n.Start()
return ingress.Controller(n)
}
- 從環境變量“NGINX_BINARY”中擷取nginx二進制檔案的路徑,如果沒有該環境變量,則使用預設的預設值“/usr/sbin/nginx”
- 從"/etc/resolv.conf"中讀取dns nameservers的IP清單
- 構造NGINXController對象
- 檢視 "/proc/net/if_inet6"檔案是否存在,檢查是否Enable IPv6
- 設定proxy的default server為 ”127.0.0.1:442”
- 啟動對 tcp/443 端口的監聽
- start goroutine that accepts tcp connections in port 443
- onChange定義nginx.tmpl檔案内容發生變化時需要進行操作:重新生成nginx template instance,并賦給NGINXController
- 根據nginx.tmpl生成nginx Template,并将onChange注冊進去,通過FileWatcher監聽nginx.tmpl的變化時,自動調用onChange。
- start a new NGINX master process running in foreground.
NGINXController的定義如下:
controllers/nginx/pkg/cmd/controller/nginx.go:156
type NGINXController struct {
t *ngx_template.Template
configmap *api_v1.ConfigMap
storeLister ingress.StoreLister
binary string
resolver []net.IP
cmdArgs []string
stats *statsCollector
statusModule statusModule
// returns true if IPV6 is enabled in the pod
isIPV6Enabled bool // returns true if proxy protocol es enabled
isProxyProtocolEnabled bool
proxy *proxy
}
NewTemplate
時,我們可以看到根據nginx.tmpl來生成nginx配置時,注冊了
funcMap
,從
funcMap
我們可以看出nginx.tpml主要包括哪些組成部分。
//controllers/nginx/pkg/template/template.go:57 func NewTemplate(file string, onChange func()) (*Template, error) {
tmpl, err := text_template.New("nginx.tmpl").Funcs(funcMap).ParseFiles(file)
fw, err := watch.NewFileWatcher(file, onChange)
return &Template{
tmpl: tmpl,
fw: fw,
s: defBufferSize,
tmplBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)),
outCmdBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)),
}, nil
}
//controllers/nginx/pkg/template/template.go:123
funcMap = text_template.FuncMap{
"empty": func(input interface{}) bool {
check, ok := input.(string)
if ok {
return len(check) == 0
}
return true
},
"buildLocation": buildLocation,
"buildAuthLocation": buildAuthLocation,
"buildAuthResponseHeaders": buildAuthResponseHeaders,
"buildProxyPass": buildProxyPass,
"buildRateLimitZones": buildRateLimitZones,
"buildRateLimit": buildRateLimit,
"buildResolvers": buildResolvers,
"buildUpstreamName": buildUpstreamName,
"isLocationAllowed": isLocationAllowed,
"buildLogFormatUpstream": buildLogFormatUpstream,
"buildDenyVariable": buildDenyVariable,
"getenv": os.Getenv,
"contains": strings.Contains,
"hasPrefix": strings.HasPrefix,
"hasSuffix": strings.HasSuffix,
"toUpper": strings.ToUpper,
"toLower": strings.ToLower,
"formatIP": formatIP,
"buildNextUpstream": buildNextUpstream,
}
controller.NewIngressController
core/pkg/ingress/controller/launch.go:29 // NewIngressController returns a configured Ingress controller func NewIngressController(backend ingress.Controller) *GenericController {
...
flags.Parse(os.Args)
// 對于Ningx而言,OverrideFlags主要處理ingress-class配置,如果ingress-class沒有配置,則使用預設配置"nginx",并啟動Prometheus Collector for the nginx。
backend.OverrideFlags(flags)
flag.Set("logtostderr", "true")
...
// 如果配置了publish-service,則要檢查該serivce的svc.Status.LoadBalancer.Ingress不為空。 if *publishSvc != "" {
...
if len(svc.Status.LoadBalancer.Ingress) == 0 {
// We could poll here, but we instead just exit and rely on k8s to restart us
glog.Fatalf("service %s does not (yet) have ingress points", *publishSvc)
}
}
...
// 建立目錄 "/ingress-controller/ssl", 用于儲存Ingress中定義的SSL certificates,檔案名格式為<namespace>-<secret name>.pem
err = os.MkdirAll(ingress.DefaultSSLDirectory, 0655)
// 構造Ingress Controller的配置
config := &Configuration{
UpdateStatus: *updateStatus,
ElectionID: *electionID,
Client: kubeClient,
ResyncPeriod: *resyncPeriod,
DefaultService: *defaultSvc,
IngressClass: *ingressClass,
DefaultIngressClass: backend.DefaultIngressClass(),
Namespace: *watchNamespace,
ConfigMapName: *configMap,
TCPConfigMapName: *tcpConfigMapName,
UDPConfigMapName: *udpConfigMapName,
DefaultSSLCertificate: *defSSLCertificate,
DefaultHealthzURL: *defHealthzURL,
PublishService: *publishSvc,
Backend: backend,
ForceNamespaceIsolation: *forceIsolation,
UpdateStatusOnShutdown: *UpdateStatusOnShutdown,
SortBackends: *SortBackends,
}
// 建立 Nginx Ingress controller。
ic := newIngressController(config)
// 注冊 "/debug/pprof"等handler,友善必要時進行性能調試. // 注冊 "/stop" handler,以必要時stop nginx-ingress-controller. // 注冊 "/metrics" handler,提供Prometheus收集監控資料。 go registerHandlers(*profiling, *healthzPort, ic)
return ic
}
- 對于Ningx而言,OverrideFlags主要處理ingress-class配置,如果ingress-class沒有配置,則使用預設配置"nginx",并啟動Prometheus Collector for the nginx。
- 如果配置了publish-service,則要檢查該serivce的svc.Status.LoadBalancer.Ingress不為空。
- 建立目錄 "/ingress-controller/ssl", 用于儲存Ingress中定義的SSL certificates,檔案名格式為<namespace>-<secret name>.pem
- 構造Ingress Controller的配置
- 建立 Nginx Ingress controller。
- 注冊 "/debug/pprof"等handler, 友善必要時進行性能調試.
- 注冊 "/stop" handler,以必要時stop nginx-ingress-controller.
- 注冊 "/metrics" handler,提供Prometheus收集監控資料。
core/pkg/ingress/controller/controller.go:149 // newIngressController creates an Ingress controller
func newIngressController(config *Configuration) *GenericController {
// 建立事件廣播器
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: config.Client.Core().Events(config.Namespace),
})
// 建構GenericController
ic := GenericController{
cfg: config,
stopLock: &sync.Mutex{},
stopCh: make(chan struct{}),
// 設定syncRateLimiter的qps為0.3, burst為1.
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, api.EventSource{
Component: "ingress-controller",
}),
// sslCertTracker holds a store of referenced Secrets in Ingress rules.
sslCertTracker: newSSLCertTracker(),
}
// 建立Ingress的syncQueue,每往syncQueue插入一個Ingress對象,就會調用syncIngress一次。
ic.syncQueue = task.NewTaskQueue(ic.syncIngress)
// from here to the end of the method all the code is just boilerplate // required to watch Ingress, Secrets, ConfigMaps and Endoints. // This is used to detect new content, updates or removals and act accordingly // 定義Ingress Event Handler: Add, Delete, Update。
ingEventHandler := cache.ResourceEventHandlerFuncs{
// 注冊Ingress Add Event Handler.
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
// 隻處理Annotation ”kubernetes.io/ingress.class”滿足條件的Ingress,條件必須滿足其中之一:1. 如果Annotation為空,則要求--ingress-class設定的值為"nginx";2. Annotation與--ingress-class設定的值相同。 if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
ic.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
// 将滿足條件的Ingress Object加入到syncQueue,如此便會觸發調用ic.syncIngress來update and reload nginx config。
ic.syncQueue.Enqueue(obj)
// extracts information about secrets inside the Ingress rule, 如果該Ingress中的secret不在sslCertTracker cache中,則會調用ic.syncSecret将secret内容更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
ic.extractSecretNames(addIng)
},
// 注冊Ingress Delete Event Handler。
DeleteFunc: func(obj interface{}) {
delIng := obj.(*extensions.Ingress)
// 同Add一樣,隻處理Annotation ”kubernetes.io/ingress.class”滿足條件的Ingress。 if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
ic.recorder.Eventf(delIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
// 将滿足條件的Ingress Object加入到syncQueue。
ic.syncQueue.Enqueue(obj)
},
// 注冊Ingress Update Event Handler。
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
ic.recorder.Eventf(curIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else {
// old and cur are invalid or old and cur doesn't have changes, so ignore return
}
// 将滿足條件的Ingress Object加入到syncQueue。
ic.syncQueue.Enqueue(cur)
// 将Ingress中定義的Secret同步更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
ic.extractSecretNames(curIng)
},
}
// 定義Secret Event Handler: Add, Delete, Update。
secrEventHandler := cache.ResourceEventHandlerFuncs{
// 注冊Secret Add Event Handler。
AddFunc: func(obj interface{}) {
sec := obj.(*api.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
// checks if a secret is referenced or not by one or more Ingress rules if ic.secrReferenced(sec.Namespace, sec.Name) {
// 調用ic.syncSecret将secret内容更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中
ic.syncSecret(key)
}
},
// 注冊Secret Update Event Handler。
UpdateFunc: func(old, cur interface{}) {
// 判斷old secret與current secret内容是否相同。 if !reflect.DeepEqual(old, cur) {
sec := cur.(*api.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
// 如果不同,則調用ic.syncSecret同步到pem檔案中。
ic.syncSecret(key)
}
},
// 注冊Secret Delete Event Handler。
DeleteFunc: func(obj interface{}) {
sec := obj.(*api.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
// 從sslCertTracker cache中删除對應的secret
ic.sslCertTracker.DeleteAll(key)
},
}
// 定義通用event handler(給Endpoint Object使用),對于Add/Delete/Update都将object插入到syncQueue.
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
ic.syncQueue.Enqueue(cur)
}
},
}
// 定義ConfigMap Event Handler: Add, Delete, Update。
mapEventHandler := cache.ResourceEventHandlerFuncs{
// 注冊ConfigMap Add Event Handler。
AddFunc: func(obj interface{}) {
upCmap := obj.(*api.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
// 判斷該ConfigMap的namespace/configMapName是都比對--configmap的Name。 if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
// 如果比對,則将該configmap内容設定給後端nginx,并設定reloadRequired為true(syncIngress中會判斷reloadRequired是否為true,才執行對應的OnUpdate對nginx config更新并reload。)
ic.cfg.Backend.SetConfig(upCmap)
ic.reloadRequired = true
}
},
// 注冊ConfigMap Update Event Handler。
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*api.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
// 判斷該ConfigMap的namespace/configMapName是都比對--configmap的Name if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
// 如果比對,則将該configmap内容設定給後端nginx,并設定reloadRequired為true(syncIngress中會判斷reloadRequired是否為true,才執行對應的OnUpdate對nginx config更新并reload。)
ic.cfg.Backend.SetConfig(upCmap)
ic.reloadRequired = true
}
// updates to configuration configmaps can trigger an update if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
ic.recorder.Eventf(upCmap, api.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
// 将更新的configmap插入到syncQueue,觸發調用syncIngress。
ic.syncQueue.Enqueue(cur)
}
}
},
}
// 如果啟用了--force-namespace-isolation,并且配置--watch-namespace不是“”(表示all namespace),則watchNs為--watch-namespace配置的值,否則其他任何情況,wathNs都為all. // 注意watchNs隻表示對Secret和ConfigMap的watch namesapce。
watchNs := api.NamespaceAll
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != api.NamespaceAll {
watchNs = ic.cfg.Namespace
}
// 對ingress,endpoint,service而言,watch的namespace就是--watch-namespace配置的值。
ic.ingLister.Store, ic.ingController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Extensions().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
ic.endpLister.Store, ic.endpController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&api.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
ic.secrLister.Store, ic.secrController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "secrets", watchNs, fields.Everything()),
&api.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
ic.mapLister.Store, ic.mapController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "configmaps", watchNs, fields.Everything()),
&api.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
ic.svcLister.Store, ic.svcController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&api.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
// 對node對象而言,watch的namespace當然始終是all namespace。
ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
&api.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
// --update-status配置預設為true,也就是說預設會通過status.NewStatusSyncer來配置ingress controller的syncStatus接口。syncStatus用來returns a list of IP addresses and/or FQDN where the ingress controller is currently running。 if config.UpdateStatus {
ic.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: ic.cfg.PublishService,
IngressLister: ic.ingLister,
ElectionID: config.ElectionID,
IngressClass: config.IngressClass,
DefaultIngressClass: config.DefaultIngressClass,
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
})
} else {
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
}
ic.annotations = newAnnotationExtractor(ic)
// 将Ingress, Service, Node, Endpoint, Secret, ConfigMap的Lister指派給nginx ingress controller的Lister。
ic.cfg.Backend.SetListers(ingress.StoreLister{
Ingress: ic.ingLister,
Service: ic.svcLister,
Node: ic.nodeLister,
Endpoint: ic.endpLister,
Secret: ic.secrLister,
ConfigMap: ic.mapLister,
})
return &ic
}
- 建立事件廣播器
- 建構GenericController
- 建立Ingress的syncQueue,每往syncQueue插入一個Ingress對象,就會調用syncIngress一次。
- 定義Ingress Event Handler: Add, Delete, Update。
- 注冊Ingress Add Event Handler.
- 隻處理Annotation ”kubernetes.io/ingress.class”滿足條件的Ingress,條件必須滿足其中之一:1. 如果Annotation為空,則要求--ingress-class設定的值為"nginx";2. Annotation與--ingress-class設定的值相同。
- 将滿足條件的Ingress Object加入到syncQueue,如此便會觸發調用ic.syncIngress來update and reload nginx config。
- extracts information about secrets inside the Ingress rule, 如果該Ingress中的secret不在sslCertTracker cache中,則會調用ic.syncSecret将secret内容更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
- 注冊Ingress Delete Event Handler。
- 同Add一樣,隻處理Annotation ”kubernetes.io/ingress.class”滿足條件的Ingress。
- 将滿足條件的Ingress Object加入到syncQueue。
- 注冊Ingress Update Event Handler。
- 将Ingress中定義的Secret同步更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
- 注冊Ingress Add Event Handler.
- 定義Secret Event Handler: Add, Delete, Update。
- 注冊Secret Add Event Handler。
- checks if a secret is referenced or not by one or more Ingress rules
- 調用ic.syncSecret将secret内容更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中
- checks if a secret is referenced or not by one or more Ingress rules
- 注冊Secret Update Event Handler。
- 判斷old secret與current secret内容是否相同。 - 如果不同,則調用ic.syncSecret同步到pem檔案中
- 注冊Secret Delete Event Handler。
- 從sslCertTracker cache中删除對應的secret
- 注冊Secret Add Event Handler。
- 定義通用event handler(給Endpoint Object使用),對于Add/Delete/Update都将object插入到syncQueue.
- 定義ConfigMap Event Handler: Add, Delete, Update。
- 注冊ConfigMap Add Event Handler。
- 判斷該ConfigMap的namespace/configMapName是都比對--configmap的Name。
- 如果比對,則将該configmap内容設定給後端nginx,并設定reloadRequired為true(syncIngress中會判斷reloadRequired是否為true,才執行對應的OnUpdate對nginx config更新并reload。)
- 判斷該ConfigMap的namespace/configMapName是都比對--configmap的Name。
- 注冊ConfigMap Update Event Handler。 - 判斷該ConfigMap的namespace/configMapName是都比對--configmap的Name - 如果比對,則将該configmap内容設定給後端nginx,并設定reloadRequired為true(syncIngress中會判斷reloadRequired是否為true,才執行對應的OnUpdate對nginx config更新并reload。) - updates to configuration configmaps can trigger an update - 将更新的configmap插入到syncQueue,觸發調用syncIngress。
- 注冊ConfigMap Add Event Handler。
- 如果啟用了--force-namespace-isolation,并且配置--watch-namespace不是“”(表示all namespace),則watchNs為--watch-namespace配置的值,否則其他任何情況,wathNs都為all.
- 注意watchNs隻表示對Secret和ConfigMap的watch namesapce。
- 對ingress,endpoint,service而言,watch的namespace就是--watch-namespace配置的值。
- 對node對象而言,watch的namespace當然始終是all namespace。
- --update-status配置預設為true,也就是說預設會通過status.NewStatusSyncer來配置ingress controller的syncStatus接口。syncStatus用來returns a list of IP addresses and/or FQDN where the ingress controller is currently running。
- 将Ingress, Service, Node, Endpoint, Secret, ConfigMap的Lister指派給nginx ingress controller的Lister。
每次插入一個Object到ic.syncQueue,都會觸發調用一次syncIngress,從上面的代碼分析,可得出以下情況會往ic.syncQueue插入Object:
- Ingress Add/Delete/Update Event Handler
- Endpoint Add/Delete/Update Event Handler
- ConfigMap Update Event Handler
上面也提到,syncIngress會update and reload nginx config,具體的邏輯見如下代碼。
core/pkg/ingress/controller/controller.go:378 // sync collects all the pieces required to assemble the configuration file and // then sends the content to the backend (OnUpdate) receiving the populated // template as response reloading the backend if is required.
func (ic *GenericController) syncIngress(key interface{}) error {
ic.syncRateLimiter.Accept()
if ic.syncQueue.IsShuttingDown() {
return nil
}
// getBackendServers returns a list of Upstream and Server to be used by the backend. An upstream can be used in multiple servers if the namespace, service name and port are the same
upstreams, servers := ic.getBackendServers()
var passUpstreams []*ingress.SSLPassthroughBackend
// 建構passUpstreams部分 for _, server := range servers {
if !server.SSLPassthrough {
continue
}
for _, loc := range server.Locations {
if loc.Path != rootLocation {
glog.Warningf("ignoring path %v of ssl passthrough host %v", loc.Path, server.Hostname)
continue
}
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
Backend: loc.Backend,
Hostname: server.Hostname,
Service: loc.Service,
Port: loc.Port,
})
break
}
}
// 建構目前nginx config。
pcfg := ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
PassthroughBackends: passUpstreams,
}
// 如果ic.reloadRequired為false,并且nginx config内容不變,則跳過nginx reload,流程結束。 if !ic.reloadRequired && (ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg)) {
glog.V(3).Infof("skipping backend reload (no changes detected)")
return nil
}
glog.Infof("backend reload required")
// 否則,調用NGINXController.OnUpdate對nginx config進行reload.
err := ic.cfg.Backend.OnUpdate(pcfg)
if err != nil {
incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
return err
}
ic.reloadRequired = false
glog.Infof("ingress backend successfully reloaded...")
incReloadCount()
setSSLExpireTime(servers)
// 更新ic.runningConfig
ic.runningConfig = &pcfg
return nil
}
- getBackendServers returns a list of Upstream and Server to be used by the backend. An upstream can be used in multiple servers if the namespace,
- 建構passUpstreams部分
- 建構目前nginx config。
- 如果ic.reloadRequired為false,并且nginx config内容不變,則跳過nginx reload,流程結束。
- 否則,調用NGINXController.OnUpdate對nginx config進行reload.
- 更新ic.runningConfig
從前面的代碼分析,可得出以下情況會調用ic.syncSecret将secret内容更新到對應的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
- Ingress Add Event Handler
- Secret Add/Update Event Handler
ic.syncSecret具體的邏輯見如下代碼。
core/pkg/ingress/controller/backend_ssl.go:38 // syncSecret keeps in sync Secrets used by Ingress rules with the files on // disk to allow copy of the content of the secret to disk to be used // by external processes.
func (ic *GenericController) syncSecret(key string) {
glog.V(3).Infof("starting syncing of secret %v", key)
var cert *ingress.SSLCert
var err error
// 調用ic.getPemCertificate receives a secret, and creates a ingress.SSLCert as return. It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
cert, err = ic.getPemCertificate(key)
if err != nil {
glog.Warningf("error obtaining PEM from secret %v: %v", key, err)
return
}
// create certificates and add or update the item in the store(sslCertTracker)
cur, exists := ic.sslCertTracker.Get(key)
if exists {
s := cur.(*ingress.SSLCert)
if reflect.DeepEqual(s, cert) {
// no need to update return
}
glog.Infof("updating secret %v in the local store", key)
ic.sslCertTracker.Update(key, cert)
ic.reloadRequired = true return
}
glog.Infof("adding secret %v to the local store", key)
ic.sslCertTracker.Add(key, cert)
ic.reloadRequired = true
}
- 調用ic.getPemCertificate receives a secret, and creates a ingress.SSLCert as return. It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
- create certificates and add or update the item in the store(sslCertTracker)
- 設定 ic.reloadRequired = true
在newIngressController中通過NewStatusSyncer給ingress Controller建立了一個StatusSyncer。
--update-status
配置預設為true,也就是說預設會通過status.NewStatusSyncer來配置ingress controller的syncStatus接口。syncStatus用來returns a list of IP addresses and/or FQDN where the ingress controller is currently running。
core/pkg/ingress/status/status.go:186 // NewStatusSyncer returns a new Sync instance
func NewStatusSyncer(config Config) Sync {
pod, err := k8s.GetPodDetails(config.Client)
if err != nil {
glog.Fatalf("unexpected error obtaining pod information: %v", err)
}
st := statusSync{
pod: pod,
runLock: &sync.Mutex{},
Config: config,
}
// 每往st.syncQueue中插入一個Object,都會觸發調用st.sync。
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)
// we need to use the defined ingress class to allow multiple leaders // in order to update information about ingress status id := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
if config.IngressClass != "" {
id = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
}
// 選舉leader
le, err := NewElection(id,
pod.Name, pod.Namespace, 30*time.Second,
st.callback, config.Client)
if err != nil {
glog.Fatalf("unexpected error starting leader election: %v", err)
}
st.elector = le
return st
}
core/pkg/ingress/status/status.go:146
func (s *statusSync) sync(key interface{}) error {
s.runLock.Lock()
defer s.runLock.Unlock()
if s.syncQueue.IsShuttingDown() {
glog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
return nil
}
// skipping Ingress status update (I am not the current leader) if !s.elector.IsLeader() {
glog.V(2).Infof("skipping Ingress status update (I am not the current leader)")
return nil
}
// runningAddresses returns a list of IP addresses and/or FQDN where the ingress controller is currently running.
addrs, err := s.runningAddresses()
if err != nil {
return err
}
// 将新Ip更新到對應的ingress.status中(Ingress.Status.LoadBalancer.Ingress = newIPs)
s.updateStatus(sliceToStatus(addrs))
return nil
}
core/pkg/ingress/status/status.go:218 // runningAddresses returns a list of IP addresses and/or FQDN where the // ingress controller is currently running
func (s *statusSync) runningAddresses() ([]string, error) {
// 如果配置了PublishService,則擷取PublishService的`svc.Status.LoadBalancer.Ingress`作為currently running ingress controller的IP/FQDN if s.PublishService != "" {
ns, name, _ := k8s.ParseNameNS(s.PublishService)
svc, err := s.Client.Core().Services(ns).Get(name, meta_v1.GetOptions{})
if err != nil {
return nil, err
}
addrs := []string{}
for _, ip := range svc.Status.LoadBalancer.Ingress {
if ip.IP == "" {
addrs = append(addrs, ip.Hostname)
} else {
addrs = append(addrs, ip.IP)
}
}
return addrs, nil
}
// get information about all the pods running the ingress controller
pods, err := s.Client.Core().Pods(s.pod.Namespace).List(meta_v1.ListOptions{
LabelSelector: labels.SelectorFromSet(s.pod.Labels).String(),
})
if err != nil {
return nil, err
}
// 如果沒有配置PublishService,則傳回各個pod所在的node IP組成的Array。
addrs := []string{}
for _, pod := range pods.Items {
name := k8s.GetNodeIP(s.Client, pod.Spec.NodeName)
if !strings.StringInSlice(name, addrs) {
addrs = append(addrs, name)
}
}
return addrs, nil
}
StatusSyncer的邏輯為:
- 每往st.syncQueue中插入一個Object,都會觸發調用st.sync。sync的流程如下:
- skipping Ingress status update (I am not the current leader)
- runningAddresses returns a list of IP addresses and/or FQDN where the ingress controller is currently running. runningAddresses的流程如下:
- 如果配置了PublishService,則擷取PublishService的
作為currently running ingress controller的IP/FQDNsvc.Status.LoadBalancer.Ingress
- 如果沒有配置PublishService,則傳回各個pod所在的node IP組成的Array。
- 如果配置了PublishService,則擷取PublishService的
- 将新Ip更新到對應的ingress.status中(Ingress.Status.LoadBalancer.Ingress = newIPs)
- 選舉leader
GenericController.Start
core/pkg/ingress/controller/controller.go:1237
// Start starts the Ingress controller.
func (ic GenericController) Start() {
glog.Infof("starting Ingress controller")
// 分别啟動goruntine對ingress,endpoint,service,node,secret,configmap進行listwatch。
go ic.ingController.Run(ic.stopCh)
go ic.endpController.Run(ic.stopCh)
go ic.svcController.Run(ic.stopCh)
go ic.nodeController.Run(ic.stopCh)
go ic.secrController.Run(ic.stopCh)
go ic.mapController.Run(ic.stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(ic.stopCh,
ic.ingController.HasSynced,
ic.svcController.HasSynced,
ic.endpController.HasSynced,
ic.secrController.HasSynced,
ic.mapController.HasSynced,
ic.nodeController.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
// 啟動goruntine,每次執行完worker(在newIngressController時通過ic.syncQueue = task.NewTaskQueue(ic.syncIngress)注冊worker為ic.syncIngress),等待10s,再次執行worker,如此循環,直到收到stopCh
go ic.syncQueue.Run(10*time.Second, ic.stopCh)
// 如果配置了--update-status(預設為true),則啟動goruntine執行statusSync.Run,starts the loop to keep the status in sync. if ic.syncStatus != nil {
go ic.syncStatus.Run(ic.stopCh)
}
<-ic.stopCh
}
- 分别啟動goruntine對ingress,endpoint,service,node,secret,configmap進行listwatch。
- Wait for all involved caches to be synced, before processing items from the queue is started.
- 啟動goruntine,每次執行完worker(在newIngressController時通過ic.syncQueue = task.NewTaskQueue(ic.syncIngress)注冊worker為
),等待10s,再次執行worker,如此循環,直到收到stopCh.ic.syncIngress
- 關于
的代碼分析在前面小節中已經分析過了。ic.syncIngress
- 如果配置了
(預設為true),則啟動goruntine執行statusSync.Run,starts the loop to keep the status in sync.--update-status
對于statysSync.Run的代碼,我們有必要進一步分析:
core/pkg/ingress/status/status.go:86
func (s statusSync) Run(stopCh <-chan struct{}) {
// 啟動goruntine starts the leader election loop。
go wait.Forever(s.elector.Run, 0)
// 啟動goruntine,每隔30s往statusSync.syncQueue中插入dummy object,強制觸發ingress status sync。
go s.run()
// 啟動goruntine,每次執行完worker(在NewStatusSyncer時通過st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)注冊worker為st.sync),等待1s,再次執行worker,如此循環,直到收到stopCh。
go s.syncQueue.Run(time.Second, stopCh)
<-stopCh
}
core/pkg/ingress/status/status.go:132
func (s *statusSync) run() {
err := wait.PollInfinite(updateInterval, func() (bool, error) {
if s.syncQueue.IsShuttingDown() {
return true, nil
}
// send a dummy object to the queue to force a sync
s.syncQueue.Enqueue("dummy")
return false, nil
})
if err != nil {
glog.Errorf("error waiting shutdown: %v", err)
}
}
- 啟動goruntine starts the leader election loop。
- 啟動goruntine,每隔30s往statusSync.syncQueue中插入dummy object,強制觸發ingress status sync。
- 啟動goruntine,每次執行完worker(在NewStatusSyncer時通過st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)注冊worker為st.sync),等待1s,再次執行worker,如此循環,直到收到stopCh。
- 對于st.sync的源碼分析在前面小節中已經分析過了。
本文轉自掘金-
Kubernetes Nginx Ingress Controller源碼分析