天天看點

kube-proxy源碼解析kube-proxy源碼解析

kube-proxy源碼解析

ipvs相對于iptables模式具備較高的性能與穩定性, 本文講以此模式的源 碼解析為主,如果想去了解iptables模式的原理,可以去參考其實作,架構上無差别。

kube-proxy主要功能是監聽service和endpoint的事件,然後下放代理政策到機器上。 底層調用

docker/libnetwork

, 而libnetwork最終調用了

netlink

與netns來實作ipvs的建立等動作

初始化配置

代碼入口:

cmd/kube-proxy/app/server.go

Run() 函數

通過指令行參數去初始化proxyServer的配置

proxyServer, err := NewProxyServer(o)           
type ProxyServer struct {
 // k8s client Client clientset.Interface EventClient v1core.EventsGetter

 // ipvs 相關接口
 IptInterface utiliptables.Interface IpvsInterface utilipvs.Interface IpsetInterface utilipset.Interface

 // 處理同步時的處理器
 Proxier proxy.ProxyProvider

 // 代理模式,ipvs iptables userspace kernelspace(windows)四種
 ProxyMode string
 // 配置同步周期
 ConfigSyncPeriod time.Duration

 // service 與 endpoint 事件處理器
 ServiceEventHandler config.ServiceHandler EndpointsEventHandler config.EndpointsHandler
}           

Proxier是主要入口,抽象了兩個函數:

type ProxyProvider interface {
 // Sync immediately synchronizes the ProxyProvider's current state to iptables.
 Sync()
 // 定期執行
 SyncLoop()
}           

ipvs 的interface 這個很重要:

type Interface interface {
 // 删除所有規則
 Flush() error
 // 增加一個virtual server AddVirtualServer(*VirtualServer) error UpdateVirtualServer(*VirtualServer) error DeleteVirtualServer(*VirtualServer) error GetVirtualServer(*VirtualServer) (*VirtualServer, error)
 GetVirtualServers() ([]*VirtualServer, error)

 // 給virtual server加個realserver, 如 VirtualServer就是一個clusterip realServer就是pod(或者自定義的endpoint)
 AddRealServer(*VirtualServer, *RealServer) error GetRealServers(*VirtualServer) ([]*RealServer, error)
 DeleteRealServer(*VirtualServer, *RealServer) error
}           

我們在下文再詳細看ipvs_linux是如何實作上面接口的

virtual server與realserver, 最重要的是ip:port,然後就是一些代理的模式如sessionAffinity等:

type VirtualServer struct {
 Address net.IP Protocol string Port uint16 Scheduler string Flags ServiceFlags Timeout uint32
} type RealServer struct {
 Address net.IP Port uint16 Weight int
}           
建立apiserver client
client, eventClient, err := createClients(config.ClientConnection, master)           
建立Proxier 這是僅僅關注ipvs模式的proxier
else if proxyMode == proxyModeIPVS {
 glog.V(0).Info("Using ipvs Proxier.")
 proxierIPVS, err := ipvs.NewProxier(
 iptInterface,
 ipvsInterface,
 ipsetInterface,
 utilsysctl.New(),
 execer,
 config.IPVS.SyncPeriod.Duration,
 config.IPVS.MinSyncPeriod.Duration,
 config.IPTables.MasqueradeAll,
 int(*config.IPTables.MasqueradeBit),
 config.ClusterCIDR,
 hostname,
 getNodeIP(client, hostname),
 recorder,
 healthzServer,
 config.IPVS.Scheduler,
 )
...
 proxier = proxierIPVS
 serviceEventHandler = proxierIPVS
 endpointsEventHandler = proxierIPVS           

這個Proxier具備以下方法:

+OnEndpointsAdd(endpoints *api.Endpoints)
 +OnEndpointsDelete(endpoints *api.Endpoints)
 +OnEndpointsSynced()
 +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
 +OnServiceAdd(service *api.Service)
 +OnServiceDelete(service *api.Service)
 +OnServiceSynced()
 +OnServiceUpdate(oldService, service *api.Service)
 +Sync()
 +SyncLoop()           

是以ipvs的這個Proxier實作了我們需要的絕大部分接口

小結一下:

+-----------> endpointHandler
 |
 +-----------> serviceHandler
 | ^
 | | +-------------> sync 定期同步等
 | | |
ProxyServer---------> Proxier --------> service 事件回調 
 | | 
 | +-------------> endpoint事件回調 
 | | 觸發
 +-----> ipvs interface ipvs handler <-----+           

啟動proxyServer

  1. 檢查是不是帶了clean up參數,如果帶了那麼清除所有規則退出
  2. OOM adjuster貌似沒實作,忽略
  3. resouceContainer也沒實作,忽略
  4. 啟動metrics伺服器,這個挺重要,比如我們想監控時可以傳入這個參數, 包含promethus的 metrics. metrics-bind-address參數
  5. 啟動informer, 開始監聽事件,分别啟動協程處理。

1 2 3 4我們都不用太關注,細看5即可:

informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)

serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
// 注冊 service handler并啟動
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
// 這裡面僅僅是把ServiceEventHandler指派給informer回調  go serviceConfig.Run(wait.NeverStop)

endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
// 注冊endpoint 
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)

go informerFactory.Start(wait.NeverStop)           

serviceConfig.Run與endpointConfig.Run僅僅是給回調函數指派, 是以注冊的handler就給了informer, informer監聽到事件時就會回調:

for i := range c.eventHandlers {
 glog.V(3).Infof("Calling handler.OnServiceSynced()")
 c.eventHandlers[i].OnServiceSynced()
}           

那麼問題來了,注冊進去的這個handler是啥? 回顧一下上文的

serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS           

是以都是這個proxierIPVS

handler的回調函數, informer會回調這幾個函數,是以我們在自己開發時實作這個interface注冊進去即可:

type ServiceHandler interface {
 // OnServiceAdd is called whenever creation of new service object // is observed.
 OnServiceAdd(service *api.Service)
 // OnServiceUpdate is called whenever modification of an existing // service object is observed.
 OnServiceUpdate(oldService, service *api.Service)
 // OnServiceDelete is called whenever deletion of an existing service // object is observed.
 OnServiceDelete(service *api.Service)
 // OnServiceSynced is called once all the initial even handlers were // called and the state is fully propagated to local cache.
 OnServiceSynced()
}           

開始監聽

go informerFactory.Start(wait.NeverStop)           

這裡執行後,我們建立删除service endpoint等動作都會被監聽到,然後回調,回顧一下上面的圖,最終都是由Proxier去實作,是以後面我們重點關注Proxier即可

s.Proxier.SyncLoop()           

然後開始SyncLoop,下文開講

Proxier 實作

我們建立一個service時OnServiceAdd方法會被調用, 這裡記錄一下之前的狀态與目前狀态兩個東西,然後發個信号給syncRunner讓它去處理:

func (proxier *Proxier) OnServiceAdd(service *api.Service) {
 namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
 proxier.syncRunner.Run()
 }
}           

記錄service 資訊,可以看到沒做什麼事,就是把service存在map裡, 如果沒變直接删掉map資訊不做任何處理:

change, exists := scm.items[*namespacedName]
if !exists {
 change = &serviceChange{}
 // 老的service資訊
 change.previous = serviceToServiceMap(previous)
 scm.items[*namespacedName] = change
}
// 目前監聽到的service資訊
change.current = serviceToServiceMap(current)

如果一樣,直接删除
if reflect.DeepEqual(change.previous, change.current) {
 delete(scm.items, *namespacedName)
}           

proxier.syncRunner.Run() 裡面就發送了一個信号

select {
case bfr.run <- struct{}{}:
default:
}           

這裡面處理了這個信号

s.Proxier.SyncLoop()

func (proxier *Proxier) SyncLoop() {
 // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil {
 proxier.healthzServer.UpdateTimestamp()
 }
 proxier.syncRunner.Loop(wait.NeverStop)
}           

runner裡收到信号執行,沒收到信号會定期執行:

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
 glog.V(3).Infof("%s Loop running", bfr.name)
 bfr.timer.Reset(bfr.maxInterval)
 for {
 select {
 case <-stop:
 bfr.stop()
 glog.V(3).Infof("%s Loop stopping", bfr.name)
 return case <-bfr.timer.C(): // 定期執行
 bfr.tryRun()
 case <-bfr.run:
 bfr.tryRun() // 收到事件信号執行
 }
 }
}           

這個bfr runner裡我們最需要主意的是一個回調函數,tryRun裡檢查這個回調是否滿足被排程的條件:

type BoundedFrequencyRunner struct {
 name string // the name of this instance
 minInterval time.Duration // the min time between runs, modulo bursts
 maxInterval time.Duration // the max time between runs

 run chan struct{} // try an async run

 mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run, 這個回調
 lastRun time.Time // time of last run
 timer timer // timer for deferred runs
 limiter rateLimiter // rate limiter for on-demand runs
}

// 傳入的proxier.syncProxyRules這個函數
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)           

這是個600行左右的搓逼函數,也是處理主要邏輯的地方。

syncProxyRules

  1. 設定一些iptables規則,如mark與comment
  2. 确定機器上有網卡,ipvs需要綁定位址到上面
  3. 确定有ipset,ipset是iptables的擴充,可以給一批位址設定iptables規則

...(又臭又長,重複代碼多,看不下去了,細節問題自己去看吧)

  1. 我們最關注的,如何去處理VirtualServer的
serv := &utilipvs.VirtualServer{
 Address: net.ParseIP(ingress.IP),
 Port: uint16(svcInfo.port),
 Protocol: string(svcInfo.protocol),
 Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
 if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
 }
}           

看下實作, 如果沒有就建立,如果已存在就更新, 給網卡綁定service的cluster ip:

func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
 appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
 if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
 if appliedVirtualServer == nil {
 if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
 return err
 }
 } else {
 if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
 return err
 }
 }
 }

 // bind service address to dummy interface even if service not changed, // in case that service IP was removed by other processes if bindAddr {
 _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
 if err != nil {
 return err
 }
 }
 return nil
}           

建立service實作

現在可以去看ipvs的AddVirtualServer的實作了,主要是利用socket與核心程序通信做到的。

