tab.bondmu.Lock()
delete(tab.bonding, id)
tab.bondmu.Unlock()
}
// Retrieve the bonding results
result = w.err
if result == nil {
node = w.n
}
if node != nil {
// Add the node to the table even if the bonding ping/pong
// fails. It will be relaced quickly if it continues to be
// unresponsive.
//這個方法比較重要。 如果對應的bucket有空間,會直接插入buckets。如果buckets滿了。 會用ping操作來測試buckets中的節點試圖騰出空間。
tab.add(node)
tab.db.updateFindFails(id, 0)
return node, result
}
pingpong方法
func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
// Request a bonding slot to limit network usage
<-tab.bondslots
defer func() { tab.bondslots <- struct{}{} }()
// Ping the remote side and wait for a pong.
// Ping遠端節點。并等待一個pong消息
if w.err = tab.ping(id, addr); w.err != nil {
close(w.done)
return
//這個在udp收到一個ping消息的時候被設定為真。這個時候我們已經收到對方的ping消息了。
//那麼我們就不同等待ping消息了。 否則需要等待對方發送過來的ping消息(我們主動發起ping消息)。
if !pinged {
// Give the remote node a chance to ping us before we start
// sending findnode requests. If they still remember us,
// waitping will simply time out.
tab.net.waitping(id)
// Bonding succeeded, update the node database.
// 完成bond過程。 把節點插入資料庫。 資料庫操作在這裡完成。 bucket的操作在tab.add裡面完成。 buckets是記憶體的操作。 資料庫是持久化的seeds節點。用來加速啟動過程的。
w.n = NewNode(id, addr.IP, uint16(addr.Port), tcpPort)
tab.db.updateNode(w.n)
close(w.done)
tab.add方法
// add attempts to add the given node its corresponding bucket. If the
// bucket has space available, adding the node succeeds immediately.
// Otherwise, the node is added if the least recently active node in
// the bucket does not respond to a ping packet.
// add試圖把給定的節點插入對應的bucket。 如果bucket有空間,那麼直接插入。 否則,如果bucket中最近活動的節點沒有響應ping操作,那麼我們就使用這個節點替換它。
// The caller must not hold tab.mutex.
func (tab *Table) add(new *Node) {
b := tab.buckets[logdist(tab.self.sha, new.sha)]
tab.mutex.Lock()
defer tab.mutex.Unlock()
if b.bump(new) { //如果節點存在。那麼更新它的值。然後退出。
var oldest *Node
if len(b.entries) == bucketSize {
oldest = b.entries[bucketSize-1]
if oldest.contested {
// The node is already being replaced, don't attempt
// to replace it.
// 如果别的goroutine正在對這個節點進行測試。 那麼取消替換, 直接退出。
// 因為ping的時間比較長。是以這段時間是沒有加鎖的。 用了contested這個狀态來辨別這種情況。
return
oldest.contested = true
// Let go of the mutex so other goroutines can access
// the table while we ping the least recently active node.
tab.mutex.Unlock()
err := tab.ping(oldest.ID, oldest.addr())
tab.mutex.Lock()
oldest.contested = false
if err == nil {
// The node responded, don't replace it.
added := b.replace(new, oldest)
if added && tab.nodeAddedHook != nil {
tab.nodeAddedHook(new)
stuff方法比較簡單。 找到對應節點應該插入的bucket。 如果這個bucket沒有滿,那麼就插入這個bucket。否則什麼也不做。 需要說一下的是logdist()這個方法。這個方法對兩個值進行按照位置異或,然後傳回最高位的下标。 比如 logdist(101,010) = 3 logdist(100, 100) = 0 logdist(100,110) = 2
// stuff adds nodes the table to the end of their corresponding bucket
// if the bucket is not full. The caller must hold tab.mutex.
func (tab *Table) stuff(nodes []*Node) {
outer:
for _, n := range nodes {
if n.ID == tab.self.ID {
continue // don't add self
bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == n.ID {
continue outer // already in bucket
}
if len(bucket.entries) < bucketSize {
bucket.entries = append(bucket.entries, n)
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
在看看之前的Lookup函數。 這個函數用來查詢一個指定節點的資訊。 這個函數首先從本地拿到距離這個節點最近的所有16個節點。 然後給所有的節點發送findnode的請求。 然後對傳回的界定進行bondall處理。 然後傳回所有的節點。
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true
不會詢問我們自己
for {
// generate initial result set
result = tab.closest(target, bucketSize)
//求取和target最近的16個節點
if len(result.entries) > 0 || !refreshIfEmpty {
break
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
// ask the alpha closest nodes that we haven't asked yet
// 這裡會并發的查詢,每次3個goroutine并發(通過pendingQueries參數進行控制)
// 每次疊代會查詢result中和target距離最近的三個節點。
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] { //如果沒有查詢過 //因為這個result.entries會被重複循環很多次。 是以用這個變量控制那些已經處理過了。
asked[n.ID] = true
pendingQueries++
go func() {
// Find potential neighbors to bond with
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
}
reply <- tab.bondall(r)
}()
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] { //因為不同的遠方節點可能傳回相同的節點。所有用seen[]來做排重。
seen[n.ID] = true
//這個地方需要注意的是, 查找出來的結果又會加入result這個隊列。也就是說這是一個循環查找的過程, 隻要result裡面不斷加入新的節點。這個循環就不會終止。
result.push(n, bucketSize)
pendingQueries--
return result.entries
// closest returns the n nodes in the table that are closest to the
// given id. The caller must hold tab.mutex.
func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
// This is a very wasteful way to find the closest nodes but
// obviously correct. I believe that tree-based buckets would make
// this easier to implement efficiently.
close := &nodesByDistance{target: target}
for _, b := range tab.buckets {
for _, n := range b.entries {
close.push(n, nresults)
return close
result.push方法,這個方法會根據 所有的節點對于target的距離進行排序。 按照從近到遠的方式決定新節點的插入順序。(隊列中最大會包含16個元素)。 這樣會導緻隊列裡面的元素和target的距離越來越近。距離相對遠的會被踢出隊列。
// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
entries []*Node
target common.Hash
// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
### table.go 導出的一些方法
Resolve方法和Lookup方法
// Resolve searches for a specific node with the given ID.
// It returns nil if the node could not be found.
//Resolve方法用來擷取一個指定ID的節點。 如果節點在本地。那麼傳回本地節點。 否則執行
//Lookup在網絡上查詢一次。 如果查詢到節點。那麼傳回。否則傳回nil
func (tab *Table) Resolve(targetID NodeID) *Node {
// If the node is present in the local table, no
// network interaction is required.
hash := crypto.Keccak256Hash(targetID[:])
cl := tab.closest(hash, 1)
tab.mutex.Unlock()
if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
return cl.entries[0]
// Otherwise, do a network lookup.
result := tab.Lookup(targetID)
for _, n := range result {
if n.ID == targetID {
return n
return nil
// Lookup performs a network search for nodes close
// to the given target. It approaches the target by querying
// nodes that are closer to it on each iteration.
// The given target does not need to be an actual node
// identifier.
func (tab *Table) Lookup(targetID NodeID) []*Node {
return tab.lookup(targetID, true)
SetFallbackNodes方法,這個方法設定初始化的聯系節點。 在table是空而且資料庫裡面也沒有已知的節點,這些節點可以幫助連接配接上網絡,
// SetFallbackNodes sets the initial points of contact. These nodes
// are used to connect to the network if the table is empty and there
// are no known nodes in the database.
func (tab *Table) SetFallbackNodes(nodes []*Node) error {
if err := n.validateComplete(); err != nil {
return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err)
tab.nursery = make([]*Node, 0, len(nodes))
cpy := *n
// Recompute cpy.sha because the node might not have been
// created by NewNode or ParseNode.
cpy.sha = crypto.Keccak256Hash(n.ID[:])
tab.nursery = append(tab.nursery, &cpy)
tab.refresh()
### 總結
這樣, p2p網絡的Kademlia協定就完結了。 基本上是按照論文進行實作。 udp進行網絡通信。資料庫存儲連結過的節點。 table實作了Kademlia的核心。 根據異或距離來進行節點的查找。 節點的發現和更新等流程。