天天看點

kubernetes/k8s CRI分析-容器運作時接口分析

kubernetes CRI分析-k8s CRI分析。kubernetes中有3個功能接口,分别是容器網絡接口CNI、容器運作時接口CRI和容器存儲接口CSI。本文會對CRI是什麼、為什麼要有CRI、CRI系統架構做介紹,對CRI所涉及的k8s對象與元件進行介紹,以及k8s對CRI進行相關操作分析

關聯部落格:kubernetes/k8s CSI分析-容器存儲接口分析

kubernetes/k8s CNI分析-容器網絡接口分析

概述

kubernetes的設計初衷是支援可插拔架構,進而利于擴充

kubernetes

的功能。在此架構思想下,

kubernetes

提供了3個特定功能的接口,分别是容器網絡接口

CNI

、容器運作時接口

CRI

和容器存儲接口

CSI

kubernetes

通過調用這幾個接口,來完成相應的功能。

下面我們來對容器運作時接口

CRI

來做一下介紹與分析。

在本文中,會對

CRI

是什麼、為什麼要有

CRI

CRI

系統架構做一下介紹,以及

k8s

CRI

進行相關操作的流程分析,包括了pod建立、删除等操作。

CRI是什麼

CRI是

Container Runtime Interface

(容器運作時接口)的簡寫。

CRI解耦了kubelet與容器運作時,讓kubelet無需重新編譯就可以支援多種容器運作時。

kubelet将通過

CRI

接口來跟第三方容器運作時進行通信,來操作容器與鏡像。

實作了 CRI 接口的容器運作時通常稱為 CRI shim, 這是一個 gRPC Server,監聽在本地的 unix socket 上;而 kubelet 作為 gRPC 的用戶端來調用 CRI 接口,來進行Pod 和容器、鏡像的生命周期管理。另外,容器運作時需要自己負責管理容器的網絡,推薦使用 CNI。

kubernetes/k8s CRI分析-容器運作時接口分析

圖1:CRI shim通信圖

提出了CRI标準以後,意味着在新的版本裡需要使用新的連接配接方式與docker通信,為了相容以前的版本,k8s提供了針對docker的CRI實作,也就是kubelet包下的

dockershim

包,

dockershim

是一個grpc服務,監聽一個端口供kubelet連接配接,

dockershim

收到kubelet的請求後,将其轉化為REST API請求,再發送給

docker daemon

kubernetes/k8s CRI分析-容器運作時接口分析

圖2:dockershim通信圖

為什麼要有CRI

在1.5以前的版本中,k8s依賴于docker,為了支援不同的容器運作時,如

rkt

containerd

等,kubelet從1.5開始加入了CRI标準,它将 Kubelet 與容器運作時解耦,将原來完全面向 Pod 級别的内部接口拆分成面向

Sandbox

Container

的 gRPC 接口,并将鏡像管理和容器管理分離到不同的服務,友善後續其他容器運作時與k8s對接。

Kubernetes中的容器運作時組成

按照不同的功能可以分為四個部分:

(1)kubelet 中容器運作時的管理,

kubeGenericRuntimeManager

,它管理與CRI shim通信的用戶端,完成容器和鏡像的管理(代碼位置:

pkg/kubelet/kuberuntime/kuberuntime_manager.go

);

(2)容器運作時接口CRI,包括了容器運作時用戶端接口與容器運作時服務端接口;

(3)CRI shim用戶端,kubelet持有,用于與CRI shim服務端進行通信;

(4)CRI shim服務端,即具體的容器運作時實作,包括 kubelet 内置的

dockershim

(代碼位置:

pkg/kubelet/dockershim

)以及外部的容器運作時如

cri-containerd

(用于支援容器引擎

containerd

)、

rktlet

rkt

)等。

CRI架構圖

在 CRI 之下,包括兩種類型的容器運作時的實作:

(1)kubelet内置的

dockershim

,實作了 Docker 容器引擎的支援以及 CNI 網絡插件(包括 kubenet)的支援。

dockershim

代碼内置于kubelet,被kubelet調用,讓

dockershim

起獨立的server來建立CRI shim,向kubelet暴露grpc server;

(2)外部的容器運作時,用來支援

rkt

containerd

等容器引擎的外部容器運作時。

kubernetes/k8s CRI分析-容器運作時接口分析

kubelet中CRI相關的源碼分析

kubelet的CRI源碼分析包括如下幾部分:

(1)kubelet CRI相關啟動參數分析;

(2)kubelet CRI相關interface/struct分析;

(3)kubelet CRI初始化分析;

(4)kubelet調用CRI建立pod分析;

(5)kubelet調用CRI删除pod分析。

因篇幅原因,本篇博文先對前三部分做分析,下一篇博文再對CRI建立pod以及CRI删除pod做分析。

基于tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

1.kubelet元件CRI相關啟動參數分析

kubelet元件CRI相關啟動參數相關代碼如下:

