圖檔不能顯示時,請檢視原文:https://lessisbetter.site/2019/10/29/fabric-transaction-endorser-source/
文章目錄
-
- 流程
-
- 交易宏觀流程
- 鍊碼調用流程
- 提案背書流程
- 源碼分析
-
- Proposal定義
-
- Proposal
- Header
- gRPC定義
- SDK發送Proposal
- Peer接收Proposal
- Peer處理Proposal主流程
- preProcess 檢查和擷取資訊
- 背書節點模拟執行交易
-
- 擷取模拟器
- 模拟執行
-
- endorser部分
- chaincode部分
- 擷取鍊碼執行環境
- 模拟執行交易
- 處理鍊碼容器模拟響應
- 釋放模拟器資源
- ESCC處理模拟執行結果
-
-
- Endorser.endorseProposal
-
- 發送Response
- 總結
流程
在 快速入門Fabric核心概念和架構 這篇文章中,介紹了 Fabric 的很多概念,其中也包含了交易、提案(Proposal)和鍊碼。同時也介紹了,交易的執行流程,鍊碼的調用流程等。
本文聚焦介紹交易流程的一個環節:交易背書,以下的3幅圖,在快速入門Fabric核心概念和架構 中都有介紹,有必要的話,去讀一下上下文資訊。
交易宏觀流程

