天天看點

區塊鍊教程Fabric1.0源代碼分析Orderer BroadcastServerFabric 1.0源代碼筆記 之 Orderer #BroadcastServer(Broadcast服務端)

  兄弟連區塊鍊教程Fabric1.0源代碼分析Orderer BroadcastServer,2018年下半年,區塊鍊行業正逐漸褪去發展之初的浮躁、回歸理性,表面上看相關人才需求與身價似乎正在回落。但事實上,正是初期泡沫的漸退,讓人們更多的關注點放在了區塊鍊真正的技術之上。

Fabric 1.0源代碼筆記 之 Orderer #BroadcastServer(Broadcast服務端)

1、BroadcastServer概述

BroadcastServer相關代碼在protos/orderer、orderer目錄下。

protos/orderer/ab.pb.go,AtomicBroadcastServer接口定義。

orderer/server.go,go,AtomicBroadcastServer接口實作。

有個圖

2、AtomicBroadcastServer接口定義

2.1、AtomicBroadcastServer接口定義

type AtomicBroadcastServer interface {
    Broadcast(AtomicBroadcast_BroadcastServer) error
    Deliver(AtomicBroadcast_DeliverServer) error
}
//代碼在protos/orderer/ab.pb.go
···

### 2.2、gRPC相關實作
           

var _AtomicBroadcast_serviceDesc = grpc.ServiceDesc{

ServiceName: "orderer.AtomicBroadcast",
HandlerType: (*AtomicBroadcastServer)(nil),
Methods:     []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
    {
        StreamName:    "Broadcast",
        Handler:       _AtomicBroadcast_Broadcast_Handler,
        ServerStreams: true,
        ClientStreams: true,
    },
    {
        StreamName:    "Deliver",
        Handler:       _AtomicBroadcast_Deliver_Handler,
        ServerStreams: true,
        ClientStreams: true,
    },
},
Metadata: "orderer/ab.proto",           

}