// pkg/kubelet/config/flags.go
// AddFlags adds flags to the container runtime, according to ContainerRuntimeOptions.
func (s *ContainerRuntimeOptions) AddFlags(fs *pflag.FlagSet) {
	dockerOnlyWarning := "This docker-specific flag only works when container-runtime is set to docker."

	// General settings.
	fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.")
	fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.")
	fs.BoolVar(&s.RedirectContainerStreaming, "redirect-container-streaming", s.RedirectContainerStreaming, "Enables container streaming redirect. If false, kubelet will proxy container streaming data between apiserver and container runtime; if true, kubelet will return an http redirect to apiserver, and apiserver will access container runtime directly. The proxy approach is more secure, but introduces some overhead. The redirect approach is more performant, but less secure because the connection between apiserver and container runtime may not be authenticated.")

	// Docker-specific settings.
	fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]")
	fs.MarkHidden("experimental-dockershim")
	fs.StringVar(&s.DockershimRootDirectory, "experimental-dockershim-root-directory", s.DockershimRootDirectory, "Path to the dockershim root directory.")
	fs.MarkHidden("experimental-dockershim-root-directory")
	fs.StringVar(&s.PodSandboxImage, "pod-infra-container-image", s.PodSandboxImage, fmt.Sprintf("The image whose network/ipc namespaces containers in each pod will use. %s", dockerOnlyWarning))
	fs.StringVar(&s.DockerEndpoint, "docker-endpoint", s.DockerEndpoint, fmt.Sprintf("Use this for the docker endpoint to communicate with. %s", dockerOnlyWarning))
	fs.DurationVar(&s.ImagePullProgressDeadline.Duration, "image-pull-progress-deadline", s.ImagePullProgressDeadline.Duration, fmt.Sprintf("If no pulling progress is made before this deadline, the image pulling will be cancelled. %s", dockerOnlyWarning))
	...
}
           
// cmd/kubelet/app/options/options.go
// AddFlags adds flags for a specific KubeletFlags to the specified FlagSet
func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
    ...
    fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket endpoint is supported on Linux, while npipe and tcp endpoints are supported on windows.  Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
	fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket endpoint is supported on Linux, while npipe and tcp endpoints are supported on windows.  Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
	...
}
           

kubelet元件啟動參數的預設值在

NewKubeletFlags

函數中設定。

// cmd/kubelet/app/options/options.go
// NewKubeletFlags will create a new KubeletFlags with default values
func NewKubeletFlags() *KubeletFlags {
	remoteRuntimeEndpoint := ""
	if runtime.GOOS == "linux" {
		remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
	} else if runtime.GOOS == "windows" {
		remoteRuntimeEndpoint = "npipe:////./pipe/dockershim"
	}

	return &KubeletFlags{
		EnableServer:                        true,
		ContainerRuntimeOptions:             *NewContainerRuntimeOptions(),
		CertDirectory:                       "/var/lib/kubelet/pki",
		RootDirectory:                       defaultRootDir,
		MasterServiceNamespace:              metav1.NamespaceDefault,
		MaxContainerCount:                   -1,
		MaxPerPodContainerCount:             1,
		MinimumGCAge:                        metav1.Duration{Duration: 0},
		NonMasqueradeCIDR:                   "10.0.0.0/8",
		RegisterSchedulable:                 true,
		ExperimentalKernelMemcgNotification: false,
		RemoteRuntimeEndpoint:               remoteRuntimeEndpoint,
		NodeLabels:                          make(map[string]string),
		VolumePluginDir:                     "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/",
		RegisterNode:                        true,
		SeccompProfileRoot:                  filepath.Join(defaultRootDir, "seccomp"),
		// prior to the introduction of this flag, there was a hardcoded cap of 50 images
		NodeStatusMaxImages:         50,
		EnableCAdvisorJSONEndpoints: true,
	}
}
           

CRI相關啟動參數的預設值在

NewContainerRuntimeOptions

NewMainKubelet

// cmd/kubelet/app/options/container_runtime.go
// NewContainerRuntimeOptions will create a new ContainerRuntimeOptions with
// default values.
func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions {
	dockerEndpoint := ""
	if runtime.GOOS != "windows" {
		dockerEndpoint = "unix:///var/run/docker.sock"
	}

	return &config.ContainerRuntimeOptions{
		ContainerRuntime:           kubetypes.DockerContainerRuntime,
		RedirectContainerStreaming: false,
		DockerEndpoint:             dockerEndpoint,
		DockershimRootDirectory:    "/var/lib/dockershim",
		PodSandboxImage:            defaultPodSandboxImage,
		ImagePullProgressDeadline:  metav1.Duration{Duration: 1 * time.Minute},
		ExperimentalDockershim:     false,

		//Alpha feature
		CNIBinDir:   "/opt/cni/bin",
		CNIConfDir:  "/etc/cni/net.d",
		CNICacheDir: "/var/lib/cni/cache",
	}
}
           
// pkg/kubelet/kubelet.go
func NewMainKubelet(...) {
    ...
    if remoteRuntimeEndpoint != "" {
		// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
		if remoteImageEndpoint == "" {
			remoteImageEndpoint = remoteRuntimeEndpoint
		}
	}
	...
}
           

下面來簡單分析幾個比較重要的CRI相關啟動參數:

(1)

--container-runtime

:指定kubelet要使用的容器運作時,可選值

docker

remote

rkt (deprecated)

,預設值為

docker

,即使用kubelet内置的容器運作時

dockershim

。當需要使用外部容器運作時,該參數配置為

remote

,并設定

--container-runtime-endpoint

參數值為監聽的

unix socket

位置。

(2)

--runtime-cgroups

:容器運作時使用的cgroups,可選值。

(3)

--docker-endpoint

:docker暴露服務的socket位址,預設值為

unix:///var/run/docker.sock

,該參數配置當且僅當

--container-runtime

參數值為

docker

時有效。

(4)

--pod-infra-container-image

:pod sandbox的鏡像位址,預設值為

k8s.gcr.io/pause:3.1

--container-runtime

docker

(5)

--image-pull-progress-deadline

:容器鏡像拉取逾時時間,預設值為1分鐘,該參數配置當且僅當

--container-runtime

docker

(6)

--experimental-dockershim

:設定為

true

時,啟用

dockershim

模式,隻啟動dockershim,預設值為

false

--container-runtime

docker

(7)

--experimental-dockershim-root-directory

dockershim

根目錄,預設值為

/var/lib/dockershim

--container-runtime

docker

(8)

--container-runtime-endpoint

:容器運作時的endpoint,linux中預設值為

unix:///var/run/dockershim.sock

,注意與上面的

--docker-endpoint

區分開來。

(9)

--image-service-endpoint

:鏡像服務的endpointlinux中預設值為

unix:///var/run/dockershim.sock

2.kubelet CRI相關interface/struct分析

CRI相關接口

RuntimeService interface

:CRI shim用戶端-容器運作時接口;

代碼位置:

staging/src/k8s.io/cri-api/pkg/apis/services.go

ImageManagerService interface

:CRI shim用戶端-容器鏡像接口;

staging/src/k8s.io/cri-api/pkg/apis/services.go

RuntimeServiceServer interface

:CRI shim服務端-容器運作時接口;

staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go

ImageServiceServer interface

:CRI shim服務端-容器鏡像接口;

staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go

CRIService interface

:包括了

RuntimeServiceServer interface

ImageServiceServer interface

與CRI shim服務端啟動方法,是以其包括了一個CRI shim服務端需要實作的所有接口方法;

pkg/kubelet/dockershim/docker_service.go

DockerService interface

CRIService interface

pkg/kubelet/dockershim/docker_service.go

說明:

RuntimeService interface

RuntimeServiceServer interface

ImageManagerService interface

ImageServiceServer interface

中的接口方法是相同的,它們之間的差別隻是一個用于CRI shim用戶端,一個用于CRI shim服務端。容器運作時接口負責管理 Pod 和容器的生命周期,容器鏡像接口負責管理容器鏡像的生命周期。

CRI相關結構體

RemoteRuntimeService struct

:實作了CRI shim用戶端-容器運作時接口

RuntimeService interface

,持有與CRI shim容器運作時服務端通信的用戶端;

pkg/kubelet/remote/remote_runtime.go

RemoteImageService struct

:實作了CRI shim用戶端-容器鏡像接口

ImageManagerService interface

,持有與CRI shim容器鏡像服務端通信的用戶端;

pkg/kubelet/remote/remote_image.go

dockerService struct

:實作了CRI shim服務端-容器運作時接口

RuntimeServiceServer interface

pkg/kubelet/dockershim/docker_service.go

pkg/kubelet/dockershim/docker_container.go

dockerService struct

:實作了CRI shim服務端-容器鏡像接口

ImageServiceServer interface

pkg/kubelet/dockershim/docker_service.go

pkg/kubelet/dockershim/docker_image.go

DockerServer struct

:代表了dockershim(kubelet内置的CRI shim)的服務端,其實作了

CRIService interface

pkg/kubelet/dockershim/remote/docker_server.go

CRI shim server接口圖示

kubernetes/k8s CRI分析-容器運作時接口分析

RuntimeServiceServer

RuntimeServiceServer 提供了的接口,按照功能可以劃分為四組:

(1)PodSandbox 的管理接口:PodSandbox 是對 Kubernete Pod 的抽象,用來給容器提供一個隔離的環境,并提供網絡等共享的命名空間;

(2)Container 的管理接口:在指定的 PodSandbox 中建立、啟動、停止和删除容器;

(3)Streaming API 接口:包括 Exec、Attach 和 PortForward 等和容器進行資料互動的接口,這三個接口傳回的是運作時 Streaming Server 的 URL,而不是直接跟容器互動;

(4)runtime狀态接口:包括查詢 runtime名稱、版本、API 版本和狀态等。

ImageServiceServer

ImageServiceServer提供了 5 個接口,用于管理容器鏡像。

下面會對上面提到的接口/結構體做分析。

2.1 RuntimeService interface

RuntimeService 負責管理 Pod 和容器的生命周期,是CRI shim用戶端需要實作的容器運作時接口。

RuntimeService interface包含了

RuntimeVersioner

ContainerManager

