天天看點

【比原鍊】如何從比原節點拿到區塊資料?

作者:freewind

比原項目倉庫:https://github.com/Bytom/bytom

在前一篇中,我們已經知道如何連上一個比原節點的p2p端口,并與對方完成身份驗證。此時,雙方結點已經建立起來了信任,并且連接配接也不會斷開,下一步,兩者就可以繼續交換資料了。

那麼,我首先想到的就是,如何才能讓對方把它已有的區塊資料全都發給我呢?

這其實可以分為三個問題:

  1. 我需要發給它什麼樣的資料?
  2. 它在内部由是如何應答的呢?
  3. 我拿到資料之後,應該怎麼處理?

由于這一塊的邏輯還是比較複雜的,是以在本篇我們先回答第一個問題:

我們要發送什麼樣的資料請求,才能讓比原節點把它持有的區塊資料發給我?

找到發送請求的代碼

首先我們先要在代碼中定位到,比原到底是在什麼時候來向對方節點發送請求的。

在前一篇講的是如何建立連接配接并驗證身份,那麼發出資料請求的操作,一定在上次的代碼之後。按照這個思路,我們在

SyncManager

類中

Switch

啟動之後,找到了一個叫

BlockKeeper

的類,相關的操作是在它裡面完成的。

下面是老規矩,還是從啟動開始,但是會更簡化一些:

cmd/bytomd/main.go#L54

func main() {
    cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
    cmd.Execute()
}
           

cmd/bytomd/commands/run_node.go#L41

func runNode(cmd *cobra.Command, args []string) error {
    n := node.NewNode(config)
    if _, err := n.Start(); err != nil {
        // ...
}
           

node/node.go#L169

func (n *Node) OnStart() error {
    // ...
    n.syncManager.Start()
    // ...
}
           

netsync/handle.go#L141

func (sm *SyncManager) Start() {
    go sm.netStart()
    // ...
    go sm.syncer()
}
           

注意

sm.netStart()

,我們在一篇中建立連接配接并驗證身份的操作,就是在它裡面完成的。而這次的這個問題,是在下面的

sm.syncer()

中完成的。

另外注意,由于這兩個函數調用都使用了goroutine,是以它們是同時進行的。

sm.syncer()

的代碼如下:

netsync/sync.go#L46

func (sm *SyncManager) syncer() {
    sm.fetcher.Start()
    defer sm.fetcher.Stop()

    // ...
    for {
        select {
        case <-sm.newPeerCh:
            log.Info("New peer connected.")
            // Make sure we have peers to select from, then sync
            if sm.sw.Peers().Size() < minDesiredPeerCount {
                break
            }
            go sm.synchronise()
            // ..
    }
}
           

這裡混入了一個叫

fetcher

的奇怪的東西,名字看起來好像是專門去抓取資料的,我們要找的是它嗎?

可惜不是,

fetcher

的作用是從多個peer那裡拿到了區塊資料之後,對資料進行整理,把有用的放到本地鍊上。我們在以後會研究它,是以這裡不展開讨論。

接着是一個

for

循環,當發現通道

newPeerCh

有了新資料(也就是有了新的節點連接配接上了),會判斷一下目前自己連着的節點是否夠多(大于等于

minDesiredPeerCount

,值為

5

),夠多的話,就會進入

sm.synchronise()

,進行資料同步。

這裡為什麼要多等幾個節點,而不是一連上就馬上同步呢?我想這是希望有更多選擇的機會,找到一個資料夠多的節點。

sm.synchronise()

還是屬于

SyncManager

的方法。在真正調用到

BlockKeeper

的方法之前,它還做了一些比如清理已經斷開的peer,找到最适合同步資料的peer等。其中“清理peer”的工作涉及到不同的對象持有的peer集合間的同步,略有些麻煩,但對目前問題幫助不大,是以我打算把它們放在以後的某個問題中回答(比如“當一個節點斷開了,比原會有什麼樣的處理”),這裡就先省略。

sm.synchronise()

代碼如下:

netsync/sync.go#L77

func (sm *SyncManager) synchronise() {
    log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
    // ...
    peer, bestHeight := sm.peers.BestPeer()
    // ...
    if bestHeight > sm.chain.BestBlockHeight() {
        // ...
        sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
    }
}
           

可以看到,首先是從衆多的peers中,找到最合适的那個。什麼叫Best呢?看一下

BestPeer()

的定義:

netsync/peer.go#L266

func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
    // ...
    for _, p := range ps.peers {
        if bestPeer == nil || p.height > bestHeight {
            bestPeer, bestHeight = p.swPeer, p.height
        }
    }
    return bestPeer, bestHeight
}
           

其實就是持有區塊鍊資料最長的那個。

找到了BestPeer之後,就調用

sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)

方法,從這裡,正式進入