func RegisterAtomicBroadcastServer(s *grpc.Server, srv AtomicBroadcastServer) {

s.RegisterService(&_AtomicBroadcast_serviceDesc, srv)           

func _AtomicBroadcast_Broadcast_Handler(srv interface{}, stream grpc.ServerStream) error {

return srv.(AtomicBroadcastServer).Broadcast(&atomicBroadcastBroadcastServer{stream})           

func _AtomicBroadcast_Deliver_Handler(srv interface{}, stream grpc.ServerStream) error {

return srv.(AtomicBroadcastServer).Deliver(&atomicBroadcastDeliverServer{stream})           

//代碼在protos/orderer/ab.pb.go

## 3、AtomicBroadcastServer接口實作

### 3.1、server結構體

server結構體:
           

type server struct {

bh broadcast.Handler
dh deliver.Handler           

type broadcastSupport struct {

multichain.Manager
broadcast.ConfigUpdateProcessor           

//代碼在orderer/server.go

broadcast.Handler:
           

type Handler interface {

Handle(srv ab.AtomicBroadcast_BroadcastServer) error           

type handlerImpl struct {

sm SupportManager           

func NewHandlerImpl(sm SupportManager) Handler {

return &handlerImpl{
    sm: sm,
}           

type SupportManager interface {

ConfigUpdateProcessor
GetChain(chainID string) (Support, bool)           

type ConfigUpdateProcessor interface { //處理通道配置更新

Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)           

//代碼在orderer/common/broadcast/broadcast.go

deliver.Handler:
           
Handle(srv ab.AtomicBroadcast_DeliverServer) error           

type deliverServer struct {

sm SupportManager           
GetChain(chainID string) (Support, bool)           

//代碼在orderer/common/deliver/deliver.go

### 3.2、server結構體相關方法
           

//建構server結構體

func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer

//s.bh.Handle(srv)

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error

//s.dh.Handle(srv)

func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error

func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer代碼如下:
           

func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer {

s := &server{
    dh: deliver.NewHandlerImpl(deliverSupport{Manager: ml}),
    bh: broadcast.NewHandlerImpl(broadcastSupport{
        Manager:               ml,
        ConfigUpdateProcessor: configupdate.New(ml.SystemChannelID(), configUpdateSupport{Manager: ml}, signer),
    }),
}
return s           
### 3.3、Broadcast服務端Broadcast處理流程

Broadcast服務端Broadcast處理流程,即broadcast.handlerImpl.Handle方法。

#### 3.3.1、接收Envelope消息,并擷取Payload和ChannelHeader
           

msg, err := srv.Recv() //接收Envelope消息

payload, err := utils.UnmarshalPayload(msg.Payload) //反序列化擷取Payload

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) //反序列化擷取ChannelHeader

#### 3.3.2、如果消息類型為channel配置或更新,則使用multichain.Manager處理消息
           

if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) { //如果是channel配置或更新

msg, err = bh.sm.Process(msg) //configupdate.Processor.Process方法           
msg, err = bh.sm.Process(msg)代碼如下:
           

func (p Processor) Process(envConfigUpdate cb.Envelope) (*cb.Envelope, error) {

channelID, err := channelID(envConfigUpdate) //擷取ChannelHeader.ChannelId
//multichain.Manager.GetChain方法,擷取chainSupport,以及chain是否存在
support, ok := p.manager.GetChain(channelID)
if ok {
    //已存在的channel配置,調取multichain.Manager.ProposeConfigUpdate方法
    return p.existingChannelConfig(envConfigUpdate, channelID, support)
}
//新channel配置,調取multichain.Manager.NewChannelConfig方法
return p.newChannelConfig(channelID, envConfigUpdate)           

//代碼在orderer/configupdate/configupdate.go

#### 3.3.3、其他消息類型或channel消息處理後,接受消息并加入排序
           

support, ok := bh.sm.GetChain(chdr.ChannelId) //擷取chainSupport

_, filterErr := support.Filters().Apply(msg) //filter.RuleSet.Apply方法

//調取Chain.Enqueue方法,接受消息,加入排序

support.Enqueue(msg)

#### 3.3.4、向用戶端發送響應資訊
           

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})

### 3.4、Broadcast服務端Deliver處理流程

Broadcast服務端Deliver處理流程,即deliver.deliverServer.Handle方法。
           

func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {

for {
    //接收用戶端查詢請求
    envelope, err := srv.Recv()
    payload, err := utils.UnmarshalPayload(envelope.Payload)
    chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
    chain, ok := ds.sm.GetChain(chdr.ChannelId)

    erroredChan := chain.Errored()
    select {
    case <-erroredChan:
        return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
    default:

    }

    lastConfigSequence := chain.Sequence()

    sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
    result, _ := sf.Apply(envelope)

    seekInfo := &ab.SeekInfo{}
    err = proto.Unmarshal(payload.Data, seekInfo)

    cursor, number := chain.Reader().Iterator(seekInfo.Start)
    var stopNum uint64
    switch stop := seekInfo.Stop.Type.(type) {
    case *ab.SeekPosition_Oldest:
        stopNum = number
    case *ab.SeekPosition_Newest:
        stopNum = chain.Reader().Height() - 1
    case *ab.SeekPosition_Specified:
        stopNum = stop.Specified.Number
        if stopNum < number {
            return sendStatusReply(srv, cb.Status_BAD_REQUEST)
        }
    }

    for {
        if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
            select {
            case <-erroredChan:
                return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
            case <-cursor.ReadyChan():
            }
        } else {
            select {
            case <-cursor.ReadyChan():
            default:
                return sendStatusReply(srv, cb.Status_NOT_FOUND)
            }
        }

        currentConfigSequence := chain.Sequence()
        if currentConfigSequence > lastConfigSequence {
            lastConfigSequence = currentConfigSequence
            sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
            result, _ := sf.Apply(envelope)

        }

        block, status := cursor.Next()
        err := sendBlockReply(srv, block)
        if stopNum == block.Header.Number {
            break
        }
    }

    err := sendStatusReply(srv, cb.Status_SUCCESS)
}