PodSandboxManager

ContainerStatsManager

接口,下面對對這些接口一一做介紹。

容器運作時會實作

RuntimeService interface

// staging/src/k8s.io/cri-api/pkg/apis/services.go
// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
	RuntimeVersioner
	ContainerManager
	PodSandboxManager
	ContainerStatsManager

	// UpdateRuntimeConfig updates runtime configuration if specified
	UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
	// Status returns the status of the runtime.
	Status() (*runtimeapi.RuntimeStatus, error)
}
           

RuntimeVersioner interface

RuntimeVersioner interface負責傳回容器運作時的名稱、版本以及 API 版本資訊,隻有一個接口函數

Version

// staging/src/k8s.io/cri-api/pkg/apis/services.go
// RuntimeVersioner contains methods for runtime name, version and API version.
type RuntimeVersioner interface {
	// Version returns the runtime name, runtime version and runtime API version
	Version(apiVersion string) (*runtimeapi.VersionResponse, error)
}
           

ContainerManager interface

ContainerManager interface包含了對

container

(業務容器)進行操作的一些方法,如

CreateContainer

(建立容器)、

StartContainer

(啟動容器)、

StopContainer

(停止容器)、

RemoveContainer

(删除容器)等。

// staging/src/k8s.io/cri-api/pkg/apis/services.go
// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {
	// CreateContainer creates a new container in specified PodSandbox.
	CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
	// StartContainer starts the container.
	StartContainer(containerID string) error
	// StopContainer stops a running container with a grace period (i.e., timeout).
	StopContainer(containerID string, timeout int64) error
	// RemoveContainer removes the container.
	RemoveContainer(containerID string) error
	// ListContainers lists all containers by filters.
	ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)
	// ContainerStatus returns the status of the container.
	ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error)
	// UpdateContainerResources updates the cgroup resources for the container.
	UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error
	// ExecSync executes a command in the container, and returns the stdout output.
	// If command exits with a non-zero exit code, an error is returned.
	ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
	// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
	Exec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
	// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
	Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
	// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
	// for the container. If it returns error, new container log file MUST NOT
	// be created.
	ReopenContainerLog(ContainerID string) error
}
           

PodSandboxManager interface

PodSandboxManager interface包含了對

pod sandbox

pause container

)進行操作的一些方法,如

RunPodSandbox

(建立并啟動

pause container

StopPodSandbox

(停止

pause container

RemovePodSandbox

(删除

pause container

// staging/src/k8s.io/cri-api/pkg/apis/services.go
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
	// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
	// the sandbox is in ready state.
	RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
	// StopPodSandbox stops the sandbox. If there are any running containers in the
	// sandbox, they should be force terminated.
	StopPodSandbox(podSandboxID string) error
	// RemovePodSandbox removes the sandbox. If there are running containers in the
	// sandbox, they should be forcibly removed.
	RemovePodSandbox(podSandboxID string) error
	// PodSandboxStatus returns the Status of the PodSandbox.
	PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
	// ListPodSandbox returns a list of Sandbox.
	ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
	// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
	PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}
           

ContainerStatsManager interface

ContainerStatsManager interface包含了對容器統計資料的查詢接口,如

ContainerStats

ListContainerStats

// staging/src/k8s.io/cri-api/pkg/apis/services.go
// ContainerStatsManager contains methods for retrieving the container
// statistics.
type ContainerStatsManager interface {
	// ContainerStats returns stats of the container. If the container does not
	// exist, the call returns an error.
	ContainerStats(containerID string) (*runtimeapi.ContainerStats, error)
	// ListContainerStats returns stats of all running containers.
	ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)
}
           

2.2 ImageManagerService interface

ImageManagerService負責管理鏡像的生命周期,是CRI shim用戶端需要實作的鏡像接口。

ImageManagerService interface包含了容器鏡像的相關操作接口,如

PullImage

(拉取鏡像)、

ListImages

(列出現存鏡像清單)等。

// staging/src/k8s.io/cri-api/pkg/apis/services.go
// ImageManagerService interface should be implemented by a container image
// manager.
// The methods should be thread-safe.
type ImageManagerService interface {
	// ListImages lists the existing images.
	ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
	// ImageStatus returns the status of the image.
	ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error)
	// PullImage pulls an image with the authentication config.
	PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
	// RemoveImage removes the image.
	RemoveImage(image *runtimeapi.ImageSpec) error
	// ImageFsInfo returns information of the filesystem that is used to store images.
	ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error)
}
           

2.3 CRIService interface / DockerService interface

CRIService interface中定義了CRI shim服務端必須實作的一些方法,其中包括了

RuntimeServiceServer interface

(容器運作時操作相關方法)、

ImageServiceServer interface

(鏡像操作相關方法)以及CRI shim服務端啟動方法。

// pkg/kubelet/dockershim/docker_service.go
// CRIService includes all methods necessary for a CRI server.
type CRIService interface {
	runtimeapi.RuntimeServiceServer
	runtimeapi.ImageServiceServer
	Start() error
}

