天天看点

fabric源码分析之七链码源码分析

一、容器和虚拟机

在fabric中,有两类链码,一类是系统链码,一类是用户链码。而链码都需要安装和实例化才能使用,在这当中,它们虽然原理相似,但是实现的方式还是有所不同。在系统链码中,首先要Register,然后再Deploy才能使用;而用户链码则首先要Install,然后再instantiate就可以被外部接口使用了。

因此,对容器的启动也可分成这两部分来进行解析,从宏观上把握入口,然后分类进行源码的解析。

二、整体的入口

在前面的分析中可以知道在Launch函数中,是启动容器的入口。那么就从Launch这个函数开始看(core/chaincode/chaincode_support.go):

func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
	cname := chaincodeName + ":" + chaincodeVersion
	if h := cs.HandlerRegistry.Handler(cname); h != nil {
		return h, nil
	}

  //此处到得容器相关的信息,包括生产容器的具体类型是系统链码容器还是用户链码容器
  //在后面会说明,系统链码启动的容器是:inprocVM---inproContainer,用户链码启动的容器是DockerVM---DockerContainer
	ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe)
	if err != nil {
		// TODO: There has to be a better way to do this...
		if cs.UserRunsCC {
			chaincodeLogger.Error(
				"You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?",
			)
		}

		return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname)
	}

  //启动Runtime中的Launch
	if err := cs.Launcher.Launch(ccci); err != nil {
		return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname)
	}

	h := cs.HandlerRegistry.Handler(cname)
	if h == nil {
		return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
	}

	return h, nil
}
//runtime_launcher.go
func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
	var startFailCh chan error
	var timeoutCh <-chan time.Time

	startTime := time.Now()
	cname := ccci.Name + ":" + ccci.Version
	launchState, alreadyStarted := r.Registry.Launching(cname)
	if !alreadyStarted {
		startFailCh = make(chan error, 1)
		timeoutCh = time.NewTimer(r.StartupTimeout).C

		codePackage, err := r.getCodePackage(ccci)
		if err != nil {
			return err
		}

		go func() {
      //启动Process
			if err := r.Runtime.Start(ccci, codePackage); err != nil {
				startFailCh <- errors.WithMessage(err, "error starting container")
				return
			}
			exitCode, err := r.Runtime.Wait(ccci)
			if err != nil {
				launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
			}
			launchState.Notify(errors.Errorf("container exited with %d", exitCode))
		}()
	}
......
	return err
}
// Start launches chaincode in a runtime environment.
func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error {
	cname := ccci.Name + ":" + ccci.Version

	lc, err := c.LaunchConfig(cname, ccci.Type)
	if err != nil {
		return err
	}

	chaincodeLogger.Debugf("start container: %s", cname)
	chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " "))
	chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t"))

	scr := container.StartContainerReq{
		Builder: &container.PlatformBuilder{
			Type:             ccci.Type,
			Name:             ccci.Name,
			Version:          ccci.Version,
			Path:             ccci.Path,
			CodePackage:      codePackage,
			PlatformRegistry: c.PlatformRegistry,
		},
		Args:          lc.Args,
		Env:           lc.Envs,
		FilesToUpload: lc.Files,
		CCID: ccintf.CCID{
			Name:    ccci.Name,
			Version: ccci.Version,
		},
	}

  //启动Process--注意传入的容器类型
	if err := c.Processor.Process(ccci.ContainerType, scr); err != nil {
		return errors.WithMessage(err, "error starting container")
	}

	return nil
}
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
	v := vmc.newVM(vmtype)
	ccid := req.GetCCID()
	id := ccid.GetName()

	vmc.lockContainer(id)
	defer vmc.unlockContainer(id)
  //启动虚拟机
	return req.Do(v)
}
//到这里容器的实例化就进入到了接口的具体确定阶段,根据不同的类型来确定是SCC或ACC
func (vmc *VMController) newVM(typ string) VM {
	v, ok := vmc.vmProviders[typ]
	if !ok {
		vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
	}
	return v.NewVM()
}

           

三、系统容器虚拟机的启动

1、容器的启动

在上面的Process中最后一行代码中req.Do(v),启动了相关的虚拟机容器。看一下这个接口的定义:

type VMCReq interface {
	Do(v VM) error
	GetCCID() ccintf.CCID
}
//StartContainerReq - properties for starting a container.
type StartContainerReq struct {
	ccintf.CCID
	Builder       Builder
	Args          []string
	Env           []string
	FilesToUpload map[string][]byte
}
//StopContainerReq - properties for stopping a container.
type StopContainerReq struct {
	ccintf.CCID
	Timeout uint
	//by default we will kill the container after stopping
	Dontkill bool
	//by default we will remove the container after killing
	Dontremove bool
}
func (w WaitContainerReq) Do(v VM) error {
	exited := w.Exited
	go func() {
		exitCode, err := v.Wait(w.CCID)
		exited(exitCode, err)
	}()
	return nil
}
           

其它mock部分就不列出来了,供测试使用的,有兴趣可以看看源码。再看一下实例的具体生成成,沿着上面的NewVM来看:

// NewVM creates an inproc VM instance
func (r *Registry) NewVM() container.VM {
	return NewInprocVM(r)
}
// NewInprocVM creates a new InprocVM
func NewInprocVM(r *Registry) *InprocVM {
	return &InprocVM{
		registry: r,
	}
}
           

这里只分析启动部分,其它和这个基本差不多:

func (si StartContainerReq) Do(v VM) error {
	return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}
//Start starts a previously registered system codechain
func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
	path := ccid.GetName()

	ipctemplate := vm.registry.getType(path)
	if ipctemplate == nil {
		return fmt.Errorf(fmt.Sprintf("%s not registered", path))
	}

	instName := vm.GetVMName(ccid)

	ipc, err := vm.getInstance(ipctemplate, instName, args, env)

	if err != nil {
		return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName))
	}

	if ipc.running {
		return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
	}

	ipc.running = true

	go func() {
		defer func() {
			if r := recover(); r != nil {
				inprocLogger.Criticalf("caught panic from chaincode  %s", instName)
			}
		}()
		ipc.launchInProc(instName, args, env)
	}()

	return nil
}
           

这里面会判断是否生成了链码的进程,否则:

func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
	if ipc.ChaincodeSupport == nil {
		inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
	}

 //建立一个Send和一个Recv通道
	peerRcvCCSend := make(chan *pb.ChaincodeMessage)
	ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
	var err error
  //链码侧通道Handler
	ccchan := make(chan struct{}, 1)
  //Peer侧通道Handler
	ccsupportchan := make(chan struct{}, 1)
	shimStartInProc := _shimStartInProc // shadow to avoid race in test

  //链码侧相关
	go func() {
		defer close(ccchan)
		inprocLogger.Debugf("chaincode started for %s", id)
		if args == nil {
			args = ipc.args
		}
		if env == nil {
			env = ipc.env
		}
		err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
		if err != nil {
			err = fmt.Errorf("chaincode-support ended with err: %s", err)
			_inprocLoggerErrorf("%s", err)
		}
		inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
	}()

	// shadow function to avoid data race
	inprocLoggerErrorf := _inprocLoggerErrorf
  //Peer侧相关
	go func() {
		defer close(ccsupportchan)
		inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
		inprocLogger.Debugf("chaincode-support started for  %s", id)
    //消息处理
		err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
		if err != nil {
			err = fmt.Errorf("chaincode ended with err: %s", err)
			inprocLoggerErrorf("%s", err)
		}
		inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
	}()

	select {
	case <-ccchan:
		close(peerRcvCCSend)
		inprocLogger.Debugf("chaincode %s quit", id)
	case <-ccsupportchan:
		close(ccRcvPeerSend)
		inprocLogger.Debugf("chaincode support %s quit", id)
	case <-ipc.stopChan:
		close(ccRcvPeerSend)
		close(peerRcvCCSend)
		inprocLogger.Debugf("chaincode %s stopped", id)
	}
	return err
}
// StartInProc is an entry point for system chaincodes bootstrap. It is not an
// API for chaincodes.
func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error {
	chaincodeLogger.Debugf("in proc %v", args)

	var chaincodename string
	for _, v := range env {
		if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 {
			p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=")
			chaincodename = p[1]
			break
		}
	}
	if chaincodename == "" {
		return errors.New("error chaincode id not provided")
	}

	stream := newInProcStream(recv, send)
	chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename)
  //看看这是谁,Handler消息处理就在这个函数里,前面分析过,这里就不再赘述
	err := chatWithPeer(chaincodename, stream, cc)
	return err
}
           