BlockKeeper

 -- 也就是本文的主角 -- 的世界。

BlockKeeper

blockKeeper.BlockRequestWorker

的邏輯比較複雜,它包含了:

  1. 根據自己持有的區塊資料來計算需要同步的資料
  2. 向前面找到的最佳節點發送資料請求
  3. 拿到對方發過來的區塊資料
  4. 對資料進行處理
  5. 廣播新狀态
  6. 處理各種出錯情況,等等

由于本文中隻關注“發送請求”,是以一些與之關系不大的邏輯我會忽略掉,留待以後再講。

在“發送請求”這裡,實際也包含了兩種情形,一種簡單的,一種複雜的:

  1. 簡單的:假設不存在分叉,則直接檢查本地高度最高的區塊,然後請求下一個區塊
  2. 複雜的:考慮分叉的情況,則目前本地的區塊可能就存在分叉,那麼到底應該請求哪個區塊,就需要慎重考慮

由于第2種情況對于本文來說過于複雜(因為需要深刻了解比原鍊中分叉的處理邏輯),是以在本文中将把問題簡化,隻考慮第1種。而分叉的處理,将放在以後講解。

下面是把

blockKeeper.BlockRequestWorker

中的代碼簡化成了隻包含第1種情況:

netsync/block_keeper.go#L72

func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
    num := bk.chain.BestBlockHeight() + 1
    reqNum := uint64(0)
    reqNum = num
    // ...
    bkPeer, ok := bk.peers.Peer(peerID)
    swPeer := bkPeer.getPeer()
    // ...
    block, err := bk.BlockRequest(peerID, reqNum)
    // ...
}
           

在這種情況下,我們可以認為

bk.chain.BestBlockHeight()

中的

Best

,指的是本地持有的不帶分叉的區塊鍊高度最高的那個。(需要提醒的是,如果存在分叉情況,則

Best

不一定是高度最高的那個)

那麼我們就可以直接向最佳peer請求下一個高度的區塊,它是通過

bk.BlockRequest(peerID, reqNum)

實作的:

netsync/block_keeper.go#L152

func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
    var block *types.Block

    if err := bk.blockRequest(peerID, height); err != nil {
        return nil, errReqBlock
    }

    // ...

    for {
        select {
        case pendingResponse := <-bk.pendingProcessCh:
            block = pendingResponse.block
            // ...
            return block, nil
        // ...
        }
    }
}
           

在上面簡化後的代碼中,主要分成了兩個部分。一個是發送請求

bk.blockRequest(peerID, height)

,這是本文的重點;它下面的

for-select

部分,已經是在等待并處理對方節點的傳回資料了,這部分我們今天先略過不講。

bk.blockRequest(peerID, height)

這個方法,從邏輯上又可以分成兩部分:

  1. 構造出請求的資訊
  2. 把資訊發送給對方節點

構造出請求的資訊

bk.blockRequest(peerID, height)

經過一連串的方法調用之後,使用

height

構造出了一個

BlockRequestMessage

對象,代碼如下:

netsync/block_keeper.go#L148

func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
    return bk.peers.requestBlockByHeight(peerID, height)
}
           

netsync/peer.go#L332

func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
    peer, ok := ps.Peer(peerID)
    // ...
    return peer.requestBlockByHeight(height)
}
           

netsync/peer.go#L73

func (p *peer) requestBlockByHeight(height uint64) error {
    msg := &BlockRequestMessage{Height: height}
    p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
    return nil
}
           

到這裡,終于構造出了所需要的

BlockRequestMessage

,其實主要就是把

height

告訴peer。

然後,通過

Peer

TrySend()

把該資訊發出去。

發送請求

TrySend

中,主要是通過

github.com/tendermint/go-wire

庫将其序列化,再發送給對方。看起來應該是很簡單的操作吧,先預個警,還是挺繞的。

當我們進入

TrySend()

後:

p2p/peer.go#L242

func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if !p.IsRunning() {
        return false
    }
    return p.mconn.TrySend(chID, msg)
}
           

發現它把鍋丢給了

p.mconn.TrySend

方法,那麼

mconn

是什麼?

chID

又是什麼?

mconn

MConnection

的執行個體,它是從哪兒來的?它應該在之前的某個地方初始化了,否則我們沒法直接調用它。是以我們先來找到它初始化的地方。

經過一番尋找,發現原來是在前一篇之後,即比原節點與另一個節點完成了身份驗證之後,具體的位置在

Switch

類啟動的地方。

我們這次直接從

Swtich

OnStart

作為起點:

p2p/switch.go#L186

func (sw *Switch) OnStart() error {
    //...
    // Start listeners
    for _, listener := range sw.listeners {
        go sw.listenerRoutine(listener)
    }
    return nil
}
           

p2p/switch.go#L498