// DockerService is an interface that embeds the new RuntimeService and
// ImageService interfaces.
type DockerService interface {
	CRIService

	// For serving streaming calls.
	http.Handler

	// For supporting legacy features.
	DockerLegacyService
}
           

2.4 RemoteRuntimeService struct

實作了CRI shim用戶端-容器運作時接口

RuntimeService interface

,持有與CRI shim容器運作時服務端通信的用戶端

runtimeClient

// pkg/kubelet/remote/remote_runtime.go
// RemoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.
type RemoteRuntimeService struct {
	timeout       time.Duration
	runtimeClient runtimeapi.RuntimeServiceClient
	// Cache last per-container error message to reduce log spam
	logReduction *logreduction.LogReduction
}
           

2.5 RemoteImageService struct

實作了CRI shim用戶端-容器鏡像接口

ImageManagerService interface

,持有與CRI shim容器鏡像服務端通信的用戶端

imageClient

// pkg/kubelet/remote/remote_image.go
// RemoteImageService is a gRPC implementation of internalapi.ImageManagerService.
type RemoteImageService struct {
	timeout     time.Duration
	imageClient runtimeapi.ImageServiceClient
}
           

2.5 DockerServer struct

DockerServer struct代表了dockershim(kubelet内置的CRI shim)的服務端,其實作了

CRIService interface

// pkg/kubelet/dockershim/remote/docker_server.go
// DockerServer is the grpc server of dockershim.
type DockerServer struct {
	// endpoint is the endpoint to serve on.
	endpoint string
	// service is the docker service which implements runtime and image services.
	service dockershim.CRIService
	// server is the grpc server.
	server *grpc.Server
}
           

3.kubelet CRI相關初始化

kubelet中CRI相關初始化邏輯如下:

(1)當kubelet選用dockershim作為容器運作時,則初始化并啟動容器運作時服務端dockershim(初始化dockershim過程中也會初始化網絡插件CNI);

(2)初始化容器運作時CRI shim用戶端(用于調用CRI shim服務端:内置的容器運作時dockershim或remote容器運作時);

(3)初始化

kubeGenericRuntimeManager

,用于容器運作時的管理。初始化完成後,後續

kubelet

對容器以及鏡像的相關操作都會通過該結構體持有的

CRI shim

用戶端,與

CRI shim

服務端進行通信來完成。

CRI初始化的調用鍊

main (cmd/kubelet/kubelet.go)

-> NewKubeletCommand (cmd/kubelet/app/server.go)

-> Run (cmd/kubelet/app/server.go)

-> run (cmd/kubelet/app/server.go)

-> RunKubelet (cmd/kubelet/app/server.go)

-> CreateAndInitKubelet(cmd/kubelet/app/server.go)

-> kubelet.NewMainKubelet(pkg/kubelet/kubelet.go)

-> getRuntimeAndImageServices(pkg/kubelet/kubelet.go) && kuberuntime.NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime_manager.go)

NewMainKubelet函數中CRI相關邏輯:

(1)初始化并啟動内置容器運作時服務端dockershim:根據

containerRuntime

的值(kubelet啟動參數

--container-runtime

),如果是

docker

,則初始化并啟動

docker CRI shim

即kubelet内置容器運作時

dockershim

,暴露

grpc socket

,如果是

remote

,則不做初始化啟動操作。

(2)調用

getRuntimeAndImageServices

:初始化容器運作時CRI shim用戶端,包括容器運作時用戶端

runtimeClient

以及容器鏡像用戶端

imageClient

(3)調用

kuberuntime.NewKubeGenericRuntimeManager

,以及

klet

指派:初始化

kubeGenericRuntimeManager struct

kubelet

CRI shim

CRI shim

// pkg/kubelet/kubelet.go
func NewMainKubelet(...) {
    ...
    switch containerRuntime {
    // (1)初始化并啟動内置容器運作時服務端dockershim
	case kubetypes.DockerContainerRuntime:
		// Create and start the CRI shim running as a grpc server.
		streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
		ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
			&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
		if err != nil {
			return nil, err
		}
		if crOptions.RedirectContainerStreaming {
			klet.criHandler = ds
		}

		// The unix socket for kubelet <-> dockershim communication.
		klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
			remoteRuntimeEndpoint,
			remoteImageEndpoint)
		klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
		server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
		if err := server.Start(); err != nil {
			return nil, err
		}

		// Create dockerLegacyService when the logging driver is not supported.
		supported, err := ds.IsCRISupportedLogDriver()
		if err != nil {
			return nil, err
		}
		if !supported {
			klet.dockerLegacyService = ds
			legacyLogProvider = ds
		}
	case kubetypes.RemoteContainerRuntime:
		// No-op.
		break
	default:
		return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
	}
	// (2)初始化容器運作時CRI shim用戶端
	runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
	if err != nil {
		return nil, err
	}
	klet.runtimeService = runtimeService

	if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
		klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
	}
    // (3)初始化```GenericRuntimeManager```,用于容器運作時的管理
	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
		kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
		klet.livenessManager,
		klet.startupManager,
		seccompProfileRoot,
		containerRefManager,
		machineInfo,
		klet,
		kubeDeps.OSInterface,
		klet,
		httpClient,
		imageBackOff,
		kubeCfg.SerializeImagePulls,
		float32(kubeCfg.RegistryPullQPS),
		int(kubeCfg.RegistryBurst),
		kubeCfg.CPUCFSQuota,
		kubeCfg.CPUCFSQuotaPeriod,
		runtimeService,
		imageService,
		kubeDeps.ContainerManager.InternalContainerLifecycle(),
		legacyLogProvider,
		klet.runtimeClassManager,
	)
	if err != nil {
		return nil, err
	}
	klet.containerRuntime = runtime
	klet.streamingRuntime = runtime
	klet.runner = runtime
	...
}
           

