天天看點

以太坊源碼分析(49)p2p-table.go源碼分析

                tab.bondmu.Lock()

                delete(tab.bonding, id)

                tab.bondmu.Unlock()

            }

            // Retrieve the bonding results

            result = w.err

            if result == nil {

                node = w.n

        }

        if node != nil {

            // Add the node to the table even if the bonding ping/pong

            // fails. It will be relaced quickly if it continues to be

            // unresponsive.

            //這個方法比較重要。 如果對應的bucket有空間,會直接插入buckets。如果buckets滿了。 會用ping操作來測試buckets中的節點試圖騰出空間。

            tab.add(node)

            tab.db.updateFindFails(id, 0)

        return node, result

    }

pingpong方法

    func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {

        // Request a bonding slot to limit network usage

        <-tab.bondslots

        defer func() { tab.bondslots <- struct{}{} }()

        // Ping the remote side and wait for a pong.

        // Ping遠端節點。并等待一個pong消息

        if w.err = tab.ping(id, addr); w.err != nil {

            close(w.done)

            return

        //這個在udp收到一個ping消息的時候被設定為真。這個時候我們已經收到對方的ping消息了。

        //那麼我們就不同等待ping消息了。 否則需要等待對方發送過來的ping消息(我們主動發起ping消息)。

        if !pinged {

            // Give the remote node a chance to ping us before we start

            // sending findnode requests. If they still remember us,

            // waitping will simply time out.

            tab.net.waitping(id)

        // Bonding succeeded, update the node database.

        // 完成bond過程。 把節點插入資料庫。 資料庫操作在這裡完成。 bucket的操作在tab.add裡面完成。 buckets是記憶體的操作。 資料庫是持久化的seeds節點。用來加速啟動過程的。

        w.n = NewNode(id, addr.IP, uint16(addr.Port), tcpPort)

        tab.db.updateNode(w.n)

        close(w.done)

tab.add方法

    // add attempts to add the given node its corresponding bucket. If the

    // bucket has space available, adding the node succeeds immediately.

    // Otherwise, the node is added if the least recently active node in

    // the bucket does not respond to a ping packet.

    // add試圖把給定的節點插入對應的bucket。 如果bucket有空間,那麼直接插入。 否則,如果bucket中最近活動的節點沒有響應ping操作,那麼我們就使用這個節點替換它。

    // The caller must not hold tab.mutex.

    func (tab *Table) add(new *Node) {

        b := tab.buckets[logdist(tab.self.sha, new.sha)]

        tab.mutex.Lock()

        defer tab.mutex.Unlock()

        if b.bump(new) { //如果節點存在。那麼更新它的值。然後退出。

        var oldest *Node

        if len(b.entries) == bucketSize {

            oldest = b.entries[bucketSize-1]

            if oldest.contested {

                // The node is already being replaced, don't attempt

                // to replace it.

                // 如果别的goroutine正在對這個節點進行測試。 那麼取消替換, 直接退出。

                // 因為ping的時間比較長。是以這段時間是沒有加鎖的。 用了contested這個狀态來辨別這種情況。

                return

            oldest.contested = true

            // Let go of the mutex so other goroutines can access

            // the table while we ping the least recently active node.

            tab.mutex.Unlock()

            err := tab.ping(oldest.ID, oldest.addr())

            tab.mutex.Lock()

            oldest.contested = false

            if err == nil {

                // The node responded, don't replace it.

        added := b.replace(new, oldest)

        if added && tab.nodeAddedHook != nil {

            tab.nodeAddedHook(new)

stuff方法比較簡單。 找到對應節點應該插入的bucket。 如果這個bucket沒有滿,那麼就插入這個bucket。否則什麼也不做。 需要說一下的是logdist()這個方法。這個方法對兩個值進行按照位置異或,然後傳回最高位的下标。 比如 logdist(101,010) = 3 logdist(100, 100) = 0 logdist(100,110) = 2

    // stuff adds nodes the table to the end of their corresponding bucket

    // if the bucket is not full. The caller must hold tab.mutex.

    func (tab *Table) stuff(nodes []*Node) {

    outer:

        for _, n := range nodes {

            if n.ID == tab.self.ID {

                continue // don't add self

            bucket := tab.buckets[logdist(tab.self.sha, n.sha)]

            for i := range bucket.entries {

                if bucket.entries[i].ID == n.ID {

                    continue outer // already in bucket

                }

            if len(bucket.entries) < bucketSize {

                bucket.entries = append(bucket.entries, n)

                if tab.nodeAddedHook != nil {

                    tab.nodeAddedHook(n)

在看看之前的Lookup函數。 這個函數用來查詢一個指定節點的資訊。 這個函數首先從本地拿到距離這個節點最近的所有16個節點。 然後給所有的節點發送findnode的請求。 然後對傳回的界定進行bondall處理。 然後傳回所有的節點。

    func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {

        var (

            target = crypto.Keccak256Hash(targetID[:])

            asked = make(map[NodeID]bool)

            seen = make(map[NodeID]bool)

            reply = make(chan []*Node, alpha)

            pendingQueries = 0

            result *nodesByDistance

        )

        // don't query further if we hit ourself.

        // unlikely to happen often in practice.

        asked[tab.self.ID] = true

        不會詢問我們自己

        for {

            // generate initial result set

            result = tab.closest(target, bucketSize)

            //求取和target最近的16個節點

            if len(result.entries) > 0 || !refreshIfEmpty {

                break

            // The result set is empty, all nodes were dropped, refresh.

            // We actually wait for the refresh to complete here. The very

            // first query will hit this case and run the bootstrapping

            // logic.

            <-tab.refresh()

            refreshIfEmpty = false

            // ask the alpha closest nodes that we haven't asked yet

            // 這裡會并發的查詢,每次3個goroutine并發(通過pendingQueries參數進行控制)

            // 每次疊代會查詢result中和target距離最近的三個節點。

            for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {

                n := result.entries[i]

                if !asked[n.ID] { //如果沒有查詢過 //因為這個result.entries會被重複循環很多次。 是以用這個變量控制那些已經處理過了。

                    asked[n.ID] = true

                    pendingQueries++

                    go func() {

                        // Find potential neighbors to bond with

                        r, err := tab.net.findnode(n.ID, n.addr(), targetID)

                        if err != nil {

                            // Bump the failure counter to detect and evacuate non-bonded entries

                            fails := tab.db.findFails(n.ID) + 1

                            tab.db.updateFindFails(n.ID, fails)

                            log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                            if fails >= maxFindnodeFailures {

                                log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)

                                tab.delete(n)

                            }

                        }

                        reply <- tab.bondall(r)

                    }()

            if pendingQueries == 0 {

                // we have asked all closest nodes, stop the search

            // wait for the next reply

            for _, n := range <-reply {

                if n != nil && !seen[n.ID] { //因為不同的遠方節點可能傳回相同的節點。所有用seen[]來做排重。

                    seen[n.ID] = true

                    //這個地方需要注意的是, 查找出來的結果又會加入result這個隊列。也就是說這是一個循環查找的過程, 隻要result裡面不斷加入新的節點。這個循環就不會終止。

                    result.push(n, bucketSize)

            pendingQueries--

        return result.entries

    // closest returns the n nodes in the table that are closest to the

    // given id. The caller must hold tab.mutex.

    func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {

        // This is a very wasteful way to find the closest nodes but

        // obviously correct. I believe that tree-based buckets would make

        // this easier to implement efficiently.

        close := &nodesByDistance{target: target}

        for _, b := range tab.buckets {

            for _, n := range b.entries {

                close.push(n, nresults)

        return close

result.push方法,這個方法會根據 所有的節點對于target的距離進行排序。 按照從近到遠的方式決定新節點的插入順序。(隊列中最大會包含16個元素)。 這樣會導緻隊列裡面的元素和target的距離越來越近。距離相對遠的會被踢出隊列。

    // nodesByDistance is a list of nodes, ordered by

    // distance to target.

    type nodesByDistance struct {

        entries []*Node

        target common.Hash

    // push adds the given node to the list, keeping the total size below maxElems.

    func (h *nodesByDistance) push(n *Node, maxElems int) {

        ix := sort.Search(len(h.entries), func(i int) bool {

            return distcmp(h.target, h.entries[i].sha, n.sha) > 0

        })

        if len(h.entries) < maxElems {

            h.entries = append(h.entries, n)

        if ix == len(h.entries) {

            // farther away than all nodes we already have.

            // if there was room for it, the node is now the last element.

        } else {

            // slide existing entries down to make room

            // this will overwrite the entry we just appended.

            copy(h.entries[ix+1:], h.entries[ix:])

            h.entries[ix] = n

### table.go 導出的一些方法

Resolve方法和Lookup方法

    // Resolve searches for a specific node with the given ID.

    // It returns nil if the node could not be found.

    //Resolve方法用來擷取一個指定ID的節點。 如果節點在本地。那麼傳回本地節點。 否則執行

    //Lookup在網絡上查詢一次。 如果查詢到節點。那麼傳回。否則傳回nil

    func (tab *Table) Resolve(targetID NodeID) *Node {

        // If the node is present in the local table, no

        // network interaction is required.

        hash := crypto.Keccak256Hash(targetID[:])

        cl := tab.closest(hash, 1)

        tab.mutex.Unlock()

        if len(cl.entries) > 0 && cl.entries[0].ID == targetID {

            return cl.entries[0]

        // Otherwise, do a network lookup.

        result := tab.Lookup(targetID)

        for _, n := range result {

            if n.ID == targetID {

                return n

        return nil

    // Lookup performs a network search for nodes close

    // to the given target. It approaches the target by querying

    // nodes that are closer to it on each iteration.

    // The given target does not need to be an actual node

    // identifier.

    func (tab *Table) Lookup(targetID NodeID) []*Node {

        return tab.lookup(targetID, true)

SetFallbackNodes方法,這個方法設定初始化的聯系節點。 在table是空而且資料庫裡面也沒有已知的節點,這些節點可以幫助連接配接上網絡,

    // SetFallbackNodes sets the initial points of contact. These nodes

    // are used to connect to the network if the table is empty and there

    // are no known nodes in the database.

    func (tab *Table) SetFallbackNodes(nodes []*Node) error {

            if err := n.validateComplete(); err != nil {

                return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err)

        tab.nursery = make([]*Node, 0, len(nodes))

            cpy := *n

            // Recompute cpy.sha because the node might not have been

            // created by NewNode or ParseNode.

            cpy.sha = crypto.Keccak256Hash(n.ID[:])

            tab.nursery = append(tab.nursery, &cpy)

        tab.refresh()

### 總結

這樣, p2p網絡的Kademlia協定就完結了。 基本上是按照論文進行實作。 udp進行網絡通信。資料庫存儲連結過的節點。 table實作了Kademlia的核心。 根據異或距離來進行節點的查找。 節點的發現和更新等流程。

繼續閱讀