一、容器和虛拟機
在fabric中,有兩類鍊碼,一類是系統鍊碼,一類是使用者鍊碼。而鍊碼都需要安裝和執行個體化才能使用,在這當中,它們雖然原理相似,但是實作的方式還是有所不同。在系統鍊碼中,首先要Register,然後再Deploy才能使用;而使用者鍊碼則首先要Install,然後再instantiate就可以被外部接口使用了。
是以,對容器的啟動也可分成這兩部分來進行解析,從宏觀上把握入口,然後分類進行源碼的解析。
二、整體的入口
在前面的分析中可以知道在Launch函數中,是啟動容器的入口。那麼就從Launch這個函數開始看(core/chaincode/chaincode_support.go):
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
cname := chaincodeName + ":" + chaincodeVersion
if h := cs.HandlerRegistry.Handler(cname); h != nil {
return h, nil
}
//此處到得容器相關的資訊,包括生産容器的具體類型是系統鍊碼容器還是使用者鍊碼容器
//在後面會說明,系統鍊碼啟動的容器是:inprocVM---inproContainer,使用者鍊碼啟動的容器是DockerVM---DockerContainer
ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe)
if err != nil {
// TODO: There has to be a better way to do this...
if cs.UserRunsCC {
chaincodeLogger.Error(
"You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?",
)
}
return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname)
}
//啟動Runtime中的Launch
if err := cs.Launcher.Launch(ccci); err != nil {
return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname)
}
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
}
return h, nil
}
//runtime_launcher.go
func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
var startFailCh chan error
var timeoutCh <-chan time.Time
startTime := time.Now()
cname := ccci.Name + ":" + ccci.Version
launchState, alreadyStarted := r.Registry.Launching(cname)
if !alreadyStarted {
startFailCh = make(chan error, 1)
timeoutCh = time.NewTimer(r.StartupTimeout).C
codePackage, err := r.getCodePackage(ccci)
if err != nil {
return err
}
go func() {
//啟動Process
if err := r.Runtime.Start(ccci, codePackage); err != nil {
startFailCh <- errors.WithMessage(err, "error starting container")
return
}
exitCode, err := r.Runtime.Wait(ccci)
if err != nil {
launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
}
launchState.Notify(errors.Errorf("container exited with %d", exitCode))
}()
}
......
return err
}
// Start launches chaincode in a runtime environment.
func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error {
cname := ccci.Name + ":" + ccci.Version
lc, err := c.LaunchConfig(cname, ccci.Type)
if err != nil {
return err
}
chaincodeLogger.Debugf("start container: %s", cname)
chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " "))
chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t"))
scr := container.StartContainerReq{
Builder: &container.PlatformBuilder{
Type: ccci.Type,
Name: ccci.Name,
Version: ccci.Version,
Path: ccci.Path,
CodePackage: codePackage,
PlatformRegistry: c.PlatformRegistry,
},
Args: lc.Args,
Env: lc.Envs,
FilesToUpload: lc.Files,
CCID: ccintf.CCID{
Name: ccci.Name,
Version: ccci.Version,
},
}
//啟動Process--注意傳入的容器類型
if err := c.Processor.Process(ccci.ContainerType, scr); err != nil {
return errors.WithMessage(err, "error starting container")
}
return nil
}
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
v := vmc.newVM(vmtype)
ccid := req.GetCCID()
id := ccid.GetName()
vmc.lockContainer(id)
defer vmc.unlockContainer(id)
//啟動虛拟機
return req.Do(v)
}
//到這裡容器的執行個體化就進入到了接口的具體确定階段,根據不同的類型來确定是SCC或ACC
func (vmc *VMController) newVM(typ string) VM {
v, ok := vmc.vmProviders[typ]
if !ok {
vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
}
return v.NewVM()
}
三、系統容器虛拟機的啟動
1、容器的啟動
在上面的Process中最後一行代碼中req.Do(v),啟動了相關的虛拟機容器。看一下這個接口的定義:
type VMCReq interface {
Do(v VM) error
GetCCID() ccintf.CCID
}
//StartContainerReq - properties for starting a container.
type StartContainerReq struct {
ccintf.CCID
Builder Builder
Args []string
Env []string
FilesToUpload map[string][]byte
}
//StopContainerReq - properties for stopping a container.
type StopContainerReq struct {
ccintf.CCID
Timeout uint
//by default we will kill the container after stopping
Dontkill bool
//by default we will remove the container after killing
Dontremove bool
}
func (w WaitContainerReq) Do(v VM) error {
exited := w.Exited
go func() {
exitCode, err := v.Wait(w.CCID)
exited(exitCode, err)
}()
return nil
}
其它mock部分就不列出來了,供測試使用的,有興趣可以看看源碼。再看一下執行個體的具體生成成,沿着上面的NewVM來看:
// NewVM creates an inproc VM instance
func (r *Registry) NewVM() container.VM {
return NewInprocVM(r)
}
// NewInprocVM creates a new InprocVM
func NewInprocVM(r *Registry) *InprocVM {
return &InprocVM{
registry: r,
}
}
這裡隻分析啟動部分,其它和這個基本差不多:
func (si StartContainerReq) Do(v VM) error {
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}
//Start starts a previously registered system codechain
func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
path := ccid.GetName()
ipctemplate := vm.registry.getType(path)
if ipctemplate == nil {
return fmt.Errorf(fmt.Sprintf("%s not registered", path))
}
instName := vm.GetVMName(ccid)
ipc, err := vm.getInstance(ipctemplate, instName, args, env)
if err != nil {
return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName))
}
if ipc.running {
return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
}
ipc.running = true
go func() {
defer func() {
if r := recover(); r != nil {
inprocLogger.Criticalf("caught panic from chaincode %s", instName)
}
}()
ipc.launchInProc(instName, args, env)
}()
return nil
}
這裡面會判斷是否生成了鍊碼的程序,否則:
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
if ipc.ChaincodeSupport == nil {
inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
}
//建立一個Send和一個Recv通道
peerRcvCCSend := make(chan *pb.ChaincodeMessage)
ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
var err error
//鍊碼側通道Handler
ccchan := make(chan struct{}, 1)
//Peer側通道Handler
ccsupportchan := make(chan struct{}, 1)
shimStartInProc := _shimStartInProc // shadow to avoid race in test
//鍊碼側相關
go func() {
defer close(ccchan)
inprocLogger.Debugf("chaincode started for %s", id)
if args == nil {
args = ipc.args
}
if env == nil {
env = ipc.env
}
err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
if err != nil {
err = fmt.Errorf("chaincode-support ended with err: %s", err)
_inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
}()
// shadow function to avoid data race
inprocLoggerErrorf := _inprocLoggerErrorf
//Peer側相關
go func() {
defer close(ccsupportchan)
inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
inprocLogger.Debugf("chaincode-support started for %s", id)
//消息處理
err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
if err != nil {
err = fmt.Errorf("chaincode ended with err: %s", err)
inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
}()
select {
case <-ccchan:
close(peerRcvCCSend)
inprocLogger.Debugf("chaincode %s quit", id)
case <-ccsupportchan:
close(ccRcvPeerSend)
inprocLogger.Debugf("chaincode support %s quit", id)
case <-ipc.stopChan:
close(ccRcvPeerSend)
close(peerRcvCCSend)
inprocLogger.Debugf("chaincode %s stopped", id)
}
return err
}
// StartInProc is an entry point for system chaincodes bootstrap. It is not an
// API for chaincodes.
func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error {
chaincodeLogger.Debugf("in proc %v", args)
var chaincodename string
for _, v := range env {
if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 {
p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=")
chaincodename = p[1]
break
}
}
if chaincodename == "" {
return errors.New("error chaincode id not provided")
}
stream := newInProcStream(recv, send)
chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename)
//看看這是誰,Handler消息處理就在這個函數裡,前面分析過,這裡就不再贅述
err := chatWithPeer(chaincodename, stream, cc)
return err
}
系統鍊碼直接啟動了記憶體的虛拟機。隻有使用者鍊碼才會啟動Docker,在内部運作虛拟機。是以這二者才分在類的生成中從同一接口繼承但分成了兩個類。特别需要注意的是要看看上面對Peer側和鍊碼側的消息處理的生成過程,這個非常重要。代碼裡有直接注釋,感興趣可以把代碼跟到底看看到底是如何生成的。
2、消息的傳遞
看到了chatWithPeer,就想到了handleMessage,這個在前面有詳細分析,如果有什麼不明白可以看看“鍊碼源碼分析”。下面隻列出代碼:
// handleMessage message handles loop for shim side of chaincode/peer stream.
func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error {
if msg.Type == pb.ChaincodeMessage_KEEPALIVE {
chaincodeLogger.Debug("Sending KEEPALIVE response")
handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work
return nil
}
chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state)
var err error
switch handler.state {
case ready:
err = handler.handleReady(msg, errc)
case established:
err = handler.handleEstablished(msg, errc)
case created:
err = handler.handleCreated(msg, errc)
default:
err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
}
if err != nil {
payload := []byte(err.Error())
errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
handler.serialSend(errorMsg)
return err
}
return nil
}
四、使用者容器虛拟機的啟動
1、虛拟機的啟動
// NewVM creates a new DockerVM instance
func (p *Provider) NewVM() container.VM {
return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics)
}
// NewDockerVM returns a new DockerVM instance
func NewDockerVM(peerID, networkID string, buildMetrics *BuildMetrics) *DockerVM {
return &DockerVM{
PeerID: peerID,
NetworkID: networkID,
getClientFnc: getDockerClient,
BuildMetrics: buildMetrics,
}
}
// Start starts a container using a previously created docker image
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
imageName, err := vm.GetVMNameForDocker(ccid)
if err != nil {
return err
}
attachStdout := viper.GetBool("vm.docker.attachStdout")
containerName := vm.GetVMName(ccid)
logger := dockerLogger.With("imageName", imageName, "containerName", containerName)
//通過VM獲得用戶端
client, err := vm.getClientFnc()
if err != nil {
logger.Debugf("failed to get docker client", "error", err)
return err
}
//停止容器和虛拟機
vm.stopInternal(client, containerName, 0, false, false)
// 此處建立容器
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err == docker.ErrNoSuchImage {
//如果沒有鏡像,則使用builder來建立相關容器
reader, err := builder.Build()
if err != nil {
return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName)
}
//部署鏡像
err = vm.deployImage(client, ccid, reader)
if err != nil {
return err
}
//建立鏡像後,再建立容器
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err != nil {
logger.Errorf("failed to create container: %s", err)
return err
}
} else if err != nil {
logger.Errorf("create container failed: %s", err)
return err
}
// stream stdout and stderr to chaincode logger
if attachStdout {
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
streamOutput(dockerLogger, client, containerName, containerLogger)
}
// upload specified files to the container before starting it
// this can be used for configurations such as TLS key and certs
//處理容器需要的證書相關的檔案
if len(filesToUpload) != 0 {
// the docker upload API takes a tar file, so we need to first
// consolidate the file entries to a tar
payload := bytes.NewBuffer(nil)
gw := gzip.NewWriter(payload)
tw := tar.NewWriter(gw)
for path, fileToUpload := range filesToUpload {
cutil.WriteBytesToPackage(path, fileToUpload, tw)
}
// Write the tar file out
if err := tw.Close(); err != nil {
return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err)
}
gw.Close()
//上傳資料
err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{
InputStream: bytes.NewReader(payload.Bytes()),
Path: "/",
NoOverwriteDirNonDir: false,
})
if err != nil {
return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err)
}
}
// start container with HostConfig was deprecated since v1.10 and removed in v1.2
err = client.StartContainer(containerName, nil)
if err != nil {
dockerLogger.Errorf("start-could not start container: %s", err)
return err
}
dockerLogger.Debugf("Started container %s", containerName)
return nil
}
看一下建立容器的代碼:
func (vm *DockerVM) createContainer(client dockerClient, imageID, containerID string, args, env []string, attachStdout bool) error {
logger := dockerLogger.With("imageID", imageID, "containerID", containerID)
logger.Debugw("create container")
_, err := client.CreateContainer(docker.CreateContainerOptions{
Name: containerID,
Config: &docker.Config{
Cmd: args,
Image: imageID,
Env: env,
AttachStdout: attachStdout,
AttachStderr: attachStdout,
},
HostConfig: getDockerHostConfig(),
})
if err != nil {
return err
}
logger.Debugw("created container")
return nil
}
// See https://goo.gl/tyzwVM for more details.
func (c *Client) CreateContainer(opts CreateContainerOptions) (*Container, error) {
path := "/containers/create?" + queryString(opts)
resp, err := c.do(
"POST",
path,
doOptions{
data: struct {
*Config
HostConfig *HostConfig `json:"HostConfig,omitempty" yaml:"HostConfig,omitempty" toml:"HostConfig,omitempty"`
NetworkingConfig *NetworkingConfig `json:"NetworkingConfig,omitempty" yaml:"NetworkingConfig,omitempty" toml:"NetworkingConfig,omitempty"`
}{
opts.Config,
opts.HostConfig,
opts.NetworkingConfig,
},
context: opts.Context,
},
)
if e, ok := err.(*Error); ok {
if e.Status == http.StatusNotFound {
return nil, ErrNoSuchImage
}
if e.Status == http.StatusConflict {
return nil, ErrContainerAlreadyExists
}
// Workaround for 17.09 bug returning 400 instead of 409.
// See https://github.com/moby/moby/issues/35021
if e.Status == http.StatusBadRequest && strings.Contains(e.Message, "Conflict.") {
return nil, ErrContainerAlreadyExists
}
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
var container Container
if err := json.NewDecoder(resp.Body).Decode(&container); err != nil {
return nil, err
}
container.Name = opts.Name
return &container, nil
}
func (c *Client) startContainer(id string, hostConfig *HostConfig, opts doOptions) error {
path := "/containers/" + id + "/start"
if c.serverAPIVersion == nil {
c.checkAPIVersion()
}
if c.serverAPIVersion != nil && c.serverAPIVersion.LessThan(apiVersion124) {
opts.data = hostConfig
opts.forceJSON = true
}
resp, err := c.do("POST", path, opts)
if err != nil {
if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound {
return &NoSuchContainer{ID: id, Err: err}
}
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified {
return &ContainerAlreadyRunning{ID: id}
}
return nil
}
2、消息的傳遞
在chaincode.go的shim包啟動時(使用者鍊碼啟動時)的Start函數中調用了userChaincodeStreamGetter–>chaincodeSupportClient.Register,而chaindcode_support.go中的Register實作了
(protos/peer/chaincode_shim.pb.go)
// ChaincodeSupportServer is the server API for ChaincodeSupport service.
type ChaincodeSupportServer interface {
Register(ChaincodeSupport_RegisterServer) error
}
即Register調用HandleChaincodeStream,再調用ProcessStream,在其中的預設選項中調用handleMessage,又回到了Peer側。
而在前面的鍊碼源碼分析中已經分析過,一個使用者的鍊碼啟動是以在鍊碼的main函數中調用 shim.Start()為開始的,此函數數最終會調用chatWithPeer函數,其中的預設項為調用handleMessage,開始鍊碼鍘的消息循環。這樣二者再按照前面提到的通過GRPC互相發送消息,就可以展開一個使用者鍊碼和Peer側的通信了.
五、總結
通過分析兩類鍊碼容器和執行情況,基本上就明白了鍊碼源碼執行的環境,這正是對以前的“鍊碼源碼分析”的進一步完善。結合這兩篇文章基本上就明白了,鍊碼在Fabric上執行的看哪個流程和方式。掌握了鍊碼執行的原理和運作的過程,就可以針對實際情況對其做為相應的優化和修改,從為我所用到我想我用。
推薦一下阿裡朋友的PerfMa社群的課程,都是幹貨:
