天天看點

使用Go語言編寫區塊鍊P2P網絡(譯)

外文發表日期: 2018-04-14

外文連結:https://medium.com/coinmonks/code-a-simple-p2p-blockchain-in-go-46662601f417

使用Go語言編寫區塊鍊P2P網絡(譯)

在之前的文章中,我們已經知道了怎麼編寫PoW也知道了IPFS怎麼工作, 但是有一個緻命的缺點,我們的服務都是中心化的,這篇文章會教你怎麼實作一個簡單的完全去中心化的P2P網絡。

背景知識

什麼是P2P網絡

在真正的P2P架構中,不需要中心化的服務來維護區塊鍊的狀态。例如,當你給朋友發送比特币時,比特币區塊鍊的“狀态”應該更新,這樣你朋友的餘額就會增加,你的餘額就會減少。

在這個網絡中,不存在一個權力高度中心化的機構來維護狀态(銀行就是這樣的中心化機構)。對于比特币網絡來說,每個節點都會維護一份完整的區塊鍊狀态,當交易發生時,每個節點的區塊鍊狀态都會得到更新。這樣,隻要網絡中51%的節點對區塊鍊的狀态達成一緻,那麼區塊鍊網絡就是安全可靠的,具體可以閱讀這篇一緻性協定文章。

本文将繼續之前的工作,200行Go代碼實作區塊鍊, 并加入P2P網絡架構。在繼續之前,強烈建議你先閱讀該篇文章,它會幫助你了解接下來的代碼。

開始實作

編寫P2P網絡可不是開開玩笑就能簡單視線的,有很多邊邊角角的情況都要覆寫到,而且需要你擁有很多工程學的知識,這樣的P2P網絡才是可擴充、高可靠的。有句諺語說得好:站在巨人肩膀上做事,那麼我們先看看巨人們提供了哪些工具吧。

喔,看看,我們發現了什麼!一個用Go語言實作的P2P庫go-libp2p!如果你對新技術足夠敏銳,就會發現這個庫的作者和IPFS的作者是同一個團隊。如果你還沒看過我們的IPFS教程,可以看看這裡, 你可以選擇跳過IPFS教程,因為對于本文這不是必須的。

警告

目前來說,

go-libp2p

主要有兩個缺點:

1. 安裝設定比較痛苦,它使用gx作為包管理工具,怎麼說呢,不咋好用,但是湊活用吧

2. 目前項目還沒有成熟,正在緊密鑼鼓的開發中,當使用這個庫時,可能會遇到一些資料競争(data race)

對于第一點,不必擔心,有我們呢。第二點是比較大的問題,但是不會影響我們的代碼。假如你在使用過程中發現了資料競争問題,記得給項目提一個issue,幫助它更好的成長!

總之,目前開源世界中,現代化的P2P庫是非常非常少的,因為我們要多給

go-libp2p

一些耐心和包容,而且就目前來說,它已經能很好的滿足我們的目标了。

安裝設定

最好的環境設定方式是直接clone

libp2p

庫,然後在這個庫的代碼中直接開發。你也可以在自己的庫中,調用這個庫開發,但是這樣就需要用到

gx

了。這裡我們使用簡單的方式,假設你已經安裝了Go:

-

go get -d github.com/libp2p/go-libp2p/…

- 進入

go-libp2p

檔案夾

-

make

-

make deps

這裡會通過gx包管理工具下載下傳所有需要的包和依賴,再次申明,我們不喜歡gx,因為它打破了Go語言的很多慣例,但是為了這個很棒的庫,認慫吧。

這裡,我們在

examples

子目錄下進行開發,是以在

go-libp2p

的examples下建立一個你自己的目錄

-

mkdir ./examples/p2p

然後進入到p2p檔案夾下,建立

main.go

檔案,後面所有的代碼都會在該檔案中。

你的目錄結構是這樣的:

使用Go語言編寫區塊鍊P2P網絡(譯)

好了,勇士們,拔出你們的劍,哦不,拔出你們的

main.go

,開始我們的征途吧!

導入相關庫

這裡申明我們需要用的庫,大部分庫是來自于

go-libp2p

本身的,在教程中,你會學到怎麼去使用它們。

package main

