天天看點

distribution源碼分析(二):registry啟動

1. 前言

像分析docker1.9的源碼時一樣,我在distribution源碼分析系列的第一篇仍然是介紹主函數開始的啟動流程,這樣就能對distribution的大體架構有個初步了解。分析流程中,distribution的版本均為2.1.0。

2. 本文分析内容安排

  • Cmd的初始化
  • Execute函數
  • handler句柄轉發

3. Cmd的初始化

主函數在cmd/registry/main.go中,隻一條語句,

registry.Cmd.Execute()

,Cmd是在/distribution/registry/registry.go中定義的,是以也可以說後來registry的執行都是在該檔案中定義的Cmd和其他的諸如Context上下文等的基礎上執行的。再将具體初始化之前首先介紹一下啟動registry是指定的參數yaml檔案

config-example.yml

,下面是一個具體例子:

version: 
log:
  fields:
    service: registry
storage:
    cache:
        blobdescriptor: inmemory
    filesystem:
        rootdirectory: /var/lib/registry
http:
    addr: :
    headers:
        X-Content-Type-Options: [nosniff]
health:
  storagedriver:
    enabled: true
    interval: 10s
    threshold: 3
           

可見storage、http等都在此做了配置。

3.1 Cmd配置初始化

Cmd主要的初始化都是針對于Run函數變量進行的,下面的代碼都是Run定義的函數中摘下來的

// setup context
ctx := context.WithVersion(context.Background(), version.Version)
config, err := resolveConfiguration(args)
if err != nil {
fmt.Fprintf(os.Stderr, "configuration error: %v\n", err)
cmd.Usage()
os.Exit)
}
           

這段代碼首先定義上下文,然後解析運作registry輸入的yaml參數中的内容為Configuration結構,Configuration的定義在distribution/configuration/configuration.go中,包含了運作registry需要的Log、Storage、Auth、HTTP、Notification、Redis、Health等所有配置。

配置工作已經做完了,下面開始正式建立registry并提供服務的代碼,在此之前先介紹Registry結構,它代表了一個registry完整的執行個體,包括Configuration代表了配置資訊、handlers.App代表了一個全局的registry application對象可以向所有的requests提供共享資源、http.Server定義了運作Http server的所有變量。

type Registry struct {
    config *configuration.Configuration
    app    *handlers.App
    server *http.Server
}
           

其中handlers.App需要單獨列出來講一下,其中mux.Router是路由分發的,storagedriver.StorageDriver用于指定後端存儲,events包含了與App相關的所有事件。

type App struct {
    context.Context
    Config *configuration.Configuration
    router           *mux.Router                 // main application router, configured with dispatchers
    driver           storagedriver.StorageDriver // driver maintains the app global storage driver instance.
    registry         distribution.Namespace      // registry is the primary registry backend for the app instance.
    accessController auth.AccessController       // main access controller for application
    // events contains notification related configuration.
    events struct {
        sink   notifications.Sink
        source notifications.SourceRecord
    }
    redis *redis.Pool
    // true if this registry is configured as a pull through cache
    isCache bool
}
           

介紹完了用到的重要結構體繼續介紹registry啟動流程

registry, err := NewRegistry(ctx, config)
if err != nil {
    log.Fatalln(err)
}
if err = registry.ListenAndServe(); err != nil {
    log.Fatalln(err)
}
           

3.2初始化App

NewRegistry

根據之前定義的Context和Configuration建立一個Regsitry,其中主要是通過handlers.NewApp給app指派,該函數的實作位于/distribution/registry/handlers/app.go中,首先根據既有的Context和Configuration做最初的初始化

app := &App{
    Config:  configuration,
    Context: ctx,
    router:  v2.RouterWithPrefix(configuration.HTTP.Prefix),
    isCache: configuration.Proxy.RemoteURL != "",
}
           

之後調用App的register函數,通過route name将相應的handler注冊到app中,register函數根據route name提供一個服務于request的handler并調用dispath指定的函數提供Http服務。

// Register the handler dispatchers.
    app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
        return http.HandlerFunc(apiBase)
    })
    app.register(v2.RouteNameManifest, imageManifestDispatcher)
    app.register(v2.RouteNameCatalog, catalogDispatcher)
    app.register(v2.RouteNameTags, tagsDispatcher)
    app.register(v2.RouteNameBlob, blobDispatcher)
    app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
    app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
           