pkg/util/ipvs/ipvs_linux.go

裡 runner結構體實作了這些方法, 這裡用到了 docker/libnetwork/ipvs庫:

// runner implements Interface. type runner struct {
 exec utilexec.Interface
 ipvsHandle *ipvs.Handle
}

// New returns a new Interface which will call ipvs APIs. func New(exec utilexec.Interface) Interface {
 ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs if err != nil {
 glog.Errorf("IPVS interface can't be initialized, error: %v", err)
 return nil
 }
 return &runner{
 exec: exec,
 ipvsHandle: ihandle,
 }
}           

New的時候建立了一個特殊的socket, 這裡與我們普通的socket程式設計無差别,關鍵是syscall.AF_NETLINK這個參數,代表與核心程序通信:

sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)

func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
 fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
 if err != nil {
 return nil, err
 }
 s := &NetlinkSocket{
 fd: int32(fd),
 }
 s.lsa.Family = syscall.AF_NETLINK
 if err := syscall.Bind(fd, &s.lsa); err != nil {
 syscall.Close(fd)
 return nil, err
 }

 return s, nil
}           

建立一個service, 轉換成docker service格式,直接調用:

// AddVirtualServer is part of Interface. func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
 eSvc, err := toBackendService(vs)
 if err != nil {
 return err
 }
 return runner.ipvsHandle.NewService(eSvc)
}           

然後就是把service資訊打包,往socket裡面寫即可:

func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
 req := newIPVSRequest(cmd)
 req.Seq = atomic.AddUint32(&i.seq, 1)

 if s == nil {
 req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
 req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
 } else {
 req.AddData(fillService(s))
 } // 把service塞到請求中 if d == nil {
 if cmd == ipvsCmdGetDest {
 req.Flags |= syscall.NLM_F_DUMP
 }

 } else {
 req.AddData(fillDestinaton(d))
 }

 // 給核心程序發送service資訊
 res, err := execute(i.sock, req, 0)
 if err != nil {
 return [][]byte{}, err
 }

 return res, nil
}           
構造請求
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
 return newGenlRequest(ipvsFamily, cmd)
}           

在構造請求時傳入的是ipvs協定簇

然後構造一個與核心通信的消息頭

func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
 return &NetlinkRequest{
 NlMsghdr: syscall.NlMsghdr{
 Len: uint32(syscall.SizeofNlMsghdr),
 Type: uint16(proto),
 Flags: syscall.NLM_F_REQUEST | uint16(flags),
 Seq: atomic.AddUint32(&nextSeqNr, 1),
 },
 }
}           
給消息加Data,這個Data是個數組,需要實作兩個方法:
type NetlinkRequestData interface {
 Len() int // 長度
 Serialize() []byte // 序列化, 核心通信也需要一定的資料格式,service資訊也需要實作
}           

比如 header是這樣序列化的, 一看愣住了,思考好久才看懂:

拆下看:

([unsafe.Sizeof(hdr)]byte) 一個*[]byte類型,長度就是結構體大小

(unsafe.Pointer(hdr))把結構體轉成byte指針類型

加個*取它的值

用[:]轉成byte傳回

func (hdr *genlMsgHdr) Serialize() []byte {
 return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}           
發送service資訊給核心

一個很普通的socket發送接收資料

func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
 var (
 err error
 )

 if err := s.Send(req); err != nil {
 return nil, err
 }

 pid, err := s.GetPid()
 if err != nil {
 return nil, err
 }

 var res [][]byte

done:
 for {
 msgs, err := s.Receive()
 if err != nil {
 return nil, err
 }
 for _, m := range msgs {
 if m.Header.Seq != req.Seq {
 continue
 }
 if m.Header.Pid != pid {
 return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
 }
 if m.Header.Type == syscall.NLMSG_DONE {
 break done
 }
 if m.Header.Type == syscall.NLMSG_ERROR {
 error := int32(native.Uint32(m.Data[0:4]))
 if error == 0 {
 break done
 }
 return nil, syscall.Errno(-error)
 }
 if resType != 0 && m.Header.Type != resType {
 continue
 }
 res = append(res, m.Data)
 if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
 break done
 }
 }
 }
 return res, nil
}           

Service 資料打包

這裡比較細,核心思想就是核心隻認一定格式的标準資料,我們把service資訊按其标準打包發送給核心即可。

至于怎麼打包的就不詳細講了。

func fillService(s *Service) nl.NetlinkRequestData {
 cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
 if s.FWMark != 0 {
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
 } else {
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))

 // Port needs to be in network byte order.
 portBuf := new(bytes.Buffer)
 binary.Write(portBuf, binary.BigEndian, s.Port)
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
 }

 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
 if s.PEName != "" {
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
 }
 f := &ipvsFlags{
 flags: s.Flags,
 mask: 0xFFFFFFFF,
 }
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
 return cmdAttr
}           

總結

Service總體來講代碼比較簡單,但是覺得有些地方實作的有點繞,不夠簡單直接。 總體來說就是監聽apiserver事件,然後比對 處理,定期也會去執行同步政策.

本文轉自SegmentFault-