import (
    "bufio"
    "context"
    "crypto/rand"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "log"
    mrand "math/rand"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/davecgh/go-spew/spew"
    golog "github.com/ipfs/go-log"
    libp2p "github.com/libp2p/go-libp2p"
    crypto "github.com/libp2p/go-libp2p-crypto"
    host "github.com/libp2p/go-libp2p-host"
    net "github.com/libp2p/go-libp2p-net"
    peer "github.com/libp2p/go-libp2p-peer"
    pstore "github.com/libp2p/go-libp2p-peerstore"
    ma "github.com/multiformats/go-multiaddr"
    gologging "github.com/whyrusleeping/go-logging"
)           

spew

包可以很友善、優美的列印出我們的區塊鍊,是以記得安裝它:

-

go get github.com/davecgh/go-spew/spew

區塊鍊結構

記住,請先閱讀200行Go代碼實作區塊鍊, 這樣,下面的部分就會簡單很多。

先來申明全局變量:

// Block represents each 'item' in the blockchain
type Block struct {
    Index     int
    Timestamp string
    BPM       int
    Hash      string
    PrevHash  string
}

// Blockchain is a series of validated Blocks
var Blockchain []Block

var mutex = &sync.Mutex{}           
  • 我們是一家健康看護公司,是以Block中存着的是使用者的脈搏速率BPM
  • Blockchain是我們的”狀态”,或者嚴格的說:最新的Blockchain,它其實就是Block的切片(slice)
  • mutex是為了防止資源競争出現

下面是Blockchain相關的特定函數:

// make sure block is valid by checking index, and comparing the hash of the previous block
func isBlockValid(newBlock, oldBlock Block) bool {
    if oldBlock.Index+1 != newBlock.Index {
        return false
    }

    if oldBlock.Hash != newBlock.PrevHash {
        return false
    }

    if calculateHash(newBlock) != newBlock.Hash {
        return false
    }

    return true
}

// SHA256 hashing
func calculateHash(block Block) string {
    record := strconv.Itoa(block.Index) + block.Timestamp + strconv.Itoa(block.BPM) + block.PrevHash
    h := sha256.New()
    h.Write([]byte(record))
    hashed := h.Sum(nil)
    return hex.EncodeToString(hashed)
}

// create a new block using previous block's hash
func generateBlock(oldBlock Block, BPM int) Block {

    var newBlock Block

    t := time.Now()

    newBlock.Index = oldBlock.Index + 1
    newBlock.Timestamp = t.String()
    newBlock.BPM = BPM
    newBlock.PrevHash = oldBlock.Hash
    newBlock.Hash = calculateHash(newBlock)

    return newBlock
}           
  • isBlockValid

    檢查Block的hash是否合法
  • calculateHash

    使用

    sha256

    來對原始資料做hash
  • generateBlock

    建立一個新的Block區塊,然後添加到區塊鍊Blockchain上,同時會包含所需的事務

P2P結構

下面我們快接近核心部分了,首先我們要寫出建立主機的邏輯。當一個節點運作我們的程式時,它可以作為一個主機,被其它節點連接配接。下面一起看看代碼:-)