接下來是注冊storagedriver,涉及到内容比較多,稍後再講。現在要說的是為registry的App配置secret、events、redis和loghook,代碼如下:

app.configureSecret(configuration)
app.configureEvents(configuration)  app.configureRedis(configuration)
app.configureLogHook(configuration)
           

這四個函數的實作依然都在distribution/registry/handlers/app.go中,其中,configureSecret當yaml檔案中沒有指定secret時會生成一個HTTP secret;configureEvents為接下來的action準備sink,主要的代碼有:

for _, endpoint := range configuration.Notifications.Endpoints {
    if endpoint.Disabled {
        ctxu.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name)
        continue
    }
    ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
    endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
        Timeout:   endpoint.Timeout,
        Threshold: endpoint.Threshold,
        Backoff:   endpoint.Backoff,
        Headers:   endpoint.Headers,
    })
    sinks = append(sinks, endpoint)
}
app.events.sink = notifications.NewBroadcaster(sinks...)
           

這段代碼将registry的所有endpoint在sinks裡面做注冊,并且指派給app。

hostname, err := os.Hostname()
if err != nil {
    hostname = configuration.HTTP.Addr
} else {
    // try to pick the port off the config
    _, port, err := net.SplitHostPort(configuration.HTTP.Addr)
    if err == nil {
        hostname = net.JoinHostPort(hostname, port)
    }
}
app.events.source = notifications.SourceRecord{
    Addr:       hostname,
    InstanceID: ctxu.GetStringValue(app, "instance.id"),
}
           

首先擷取主機名,通過os包直接讀取主機名,當讀取失敗時采用從Configuration中存的Addr,之後為app.events的事件源設定為本機的主機名。

接下來介紹初始化App過程中與storigedriver相關的部分,首先根據給定的name和parameters建立一個StorageDriver,其中name指的是driver type,比如本地檔案系統或者s3;parameters指的是yaml針對于該driver的配置資訊

app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
           

之後将storage driver和middleware建立聯系

app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
           

建立好和middleware的聯系後,根據yaml配置存儲緩存。條件語句判斷yml檔案是否配置了cache,如果是的話繼續讀取blobdescriptor的配置。接下來的switch語句根據blobdesciptor的值是redis還是inmemory來确定對應的操作,此處以inmemory為例來講解,最重要的一行代碼是

storage.NewRegistry(app, app.driver, localOptions...)

,該函數的定義在distribution/registry/handlers/app.go中,該函數傳回的是一個distribution.Namespace的結構,Namespace指定了一個repositories的集合,提供registry的access、trust、indexing等服務,每向registry上傳一個不同名鏡像,在repositories中都會有對應的一項,相當于中繼資料;上傳同名對象時,由于tag不同,會在repositories下的tags目錄中出現針對于不同tag的tags目錄。但是,此處的storage.registry實作了接口Namespace的所有函數,真正傳回的的storage.registry結構,描述了倉庫與存儲相關的變量。

// configure storage caches
if cc, ok := configuration.Storage["cache"]; ok {
    v, ok := cc["blobdescriptor"]
    if !ok {
        // Backwards compatible: "layerinfo" == "blobdescriptor"
        v = cc["layerinfo"]
    }
    switch v {
    case "redis":
        if app.redis == nil {
            panic("redis configuration required to use for layerinfo cache")
        }
        cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redis)
        localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
        app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
        if err != nil {
            panic("could not create registry: " + err.Error())
        }
        ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
    case "inmemory":
        cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider()
        localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
        app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
        if err != nil {
            panic("could not create registry: " + err.Error())
        }
        ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
    default:
        if v != "" {
            ctxu.GetLogger(app).Warnf("unknown cache type %q, caching disabled", configuration.Storage["cache"])
        }
    }
}
           

再向下就是配置app.accessController等,然後傳回配置好的app

3.3 初始化http.Server

這塊兒主要是配置Server中的Handler,代碼如下:

handler := configureReporting(app)
handler = alive("/", handler)
handler = health.Handler(handler)
handler = panicHandler(handler)
handler = gorhandlers.CombinedLoggingHandler(os.Stdout, handler)
server := &http.Server{
    Handler: handler,
}
           

第一行調用的是configureReporting函數,定義http.Handler類型的handler變量,并為之指派app,因為App實作了ServeHTTP函數,是以可以指向app,并且擁有app具有的所有屬性,主要是給handler加上了APIKey和LicenseKey。

