以太坊p2p原理與實作
區塊鍊技術的去中心依賴于底層組網技術,以太坊的底層實作了p2pServer,大約可以分為這樣三層。
- 底層路由表。封裝了kad路由,節點的資料結構以及計算記錄,節點搜尋,驗證等功能。
- 中層peer抽象,message開放發送接口,server對外提供peer檢測,初始化,事件訂閱,peer狀态查詢,啟動,停止等功能
- 以太坊最上層peer,peerset再封裝,通過協定的Run函數,在中層啟動peer時,擷取peer,最終通過一個循環截取穩定peer,包裝在peerset中使用。
底層路由表
這裡簡化問題僅讨論Node Discovery Protocol。 這一層維護了一個buckets桶,總共有17個桶,每個桶有16個節點和10個替換節點。 Node放入時先要計算hash和localNode的距離。再按距離選擇一個桶放進去,取的時候逐個計算target和每個桶中對象的舉例,詳細參考closest函數,後面會貼出來。
距離公式滿足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n為相同節點數量 map為一個負相關的映射關系。
簡單來說就是相似越多,值越小。細節參考Node.go的logdist函數。 這裡需要了解算法Kademlia,
.
├── database.go //封裝node資料庫相關操作
├── node.go //節點資料結構
├── ntp.go //同步時間
├── table.go //路由表
├── udp.go //網絡相關操作
其中最重要的就是table對象,table公共方法有:
- newTable 執行個體建立
- Self local節點擷取
- ReadRandomNodes 随機讀取幾個節點
- Close 關閉
- Resolve 在周邊查找某個節點
- Lookup 查找某個節點的鄰近節點
逐個來分析這些方法:
newTable
- 1:生成對象執行個體(擷取資料庫用戶端,LocalNode etc)
// If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath, Version, ourID)
if err != nil {
return nil, err
}
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
bonding: make(map[NodeID]*bondproc),
bondslots: make(chan struct{}, maxBondingPingPongs),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
- 2:載入引導節點,初始化k桶。
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := 0; i < cap(tab.bondslots); i++ {
tab.bondslots <- struct{}{}
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
- 3:将節點放入到桶裡,生成一條協程用于重新整理,驗證節點。
tab.seedRand()
tab.loadSeedNodes(false) //載入種子節點
// Start the background expiration goroutine after loading seeds so that the search for
// seed nodes also considers older nodes that would otherwise be removed by the
// expiration.
tab.db.ensureExpirer()
go tab.loop()
載入種子節點
func (tab *Table) loadSeedNodes(bond bool) {
seeds := tab.db.querySeeds(seedCount, seedMaxAge)
//資料庫中的種子節點和引導節點合并
seeds = append(seeds, tab.nursery...)
if bond {
seeds = tab.bondall(seeds) //節點驗證
}
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
tab.add(seed) //節點入桶
}
}
節點入桶,同時也要檢查ip等限制。
func (tab *Table) add(new *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(new.sha) //擷取目前節點對應的桶
if !tab.bumpOrAdd(b, new) {
// Node is not in table. Add it to the replacement list.
tab.addReplacement(b, new)
}
}
桶的選擇
func (tab *Table) bucket(sha common.Hash) *bucket {
d := logdist(tab.self.sha, sha) //計算hash舉例
if d <= bucketMinDistance {
//這裡按算法來看,隻要hash前三位相等就會到第一個buckets
return tab.buckets[0]
}
return tab.buckets[d-bucketMinDistance-1]
}
Resolve
根據Node的Id查找Node,先在目前的桶裡面查找,查找一遍之後沒找到就在周邊的節點裡面搜尋一遍再找。
// Resolve searches for a specific node with the given ID.
// It returns nil if the node could not be found.
func (tab *Table) Resolve(targetID NodeID) *Node {
// If the node is present in the local table, no
// network interaction is required.
hash := crypto.Keccak256Hash(targetID[:])
tab.mutex.Lock()
//查找最近節點
cl := tab.closest(hash, 1)
tab.mutex.Unlock()
if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
return cl.entries[0]
}
// Otherwise, do a network lookup.
//不存在 搜尋鄰居節點
result := tab.Lookup(targetID)
for _, n := range result {
if n.ID == targetID {
return n
}
}
return nil
}
這裡需要了解的函數是 closest,周遊所有桶的所有節點,查找最近的一個
// closest returns the n nodes in the table that are closest to the
// given id. The caller must hold tab.mutex.
func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
// This is a very wasteful way to find the closest nodes but
// obviously correct. I believe that tree-based buckets would make
// this easier to implement efficiently.
close := &nodesByDistance{target: target}
for _, b := range tab.buckets {
for _, n := range b.entries {
close.push(n, nresults)
}
}
return close
}
func (h *nodesByDistance) push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
//近的靠前邊
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}
ReadRandomNodes
整體思路是先拷貝出來,再逐個桶的抽最上面的一個,剩下空桶移除,剩下的桶合并後,下一輪再抽桶的第一個節點,直到填滿給定資料或者桶全部空掉。最後傳回填到數組裡面的數量。
// ReadRandomNodes fills the given slice with random nodes from the
// table. It will not write the same node more than once. The nodes in
// the slice are copies and can be modified by the caller.
func (tab *Table) ReadRandomNodes(buf []*Node) (n int) {
if !tab.isInitDone() {
return 0
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
// Find all non-empty buckets and get a fresh slice of their entries.
var buckets [][]*Node
//拷貝節點
for _, b := range tab.buckets {
if len(b.entries) > 0 {
buckets = append(buckets, b.entries[:])
}
}
if len(buckets) == 0 {
return 0
}
// Shuffle the buckets.
for i := len(buckets) - 1; i > 0; i-- {
j := tab.rand.Intn(len(buckets))
buckets[i], buckets[j] = buckets[j], buckets[i]
}
// Move head of each bucket into buf, removing buckets that become empty.
var i, j int
for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
b := buckets[j]
buf[i] = &(*b[0]) //取第一個節點
buckets[j] = b[1:] //移除第一個
if len(b) == 1 {
//空桶移除
buckets = append(buckets[:j], buckets[j+1:]...)
}
if len(buckets) == 0 {
break
}
}
return i + 1
}
Lookup
lookup會要求已知節點查找鄰居節點,查找的鄰居節點又遞歸的找它周邊的節點
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
go func() {
// Find potential neighbors to bond with
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
}
reply <- tab.bondall(r)
}()
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply { //此處會阻塞請求
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
桶的維護
桶初始化完成後會進入一個循環邏輯,其中通過三個timer控制調整周期。
- 驗證timer 間隔 10s左右
- 重新整理timer 間隔 30 min
- 持久化timer 間隔 30s
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
重新整理邏輯:重新加載種子節點,查找周邊節點,随機三個節點,并查找這三個節點的周圍節點。
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
tab.loadSeedNodes(true)
tab.lookup(tab.self.ID, false)
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}
驗證邏輯:驗證每個桶的最末尾節點,如果該節點通過驗證則放到隊首(驗證過程是本地節點向它發送ping請求,如果回應pong則通過)
last, bi := tab.nodeToRevalidate() //取最後一個節點
if last == nil {
// No non-empty bucket found.
return
}
// Ping the selected node and wait for a pong.
err := tab.ping(last.ID, last.addr()) //通信驗證
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
log.Debug("Revalidated node", "b", bi, "id", last.ID)
b.bump(last) //提到隊首
return
}
Peer/Server
相關檔案
.
├── dial.go //封裝一個任務生成處理結構以及三種任務結構中(此處命名不太精确)
├── message.go //定義一些資料的讀寫接口,以及對外的Send/SendItem函數
├── peer.go //封裝了Peer 包括消息讀取
├── rlpx.go //内部的握手協定
├── server.go //初始化,維護Peer網絡,還有一些對外的接口
這一層會不斷的從路由中提取節點,提取出來的節點要經過身份驗證,協定檢查之後加入到peer裡面,緊接着如果沒有人使用這個peer,這個peer就會被删除,再重新選擇一些節點出來繼續這個流程,peer再其中是随生随銷,這樣做是為了平均的使用所有的節點,而不是僅僅依賴于特定的幾個節點。因而這裡從Server開始入手分析整個流程
Peers() //peer對象
PeerCount() //peer數量
AddPeer(node *discover.Node) //添加節點
RemovePeer(node *discover.Node) //删除節點
SubscribeEvents(ch chan *PeerEvent) //訂閱内部的事件(節點的增加,删除)
//以上四個屬于對外的接口,不影響内部邏輯
Start() //server開始工作
SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) //啟動一個連接配接,經過兩次驗證之後,如果通過則加入到peer之中。
Start初始化
Start做了三件事,生成路由表于建立底層網絡。生成DialState用于驅動維護本地peer的更新與死亡,監聽本地接口用于資訊應答。這裡主要分析peer的維護過程。函數是run函數。
func (srv *Server) Start() (err error) {
//**************初始化代碼省略
if !srv.NoDiscovery && srv.DiscoveryV5 {
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}
// node table
if !srv.NoDiscovery {
//路由表生成
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
AnnounceAddr: realaddr,
NodeDBPath: srv.NodeDatabase,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
}
ntab, err := discover.ListenUDP(conn, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}
if srv.DiscoveryV5 {
//路由表生成
var (
ntab *discv5.Network
err error
)
if sconn != nil {
ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
} else {
ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
}
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
return err
}
srv.DiscV5 = ntab
}
dynPeers := srv.maxDialedConns()
//newDialState 對象生成,這個對象包含Peer的實際維護代碼
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
// handshake 協定加載
srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
// listen/dial
//監聽本地端口
if srv.ListenAddr != "" {
if err := srv.startListening(); err != nil {
return err
}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
//重要的一句,開個協程,在其中做peer的維護
go srv.run(dialer)
srv.running = true
return nil
}
run 開始peer的生成
該函數中定義了兩個隊列
runningTasks []task //正在執行的任務
queuedTasks []task //尚未執行的任務
定義了三個匿名函數
//從正在執行任務中删除任務
delTask := func(t task) {
for i := range runningTasks {
if runningTasks[i] == t {
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
//開始一批任務
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() {
t.Do(srv); taskdone <- t
}()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
//啟動開始一批任務再調用dialstate的newTasks函數生成一批任務,加載到任務隊列裡面
scheduleTasks := func() {
// Start from queue first.
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// Query dialer for new tasks and start as many as possible now.
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}
定義了一個循環,分不同的chanel執行對應的邏輯
for {
//排程開始找生成任務
scheduleTasks()
select {
case <-srv.quit://退出
break running
case n := <-srv.addstatic:
//增加一個節點 該節點最終會生成一個dialTask
//并在newTasks的時候加入到讀列
srv.log.Debug("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
//直接删除該節點 節點不再參與維護,很快就會死掉了
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
case op := <-srv.peerOp:
// Peers 和 PeerCount 兩個外部接口,隻是讀取peer資訊
op(peers)
srv.peerOpDone <- struct{}{}
case t := <-taskdone:
//task完成後會根據不同的任務類型進行相應的處理
srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
//身份驗證通過
if trusted[c.id] {
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
case <-srv.quit:
break running
}
case c := <-srv.addpeer:
//身份協定驗證通過 加入隊列
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p) //觸發事件 此處是最上層截取peer的位置,如果此物沒有外部影響,那麼這個peer很快就被銷毀了
peerAdd++
fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
select {
case c.cont <- err:
case <-srv.quit:
break running
}
case pd := <-srv.delpeer:
//移除peer
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
peerDel++
fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)
if pd.Inbound() {
inboundCount--
}
}
}
記住上面的代碼,再來逐個的看:
scheduleTasks
scheduleTasks排程生成任務,生成的任務中有一種dialTask的任務,該任務結構如下
type dialTask struct {
flags connFlag
dest *discover.Node
lastResolved time.Time
resolveDelay time.Duration
}
func (t *dialTask) Do(srv *Server) {
if t.dest.Incomplete() {
if !t.resolve(srv) {
return
}
}
err := t.dial(srv, t.dest) //此處會調用到setupConn函數
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
}
}
}
}
dial最終回調用到setupConn函數,函數隻保留重點的幾句,篇幅有點長了
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
//身份驗證碼 擷取裝置,辨別等資訊
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err !=
//此處會往chanel中添加連接配接對象,最終觸發循環中的posthandshake分支
err = srv.checkpoint(c, srv.posthandshake)
//協定驗證
phs, err := c.doProtoHandshake(srv.ourHandshake)
c.caps, c.name = phs.Caps, phs.Name
//此處會往chanel中添加連接配接對象 最終觸發循環中的addpeer分支
err = srv.checkpoint(c, srv.addpeer)
}
posthandshake 分支僅僅做了驗證,addpeer做的事情就比較多了,重要的就是執行runPeer函數
func (srv *Server) runPeer(p *Peer) {
// 廣播 peer add
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeAdd,
Peer: p.ID(),
})
// run the protocol
remoteRequested, err := p.run() //
// 廣播 peer drop
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeDrop,
Peer: p.ID(),
Error: err.Error(),
})
//移除peer
srv.delpeer <- peerDrop{p, err, remoteRequested}
}
func (p *Peer) run() (remoteRequested bool, err error) {
//*************
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
//*************
//這一句阻塞性確定了peer的存活
p.wg.Wait()
}
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
//其他的都是為這一句做準備的,在以太坊中p2p就是靠這一句對上層暴露peer對象
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
p.wg.Done()
}()
}
}
這樣就可以可理出一條思路 scheduleTasks執行生成dialTask任務 dialTask任務執行過程中逐個填充posthandshake,addPeer這兩個chanel。 addPeer執行時對上層暴露了Peer對象,完成後填充了delpeer,最後删除了Peer。
任務的生成
具體看代碼中的注釋
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
if s.start.IsZero() {
s.start = now
}
var newtasks []task
//這裡聲明了一個添加任務的函數
addDial := func(flag connFlag, n *discover.Node) bool {
if err := s.checkDial(n, peers); err != nil {
log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
return false
}
s.dialing[n.ID] = flag //排除掉已經再測試的
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
return true
}
// Compute number of dynamic dials necessary at this point.
needDynDials := s.maxDynDials //目前系統中最大連接配接數目
for _, p := range peers { //扣除已建立連結的peer
if p.rw.is(dynDialedConn) {
needDynDials--
}
}
for _, flag := range s.dialing { //扣除已建立連結的peer
if flag&dynDialedConn != 0 {
needDynDials--
}
}
//外部指令添加的節點 這種節點不占用needDynDials數目,
//是為了保證手動加的節點能夠起效
for id, t := range s.static {
err := s.checkDial(t.dest, peers)
switch err {
case errNotWhitelisted, errSelf:
log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
delete(s.static, t.dest.ID)
case nil:
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
}
// If we don't have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 &&
//檢查引導節點 因為引導節點比搜尋到的節點更大機率靠譜 因而比較靠前
now.Sub(s.start) > fallbackInterval {
bootnode := s.bootnodes[0]
s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
s.bootnodes = append(s.bootnodes, bootnode)
if addDial(dynDialedConn, bootnode) {
needDynDials--
}
}
//随機的從路由中抽取最大節點的二分之一
randomCandidates := needDynDials / 2
if randomCandidates > 0 {
n := s.ntab.ReadRandomNodes(s.randomNodes)
for i := 0; i < randomCandidates && i < n; i++ {
if addDial(dynDialedConn, s.randomNodes[i]) {
needDynDials--
}
}
}
// 從lookupbuf中抽取
i := 0
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
if addDial(dynDialedConn, s.lookupBuf[i]) {
needDynDials--
}
}
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
// 如果還是不夠,路由再去搜尋節點
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
s.lookupRunning = true
newtasks = append(newtasks, &discoverTask{})
}
// wait
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
t := &waitExpireTask{s.hist.min().exp.Sub(now)}
newtasks = append(newtasks, t)
}
return newtasks
}
消息發送
另一個是message中的Send,SendItem函數 實作了MsgWriter的對象都可以調用這個函數寫入,覺得這裡沒什麼必要,完全可以封裝到peer裡面去,不過它上層做廣播的時候确實是調用的這兩個函數。
func Send(w MsgWriter, msgcode uint64, data interface{}) error {
size, r, err := rlp.EncodeToReader(data)
if err != nil {
return err
}
return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
}
func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error {
return Send(w, msgcode, elems)
}
以太坊上層調用
Peer/PeerSet
檔案:go-ethereum/eth/peer.go
定義了兩個struct,Peer和PeerSet。Peer封裝了底層的p2p.Peer,內建了一些和業務相關的方法,比如SendTransactions,SendNewBlock等。PeerSet是Peer的集合
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
version int // Protocol version negotiated
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
td *big.Int
lock sync.RWMutex
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
}
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
}
Peer注冊/登出
檔案:go-ethereum/eth/handler.go manager.handle在檢查了peer後會把這個peer注冊到peerset中,表示此peer可用,發生錯誤後peerset登出該peer,傳回錯誤,最後再Server中銷毀。
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
//此處如果順利會進入for循環 如果失敗傳回錯誤我會銷毀掉這個peer
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
參考
源碼:
https://github.com/ethereum/go-ethereum/tree/master/p2pKademlia算法:
https://en.wikipedia.org/wiki/Kademlia轉自:(
魂祭心) https://my.oschina.net/hunjixin/blog/1803029
如果你希望高效的學習以太坊DApp開發,可以通路彙智網提供的最熱門線上互動教程:
1.
适合區塊鍊新手的以太坊DApp實戰入門教程2.
區塊鍊+IPFS+Node.js+MongoDB+Express去中心化以太坊電商應用開發實戰其他更多内容也可以通路
這個以太坊部落格