3.1 初始化并啟動内置容器運作時服務端dockershim

這裡對變量

containerRuntime

值等于

docker

時做分析,即kubelet啟動參數

--container-runtime

值為

docker

,這時kubelet會使用内置的CRI shim即dockershim作為容器運作時,dockershim調用docker進行容器以及鏡像的相關操作。

初始化并啟動

dockershim

主要邏輯如下:

(1)調用

dockershim.NewDockerService

:建立并初始化

dockershim

服務端,包括初始化docker client、初始化cni網絡配置等操作;

dockerremote.NewDockerServer

server.Start

:啟動

dockershim

,暴露服務socket。

3.1.1 dockershim.NewDockerService

建立并初始化

dockershim

服務端,主要邏輯如下:

NewDockerClientFromConfig

:建立docker的用戶端-client對象,包含了我們常用的docker run,docker images等所有操作調用;

(2)建構

dockerService struct

(2)初始化CNI網絡配置(CNI網絡配置初始化在專門進行CNI分析的博文再詳細講解)。

// pkg/kubelet/dockershim/docker_service.go
// NewDockerService creates a new `DockerService` struct.
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings,
	cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, startLocalStreamingServer bool) (DockerService, error) {
    // (1)建立docker的用戶端
	client := NewDockerClientFromConfig(config)

	c := libdocker.NewInstrumentedInterface(client)

	checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir))
	if err != nil {
		return nil, err
	}
    // (2)建構```dockerService struct```
	ds := &dockerService{
		client:          c,
		os:              kubecontainer.RealOS{},
		podSandboxImage: podSandboxImage,
		streamingRuntime: &streamingRuntime{
			client:      client,
			execHandler: &NativeExecHandler{},
		},
		containerManager:          cm.NewContainerManager(cgroupsName, client),
		checkpointManager:         checkpointManager,
		startLocalStreamingServer: startLocalStreamingServer,
		networkReady:              make(map[string]bool),
		containerCleanupInfos:     make(map[string]*containerCleanupInfo),
	}

	// check docker version compatibility.
	if err = ds.checkVersionCompatibility(); err != nil {
		return nil, err
	}

	// create streaming server if configured.
	if streamingConfig != nil {
		var err error
		ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
		if err != nil {
			return nil, err
		}
	}

	// Determine the hairpin mode.
	if err := effectiveHairpinMode(pluginSettings); err != nil {
		// This is a non-recoverable error. Returning it up the callstack will just
		// lead to retries of the same failure, so just fail hard.
		return nil, err
	}
	klog.Infof("Hairpin mode set to %q", pluginSettings.HairpinMode)
    // (3)初始化CNI網絡配置
	// dockershim currently only supports CNI plugins.
	pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString)
	cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs)
	cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs, pluginSettings.PluginCacheDir))
	netHost := &dockerNetworkHost{
		&namespaceGetter{ds},
		&portMappingGetter{ds},
	}
	plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)
	if err != nil {
		return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err)
	}
	ds.network = network.NewPluginManager(plug)
	klog.Infof("Docker cri networking managed by %v", plug.Name())

	// NOTE: cgroup driver is only detectable in docker 1.11+
	cgroupDriver := defaultCgroupDriver
	dockerInfo, err := ds.client.Info()
	klog.Infof("Docker Info: %+v", dockerInfo)
	if err != nil {
		klog.Errorf("Failed to execute Info() call to the Docker client: %v", err)
		klog.Warningf("Falling back to use the default driver: %q", cgroupDriver)
	} else if len(dockerInfo.CgroupDriver) == 0 {
		klog.Warningf("No cgroup driver is set in Docker")
		klog.Warningf("Falling back to use the default driver: %q", cgroupDriver)
	} else {
		cgroupDriver = dockerInfo.CgroupDriver
	}
	if len(kubeCgroupDriver) != 0 && kubeCgroupDriver != cgroupDriver {
		return nil, fmt.Errorf("misconfiguration: kubelet cgroup driver: %q is different from docker cgroup driver: %q", kubeCgroupDriver, cgroupDriver)
	}
	klog.Infof("Setting cgroupDriver to %s", cgroupDriver)
	ds.cgroupDriver = cgroupDriver
	ds.versionCache = cache.NewObjectCache(
		func() (interface{}, error) {
			return ds.getDockerVersion()
		},
		versionCacheTTL,
	)

	// Register prometheus metrics.
	metrics.Register()

	return ds, nil
}
           