之後調用alive函數,該函數原型為func alive(path string, handler http.Handler) http.Handler ,當path與r.URL.Path比對時範圍StatusOK;否則,傳回handler.ServeHTTP(w,r),讓該handler提供http服務,在該位置并沒有提前定義path,是以可以認為是為了提供http服務才調用該函數的。

接下來調用health.Handler來進行安全性檢測,如果失敗則停止,如果通過則繼續提供http服務。health.Handler(handler)中調用CheckStatus()函數,隻有一行DefaultRegistry.CheckStatus(),傳回預設registry的檢查結果,預設registry即在用的倉庫,檢查倉庫是否可以安全地提供服務。

之後繼續完成完成後面的panicHandler喚醒延遲服務并按照yaml中配置的log選項顯示log資訊,CombinedLoggingHandler函數将logs以設定的Format顯示。之後将根據配置好的handler給Server指派。

至此,Registry結構中的所有屬性都已配置好,可以傳回了,如下所示:

return &Registry{
        app:    app,
        config: config,
        server: server,
}, nil
           

3.4 ListenAndServe提供服務

通過上面的代碼已經完全配置好了registry,接下來的代碼是registry.ListenAndServe(),這個函數運作registry的Http server,具體提供服務,流程如下:

ln, err := listener.NewListener(config.HTTP.Net, config.HTTP.Addr)
           

NewListener的實作位于distribution/registry/listener/listener.go中,參數是string類型的net和laddr,net指明是”unix”還是“tcp”,laddr指明提供服務的位址。根據類型分别選擇提供tcp或者unix的Listen服務,比如net.Listen(“tcp”,laddr)。

之後就是配置tls,包括Key、Certificate、CA等,配置完後調用的函數是

ln = tls.NewListener(ln, tlsConf)

,這個語句将原本的listener加上tls。最後傳回的語句為

registry.server.Serve(ln)

,之前已經為server注冊了handler, 此處調用該Serve函數,以Listener為參數,就能在Listener中記錄的特定位址接受連接配接,為每個連接配接建立一個goroutine。并用該協程讀取request,調用相應的handler進行處理。

4. Execute函數

Execute函數的實作位于Goeps的spf13/cobra/command.go中,是Command結構的一個成員函數,首先對Command結構作介紹,它代表了程式執行的一個指令,比如

go run

中的

run

便是一個Command,在該結構中,Run函數是真正執行工作的:

type Command struct {
    // Name is the command name, usually the executable's name.
    name string
    // The one-line usage message.
    Use string
    // An array of aliases that can be used instead of the first word in Use.
    Aliases []string
    // The short description shown in the 'help' output.
    Short string
    // The long message shown in the 'help <this-command>' output.
    Long string
    // Examples of how to use the command
    Example string
    // List of all valid non-flag arguments, used for bash completions *TODO* actually validate these
    ValidArgs []string
    // Custom functions used by the bash autocompletion generator
    BashCompletionFunction string
    // Is this command deprecated and should print this string when used?
    Deprecated string
    // Full set of flags
    flags *flag.FlagSet
    // Set of flags childrens of this command will inherit
    pflags *flag.FlagSet
    // Flags that are declared specifically by this command (not inherited).
    lflags *flag.FlagSet
    // The *Run functions are executed in the following order:
    //   * PersistentPreRun()
    //   * PreRun()
    //   * Run()
    //   * PostRun()
    //   * PersistentPostRun()
    // All functions get the same args, the arguments after the command name
    // PersistentPreRun: children of this command will inherit and execute
    PersistentPreRun func(cmd *Command, args []string)
    // PreRun: children of this command will not inherit.
    PreRun func(cmd *Command, args []string)
    // Run: Typically the actual work function. Most commands will only implement this
    Run func(cmd *Command, args []string)
    // PostRun: run after the Run command.
    PostRun func(cmd *Command, args []string)
    // PersistentPostRun: children of this command will inherit and execute after PostRun
    PersistentPostRun func(cmd *Command, args []string)
    // Commands is the list of commands supported by this program.
    commands []*Command
    // Parent Command for this command
    parent *Command
    // max lengths of commands' string lengths for use in padding
    commandsMaxUseLen         int
    commandsMaxCommandPathLen int
    commandsMaxNameLen        int
    flagErrorBuf *bytes.Buffer
    cmdErrorBuf  *bytes.Buffer
    args          []string                 // actual args parsed from flags
    output        *io.Writer               // nil means stderr; use Out() method instead
    usageFunc     func(*Command) error     // Usage can be defined by application
    usageTemplate string                   // Can be defined by Application
    helpTemplate  string                   // Can be defined by Application
    helpFunc      func(*Command, []string) // Help can be defined by application
    helpCommand   *Command                 // The help command
    helpFlagVal   bool
    // The global normalization function that we can use on every pFlag set and children commands
    globNormFunc func(f *flag.FlagSet, name string) flag.NormalizedName
}
           

