源碼版本
kubernetes version: v1.3.0
DockerClient初始化
DockerClient是KubeletConfig的成員之一。
KubeletConfig結構介紹:
type KubeletConfig struct {
Address net.IP
AllowPrivileged bool
...
DockerClient dockertools.DockerInterface
RuntimeCgroups string
DockerExecHandler dockertools.ExecHandler
...
}
而kubeletConfig的初始化是在UnsecuredKubeletConfig()接口中進行的,需要依賴最開始組建的kubeletServer配置結構,該kubeletServer結構中有DockerEndpoint字元串成員:
type KubeletServer struct {
componentconfig.KubeletConfiguration
AuthPath util.StringFlag // Deprecated -- use KubeConfig instead
KubeConfig util.StringFlag
APIServerList []string
RunOnce bool
// Insert a probability of random errors during calls to the master.
ChaosChance float64
// Crash immediately, rather than eating panics.
ReallyCrashForTesting bool
SystemReserved config.ConfigurationMap
KubeReserved config.ConfigurationMap
}
type KubeletConfiguration struct {
// config is the path to the config file or directory of files
Config string `json:"config"`
...
DockerEndpoint string `json:"dockerEndpoint"`
...
實際上如果沒有指定該參數的話,會預設使用端點”unix:///var/run/docker.sock”做為DockerEndpoint。可以檢視NewEnvClient()接口。
回到kubeletConfig的初始化接口UnsecuredKubeletConfig():
func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
if err != nil {
return nil, err
}
...
return &KubeletConfig{
Address: net.ParseIP(s.Address),
AllowPrivileged: s.AllowPrivileged,
...
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
...
}
接着繼續檢視dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration)。
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
if dockerEndpoint == "fake://" {
return NewFakeDockerClient()
}
client, err := getDockerClient(dockerEndpoint)
if err != nil {
glog.Fatalf("Couldn't connect to docker: %v", err)
}
glog.Infof("Start docker client with request timeout=%v", requestTimeout)
return newKubeDockerClient(client, requestTimeout)
}
先前我們了解了如果在kubelet啟動時沒有傳入”docker-endpoint”參數的話,s.DockerEndpoint即為空。
s.RuntimeRequestTimeout.Duration值可以檢視NewKubeletServer()函數的初始化,是2min。
getDockerClient()接口比較簡單:
getDockerClient –> dockerapi.NewEnvClient() –> NewClient().
NewClient()接口如下:
func NewClient(host string, version string, client *http.Client, httpHeaders map[string]string) (*Client, error) {
proto, addr, basePath, err := ParseHost(host)
if err != nil {
return nil, err
}
transport, err := transport.NewTransportWithHTTP(proto, addr, client)
if err != nil {
return nil, err
}
return &Client{
proto: proto,
addr: addr,
basePath: basePath,
transport: transport,
version: version,
customHTTPHeaders: httpHeaders,
}, nil
}
之前講了如果沒有傳入”docker-endpoint”參數的話,預設值就是”unix:///var/run/docker.sock”.即host參數為該值。
ParseHost()先根據host進行解析,然後建立transport–>Client。
Client結構如下:
type Client struct {
// proto holds the client protocol i.e. unix.
proto string
// addr holds the client address.
addr string
// basePath holds the path to prepend to the requests.
basePath string
// transport is the interface to send request with, it implements transport.Client.
transport transport.Client
// version of the server to talk to.
version string
// custom http headers configured by users.
customHTTPHeaders map[string]string
}
建立Client成功之後,最終開始提到的ConnectToDockerOrDie()接口會調用newKubeDockerClient()生成pkg/kubelet/dockertools/kube_docker_client.go裡的kubeDockerClient結構:
type kubeDockerClient struct {
// timeout is the timeout of short running docker operations.
timeout time.Duration
client *dockerapi.Client
}
初始化到這裡就結束了,那我們回到最初,介紹下DockerClient定義:
dockertools.DockerInterface如下:
type DockerInterface interface {
ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
InspectContainer(id string) (*dockertypes.ContainerJSON, error)
CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
StartContainer(id string) error
StopContainer(id string, timeout int) error
RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
InspectImage(image string) (*dockertypes.ImageInspect, error)
ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
ImageHistory(id string) ([]dockertypes.ImageHistory, error)
Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
Version() (*dockertypes.Version, error)
Info() (*dockertypes.Info, error)
CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
}
而我們最終初始化傳回了結構體kubeDockerClient,是以DockerInterface接口的實作,我們可以回到kubeDockerClient結構體所在檔案pkg/kubelet/dockertools/kube_docker_client.go檢視接口實作。
DockeClient接口分析
源碼目錄: pkg/kubelet/dockertools/kube_docker_client.go
實作的接口如下:
可以看到kubeDockerClient結構體實作了所有的DockerInterface接口。
這些接口其實是對docker的操作接口進行了封裝,下面取一個接口進行分析:
func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
containers, err := d.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return containers, nil
}
該ListContainers()接口的關鍵就是調用了d.client.ContainerList(ctx, options).
是以關鍵對象還是client,繼續回到上面講初始化時介紹到的Client結構體。
Client結構所在檔案: vendor/github.com/docker/engine-api/client/client.go
Client package結構:
操作docker API的接口都封裝在這些檔案中,有空可以深入了解下,這裡就不一一介紹了,我們繼續回到d.client.ContainerList(ctx, options),實作如下:
func (cli *Client) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
query := url.Values{}
if options.All {
query.Set("all", "1")
}
if options.Limit != - {
query.Set("limit", strconv.Itoa(options.Limit))
}
if options.Since != "" {
query.Set("since", options.Since)
}
if options.Before != "" {
query.Set("before", options.Before)
}
if options.Size {
query.Set("size", "1")
}
if options.Filter.Len() > {
filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filter)
if err != nil {
return nil, err
}
query.Set("filters", filterJSON)
}
resp, err := cli.get(ctx, "/containers/json", query, nil)
if err != nil {
return nil, err
}
var containers []types.Container
err = json.NewDecoder(resp.body).Decode(&containers)
ensureReaderClosed(resp)
return containers, err
}
前面都是一些參數初始化,其實就是建構一個GET請求,然後調用cli.get(),該get就是一個httpRequest:
func (cli *Client) get(ctx context.Context, path string, query url.Values, headers map[string][]string) (*serverResponse, error) {
return cli.sendRequest(ctx, "GET", path, query, nil, headers)
}
func (cli *Client) sendRequest(ctx context.Context, method, path string, query url.Values, obj interface{}, headers map[string][]string) (*serverResponse, error) {
var body io.Reader
if obj != nil {
var err error
body, err = encodeData(obj)
if err != nil {
return nil, err
}
if headers == nil {
headers = make(map[string][]string)
}
headers["Content-Type"] = []string{"application/json"}
}
return cli.sendClientRequest(ctx, method, path, query, body, headers)
}
func (cli *Client) sendClientRequest(ctx context.Context, method, path string, query url.Values, body io.Reader, headers map[string][]string) (*serverResponse, error) {
serverResp := &serverResponse{
body: nil,
statusCode:,
}
...
req, err := cli.newRequest(method, path, query, body, headers)
if cli.proto == "unix" || cli.proto == "npipe" {
// For local communications, it doesn't matter what the host is. We just
// need a valid and meaningful host name. (See #189)
req.Host = "docker"
}
req.URL.Host = cli.addr
req.URL.Scheme = cli.transport.Scheme()
if expectedPayload && req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "text/plain")
}
resp, err := cancellable.Do(ctx, cli.transport, req)
if resp != nil {
serverResp.statusCode = resp.StatusCode
}
...
if serverResp.statusCode < || serverResp.statusCode >= {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return serverResp, err
}
if len(body) == {
return serverResp, fmt.Errorf("Error: request returned %s for API route and version %s, check if the server supports the requested API version", http.StatusText(serverResp.statusCode), req.URL)
}
return serverResp, fmt.Errorf("Error response from daemon: %s", bytes.TrimSpace(body))
}
serverResp.body = resp.Body
serverResp.header = resp.Header
return serverResp, nil
}
func Do(ctx context.Context, client transport.Sender, req *http.Request) (*http.Response, error) {
...
result := make(chan responseAndError,)
go func() {
resp, err := client.Do(req)
testHookDoReturned()
result <- responseAndError{resp, err}
}()
var resp *http.Response
select {
case <-ctx.Done():
testHookContextDoneBeforeHeaders()
cancel()
// Clean up after the goroutine calling client.Do:
go func() {
if r := <-result; r.resp != nil && r.resp.Body != nil {
testHookDidBodyClose()
r.resp.Body.Close()
}
}()
return nil, ctx.Err()
case r := <-result:
var err error
resp, err = r.resp, r.err
if err != nil {
return resp, err
}
}
...
return resp, nil
}
上面列出了httpRequest的整個調用過程,最終調用client.Do(),該client對象需要回到之前的初始化過程中去,實際就是調用vemdor/github.com/docker/engine-api/client/client.go中的Client.transport,而該對象初始化時設定為apiTransport對象:
type apiTransport struct {
*http.Client
*tlsInfo
transport *http.Transport
}
是以client.Do()實際就是調用http.Client.Do()。
OK,到此算是分析結束,具體的各個接口實作,還是需要花時間檢視源碼,但也都是大同小異。
學習源碼的過程中,可以看到很多經典的實作,比如上面介紹的cancellable.Do()接口實作,golang非常推崇的”協程+channel”的方式,通過select case的方式循環等待協程處理的結果,确實很友善。