外文發表日期: 2018-04-14
外文連結:https://medium.com/coinmonks/code-a-simple-p2p-blockchain-in-go-46662601f417

在之前的文章中,我們已經知道了怎麼編寫PoW也知道了IPFS怎麼工作, 但是有一個緻命的缺點,我們的服務都是中心化的,這篇文章會教你怎麼實作一個簡單的完全去中心化的P2P網絡。
背景知識
什麼是P2P網絡
在真正的P2P架構中,不需要中心化的服務來維護區塊鍊的狀态。例如,當你給朋友發送比特币時,比特币區塊鍊的“狀态”應該更新,這樣你朋友的餘額就會增加,你的餘額就會減少。
在這個網絡中,不存在一個權力高度中心化的機構來維護狀态(銀行就是這樣的中心化機構)。對于比特币網絡來說,每個節點都會維護一份完整的區塊鍊狀态,當交易發生時,每個節點的區塊鍊狀态都會得到更新。這樣,隻要網絡中51%的節點對區塊鍊的狀态達成一緻,那麼區塊鍊網絡就是安全可靠的,具體可以閱讀這篇一緻性協定文章。
本文将繼續之前的工作,200行Go代碼實作區塊鍊, 并加入P2P網絡架構。在繼續之前,強烈建議你先閱讀該篇文章,它會幫助你了解接下來的代碼。
開始實作
編寫P2P網絡可不是開開玩笑就能簡單視線的,有很多邊邊角角的情況都要覆寫到,而且需要你擁有很多工程學的知識,這樣的P2P網絡才是可擴充、高可靠的。有句諺語說得好:站在巨人肩膀上做事,那麼我們先看看巨人們提供了哪些工具吧。
喔,看看,我們發現了什麼!一個用Go語言實作的P2P庫go-libp2p!如果你對新技術足夠敏銳,就會發現這個庫的作者和IPFS的作者是同一個團隊。如果你還沒看過我們的IPFS教程,可以看看這裡, 你可以選擇跳過IPFS教程,因為對于本文這不是必須的。
警告
目前來說,
go-libp2p
主要有兩個缺點:
1. 安裝設定比較痛苦,它使用gx作為包管理工具,怎麼說呢,不咋好用,但是湊活用吧
2. 目前項目還沒有成熟,正在緊密鑼鼓的開發中,當使用這個庫時,可能會遇到一些資料競争(data race)
對于第一點,不必擔心,有我們呢。第二點是比較大的問題,但是不會影響我們的代碼。假如你在使用過程中發現了資料競争問題,記得給項目提一個issue,幫助它更好的成長!
總之,目前開源世界中,現代化的P2P庫是非常非常少的,因為我們要多給
go-libp2p
一些耐心和包容,而且就目前來說,它已經能很好的滿足我們的目标了。
安裝設定
最好的環境設定方式是直接clone
libp2p
庫,然後在這個庫的代碼中直接開發。你也可以在自己的庫中,調用這個庫開發,但是這樣就需要用到
gx
了。這裡我們使用簡單的方式,假設你已經安裝了Go:
-
go get -d github.com/libp2p/go-libp2p/…
- 進入
go-libp2p
檔案夾
-
make
-
make deps
這裡會通過gx包管理工具下載下傳所有需要的包和依賴,再次申明,我們不喜歡gx,因為它打破了Go語言的很多慣例,但是為了這個很棒的庫,認慫吧。
這裡,我們在
examples
子目錄下進行開發,是以在
go-libp2p
的examples下建立一個你自己的目錄
-
mkdir ./examples/p2p
然後進入到p2p檔案夾下,建立
main.go
檔案,後面所有的代碼都會在該檔案中。
你的目錄結構是這樣的:
好了,勇士們,拔出你們的劍,哦不,拔出你們的
main.go
,開始我們的征途吧!
導入相關庫
這裡申明我們需要用的庫,大部分庫是來自于
go-libp2p
本身的,在教程中,你會學到怎麼去使用它們。
package main
import (
"bufio"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"log"
mrand "math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
golog "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
gologging "github.com/whyrusleeping/go-logging"
)
spew
包可以很友善、優美的列印出我們的區塊鍊,是以記得安裝它:
-
go get github.com/davecgh/go-spew/spew
區塊鍊結構
記住,請先閱讀200行Go代碼實作區塊鍊, 這樣,下面的部分就會簡單很多。
先來申明全局變量:
// Block represents each 'item' in the blockchain
type Block struct {
Index int
Timestamp string
BPM int
Hash string
PrevHash string
}
// Blockchain is a series of validated Blocks
var Blockchain []Block
var mutex = &sync.Mutex{}
- 我們是一家健康看護公司,是以Block中存着的是使用者的脈搏速率BPM
- Blockchain是我們的”狀态”,或者嚴格的說:最新的Blockchain,它其實就是Block的切片(slice)
- mutex是為了防止資源競争出現
下面是Blockchain相關的特定函數:
// make sure block is valid by checking index, and comparing the hash of the previous block
func isBlockValid(newBlock, oldBlock Block) bool {
if oldBlock.Index+1 != newBlock.Index {
return false
}
if oldBlock.Hash != newBlock.PrevHash {
return false
}
if calculateHash(newBlock) != newBlock.Hash {
return false
}
return true
}
// SHA256 hashing
func calculateHash(block Block) string {
record := strconv.Itoa(block.Index) + block.Timestamp + strconv.Itoa(block.BPM) + block.PrevHash
h := sha256.New()
h.Write([]byte(record))
hashed := h.Sum(nil)
return hex.EncodeToString(hashed)
}
// create a new block using previous block's hash
func generateBlock(oldBlock Block, BPM int) Block {
var newBlock Block
t := time.Now()
newBlock.Index = oldBlock.Index + 1
newBlock.Timestamp = t.String()
newBlock.BPM = BPM
newBlock.PrevHash = oldBlock.Hash
newBlock.Hash = calculateHash(newBlock)
return newBlock
}
-
檢查Block的hash是否合法isBlockValid
-
使用calculateHash
來對原始資料做hashsha256
-
建立一個新的Block區塊,然後添加到區塊鍊Blockchain上,同時會包含所需的事務generateBlock
P2P結構
下面我們快接近核心部分了,首先我們要寫出建立主機的邏輯。當一個節點運作我們的程式時,它可以作為一個主機,被其它節點連接配接。下面一起看看代碼:-)
// makeBasicHost creates a LibP2P host with a random peer ID listening on the
// given multiaddress. It will use secio if secio is true.
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {
// If the seed is zero, use real cryptographic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// Generate a key pair for this host. We will use it
// to obtain a valid host ID.
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
libp2p.Identity(priv),
}
if !secio {
opts = append(opts, libp2p.NoEncryption())
}
basicHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// Build host multiaddress
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
addr := basicHost.Addrs()[0]
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("I am %s\n", fullAddr)
if secio {
log.Printf("Now run \"go run main.go -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr)
} else {
log.Printf("Now run \"go run main.go -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
}
return basicHost, nil
}
makeBasicHost
函數有3個參數,同時傳回一個host結構體
-
listenPort
是主機監聽的端口,其它節點會連接配接該端口
-
secio
表明是否開啟資料流的安全選項,最好開啟,是以它代表了”安全輸入/輸出”
-
randSeed
是一個可選的指令行辨別,可以允許我們提供一個随機數種子來為我們的主機生成随機的位址。這裡我們不會使用
函數的第一個
if
語句針對随機種子生成随機key,接着我們生成公鑰和私鑰,這樣能保證主機是安全的。
opts
部分開始建構網絡位址部分,這樣其它節點就可以連接配接進來。
!secio
部分可以繞過加密,但是我們準備使用加密,是以這段代碼不會被觸發。
接着,建立了主機位址,這樣其他節點就可以連接配接進來。
log.Printf
可以用來在控制台列印出其它節點的連接配接資訊。最後我們傳回生成的主機位址給調用方函數。
流處理
之前的主機需要能處理進入的資料流。當另外一個節點連接配接到主機時,它會想要提出一個新的區塊鍊,來覆寫主機上的區塊鍊,是以我們需要邏輯來判定是否要接受新的區塊鍊。
同時,當我們往本地的區塊鍊添加區塊後,也要把相關資訊廣播給其它節點,這裡也需要實作相關邏輯。
先來建立流處理的基本架構吧:
func handleStream(s net.Stream) {
log.Println("Got a new stream!")
// Create a buffer stream for non blocking read and write.
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go readData(rw)
go writeData(rw)
// stream 's' will stay open until you close it (or the other side closes it).
}
這裡建立一個新的
ReadWriter
,為了能支援資料讀取和寫入,同時我們啟動了一個單獨的Go協程來處理相關讀寫邏輯。
讀取資料
首先建立
readData
函數:
func readData(rw *bufio.ReadWriter) {
for {
str, err := rw.ReadString('\n')
if err != nil {
log.Fatal(err)
}
if str == "" {
return
}
if str != "\n" {
chain := make([]Block, 0)
if err := json.Unmarshal([]byte(str), &chain); err != nil {
log.Fatal(err)
}
mutex.Lock()
if len(chain) > len(Blockchain) {
Blockchain = chain
bytes, err := json.MarshalIndent(Blockchain, "", " ")
if err != nil {
log.Fatal(err)
}
// Green console color: \x1b[32m
// Reset console color: \x1b[0m
fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
}
mutex.Unlock()
}
}
}
該函數是一個無限循環,因為它需要永不停歇的去讀取外面進來的資料。首先,我們使用
ReadString
解析從其它節點發送過來的新的區塊鍊(JSON字元串)。
然後檢查進來的區塊鍊的長度是否比我們本地的要長,如果進來的鍊更長,那麼我們就接受新的鍊為最新的網絡狀态(最新的區塊鍊)。
同時,把最新的區塊鍊在控制台使用一種特殊的顔色列印出來,這樣我們就知道有新連結受了。
如果在我們主機的本地添加了新的區塊到區塊鍊上,那就需要把本地最新的區塊鍊廣播給其它相連的節點知道,這樣這些節點機會接受并更新到我們的區塊鍊版本。這裡使用
writeData
函數:
func writeData(rw *bufio.ReadWriter) {
go func() {
for {
time.Sleep(5 * time.Second)
mutex.Lock()
bytes, err := json.Marshal(Blockchain)
if err != nil {
log.Println(err)
}
mutex.Unlock()
mutex.Lock()
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()
mutex.Unlock()
}
}()
stdReader := bufio.NewReader(os.Stdin)
for {
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
log.Fatal(err)
}
sendData = strings.Replace(sendData, "\n", "", -1)
bpm, err := strconv.Atoi(sendData)
if err != nil {
log.Fatal(err)
}
newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)
if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
mutex.Lock()
Blockchain = append(Blockchain, newBlock)
mutex.Unlock()
}
bytes, err := json.Marshal(Blockchain)
if err != nil {
log.Println(err)
}
spew.Dump(Blockchain)
mutex.Lock()
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()
mutex.Unlock()
}
}
首先是一個單獨協程中的函數,每5秒鐘會将我們的最新的區塊鍊狀态廣播給其它相連的節點。它們收到後,如果發現我們的區塊鍊比它們的要短,就會直接把我們發送的區塊鍊資訊丢棄,繼續使用它們的區塊鍊,反之則使用我們的區塊鍊。總之,無論哪種方法,所有的節點都會定期的同步本地的區塊鍊到最新狀态。
這裡我們需要一個方法來建立一個新的Block區塊,包含之前提到過的脈搏速率(BPM)。為了簡化實作,我們不會真的去通過物聯網裝置讀取脈搏,而是直接在終端控制台上輸入一個脈搏速率數字。
首先要驗證輸入的BPM是一個整數類型,然後使用之前的
generateBlock
來生成區塊,接着使用
spew.Dump
輸入到終端控制台,最後我們使用
rw.WriteString
把最新的區塊鍊廣播給相連的其它節點。
牛逼了我的哥,現在我們完成了區塊鍊相關的函數以及大多數P2P相關的函數。在前面,我們建立了流處理,是以可以讀取和寫入最新的區塊鍊狀态;建立了狀态同步函數,這樣節點之間可以互相同步最新狀态。
剩下的就是實作我們的
main
函數了:
func main() {
t := time.Now()
genesisBlock := Block{}
genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}
Blockchain = append(Blockchain, genesisBlock)
// LibP2P code uses golog to log messages. They log with different
// string IDs (i.e. "swarm"). We can control the verbosity level for
// all loggers with:
golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info
// Parse options from the command line
listenF := flag.Int("l", 0, "wait for incoming connections")
target := flag.String("d", "", "target peer to dial")
secio := flag.Bool("secio", false, "enable secio")
seed := flag.Int64("seed", 0, "set random seed for id generation")
flag.Parse()
if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l")
}
// Make a host that listens on the given multiaddress
ha, err := makeBasicHost(*listenF, *secio, *seed)
if err != nil {
log.Fatal(err)
}
if *target == "" {
log.Println("listening for connections")
// Set a stream handler on host A. /p2p/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
select {} // hang forever
/**** This is where the listener code ends ****/
} else {
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
// The following code extracts target's peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
log.Fatalln(err)
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// We have a peer ID and a targetAddr so we add it to the peerstore
// so LibP2P knows how to contact it
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
log.Println("opening stream")
// make a new stream from host B to host A
// it should be handled on host A by the handler we set above because
// we use the same /p2p/1.0.0 protocol
s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
if err != nil {
log.Fatalln(err)
}
// Create a buffered stream so that read and writes are non blocking.
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// Create a thread to read and write data.
go writeData(rw)
go readData(rw)
select {} // hang forever
}
}
首先是建立一個創世區塊(如果你讀了200行Go代碼實作你的區塊鍊,這裡就不會陌生)。
其次我們使用
go-libp2p
的
SetAllLoggers
日志函數來記錄日志。
接着,設定了所有的指令行辨別:
-
之前有提到,是用來加密資料流的。在我們的程式中,一定要打開該辨別secio
-
指明目前節點要連接配接到的主機位址target
-
是目前節點的監聽主機位址,這樣其它節點就可以連接配接進來,記住,每個節點都有兩個身份:主機和用戶端, 畢竟P2P不是白叫的listenF
-
是随機數種子,用來建立主機位址時使用seed
然後,使用
makeBasicHost
函數來建立一個新的主機位址,如果我們隻想做主機不想做用戶端(連接配接其它的主機),就使用
if *target == “”
。
接下來的幾行,會從
target
解析出我們要連接配接到的主機位址。然後把
peerID
和主機目标位址
targetAddr
添加到”store”中,這樣就可以持續跟蹤我們跟其它主機的連接配接資訊,這裡使用的是
ha.Peerstore().AddAddr
函數。
接着我們使用
ha.NewStream
連接配接到想要連接配接的節點上,同時為了能接收和發送最新的區塊鍊資訊,建立了
ReadWriter
,同時使用一個Go協程來進行
readData
和
writeData
。
哇哦
終于完成了,寫文章遠比寫代碼累!我知道之前的内容有點難,但是相比P2P的複雜性來說,你能通過一個庫來完成P2P網絡,已經很牛逼了,是以繼續加油!
完整代碼
mycoralhealth/blockchain-tutorial
運作結果
現在讓我們來試驗一下,首先打開3個獨立的終端視窗做為獨立節點。
開始之前,請再次進入
go-libp2p
的根目錄運作一下
make deps
,確定所有依賴都正常安裝。
回到你的工作目錄
examples/p2p
,打開第一個終端視窗,輸入
go run main.go -l 10000 -secio
細心的讀者會發現有一段話”Now run…”,那還等啥,繼續跟着做吧,打開第二個終端視窗運作:
go run main.go -l 10001 -d <given address in the instructions> -secio
這是你會發現第一個終端視窗檢測到了新連接配接!
接着打開第三個終端視窗,運作:
go run main.go -l 10002 -d <given address in the instructions> -secio
檢查第二終端,又發現了新連接配接
接着,該我們輸入BPM資料了,在第一個終端視窗中輸入”70”,等幾秒中,觀察各個視窗的列印輸出。
來看看發生了什麼:
- 終端1向本地的區塊鍊添加了一個新的區塊Block
- 終端1向終端2廣播該資訊
- 終端2将新的區塊鍊跟本地的對比,發現終端1的更長,是以使用新的區塊鍊替代了本地的區塊鍊,然後将新的區塊鍊廣播給終端3
- 同上,終端3也進行更新
所有的3個終端節點都把區塊鍊更新到了最新版本,同時沒有使用任何外部的中心化服務,這就是P2P網絡的力量!
我們再往終端2的區塊鍊中添加一個區塊試試看,在終端2中輸入”80”
結果忠誠的記錄了我們的正确性,再一次歡呼吧!
下一步
先享受一下自己的工作,你剛用了區區幾百行代碼就實作了一個全功能的P2P網絡!這不是開玩笑,P2P程式設計時非常複雜的,為什麼之前沒有相關的教程,就是因為太難了。
但是,這裡也有幾個可以改進的地方,你可以挑戰一下自己:
- 之前提到過,
是存在資料競争的Bug的,是以如果你要在生産環境使用,需要格外小心。一旦發現Bug,請回報給作者團隊知道go-libp2p
- 嘗試将本文的P2P網絡跟之前的共識協定結合,例如之前的文章PoW 和PoS (PoS是中文譯文)
- 添加持久化存儲。截止目前,為了簡化實作,我們沒有實作持久化存儲,是以節點關閉,資料就丢失了
- 本文的代碼沒有在大量節點的環境下測試過,試着寫一個腳本運作大量節點,看看性能會怎麼變化。如果發現Bug記得給我們[送出]((https://github.com/mycoralhealth/blockchain-tutorial/tree/master/p2p)
- 學習一下節點發現技術。新節點是怎麼發現已經存在的節點的?這篇文章是一個很好的起點
如果我寫的任何文章曾在你的心裡蕩起漣漪,那至少說明在逝去的歲月裡,我們在某一刻,共同經曆着一樣的技術探索之路。
有時候,雖然素未謀面,卻已相識很久,很微妙也很知足。
想學習區塊鍊技術,可以搜尋公衆号優優區塊鍊課堂或者添加公衆微信号uulesson