Peer 啟動後會在背景執行 gossip 服務,包括若幹 goroutine,實作位于
gossip/state/state.go#NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider
方法。
其中一個協程專門負責處理收到的區塊資訊。
// Deliver in order messages into the incoming channel
go s.deliverPayloads()
deliverPayloads() 方法實作位于同一個檔案的 GossipStateProviderImpl 結構下,其主要過程為循環從收到的 Gossip 消息載荷緩沖區按序拿到封裝消息,解析後進行處理。核心代碼邏輯如下:
// gossip/state/state.go#GossipStateProviderImpl.deliverPayloads()
for {
select {
case <-s.payloads.Ready(): // 等待消息
// 依次處理收到的消息
for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
rawBlock := &common.Block{}
// 從載荷資料中嘗試解析區塊結構,失敗則嘗試下個消息
if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
continue
}
// 檢查區塊結構是否完整,失敗則嘗試下個消息
if rawBlock.Data == nil || rawBlock.Header == nil {
logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
payload.SeqNum, rawBlock.Header, rawBlock.Data)
continue
}
// 從載荷中解析私密資料,失敗則嘗試下個消息
var p util.PvtDataCollections
if payload.PrivateData != nil {
err := p.Unmarshal(payload.PrivateData)
if err != nil {
logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
continue
}
}
// 核心部分:送出區塊到本地賬本
if err := s.commitBlock(rawBlock, p); err != nil {
if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
return
}
logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
}
}
case <-s.stopCh: // 停止處理消息
s.stopCh <- struct{}{}
logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
}
整體邏輯
s.commitBlock(rawBlock, p) 是對區塊進行處理和送出的核心邏輯,主要包括送出前準備、送出過程和送出後處理三部分,如下圖所示。
下面分别進行介紹三個階段的實作過程。
送出前準備
主要完成對區塊中交易格式的檢查和擷取關聯該區塊但缺失的私密資料,最後建構 blockAndPvtData 結構。
格式檢查
對區塊格式的檢查主要在
core/committer/txvalidator/validator.go#TxValidator.Validate(block *common.Block) error
方法中完成,包括檢查交易格式、對應賬本是否存在、是否雙花、滿足 VSCC 和 Policy 等。核心邏輯如下。
// core/committer/txvalidator/validator.go#TxValidator.Validate(block *common.Block) error
// 并發驗證交易有效性
go func() {
for tIdx, d := range block.Data.Data {
// ensure that we don't have too many concurrent validation workers
v.Support.Acquire(context.Background(), 1)
go func(index int, data []byte) {
defer v.Support.Release(1)
v.validateTx(&blockValidationRequest{
d: data,
block: block,
tIdx: index,
}, results)
}(tIdx, d)
}
}()
// 處理檢查結果
for i := 0; i < len(block.Data.Data); i++ {
res := <-results
if res.err != nil {
if err == nil || res.tIdx < errPos {
err = res.err
errPos = res.tIdx
}
} else { // 設定有效标記,記錄鍊碼名稱,更新鍊碼資訊
txsfltr.SetFlag(res.tIdx, res.validationCode)
if res.validationCode == peer.TxValidationCode_VALID {
if res.txsChaincodeName != nil {
txsChaincodeNames[res.tIdx] = res.txsChaincodeName
}
if res.txsUpgradedChaincode != nil {
txsUpgradedChaincodes[res.tIdx] = res.txsUpgradedChaincode
}
txidArray[res.tIdx] = res.txid
}
}
}
// 标記雙花交易
if v.Support.Capabilities().ForbidDuplicateTXIdInBlock() {
markTXIdDuplicates(txidArray, txsfltr)
}
// 防止多次更新操作
v.invalidTXsForUpgradeCC(txsChaincodeNames, txsUpgradedChaincodes, txsfltr)
// 确認所有交易都完成檢查
err = v.allValidated(txsfltr, block)
if err != nil {
return err
}
// 更新交易有效标簽到中繼資料
utils.InitBlockMetadata(block)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsflt
擷取缺失的私密資料
首先根據已有的私密資料計算區塊中交易關聯的讀寫集資訊。如果仍有缺失,則嘗試從其它節點擷取。
// gossip/privdata/coordinator.go#coordinator.StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error
// 利用已有的私密資料計算讀寫集
ownedRWsets, err := computeOwnedRWsets(block, privateDataSets)
privateInfo, err := c.listMissingPrivateData(block, ownedRWsets)
// 擷取缺失私密資料
for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) {
c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo)
if len(privateInfo.missingKeys) == 0 {
break
}
time.Sleep(pullRetrySleepInterval)
}
建構 blockAndPvtData 結構
blockAndPvtData 結構用于後續的送出工作,是以,需要包括相關的區塊和私密資料。
主要實作邏輯如下:
// gossip/privdata/coordinator.go#coordinator.StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error
// 填充私密讀寫集資訊
for seqInBlock, nsRWS := range ownedRWsets.bySeqsInBlock() {
rwsets := nsRWS.toRWSet()
blockAndPvtData.BlockPvtData[seqInBlock] = &ledger.TxPvtData{
SeqInBlock: seqInBlock,
WriteSet: rwsets,
}
}
// 填充缺失私密資料資訊
for missingRWS := range privateInfo.missingKeys {
blockAndPvtData.Missing = append(blockAndPvtData.Missing, ledger.MissingPrivateData{
TxId: missingRWS.txID,
Namespace: missingRWS.namespace,
Collection: missingRWS.collection,
SeqInBlock: int(missingRWS.seqInBlock),
})
}
送出過程
送出過程是核心過程,主要包括預處理、驗證交易、更新本地區塊鍊結構、更新本地資料庫結構四個步驟。
預處理
預處理階段負責構造一個有效的内部區塊結構。包括:
- 處理 Endorser 交易:隻保留有效的 Endorser 交易;
- 處理配置交易:擷取配置更新的模拟結果,放入讀寫集;
- 校驗寫集合:如果狀态資料庫采用 CouchDB,要按照 CouchDB 限制檢查鍵值的格式:Key 必須為非下劃線開頭的 UTF-8 字元串,Value 必須為合法的字典結構,且不包括下劃線開頭的鍵名。
核心實作代碼位于 core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#preprocessProtoBlock(txmgr txmgr.TxMgr, validateKVFunc func(key string, value []byte) error, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error),如下所示:
// core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#preprocessProtoBlock(txmgr txmgr.TxMgr, validateKVFunc func(key string, value []byte) error, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error)
// 處理 endorser 交易
if txType == common.HeaderType_ENDORSER_TRANSACTION {
// extract actions from the envelope message
respPayload, err := utils.GetActionFromEnvelope(envBytes)
if err != nil {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_NIL_TXACTION)
continue
}
txRWSet = &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
} else { // 處理配置更新交易
rwsetProto, err := processNonEndorserTx(env, chdr.TxId, txType, txmgr, !doMVCCValidation)
if _, ok := err.(*customtx.InvalidTxError); ok {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
if err != nil {
return nil, err
}
if rwsetProto != nil {
if txRWSet, err = rwsetutil.TxRwSetFromProtoMsg(rwsetProto); err != nil {
return nil, err
}
}
}
// 檢查讀寫集是否符合資料庫要求格式
if txRWSet != nil {
if err := validateWriteset(txRWSet, validateKVFunc); err != nil {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_WRITESET)
continue
}
b.Txs = append(b.Txs, &valinternal.Transaction{IndexInBlock: txIndex, ID: chdr.TxId, RWSet: txRWSet})
}
驗證交易
接下來,對區塊中交易進行 MVCC 檢查,并校驗私密讀寫集,更新區塊中繼資料中的交易有效标記清單。
MVCC 檢查需要逐個驗證塊中的 Endorser 交易,滿足下列條件者才認為有效:
- 公共讀集合中 key 版本在該交易前未變;
- RangeQuery 的結果未變;
- 私密讀集合中 key 版本未變。
實作在 core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go#Validator.ValidateAndPrepareBatch(block *valinternal.Block, doMVCCValidation bool) (*valinternal.PubAndHashUpdates, error) 方法中,主要邏輯如下:
// core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go#Validator.ValidateAndPrepareBatch(block *valinternal.Block, doMVCCValidation bool) (*valinternal.PubAndHashUpdates, error)
// 依次順序檢查每個交易
for _, tx := range block.Txs {
var validationCode peer.TxValidationCode
var err error
// 檢查 Endorser 交易
if validationCode, err = v.validateEndorserTX(tx.RWSet, doMVCCValidation, updates); err != nil {
return nil, err
}
tx.ValidationCode = validationCode
// 有效交易則将其讀寫集放到更新集合中
if validationCode == peer.TxValidationCode_VALID {
committingTxHeight := version.NewHeight(block.Num, uint64(tx.IndexInBlock))
updates.ApplyWriteSet(tx.RWSet, committingTxHeight)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%s]",
block.Num, tx.IndexInBlock, tx.ID, validationCode.String())
}
}
對私密讀寫集的校驗主要是再次檢查 Hash 值是否比對,實作在
core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) error
方法中。
// core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) error
for _, nsPvtdata := range pvtdata.WriteSet.NsPvtRwset {
for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
collPvtdataHash := util.ComputeHash(collPvtdata.Rwset)
hashInPubdata := tx.RetrieveHash(nsPvtdata.Namespace, collPvtdata.CollectionName)
// 重新計算私密資料 Hash 值,對比公共資料中的記錄
if !bytes.Equal(collPvtdataHash, hashInPubdata) {
return &validator.ErrPvtdataHashMissmatch{
Msg: fmt.Sprintf(`Hash of pvt data for collection [%s:%s] does not match with the corresponding hash in the public data.
public hash = [%#v], pvt data hash = [%#v]`, nsPvtdata.Namespace, collPvtdata.CollectionName, hashInPubdata, collPvtdataHash),
}
}
}
}
最後,更新區塊中繼資料中的交易有效标記清單,實作位于
core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block)
方法,代碼如下所示。
// core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block)
func postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block) {
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
for _, tx := range validatedBlock.Txs {
txsFilter.SetFlag(tx.IndexInBlock, tx.ValidationCode)
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
接下來,需要更新本地的賬本結構,包括區塊鍊結構和相關的本地資料庫。
更新本地區塊鍊結構
入口在
core/ledger/ledgerstorage/store.go#Store.CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
方法中,主要包括如下步驟:
- 将區塊寫入本地 Chunk 檔案;
- 更新索引資料庫(區塊号、Hash值、檔案指針、交易偏移、區塊中繼資料);
- 更新所送出的區塊号到私密資料庫;
區塊寫入 Chunk 檔案主要實作在
common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go#blockfileMgr.addBlock(block *common.Block) error
方法中,主要邏輯如下所示:
// common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go#blockfileMgr.addBlock(block *common.Block) error
// 計算長度資訊
blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
// 添加長度資訊到 Chunk 檔案
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
// 更新 checkpoint 資訊
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
isChainEmpty: false,
lastBlockNumber: block.Header.Number}
mgr.saveCurrentInfo(newCPInfo, false);
// 更新區塊在檔案中索引位置和交易偏移量
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
for _, txOffset := range txOffsets {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
// 更新索引資料庫
mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata})
// 更新 checkpoint 資訊和區塊鍊資訊
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
更新本地資料庫結構
更新資料庫是送出交易的最後一步,主要包括如下步驟:
- 删除過期私密資料;
- 更新私密資料生命周期記錄資料庫;
- 更新本地公共狀态資料庫和私密狀态資料庫;
- 如果啟用了曆史資料庫,更新資料。
實作代碼在
core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go#LockBasedTxMgr.Commit() error
方法中,主要邏輯如下。
// core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go#LockBasedTxMgr.Commit() error
// 準備過期的私密鍵值清理
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.current.blockNum())
// 更新私密資料生命周期記錄資料庫,這裡記錄了每個私密鍵值的存活期限
if err := txmgr.pvtdataPurgeMgr.DeleteExpiredAndUpdateBookkeeping(
txmgr.current.batch.PvtUpdates, txmgr.current.batch.HashUpdates); err != nil{
return err
}
// 更新本地公共狀态資料庫和私密狀态資料庫
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
return err
}
// 如果啟用了曆史資料庫,更新資料
if ledgerconfig.IsHistoryDBEnabled() {
if err := l.historyDB.Commit(block); err != nil {
panic(errors.WithMessage(err, "Error during commit to history db"))
}
}
送出後處理
送出後的處理比較簡單,包括清理本地的臨時狀态資料庫和更新賬本高度資訊。
清理工作包括區塊關聯的臨時私密資料和舊區塊關聯的臨時私密資料。
======關于本文 =======
更多源碼剖析内容可參考 超級賬本 Fabric 源碼剖析 開源項目;
更多區塊鍊深度技術可參考 區塊鍊技術指南 開源項目。
轉載請注明原文連結。