// makeBasicHost creates a LibP2P host with a random peer ID listening on the
// given multiaddress. It will use secio if secio is true.
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {

    // If the seed is zero, use real cryptographic randomness. Otherwise, use a
    // deterministic randomness source to make generated keys stay the same
    // across multiple runs
    var r io.Reader
    if randseed == 0 {
        r = rand.Reader
    } else {
        r = mrand.New(mrand.NewSource(randseed))
    }

    // Generate a key pair for this host. We will use it
    // to obtain a valid host ID.
    priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
    if err != nil {
        return nil, err
    }

    opts := []libp2p.Option{
        libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
        libp2p.Identity(priv),
    }

    if !secio {
        opts = append(opts, libp2p.NoEncryption())
    }

    basicHost, err := libp2p.New(context.Background(), opts...)
    if err != nil {
        return nil, err
    }

    // Build host multiaddress
    hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))

    // Now we can build a full multiaddress to reach this host
    // by encapsulating both addresses:
    addr := basicHost.Addrs()[0]
    fullAddr := addr.Encapsulate(hostAddr)
    log.Printf("I am %s\n", fullAddr)
    if secio {
        log.Printf("Now run \"go run main.go -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr)
    } else {
        log.Printf("Now run \"go run main.go -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
    }

    return basicHost, nil
}           

makeBasicHost

函數有3個參數,同時傳回一個host結構體

-

listenPort

是主機監聽的端口,其它節點會連接配接該端口

-

secio

表明是否開啟資料流的安全選項,最好開啟,是以它代表了”安全輸入/輸出”

-

randSeed

是一個可選的指令行辨別,可以允許我們提供一個随機數種子來為我們的主機生成随機的位址。這裡我們不會使用

函數的第一個

if

語句針對随機種子生成随機key,接着我們生成公鑰和私鑰,這樣能保證主機是安全的。

opts

部分開始建構網絡位址部分,這樣其它節點就可以連接配接進來。

!secio

部分可以繞過加密,但是我們準備使用加密,是以這段代碼不會被觸發。

接着,建立了主機位址,這樣其他節點就可以連接配接進來。

log.Printf

可以用來在控制台列印出其它節點的連接配接資訊。最後我們傳回生成的主機位址給調用方函數。

流處理

之前的主機需要能處理進入的資料流。當另外一個節點連接配接到主機時,它會想要提出一個新的區塊鍊,來覆寫主機上的區塊鍊,是以我們需要邏輯來判定是否要接受新的區塊鍊。

同時,當我們往本地的區塊鍊添加區塊後,也要把相關資訊廣播給其它節點,這裡也需要實作相關邏輯。

先來建立流處理的基本架構吧:

func handleStream(s net.Stream) {

    log.Println("Got a new stream!")

    // Create a buffer stream for non blocking read and write.
    rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

    go readData(rw)
    go writeData(rw)

    // stream 's' will stay open until you close it (or the other side closes it).
}           

這裡建立一個新的

ReadWriter

,為了能支援資料讀取和寫入,同時我們啟動了一個單獨的Go協程來處理相關讀寫邏輯。

讀取資料

首先建立

readData

函數:

func readData(rw *bufio.ReadWriter) {

    for {
        str, err := rw.ReadString('\n')
        if err != nil {
            log.Fatal(err)
        }

        if str == "" {
            return
        }
        if str != "\n" {

            chain := make([]Block, 0)
            if err := json.Unmarshal([]byte(str), &chain); err != nil {
                log.Fatal(err)
            }

            mutex.Lock()
            if len(chain) > len(Blockchain) {
                Blockchain = chain
                bytes, err := json.MarshalIndent(Blockchain, "", "  ")
                if err != nil {

                    log.Fatal(err)
                }
                // Green console color:     \x1b[32m
                // Reset console color:     \x1b[0m
                fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
            }
            mutex.Unlock()
        }
    }
}           

該函數是一個無限循環,因為它需要永不停歇的去讀取外面進來的資料。首先,我們使用

ReadString

解析從其它節點發送過來的新的區塊鍊(JSON字元串)。

然後檢查進來的區塊鍊的長度是否比我們本地的要長,如果進來的鍊更長,那麼我們就接受新的鍊為最新的網絡狀态(最新的區塊鍊)。

同時,把最新的區塊鍊在控制台使用一種特殊的顔色列印出來,這樣我們就知道有新連結受了。

如果在我們主機的本地添加了新的區塊到區塊鍊上,那就需要把本地最新的區塊鍊廣播給其它相連的節點知道,這樣這些節點機會接受并更新到我們的區塊鍊版本。這裡使用

writeData

函數:

func writeData(rw *bufio.ReadWriter) {

    go func() {
        for {
            time.Sleep(5 * time.Second)
            mutex.Lock()
            bytes, err := json.Marshal(Blockchain)
            if err != nil {
                log.Println(err)
            }
            mutex.Unlock()

            mutex.Lock()
            rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
            rw.Flush()
            mutex.Unlock()

        }
    }()

    stdReader := bufio.NewReader(os.Stdin)

    for {
        fmt.Print("> ")
        sendData, err := stdReader.ReadString('\n')
        if err != nil {
            log.Fatal(err)
        }

        sendData = strings.Replace(sendData, "\n", "", -1)
        bpm, err := strconv.Atoi(sendData)
        if err != nil {
            log.Fatal(err)
        }
        newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)

        if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
            mutex.Lock()
            Blockchain = append(Blockchain, newBlock)
            mutex.Unlock()
        }

        bytes, err := json.Marshal(Blockchain)
        if err != nil {
            log.Println(err)
        }

        spew.Dump(Blockchain)

        mutex.Lock()
        rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
        rw.Flush()
        mutex.Unlock()
    }

}           

首先是一個單獨協程中的函數,每5秒鐘會将我們的最新的區塊鍊狀态廣播給其它相連的節點。它們收到後,如果發現我們的區塊鍊比它們的要短,就會直接把我們發送的區塊鍊資訊丢棄,繼續使用它們的區塊鍊,反之則使用我們的區塊鍊。總之,無論哪種方法,所有的節點都會定期的同步本地的區塊鍊到最新狀态。