func (sw *Switch) listenerRoutine(l Listener) {
    for {
        inConn, ok := <-l.Connections()
        // ...
        err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
        // ...
    }
}
           

p2p/switch.go#L645

func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
    // ...
    peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
    // ...
}
           

p2p/peer.go#L87

func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
    return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}
           

p2p/peer.go#L91

func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
    conn := rawConn
    // ...
    if config.AuthEnc {
        // ...
        conn, err = MakeSecretConnection(conn, ourNodePrivKey)
        // ...
    }

    // Key and NodeInfo are set after Handshake
    p := &Peer{
        outbound: outbound,
        conn:     conn,
        config:   config,
        Data:     cmn.NewCMap(),
    }

    p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)

    p.BaseService = *cmn.NewBaseService(nil, "Peer", p)

    return p, nil
}
           

終于找到了。上面方法中的

MakeSecretConnection

就是與對方節點交換公鑰并進行身份驗證的地方,下面的

p.mconn = createMConnection(...)

就是建立

mconn

的地方。

繼續進去:

p2p/peer.go#L292

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}
           

原來

mconn

MConnection

的執行個體,它是通過

NewMConnectionWithConfig

建立的。

看了上面的代碼,發現這個

MConnectionWithConfig

與普通的

net.Conn

并沒有太大的差別,隻不過是當收到了對方發來的資料後,會根據指定的

chID

調用相應的

Reactor

Receive

方法來處理。是以它起到了将資料分發給

Reactor

的作用。

為什麼需要這樣的分發操作呢?這是因為,在比原中,節點之間交換資料,有多種不同的方式:

  1. 一種是規定了詳細的資料互動協定(比如有哪些資訊類型,分别代表什麼意思,什麼情況下發哪個,如何應答等),在

    ProtocolReactor

    中實作,它對應的

    chID

    BlockchainChannel

    ,值為

    byte(0x40)

  2. 另一種使用了與BitTorrent類似的檔案共享協定,叫PEX,在

    PEXReactor

    中實作,它對應的

    chID

    PexChannel

    ,值為

    byte(0x00)

是以節點之間發送資訊的時候,需要知道對方發過來的資料對應的是哪一種方式,然後轉交給相應的

Reactor

去處理。

在比原中,前者是主要的方式,後者起到輔助作用。我們目前的文章中涉及到的都是前者,後者将在以後專門研究。

p.mconn.TrySend

當我們知道了

p.mconn.TrySend

中的

mconn

是什麼,并且在什麼時候初始化以後,下面就可以進入它的

TrySend

方法了。

p2p/connection.go#L243

func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
    // ...
    channel, ok := c.channelsIdx[chID]
    // ...
    ok = channel.trySendBytes(wire.BinaryBytes(msg))
    if ok {
        // Wake up sendRoutine if necessary
        select {
        case c.send <- struct{}{}:
        default:
        }
    }

    return ok
}
           

可以看到,它找到相應的channel後(在這裡應該是

ProtocolReactor

對應的channel),調用channel的

trySendBytes

方法。在發送資料的時候,使用了

github.com/tendermint/go-wire

庫,将

msg

序列化為二進制數組。

p2p/connection.go#L602

func (ch *Channel) trySendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    default:
        return false
    }
}
           

原來它是把要發送的資料,放到了該channel對應的

sendQueue

中,交由别人來發送。具體是由誰來發送,我們馬上要就找到它。

細心的同學會發現,

Channel

除了

trySendBytes

方法外,還有一個

sendBytes

(在本文中沒有用上):

p2p/connection.go#L589

func (ch *Channel) sendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    case <-time.After(defaultSendTimeout):
        return false
    }
}
           

它們兩個的差別是,前者嘗試把待發送資料

bytes

放入

ch.sendQueue

時,如果能放進去,則傳回

true

,否則馬上失敗,傳回

false

,是以它是非阻塞的。而後者,如果放不進去(

sendQueue

已滿,那邊還沒處理完),則等待

defaultSendTimeout

(值為

10

秒),然後才會失敗。另外,

sendQueue

的容量預設為

1

到這裡,我們其實已經知道比原是如何向其它節點請求區塊資料,以及何時把資訊發送出去。

本想在本篇中就把真正發送資料的代碼也一起講了,但是發現它的邏輯也相當複雜,是以就另開一篇講吧。

再回到本文問題,再強調一下,我們前面說了,對于向peer請求區塊資料,有兩種情況:一種是簡單的不考慮分叉的,另一種是複雜的考慮分叉的。在本文隻考慮了簡單的情況,在這種情況下,所謂的

bestHeight

就是指的最高的那個區塊的高度,而在複雜情況下,它就不一定了。這就留待以後我們再詳細讨論,本文的問題就算是回答完畢了。

繼續閱讀