交易的詳細流程請閱讀 交易流程,了解交易流程的幾大環節。
鍊碼調用流程
上圖,展示了用戶端、Peer,以及鍊碼容器 3大主體在交易流程中的背書過程,請關注一下Peer中的 Handler,它負責和鍊碼容器互動。
提案背書流程
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-JCwq1ibP-1584264381251)(http://img.lessisbetter.site/2019-07-chaincode_swimlane.png)]
上圖,從接近源碼的層面,展示了交易背書過程。其中Fabric、Shim 是 Peer 中的子產品,ChainCode 代表鍊碼容器,Endorser Chaincode 代表 Peer 對交易提案和模拟執行結果進行背書。
如果了解過Chaincode,你會知道 Shim 是鍊碼容器和 Peer 互動所依賴的子產品。
最後推薦一份保華大佬整理的 Peer 提案背書過程,是讀源碼前,必讀的資料。雖然精簡,但把重要的核心流程都串聯起來了。
源碼分析
Proposal定義
用戶端發送被背書節點的是
SignedProposal
,它包含了簽名和Proposal,這是它在
proposal.proto
中的組成簡介,:
SignedProposal
|\_ Signature (signature on the Proposal message by the creator specified in the header)
\_ Proposal
|\_ Header (the header for this proposal)
\_ Payload (the payload for this proposal)
proposal.proto
這個檔案還簡要介紹了Client和背書節點之間通信的消息類型和過程。
Proposal
type SignedProposal struct {
// The bytes of Proposal
ProposalBytes []byte `protobuf:"bytes,1,opt,name=proposal_bytes,json=proposalBytes,proto3" json:"proposal_bytes,omitempty"`
// Signaure over proposalBytes; this signature is to be verified against
// the creator identity contained in the header of the Proposal message
// marshaled as proposalBytes
Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
}
type Proposal struct {
// The header of the proposal. It is the bytes of the Header
Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
// The payload of the proposal as defined by the type in the proposal
// header.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Optional extensions to the proposal. Its content depends on the Header's
// type field. For the type CHAINCODE, it might be the bytes of a
// ChaincodeAction message.
Extension []byte `protobuf:"bytes,3,opt,name=extension,proto3" json:"extension,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
Header
type Header struct {
ChannelHeader []byte `protobuf:"bytes,1,opt,name=channel_header,json=channelHeader,proto3" json:"channel_header,omitempty"`
SignatureHeader []byte `protobuf:"bytes,2,opt,name=signature_header,json=signatureHeader,proto3" json:"signature_header,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
// Header is a generic replay prevention and identity message to include in a signed payload
type ChannelHeader struct {
Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
// Version indicates message protocol version
Version int32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
// Timestamp is the local time when the message was created
// by the sender
Timestamp *timestamp.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Identifier of the channel this message is bound for
ChannelId string `protobuf:"bytes,4,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"`
// An unique identifier that is used end-to-end.
// - set by higher layers such as end user or SDK
// - passed to the endorser (which will check for uniqueness)
// - as the header is passed along unchanged, it will be
// be retrieved by the committer (uniqueness check here as well)
// - to be stored in the ledger
TxId string `protobuf:"bytes,5,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"`
// The epoch in which this header was generated, where epoch is defined based on block height
// Epoch in which the response has been generated. This field identifies a
// logical window of time. A proposal response is accepted by a peer only if
// two conditions hold:
// 1. the epoch specified in the message is the current epoch
// 2. this message has been only seen once during this epoch (i.e. it hasn't
// been replayed)
Epoch uint64 `protobuf:"varint,6,opt,name=epoch,proto3" json:"epoch,omitempty"`
// Extension that may be attached based on the header type
Extension []byte `protobuf:"bytes,7,opt,name=extension,proto3" json:"extension,omitempty"`
// If mutual TLS is employed, this represents
// the hash of the client's TLS certificate
TlsCertHash []byte `protobuf:"bytes,8,opt,name=tls_cert_hash,json=tlsCertHash,proto3" json:"tls_cert_hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type SignatureHeader struct {
// Creator of the message, a marshaled msp.SerializedIdentity
Creator []byte `protobuf:"bytes,1,opt,name=creator,proto3" json:"creator,omitempty"`
// Arbitrary number that may only be used once. Can be used to detect replay attacks.
Nonce []byte `protobuf:"bytes,2,opt,name=nonce,proto3" json:"nonce,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
gRPC定義
// EndorserClient is the client API for Endorser service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type EndorserClient interface {
ProcessProposal(ctx context.Context, in *SignedProposal, opts ...grpc.CallOption) (*ProposalResponse, error)
}
// EndorserServer is the server API for Endorser service.
type EndorserServer interface {
ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)
}
SDK發送Proposal
Peer接收Proposal
Peer處理Proposal主流程
主要是把背書節點的背書工作聚合一下:
- Proposal預處理
- 擷取交易執行模拟器,模拟執行Proposal
- 如果模拟執行成功,調用ESCC對Proposal和結果進行背書,如果模拟執行失敗直接傳回背書失敗的響應
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
...
// 0 -- check and validate
// 這裡有相當多的工作量
vr, err := e.preProcess(signedProp)
if err != nil {
resp := vr.resp
return resp, err
}
prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid
// 擷取指定賬本模拟器
// obtaining once the tx simulator for this proposal. This will be nil
// for chainless proposals
// Also obtain a history query executor for history queries, since tx simulator does not cover history
var txsim ledger.TxSimulator
var historyQueryExecutor ledger.HistoryQueryExecutor
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
...
defer txsim.Done()
if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}
txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim,
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC
// 模拟執行交易,失敗則傳回背書失敗的響應
// 1 -- simulate
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
if res != nil {
if res.Status >= shim.ERROR {
endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid)
var cceventBytes []byte
if ccevent != nil {
cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}
pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
return pResp, nil
}
}
// 對模拟執行的結果進行簽名背書
// 2 -- endorse and get a marshalled ProposalResponse message
var pResp *pb.ProposalResponse
// TODO till we implement global ESCC, CSCC for system chaincodes
// chainless proposals (such as CSCC) don't have to be endorsed
if chainID == "" {
pResp = &pb.ProposalResponse{Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
...
}
// Set the proposal response payload - it
// contains the "return value" from the
// chaincode invocation
pResp.Response = res
// total failed proposals = ProposalsReceived-SuccessfulProposals
e.Metrics.SuccessfulProposals.Add(1)
success = true
return pResp, nil
}
preProcess 檢查和擷取資訊
// preProcess checks the tx proposal headers, uniqueness and ACL
// 檢查proposal、ACL
func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) {
// 包含proposal、header、chainID、txid等資訊
vr := &validateResult{}
// at first, we check whether the message is valid
// 檢查proposal,并擷取各種需要的資訊
prop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp)
if err != nil {
e.Metrics.ProposalValidationFailed.Add(1)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
// 擷取Header中的2個Header
chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader)
if err != nil {
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader)
if err != nil {
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
// 檢查是否調用了不可外部(使用者)的系統鍊碼
// 先找到鍊碼執行個體,然後調用鍊碼的方法判斷本身是否可調用
// block invocations to security-sensitive system chaincodes
if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) {
endorserLogger.Errorf("Error: an attempt was made by %#v to invoke system chaincode %s", shdr.Creator, hdrExt.ChaincodeId.Name)
err = errors.Errorf("chaincode %s cannot be invoked through a proposal", hdrExt.ChaincodeId.Name)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
chainID := chdr.ChannelId
txid := chdr.TxId
endorserLogger.Debugf("[%s][%s] processing txid: %s", chainID, shorttxid(txid), txid)
if chainID != "" {
// labels that provide context for failure metrics
meterLabels := []string{
"channel", chainID,
"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,
}
// 檢查交易是否已上鍊
// Here we handle uniqueness check and ACLs for proposals targeting a chain
// Notice that ValidateProposalMessage has already verified that TxID is computed properly
if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
// increment failure due to duplicate transactions. Useful for catching replay attacks in
// addition to benign retries
e.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1)
err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
// 使用者鍊碼檢查ACL
// check ACL only for application chaincodes; ACLs
// for system chaincodes are checked elsewhere
if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) {
// check that the proposal complies with the Channel's writers
if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil {
e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
}
} else {
// chainless proposals do not/cannot affect ledger and cannot be submitted as transactions
// ignore uniqueness checks; also, chainless proposals are not validated using the policies
// of the chain since by definition there is no chain; they are validated against the local
// MSP of the peer instead by the call to ValidateProposalMessage above
}
// 儲存提取的各資訊
vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txid
return vr, nil
}
// ValidateProposalMessage checks the validity of a SignedProposal message
// this function returns Header and ChaincodeHeaderExtension messages since they
// have been unmarshalled and validated
func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) {
if signedProp == nil {
return nil, nil, nil, errors.New("nil arguments")
}
putilsLogger.Debugf("ValidateProposalMessage starts for signed proposal %p", signedProp)
// extract the Proposal message from signedProp
prop, err := utils.GetProposal(signedProp.ProposalBytes)
if err != nil {
return nil, nil, nil, err
}
// 1) look at the ProposalHeader
hdr, err := utils.GetHeader(prop.Header)
if err != nil {
return nil, nil, nil, err
}
// validate the header
chdr, shdr, err := validateCommonHeader(hdr)
if err != nil {
return nil, nil, nil, err
}
// 從SignatureHeader交易用戶端的簽名
// validate the signature
err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
if err != nil {
// log the exact message on the peer but return a generic error message to
// avoid malicious users scanning for channels
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(shdr.Creator, sId)
if err != nil {
// log the error here as well but still only return the generic error
err = errors.Wrap(err, "could not deserialize a SerializedIdentity")
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
}
return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid)
}
// 檢查txid的計算是否符合規則
// Verify that the transaction ID has been computed properly.
// This check is needed to ensure that the lookup into the ledger
// for the same TxID catches duplicates.
err = utils.CheckTxID(
chdr.TxId,
shdr.Nonce,
shdr.Creator)
if err != nil {
return nil, nil, nil, err
}
// 依據不同的proposal類型對proposal分别進行檢查
// continue the validation in a way that depends on the type specified in the header
switch common.HeaderType(chdr.Type) {
case common.HeaderType_CONFIG:
//which the types are different the validation is the same
//viz, validate a proposal to a chaincode. If we need other
//special validation for confguration, we would have to implement
//special validation
fallthrough
case common.HeaderType_ENDORSER_TRANSACTION:
// 主要是提取ChaincodeHeaderExtension
// validation of the proposal message knowing it's of type CHAINCODE
chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr)
if err != nil {
return nil, nil, nil, err
}
return prop, hdr, chaincodeHdrExt, err
default:
//NOTE : we proably need a case
return nil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type))
}
}
背書節點模拟執行交易
擷取模拟器
// Support contains functions that the endorser requires to execute its tasks
type Support interface {
crypto.SignerSupport
// IsSysCCAndNotInvokableExternal returns true if the supplied chaincode is
// ia system chaincode and it NOT invokable
IsSysCCAndNotInvokableExternal(name string) bool
// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error)
}
// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
func (s *SupportImpl) GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error) {
// 使用賬本和txid建立模拟器,每個交易有單獨的模拟器
lgr := s.Peer.GetLedger(ledgername)
if lgr == nil {
return nil, errors.Errorf("Channel does not exist: %s", ledgername)
}
return lgr.NewTxSimulator(txid)
}
// NewTxSimulator returns new `ledger.TxSimulator`
func (l *kvLedger) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
return l.txtmgmt.NewTxSimulator(txid)
}
// NewTxSimulator implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
logger.Debugf("constructing new tx simulator")
s, err := newLockBasedTxSimulator(txmgr, txid)
if err != nil {
return nil, err
}
txmgr.commitRWLock.RLock()
return s, nil
}
// 就2項重要的:查詢執行器、讀寫集建構器
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder
writePerformed bool
pvtdataQueriesPerformed bool
simulationResultsComputed bool
paginatedQueriesPerformed bool
}
func newLockBasedTxSimulator(txmgr *LockBasedTxMgr, txid string) (*lockBasedTxSimulator, error) {
// 建立讀寫集建構器,能幫助建構讀寫集
rwsetBuilder := rwsetutil.NewRWSetBuilder()
helper := newQueryHelper(txmgr, rwsetBuilder)
logger.Debugf("constructing new tx simulator txid = [%s]", txid)
return &lockBasedTxSimulator{lockBasedQueryExecutor{helper, txid}, rwsetBuilder, false, false, false, false}, nil
}
// LockBasedQueryExecutor is a query executor used in `LockBasedTxMgr`
// "隻讀",不包含寫相關的操作
type lockBasedQueryExecutor struct {
helper *queryHelper
txid string
}
模拟執行
endorser部分
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
defer txsim.Done()
// 曆史查詢器
if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}
txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim, // 模拟器在此
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC
// TODO: if the proposal has an extension, it will be of type ChaincodeAction;
// if it's present it means that no simulation is to be performed because
// we're trying to emulate a submitting peer. On the other hand, we need
// to validate the supplied action before endorsing it
// 模拟執行交易,失敗則傳回背書失敗的響應
// 1 -- simulate
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
調用chaincode子產品模拟執行交易,擷取交易執行的公開和私密資料讀寫集,以及交易執行産生的事件,并把結果傳回給上層進行背書。
其中還包含了私密資料的處理,會把它取出來,然後通過Gossip傳播給在私密資料中的Peer節點。
// SimulateProposal simulates the proposal by calling the chaincode
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid)
defer endorserLogger.Debugf("[%s][%s] Exit", txParams.ChannelID, shorttxid(txParams.TxID))
// we do expect the payload to be a ChaincodeInvocationSpec
// if we are supporting other payloads in future, this be glaringly point
// as something that should change
// 根據Proposal生成Invoke需要的資訊
cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)
if err != nil {
return nil, nil, nil, nil, err
}
// 鍊碼的中繼資料
var cdLedger ccprovider.ChaincodeDefinition
var version string
// 設定version
if !e.s.IsSysCC(cid.Name) {
// 根據要調用的鍊碼名稱,從lscc擷取鍊碼的中繼資料
cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name))
}
version = cdLedger.CCVersion()
// 實際被打樁了,無實作
err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger)
if err != nil {
return nil, nil, nil, nil, err
}
} else {
// scc版本是固定的"latest"
version = util.GetSysCCVersion()
}
// ---3. execute the proposal and get simulation results
var simResult *ledger.TxSimulationResults
var pubSimResBytes []byte
var res *pb.Response
var ccevent *pb.ChaincodeEvent
// 模拟執行,執行結果儲存在模拟器
res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)
if err != nil {
endorserLogger.Errorf("[%s][%s] failed to invoke chaincode %s, error: %+v", txParams.ChannelID, shorttxid(txParams.TxID), cid, err)
return nil, nil, nil, nil, err
}
if txParams.TXSimulator != nil {
// 通過模拟器擷取模拟執行結果,包含公開和私密資料2份讀寫集
if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {
txParams.TXSimulator.Done()
return nil, nil, nil, nil, err
}
// 存在私密資料
if simResult.PvtSimulationResults != nil {
if cid.Name == "lscc" {
// TODO: remove once we can store collection configuration outside of LSCC
txParams.TXSimulator.Done()
return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
}
// 擷取要通過Gossip傳播的私密資料
pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator)
// To read collection config need to read collection updates before
// releasing the lock, hence txParams.TXSimulator.Done() moved down here
txParams.TXSimulator.Done()
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")
}
endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", txParams.ChannelID))
}
// Add ledger height at which transaction was endorsed,
// `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.
// However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and
// manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.
// Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.
pvtDataWithConfig.EndorsedAt = endorsedAt
// 把私密資料同通道id、交易id和區塊高度發出去,代表私密資料所屬的區塊和交易
if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
return nil, nil, nil, nil, err
}
}
// 交易模拟完成,釋放模拟器占用的資源
txParams.TXSimulator.Done()
// 擷取模拟執行的公開結果
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
return nil, nil, nil, nil, err
}
}
// 傳回鍊碼中繼資料、模拟執行結果、交易執行産生的事件
return cdLedger, res, pubSimResBytes, ccevent, nil
}
callChaincode
調用chaincode子產品執行鍊碼。在前面的流程中,還沒有區分系統鍊碼SCC和使用者鍊碼UCC,SCC和UCC都會通過
Execute
函數被傳遞給chaincode子產品而執行。
如果是調用
lscc
部署或更新UCC,會調用
ExecuteLegacyInit
執行鍊碼容器的初始化。
最後傳回鍊碼模拟執行結果和事件。
// call specified chaincode (system or user)
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) {
...
// scc也在這執行
// is this a system chaincode
res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input)
if err != nil {
return nil, nil, err
}
...
// 如果是調用lscc部署或更新鍊碼,會走這段流程
if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") {
userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry)
...
// 進行鍊碼容器初始化,最後會調用鍊碼的Init的函數
_, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds)
...
}
// ----- END -------
return res, ccevent, err
}
Support
接口實際集合了衆多背書節點需要的外部子產品功能,比如鍊碼、系統鍊碼、ACL等。
// Support contains functions that the endorser requires to execute its tasks
type Support interface
// SupportImpl provides an implementation of the endorser.Support interface
// issuing calls to various static methods of the peer
type SupportImpl struct {
*PluginEndorser
crypto.SignerSupport
Peer peer.Operations
PeerSupport peer.Support
ChaincodeSupport *chaincode.ChaincodeSupport
SysCCProvider *scc.Provider
ACLProvider aclmgmt.ACLProvider
}
Execute
就是調用
ChaincodeSupport.Execute
實作的。
// Execute a proposal and return the chaincode response
func (s *SupportImpl) Execute(txParams *ccprovider.TransactionParams, cid, name, version, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
cccid := &ccprovider.CCContext{
Name: name,
Version: version,
}
// decorate the chaincode input
decorators := library.InitRegistry(library.Config{}).Lookup(library.Decoration).([]decoration.Decorator)
input.Decorations = make(map[string][]byte)
input = decoration.Apply(prop, input, decorators...)
txParams.ProposalDecorations = input.Decorations
return s.ChaincodeSupport.Execute(txParams, cccid, input)
}
chaincode部分
通過上面的接口,跨入chaincode子產品的大門。
func (cs *ChaincodeSupport) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
// Invoke得到ChaincodeMessage
resp, err := cs.Invoke(txParams, cccid, input)
// 根據ChaincodeMessage得到Response和事件
return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}
func (cs *ChaincodeSupport) Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
h, err := cs.Launch(txParams.ChannelID, cccid.Name, cccid.Version, txParams.TXSimulator)
if err != nil {
return nil, err
}
// 執行調用鍊碼的交易(和鍊碼之間的消息為ChaincodeMessage_TRANSACTION)
cctype := pb.ChaincodeMessage_TRANSACTION
return cs.execute(cctype, txParams, cccid, input, h)
}
擷取鍊碼執行環境
Launch
可以擷取鍊碼執行環境,即使用者鍊碼容器,如果已執行個體化的鍊碼,在目前背書節點上,鍊碼容器未啟動,則啟動鍊碼容器,
Launch
會傳回一個跟鍊碼容器互動Handler。
某個 Peer 上可以部署多個鍊碼容器,Peer 為了和這些鍊碼容器互動/通信,給每個鍊碼容器都建立了一個 Handler,Handler 攜帶了 Peer 和鍊碼容器互動的資源。
// Launch starts executing chaincode if it is not already running. This method
// blocks until the peer side handler gets into ready state or encounters a fatal
// error. If the chaincode is already running, it simply returns.
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
cname := chaincodeName + ":" + chaincodeVersion
if h := cs.HandlerRegistry.Handler(cname); h != nil {
return h, nil
}
// 啟動鍊碼容器 ...
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
}
return h, nil
}
鍊碼容器的啟動過程,不是本文的重點,是以不繼續深入Launch的調用細節。
模拟執行交易
execute
封裝出執行交易的消息,然後使用 Handler 執行交易。
// execute executes a transaction and waits for it to complete until a timeout value.
func (cs *ChaincodeSupport) execute(cctyp pb.ChaincodeMessage_Type, txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput, h *Handler) (*pb.ChaincodeMessage, error) {
input.Decorations = txParams.ProposalDecorations
// 建立消息
ccMsg, err := createCCMessage(cctyp, txParams.ChannelID, txParams.TxID, input)
if err != nil {
return nil, errors.WithMessage(err, "failed to create chaincode message")
}
// 執行交易
ccresp, err := h.Execute(txParams, cccid, ccMsg, cs.ExecuteTimeout)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error sending"))
}
return ccresp, nil
}
注意以下這個參數
txParams *ccprovider.TransactionParams
其類型定義如下,它包含了一條交易執行過程中的資訊和資源,是以交易傳遞的過程中,一直有這個參數。
// TransactionParams are parameters which are tied to a particular transaction
// and which are required for invoking chaincode.
type TransactionParams struct {
TxID string
ChannelID string
SignedProp *pb.SignedProposal
Proposal *pb.Proposal
TXSimulator ledger.TxSimulator
HistoryQueryExecutor ledger.HistoryQueryExecutor
CollectionStore privdata.CollectionStore
IsInitTransaction bool
// this is additional data passed to the chaincode
ProposalDecorations map[string][]byte
}
Handler 執行交易的過程如下,建立交易執行的上下文 Context,因為鍊碼容器在執行交易的時候,會和 Peer 之間進行多次通信,進行資料的讀寫,上下文可以讓資料讀寫擷取到正确的資訊。
之後 Handler 把消息發送給鍊碼容器,并等待鍊碼容器發來包含執行結果的消息,或者執行逾時,預設執行時間是 30s。
func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
chaincodeLogger.Debugf("Entry")
defer chaincodeLogger.Debugf("Exit")
// 私密資料
txParams.CollectionStore = h.getCollectionStore(msg.ChannelId)
// 是否是執行鍊碼初始化
txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT)
// 建立交易context
txctx, err := h.TXContexts.Create(txParams)
if err != nil {
return nil, err
}
// 退出時(執行交易完畢),釋放交易上下文資源
defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)
// proposal儲存到msg
if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
return nil, err
}
// 向鍊碼容器發送msg
h.serialSendAsync(msg)
// 等待鍊碼容器響應,或者逾時
var ccresp *pb.ChaincodeMessage
select {
case ccresp = <-txctx.ResponseNotifier:
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
ccName := cccid.Name + ":" + cccid.Version
h.Metrics.ExecuteTimeouts.With(
"chaincode", ccName,
).Add(1)
}
return ccresp, err
}
處理鍊碼容器模拟響應
鍊碼容器執行的響應會向上傳遞,直到
ChaincodeSupport.Execute
,它調用
processChaincodeExecutionResult
把鍊碼容器傳回的響應,轉化為交易模拟執行的 Response,而 Response 最終會傳回給Endorser,大家可去調用流程上翻。
func processChaincodeExecutionResult(txid, ccName string, resp *pb.ChaincodeMessage, err error) (*pb.Response, *pb.ChaincodeEvent, error) {
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to execute transaction %s", txid)
}
if resp == nil {
return nil, nil, errors.Errorf("nil response from transaction %s", txid)
}
if resp.ChaincodeEvent != nil {
resp.ChaincodeEvent.ChaincodeId = ccName
resp.ChaincodeEvent.TxId = txid
}
switch resp.Type {
// 交易執行成功則提取Payload中儲存的Response
case pb.ChaincodeMessage_COMPLETED:
res := &pb.Response{}
err := proto.Unmarshal(resp.Payload, res)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to unmarshal response for transaction %s", txid)
}
return res, resp.ChaincodeEvent, nil
// 失敗,則提取Payload中儲存的錯誤資訊
case pb.ChaincodeMessage_ERROR:
return nil, resp.ChaincodeEvent, errors.Errorf("transaction returned with failure: %s", resp.Payload)
default:
return nil, nil, errors.Errorf("unexpected response type %d for transaction %s", resp.Type, txid)
}
}
釋放模拟器資源
回想一下,在
Endorser.SimulateProposal
中,它擷取了 交易模拟執行器
TXSimulator
,這裡面可是有很多資源的,如果不及時釋放,在高 TPS 下,Peer壓力上大,資源洩露,性能低下等問題會爆發出來。
txParams.TXSimulator.Done()
用來釋放資源,上文提到
TxSimulator
包含了
QueryExecutor
,
lockBasedQueryExecutor
實作了
QueryExecutor
,也就是說,主要是釋放查詢操作相關的資源。
從源碼可以看到會釋放讀寫鎖以及疊代器資源,如果不及時釋放,後果果然不堪。
// Done implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) Done() {
logger.Debugf("Done with transaction simulation / query execution [%s]", q.txid)
q.helper.done()
}
func (h *queryHelper) done() {
if h.doneInvoked {
return
}
defer func() {
// 釋放鎖
h.txmgr.commitRWLock.RUnlock()
h.doneInvoked = true
// 釋放疊代器
for _, itr := range h.itrs {
itr.Close()
}
}()
}
ESCC處理模拟執行結果
上文提到,模拟執行的 Response 會最終回到 Endorser,Endorser 會調用 ESCC 對結果進行背書,最終生成
ProposalResponse
,我們看一下這個過程。
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
// Pre-process, simulate
if chainID == "" {
pResp = &pb.ProposalResponse{Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
// ...
}
// ...
}
Endorser.endorseProposal
背書鍊碼實作了可插拔,可以使用不同的ESCC,系統鍊碼和使用者鍊碼的背書過程是不同的。
// endorse the proposal by calling the ESCC
func (e *Endorser) endorseProposal(_ context.Context, chainID string, txid string, signedProp *pb.SignedProposal, proposal *pb.Proposal, response *pb.Response, simRes []byte, event *pb.ChaincodeEvent, visibility []byte, ccid *pb.ChaincodeID, txsim ledger.TxSimulator, cd ccprovider.ChaincodeDefinition) (*pb.ProposalResponse, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", chainID, shorttxid(txid), ccid)
defer endorserLogger.Debugf("[%s][%s] Exit", chainID, shorttxid(txid))
// 系統鍊碼和使用者鍊碼使用不同的ESCC
isSysCC := cd == nil
// 1) extract the name of the escc that is requested to endorse this chaincode
var escc string
// ie, "lscc" or system chaincodes
if isSysCC {
escc = "escc"
} else {
escc = cd.Endorsement()
}
endorserLogger.Debugf("[%s][%s] escc for chaincode %s is %s", chainID, shorttxid(txid), ccid, escc)
// marshalling event bytes
var err error
var eventBytes []byte
if event != nil {
eventBytes, err = putils.GetBytesChaincodeEvent(event)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}
// set version of executing chaincode
if isSysCC {
// if we want to allow mixed fabric levels we should
// set syscc version to ""
ccid.Version = util.GetSysCCVersion()
} else {
ccid.Version = cd.CCVersion()
}
// 建立背書上下文資訊
ctx := Context{
PluginName: escc, // 插件名稱
Channel: chainID,
SignedProposal: signedProp,
ChaincodeID: ccid,
Event: eventBytes,
SimRes: simRes,
Response: response,
Visibility: visibility,
Proposal: proposal,
TxID: txid,
}
// 調用插件背書
return e.s.EndorseWithPlugin(ctx)
}
背書插件實作下面的接口即可。
// Plugin endorses a proposal response
type Plugin interface {
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it.
// Returns:
// The Endorsement: A signature over the payload, and an identity that is used to verify the signature
// The payload that was given as input (could be modified within this function)
// Or error on failure
Endorse(payload []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error)
// Init injects dependencies into the instance of the Plugin
Init(dependencies ...Dependency) error
}
使用插件背書,需擷取插件執行個體,然後組裝響應Payload,它包含了交易執行的多種結果,然後對Payload以及簽名的Proposal背書。
// EndorseWithPlugin endorses the response with a plugin
func (pe *PluginEndorser) EndorseWithPlugin(ctx Context) (*pb.ProposalResponse, error) {
endorserLogger.Debug("Entering endorsement for", ctx)
if ctx.Response == nil {
return nil, errors.New("response is nil")
}
if ctx.Response.Status >= shim.ERRORTHRESHOLD {
return &pb.ProposalResponse{Response: ctx.Response}, nil
}
// 擷取插件
plugin, err := pe.getOrCreatePlugin(PluginName(ctx.PluginName), ctx.Channel)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.Errorf("plugin with name %s could not be used: %v", ctx.PluginName, err)
}
// 把模拟執行的資訊組成生成背書響應Payload
prpBytes, err := proposalResponsePayloadFromContext(ctx)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.Wrap(err, "failed assembling proposal response payload")
}
// 對Payload和簽名的Proposal進行背書
endorsement, prpBytes, err := plugin.Endorse(prpBytes, ctx.SignedProposal)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.WithStack(err)
}
resp := &pb.ProposalResponse{
Version: 1,
Endorsement: endorsement,
Payload: prpBytes,
Response: ctx.Response,
}
endorserLogger.Debug("Exiting", ctx)
return resp, nil
}
系統提供的預設背書插件如下,本質是對交易執行結果和Proposal簽名人資訊進行簽名。
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it.
// Returns:
// The Endorsement: A signature over the payload, and an identity that is used to verify the signature
// The payload that was given as input (could be modified within this function)
// Or error on failure
func (e *DefaultEndorsement) Endorse(prpBytes []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) {
// 提取Proposal的簽名人
signer, err := e.SigningIdentityForRequest(sp)
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("failed fetching signing identity: %v", err))
}
// 得到簽名人身份
// serialize the signing identity
identityBytes, err := signer.Serialize()
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("could not serialize the signing identity: %v", err))
}
// 對Payload和身份進行簽名
// sign the concatenation of the proposal response and the serialized endorser identity with this endorser's key
signature, err := signer.Sign(append(prpBytes, identityBytes...))
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("could not sign the proposal response payload: %v", err))
}
endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes}
return endorsement, prpBytes, nil
}
發送Response
ProcessProposal
會把 ProposalResponse 作為傳回值,剩下的就交給 gRPC,發送給請求方了。
總結
本文從宏觀和源碼層面,解讀了交易提案背書涉及的資料結構,以及其主要背書流程,核心可以主要包含以下幾步:
- 檢查Proposal
- 為交易建立模拟器,并調用模拟器模拟執行交易,生成執行結果
- 背書子產品對執行結果和Proposal身份資訊背書(簽名),然後生成背書響應發送給用戶端
關于背書流程,本文未涉及的環節有:
- Proposal中各字段,層層遞進的含義
- 模拟執行交易,是鍊碼執行函數,并和Peer互動的過程,以及模拟執行的各種資源
- 2種插件化ESCC的實作
後面的章節,會對相關源碼實作做進一步分析。