NewDockerClientFromConfig

NewDockerClientFromConfig函數主要是建立與docker通信的用戶端。其中

config

結構體裡,

dockerEndpoint

的值來自于kubelet啟動參數

--container-runtime-endpoint

的配置,預設是

unix:///var/run/docker.sock

// pkg/kubelet/dockershim/docker_service.go
// NewDockerClientFromConfig create a docker client from given configure
// return nil if nil configure is given.
func NewDockerClientFromConfig(config *ClientConfig) libdocker.Interface {
	if config != nil {
		// Create docker client.
		client := libdocker.ConnectToDockerOrDie(
			config.DockerEndpoint,
			config.RuntimeRequestTimeout,
			config.ImagePullProgressDeadline,
			config.WithTraceDisabled,
			config.EnableSleep,
		)
		return client
	}

	return nil
}
           
// pkg/kubelet/dockershim/libdocker/client.go
// ConnectToDockerOrDie creates docker client connecting to docker daemon.
// If the endpoint passed in is "fake://", a fake docker client
// will be returned. The program exits if error occurs. The requestTimeout
// is the timeout for docker requests. If timeout is exceeded, the request
// will be cancelled and throw out an error. If requestTimeout is 0, a default
// value will be applied.
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout, imagePullProgressDeadline time.Duration,
	withTraceDisabled bool, enableSleep bool) Interface {
	if dockerEndpoint == FakeDockerEndpoint {
		fakeClient := NewFakeDockerClient()
		if withTraceDisabled {
			fakeClient = fakeClient.WithTraceDisabled()
		}

		if enableSleep {
			fakeClient.EnableSleep = true
		}
		return fakeClient
	}
	client, err := getDockerClient(dockerEndpoint)
	if err != nil {
		klog.Fatalf("Couldn't connect to docker: %v", err)
	}
	klog.Infof("Start docker client with request timeout=%v", requestTimeout)
	return newKubeDockerClient(client, requestTimeout, imagePullProgressDeadline)
}
           

3.1.2 啟動dockershim,暴露服務socket。

dockerremote.NewDockerServer()

// pkg/kubelet/dockershim/remote/docker_server.go
// NewDockerServer creates the dockershim grpc server.
func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer {
	return &DockerServer{
		endpoint: endpoint,
		service:  s,
	}
}
           

3.2 初始化容器運作時CRI shim用戶端

getRuntimeAndImageServices函數主要邏輯:

remote.NewRemoteRuntimeService

函數:執行個體化容器相關操作的CRI shim用戶端-容器運作時用戶端

runtimeClient

,實作了上述CRI相關interface/struct分析中的RuntimeService接口(CRI shim用戶端接口);

remote.NewRemoteImageService

函數:執行個體化鏡像相關操作的CRI shim用戶端-容器鏡像用戶端

imageClient

,實作了上述CRI相關interface/struct分析中的ImageManagerService接口(CRI shim用戶端接口)。

// pkg/kubelet/kubelet.go
func getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
	rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration)
	if err != nil {
		return nil, nil, err
	}
	is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration)
	if err != nil {
		return nil, nil, err
	}
	return rs, is, err
}
           

3.2.1 remote.NewRemoteRuntimeService

remote.NewRemoteRuntimeService函數作用:執行個體化容器相關操作的CRI shim用戶端-容器運作時用戶端

runtimeClient

,實作了上述

CRI相關interface/struct分析

中的RuntimeService接口(CRI shim用戶端接口)。

主要邏輯:根據kubelet啟動參數

--container-runtime-endpoint

或使用預設值

unix:///var/run/dockershim.sock

,嘗試連接配接該socket,建立client。

// pkg/kubelet/remote/remote_runtime.go
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
	klog.V(3).Infof("Connecting to runtime service %s", endpoint)
	addr, dailer, err := util.GetAddressAndDialer(endpoint)
	if err != nil {
		return nil, err
	}
	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
	defer cancel()

	conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
	if err != nil {
		klog.Errorf("Connect remote runtime %s failed: %v", addr, err)
		return nil, err
	}

	return &RemoteRuntimeService{
		timeout:       connectionTimeout,
		runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
		logReduction:  logreduction.NewLogReduction(identicalErrorDelay),
	}, nil
}
           

3.2.2 remote.NewRemoteImageService

remote.NewRemoteImageService函數作用:執行個體化鏡像相關操作的CRI shim用戶端-容器鏡像用戶端

imageClient

CRI相關interface/struct分析

中的ImageManagerService接口(CRI shim用戶端接口)。

--image-service-endpoint

unix:///var/run/dockershim.sock

// pkg/kubelet/remote/remote_runtime.go
// NewRemoteImageService creates a new internalapi.ImageManagerService.
func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) {
	klog.V(3).Infof("Connecting to image service %s", endpoint)
	addr, dailer, err := util.GetAddressAndDialer(endpoint)
	if err != nil {
		return nil, err
	}

	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
	defer cancel()

	conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
	if err != nil {
		klog.Errorf("Connect remote image service %s failed: %v", addr, err)
		return nil, err
	}

	return &RemoteImageService{
		timeout:     connectionTimeout,
		imageClient: runtimeapi.NewImageServiceClient(conn),
	}, nil
}
           