這裡我們需要一個方法來建立一個新的Block區塊,包含之前提到過的脈搏速率(BPM)。為了簡化實作,我們不會真的去通過物聯網裝置讀取脈搏,而是直接在終端控制台上輸入一個脈搏速率數字。

首先要驗證輸入的BPM是一個整數類型,然後使用之前的

generateBlock

來生成區塊,接着使用

spew.Dump

輸入到終端控制台,最後我們使用

rw.WriteString

把最新的區塊鍊廣播給相連的其它節點。

牛逼了我的哥,現在我們完成了區塊鍊相關的函數以及大多數P2P相關的函數。在前面,我們建立了流處理,是以可以讀取和寫入最新的區塊鍊狀态;建立了狀态同步函數,這樣節點之間可以互相同步最新狀态。

剩下的就是實作我們的

main

函數了:

func main() {
    t := time.Now()
    genesisBlock := Block{}
    genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}

    Blockchain = append(Blockchain, genesisBlock)

    // LibP2P code uses golog to log messages. They log with different
    // string IDs (i.e. "swarm"). We can control the verbosity level for
    // all loggers with:
    golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info

    // Parse options from the command line
    listenF := flag.Int("l", 0, "wait for incoming connections")
    target := flag.String("d", "", "target peer to dial")
    secio := flag.Bool("secio", false, "enable secio")
    seed := flag.Int64("seed", 0, "set random seed for id generation")
    flag.Parse()

    if *listenF == 0 {
        log.Fatal("Please provide a port to bind on with -l")
    }

    // Make a host that listens on the given multiaddress
    ha, err := makeBasicHost(*listenF, *secio, *seed)
    if err != nil {
        log.Fatal(err)
    }

    if *target == "" {
        log.Println("listening for connections")
        // Set a stream handler on host A. /p2p/1.0.0 is
        // a user-defined protocol name.
        ha.SetStreamHandler("/p2p/1.0.0", handleStream)

        select {} // hang forever
        /**** This is where the listener code ends ****/
    } else {
        ha.SetStreamHandler("/p2p/1.0.0", handleStream)

        // The following code extracts target's peer ID from the
        // given multiaddress
        ipfsaddr, err := ma.NewMultiaddr(*target)
        if err != nil {
            log.Fatalln(err)
        }

        pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
        if err != nil {
            log.Fatalln(err)
        }

        peerid, err := peer.IDB58Decode(pid)
        if err != nil {
            log.Fatalln(err)
        }

        // Decapsulate the /ipfs/<peerID> part from the target
        // /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
        targetPeerAddr, _ := ma.NewMultiaddr(
            fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
        targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)

        // We have a peer ID and a targetAddr so we add it to the peerstore
        // so LibP2P knows how to contact it
        ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)

        log.Println("opening stream")
        // make a new stream from host B to host A
        // it should be handled on host A by the handler we set above because
        // we use the same /p2p/1.0.0 protocol
        s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
        if err != nil {
            log.Fatalln(err)
        }
        // Create a buffered stream so that read and writes are non blocking.
        rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

        // Create a thread to read and write data.
        go writeData(rw)
        go readData(rw)

        select {} // hang forever

    }
}           

首先是建立一個創世區塊(如果你讀了200行Go代碼實作你的區塊鍊,這裡就不會陌生)。

其次我們使用

go-libp2p

SetAllLoggers

日志函數來記錄日志。

接着,設定了所有的指令行辨別:

  • secio

    之前有提到,是用來加密資料流的。在我們的程式中,一定要打開該辨別
  • target

    指明目前節點要連接配接到的主機位址
  • listenF

    是目前節點的監聽主機位址,這樣其它節點就可以連接配接進來,記住,每個節點都有兩個身份:主機和用戶端, 畢竟P2P不是白叫的
  • seed

    是随機數種子,用來建立主機位址時使用

然後,使用

makeBasicHost

函數來建立一個新的主機位址,如果我們隻想做主機不想做用戶端(連接配接其它的主機),就使用

if *target == “”

接下來的幾行,會從

target

解析出我們要連接配接到的主機位址。然後把

peerID

和主機目标位址

targetAddr

添加到”store”中,這樣就可以持續跟蹤我們跟其它主機的連接配接資訊,這裡使用的是

ha.Peerstore().AddAddr

函數。

接着我們使用

ha.NewStream

連接配接到想要連接配接的節點上,同時為了能接收和發送最新的區塊鍊資訊,建立了

ReadWriter

,同時使用一個Go協程來進行

readData

writeData

哇哦

終于完成了,寫文章遠比寫代碼累!我知道之前的内容有點難,但是相比P2P的複雜性來說,你能通過一個庫來完成P2P網絡,已經很牛逼了,是以繼續加油!

完整代碼

mycoralhealth/blockchain-tutorial

運作結果

現在讓我們來試驗一下,首先打開3個獨立的終端視窗做為獨立節點。

開始之前,請再次進入

go-libp2p

的根目錄運作一下

make deps

,確定所有依賴都正常安裝。

回到你的工作目錄

examples/p2p

,打開第一個終端視窗,輸入

go run main.go -l 10000 -secio

使用Go語言編寫區塊鍊P2P網絡(譯)

細心的讀者會發現有一段話”Now run…”,那還等啥,繼續跟着做吧,打開第二個終端視窗運作:

go run main.go -l 10001 -d <given address in the instructions> -secio

使用Go語言編寫區塊鍊P2P網絡(譯)

這是你會發現第一個終端視窗檢測到了新連接配接!

使用Go語言編寫區塊鍊P2P網絡(譯)

接着打開第三個終端視窗,運作:

go run main.go -l 10002 -d <given address in the instructions> -secio

使用Go語言編寫區塊鍊P2P網絡(譯)

檢查第二終端,又發現了新連接配接

使用Go語言編寫區塊鍊P2P網絡(譯)

接着,該我們輸入BPM資料了,在第一個終端視窗中輸入”70”,等幾秒中,觀察各個視窗的列印輸出。

使用Go語言編寫區塊鍊P2P網絡(譯)
使用Go語言編寫區塊鍊P2P網絡(譯)
使用Go語言編寫區塊鍊P2P網絡(譯)

來看看發生了什麼:

- 終端1向本地的區塊鍊添加了一個新的區塊Block

- 終端1向終端2廣播該資訊

- 終端2将新的區塊鍊跟本地的對比,發現終端1的更長,是以使用新的區塊鍊替代了本地的區塊鍊,然後将新的區塊鍊廣播給終端3

- 同上,終端3也進行更新

所有的3個終端節點都把區塊鍊更新到了最新版本,同時沒有使用任何外部的中心化服務,這就是P2P網絡的力量!

我們再往終端2的區塊鍊中添加一個區塊試試看,在終端2中輸入”80”

使用Go語言編寫區塊鍊P2P網絡(譯)
使用Go語言編寫區塊鍊P2P網絡(譯)
使用Go語言編寫區塊鍊P2P網絡(譯)

結果忠誠的記錄了我們的正确性,再一次歡呼吧!

下一步

先享受一下自己的工作,你剛用了區區幾百行代碼就實作了一個全功能的P2P網絡!這不是開玩笑,P2P程式設計時非常複雜的,為什麼之前沒有相關的教程,就是因為太難了。

但是,這裡也有幾個可以改進的地方,你可以挑戰一下自己:

  • 之前提到過,

    go-libp2p

    是存在資料競争的Bug的,是以如果你要在生産環境使用,需要格外小心。一旦發現Bug,請回報給作者團隊知道
  • 嘗試将本文的P2P網絡跟之前的共識協定結合,例如之前的文章PoW 和PoS (PoS是中文譯文)
  • 添加持久化存儲。截止目前,為了簡化實作,我們沒有實作持久化存儲,是以節點關閉,資料就丢失了
  • 本文的代碼沒有在大量節點的環境下測試過,試着寫一個腳本運作大量節點,看看性能會怎麼變化。如果發現Bug記得給我們[送出]((https://github.com/mycoralhealth/blockchain-tutorial/tree/master/p2p)
  • 學習一下節點發現技術。新節點是怎麼發現已經存在的節點的?這篇文章是一個很好的起點

如果我寫的任何文章曾在你的心裡蕩起漣漪,那至少說明在逝去的歲月裡,我們在某一刻,共同經曆着一樣的技術探索之路。

有時候,雖然素未謀面,卻已相識很久,很微妙也很知足。

想學習區塊鍊技術,可以搜尋公衆号優優區塊鍊課堂或者添加公衆微信号uulesson

使用Go語言編寫區塊鍊P2P網絡(譯)

繼續閱讀