registry在啟動之前做的都是一些解析參數的工作,真正的啟動是從/_workspace/src/github.com/spf13/cobra/command.go中的execute裡進行的,在該函數中按照PersistentPreRun(),PreRun(),Run(),PostRun(),PersistentPostRun()的順序将五個函數執行了一遍。distribution下的configuration中儲存的是配置參數,是在運作時指定的yaml檔案中的。運作Cmd時,會先執行Cmd的init函數顯示版本号等資訊,然後執行Execute,執行到Run函數時就是第三節講的那部分内容了。

5. handler句柄轉發

與句柄轉發相關的路由在handler.App結構中,是以句柄的注冊是在distribution/registry/registry.go中NewRegistry函數中通過

app := handlers.NewApp(ctx, config)

實作的,在這行代碼之後雖然接連出現handler變量,但隻是做輔助性的判斷并在此基礎上生成http.Server用的,并沒有提供注冊handler具體函數的功能。

下面就進入NewApp函數内部檢視具體的注冊流程,用到的函數為registry,函數的原型如下:

func (app *App) register(routeName string, dispatch dispatchFunc) {
    app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
}
           

有兩個參數,第一個routeName指定route名,比如manifest、blob、tags等,指的是提供哪種服務,服務所有類型在distribution/registry/api/v2/routes.go中定義的,如下所示:

const (
    RouteNameBase            = "base"
    RouteNameManifest        = "manifest"
    RouteNameTags            = "tags"
    RouteNameBlob            = "blob"
    RouteNameBlobUpload      = "blob-upload"
    RouteNameBlobUploadChunk = "blob-upload-chunk"
    RouteNameCatalog         = "catalog"
)
           

而第二個參數指定的是提供該服務的具體句柄函數。函數體中隻有一條語句,GetRoute函數,根據函數提供的routeName傳回具體的已經注冊的mux.Route,下面的代碼中注冊了七個Route,正是對應于以上const中定義的七個routeName。之後的Handler函數建立一個監聽Bugsnag panics的http Handler。Handler函數的參數也是http.Handler,dispatchFunc讀取context和request并為route傳回一個handler。dispathcer為每個endpoint建立特定于請求的handlers而不是為每個request都建立一個router。

// Register the handler dispatchers.
app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
    return http.HandlerFunc(apiBase)
})
app.register(v2.RouteNameManifest, imageManifestDispatcher)
app.register(v2.RouteNameCatalog, catalogDispatcher)
app.register(v2.RouteNameTags, tagsDispatcher)
app.register(v2.RouteNameBlob, blobDispatcher)
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
           

6. 總結

本文從源碼的角度分析了從registry可執行檔案開始,到讀取yaml配置檔案,到根據配置建立handler.App和Server,最後由Configuration、App、Server組成registry結構并調用ListenAndServe函數提供http服務,接收連接配接并将request轉到對應的handler處理并傳回結果提供服務,最後又介紹了Serve服務具體是怎樣轉發排程handler的。

筆者認為,學習與了解registry啟動并提供服務過程的源碼,不僅讓使用者對registry的整體架構和整體流程有了了解,同時也是使用者在自身的需要下針對特定子產品進行深入研究和修改代碼的基礎。1

7. 作者介紹

梁明遠,國防科大并行與分布式計算國家重點實驗室(PDL)應屆研究所學生,14年入學伊始便開始接觸docker,準備在餘下的讀研時間在docker相關開源社群貢獻自己的代碼,畢業後準備繼續從事該方面研究。郵箱:[email protected]

8. 參考文獻

  1. http://www.infoq.com/cn/articles/docker-source-code-analysis-part2 ↩

繼續閱讀