系统链码直接启动了内存的虚拟机。只有用户链码才会启动Docker,在内部运行虚拟机。所以这二者才分在类的生成中从同一接口继承但分成了两个类。特别需要注意的是要看看上面对Peer侧和链码侧的消息处理的生成过程,这个非常重要。代码里有直接注释,感兴趣可以把代码跟到底看看到底是如何生成的。

2、消息的传递

看到了chatWithPeer,就想到了handleMessage,这个在前面有详细分析,如果有什么不明白可以看看“链码源码分析”。下面只列出代码:

// handleMessage message handles loop for shim side of chaincode/peer stream.
func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error {
	if msg.Type == pb.ChaincodeMessage_KEEPALIVE {
		chaincodeLogger.Debug("Sending KEEPALIVE response")
		handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work
		return nil
	}
	chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state)

	var err error

	switch handler.state {
	case ready:
		err = handler.handleReady(msg, errc)
	case established:
		err = handler.handleEstablished(msg, errc)
	case created:
		err = handler.handleCreated(msg, errc)
	default:
		err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
	}

	if err != nil {
		payload := []byte(err.Error())
		errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
		handler.serialSend(errorMsg)
		return err
	}

	return nil
}
           

四、用户容器虚拟机的启动

1、虚拟机的启动

// NewVM creates a new DockerVM instance
func (p *Provider) NewVM() container.VM {
	return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics)
}

// NewDockerVM returns a new DockerVM instance
func NewDockerVM(peerID, networkID string, buildMetrics *BuildMetrics) *DockerVM {
	return &DockerVM{
		PeerID:       peerID,
		NetworkID:    networkID,
		getClientFnc: getDockerClient,
		BuildMetrics: buildMetrics,
	}
}
// Start starts a container using a previously created docker image
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
	imageName, err := vm.GetVMNameForDocker(ccid)
	if err != nil {
		return err
	}

	attachStdout := viper.GetBool("vm.docker.attachStdout")
	containerName := vm.GetVMName(ccid)
	logger := dockerLogger.With("imageName", imageName, "containerName", containerName)

  //通过VM获得客户端
	client, err := vm.getClientFnc()
	if err != nil {
		logger.Debugf("failed to get docker client", "error", err)
		return err
	}

  //停止容器和虚拟机
	vm.stopInternal(client, containerName, 0, false, false)

 // 此处创建容器
	err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
	if err == docker.ErrNoSuchImage {
    //如果没有镜像,则使用builder来创建相关容器
		reader, err := builder.Build()
		if err != nil {
			return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName)
		}

    //部署镜像
		err = vm.deployImage(client, ccid, reader)
		if err != nil {
			return err
		}

    //创建镜像后,再创建容器
		err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
		if err != nil {
			logger.Errorf("failed to create container: %s", err)
			return err
		}
	} else if err != nil {
		logger.Errorf("create container failed: %s", err)
		return err
	}

	// stream stdout and stderr to chaincode logger
	if attachStdout {
		containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
		streamOutput(dockerLogger, client, containerName, containerLogger)
	}

	// upload specified files to the container before starting it
	// this can be used for configurations such as TLS key and certs
  //处理容器需要的证书相关的文件
	if len(filesToUpload) != 0 {
		// the docker upload API takes a tar file, so we need to first
		// consolidate the file entries to a tar
		payload := bytes.NewBuffer(nil)
		gw := gzip.NewWriter(payload)
		tw := tar.NewWriter(gw)

		for path, fileToUpload := range filesToUpload {
			cutil.WriteBytesToPackage(path, fileToUpload, tw)
		}

		// Write the tar file out
		if err := tw.Close(); err != nil {
			return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err)
		}

		gw.Close()

    //上传数据
		err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{
			InputStream:          bytes.NewReader(payload.Bytes()),
			Path:                 "/",
			NoOverwriteDirNonDir: false,
		})
		if err != nil {
			return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err)
		}
	}

	// start container with HostConfig was deprecated since v1.10 and removed in v1.2
	err = client.StartContainer(containerName, nil)
	if err != nil {
		dockerLogger.Errorf("start-could not start container: %s", err)
		return err
	}

	dockerLogger.Debugf("Started container %s", containerName)
	return nil
}
           

看一下创建容器的代码:

func (vm *DockerVM) createContainer(client dockerClient, imageID, containerID string, args, env []string, attachStdout bool) error {
	logger := dockerLogger.With("imageID", imageID, "containerID", containerID)
	logger.Debugw("create container")
	_, err := client.CreateContainer(docker.CreateContainerOptions{
		Name: containerID,
		Config: &docker.Config{
			Cmd:          args,
			Image:        imageID,
			Env:          env,
			AttachStdout: attachStdout,
			AttachStderr: attachStdout,
		},
		HostConfig: getDockerHostConfig(),
	})
	if err != nil {
		return err
	}
	logger.Debugw("created container")
	return nil
}
// See https://goo.gl/tyzwVM for more details.
func (c *Client) CreateContainer(opts CreateContainerOptions) (*Container, error) {
	path := "/containers/create?" + queryString(opts)
	resp, err := c.do(
		"POST",
		path,
		doOptions{
			data: struct {
				*Config
				HostConfig       *HostConfig       `json:"HostConfig,omitempty" yaml:"HostConfig,omitempty" toml:"HostConfig,omitempty"`
				NetworkingConfig *NetworkingConfig `json:"NetworkingConfig,omitempty" yaml:"NetworkingConfig,omitempty" toml:"NetworkingConfig,omitempty"`
			}{
				opts.Config,
				opts.HostConfig,
				opts.NetworkingConfig,
			},
			context: opts.Context,
		},
	)

	if e, ok := err.(*Error); ok {
		if e.Status == http.StatusNotFound {
			return nil, ErrNoSuchImage
		}
		if e.Status == http.StatusConflict {
			return nil, ErrContainerAlreadyExists
		}
		// Workaround for 17.09 bug returning 400 instead of 409.
		// See https://github.com/moby/moby/issues/35021
		if e.Status == http.StatusBadRequest && strings.Contains(e.Message, "Conflict.") {
			return nil, ErrContainerAlreadyExists
		}
	}

	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	var container Container
	if err := json.NewDecoder(resp.Body).Decode(&container); err != nil {
		return nil, err
	}

	container.Name = opts.Name

	return &container, nil
}
func (c *Client) startContainer(id string, hostConfig *HostConfig, opts doOptions) error {
	path := "/containers/" + id + "/start"
	if c.serverAPIVersion == nil {
		c.checkAPIVersion()
	}
	if c.serverAPIVersion != nil && c.serverAPIVersion.LessThan(apiVersion124) {
		opts.data = hostConfig
		opts.forceJSON = true
	}
	resp, err := c.do("POST", path, opts)
	if err != nil {
		if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound {
			return &NoSuchContainer{ID: id, Err: err}
		}
		return err
	}
	defer resp.Body.Close()
	if resp.StatusCode == http.StatusNotModified {
		return &ContainerAlreadyRunning{ID: id}
	}
	return nil
}
           

2、消息的传递

在chaincode.go的shim包启动时(用户链码启动时)的Start函数中调用了userChaincodeStreamGetter–>chaincodeSupportClient.Register,而chaindcode_support.go中的Register实现了

(protos/peer/chaincode_shim.pb.go)

// ChaincodeSupportServer is the server API for ChaincodeSupport service.
type ChaincodeSupportServer interface {
	Register(ChaincodeSupport_RegisterServer) error
}
           

即Register调用HandleChaincodeStream,再调用ProcessStream,在其中的默认选项中调用handleMessage,又回到了Peer侧。

而在前面的链码源码分析中已经分析过,一个用户的链码启动是以在链码的main函数中调用 shim.Start()为开始的,此函数数最终会调用chatWithPeer函数,其中的默认项为调用handleMessage,开始链码铡的消息循环。这样二者再按照前面提到的通过GRPC互相发送消息,就可以展开一个用户链码和Peer侧的通信了.

五、总结

通过分析两类链码容器和执行情况,基本上就明白了链码源码执行的环境,这正是对以前的“链码源码分析”的进一步完善。结合这两篇文章基本上就明白了,链码在Fabric上执行的看哪个流程和方式。掌握了链码执行的原理和运行的过程,就可以针对实际情况对其做为相应的优化和修改,从为我所用到我想我用。

推荐一下阿里朋友的PerfMa社区的课程,都是干货:

fabric源码分析之七链码源码分析

继续阅读