3.3 初始化kubeGenericRuntimeManager,用于容器運作時的管理

kuberuntime.NewKubeGenericRuntimeManager函數主要是初始化

kubeGenericRuntimeManager struct

,而

kubeGenericRuntimeManager struct

是對

KubeGenericRuntime interface

的實作。

kubeGenericRuntimeManager

是kubelet中容器運作時的管理者,管理着

CRI shim

用戶端,負責與

CRI shim服務端

互動,完成容器和鏡像的管理。

初始化完成後,後續

kubelet

CRI shim

CRI shim

// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
func NewKubeGenericRuntimeManager(
	recorder record.EventRecorder,
	livenessManager proberesults.Manager,
	startupManager proberesults.Manager,
	seccompProfileRoot string,
	containerRefManager *kubecontainer.RefManager,
	machineInfo *cadvisorapi.MachineInfo,
	podStateProvider podStateProvider,
	osInterface kubecontainer.OSInterface,
	runtimeHelper kubecontainer.RuntimeHelper,
	httpClient types.HttpGetter,
	imageBackOff *flowcontrol.Backoff,
	serializeImagePulls bool,
	imagePullQPS float32,
	imagePullBurst int,
	cpuCFSQuota bool,
	cpuCFSQuotaPeriod metav1.Duration,
	runtimeService internalapi.RuntimeService,
	imageService internalapi.ImageManagerService,
	internalLifecycle cm.InternalContainerLifecycle,
	legacyLogProvider LegacyLogProvider,
	runtimeClassManager *runtimeclass.Manager,
) (KubeGenericRuntime, error) {
	kubeRuntimeManager := &kubeGenericRuntimeManager{
		recorder:            recorder,
		cpuCFSQuota:         cpuCFSQuota,
		cpuCFSQuotaPeriod:   cpuCFSQuotaPeriod,
		seccompProfileRoot:  seccompProfileRoot,
		livenessManager:     livenessManager,
		startupManager:      startupManager,
		containerRefManager: containerRefManager,
		machineInfo:         machineInfo,
		osInterface:         osInterface,
		runtimeHelper:       runtimeHelper,
		runtimeService:      newInstrumentedRuntimeService(runtimeService),
		imageService:        newInstrumentedImageManagerService(imageService),
		keyring:             credentialprovider.NewDockerKeyring(),
		internalLifecycle:   internalLifecycle,
		legacyLogProvider:   legacyLogProvider,
		runtimeClassManager: runtimeClassManager,
		logReduction:        logreduction.NewLogReduction(identicalErrorDelay),
	}

	typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
	if err != nil {
		klog.Errorf("Get runtime version failed: %v", err)
		return nil, err
	}

	// Only matching kubeRuntimeAPIVersion is supported now
	// TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
	if typedVersion.Version != kubeRuntimeAPIVersion {
		klog.Errorf("Runtime api version %s is not supported, only %s is supported now",
			typedVersion.Version,
			kubeRuntimeAPIVersion)
		return nil, ErrVersionNotSupported
	}

	kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
	klog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s",
		typedVersion.RuntimeName,
		typedVersion.RuntimeVersion,
		typedVersion.RuntimeApiVersion)

	// If the container logs directory does not exist, create it.
	// TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
	// new runtime interface
	if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
		if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
			klog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err)
		}
	}

	kubeRuntimeManager.imagePuller = images.NewImageManager(
		kubecontainer.FilterEventRecorder(recorder),
		kubeRuntimeManager,
		imageBackOff,
		serializeImagePulls,
		imagePullQPS,
		imagePullBurst)
	kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
	kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)

	kubeRuntimeManager.versionCache = cache.NewObjectCache(
		func() (interface{}, error) {
			return kubeRuntimeManager.getTypedVersion()
		},
		versionCacheTTL,
	)

	return kubeRuntimeManager, nil
}
           

總結

該博文先對CRI做了介紹,然後對kubelet CRI相關源碼進行分析,包括kubelet元件CRI相關啟動參數分析、CRI相關interface/struct分析、CRI相關初始化分析3個部分,剩下的其他部分分析,将在下一篇CRI博文裡做分析。

CRI介紹

CRI,全稱

Container Runtime Interface

,容器運作時接口。

rkt

containerd

Sandbox

Container

的 gRPC 接口,并将鏡像管理和容器管理分離到不同的服務。

dockershim

dockershim

dockershim

docker daemon

kubeGenericRuntimeManager

pkg/kubelet/kuberuntime/kuberuntime_manager.go

dockershim

pkg/kubelet/dockershim

cri-containerd

containerd

rktlet

rkt

dockershim

dockershim

dockershim

rkt

containerd

kubernetes/k8s CRI分析-容器運作時接口分析

kubernetes/k8s CRI分析-容器運作時接口分析

CRI相關初始化

kubeGenericRuntimeManager

kubelet

CRI shim

CRI shim

繼續閱讀