天天看點

Go語言—sync.Cond源碼分析

原文連結
Go語言—sync.Cond源碼分析

Cond的主要作用就是擷取鎖之後,wait()方法會等待一個通知,來進行下一步鎖釋放等操作,以此控制鎖合适釋放,釋放頻率,适用于在并發環境下goroutine的等待和通知。

針對Golang 1.9的sync.Cond,與Golang 1.10一樣。 源代碼位置:synccond.go。

結構體

type Cond struct {
    noCopy noCopy  // noCopy可以嵌入到結構中,在第一次使用後不可複制,使用go vet作為檢測使用

    // 根據需求初始化不同的鎖,如*Mutex 和 *RWMutex
    L Locker

    notify  notifyList  // 通知清單,調用Wait()方法的goroutine會被放入list中,每次喚醒,從這裡取出
    checker copyChecker // 複制檢查,檢查cond執行個體是否被複制
}
           

再來看看等待隊列notifyList結構體:

type notifyList struct {
    wait   uint32
    notify uint32
    lock   uintptr
    head   unsafe.Pointer
    tail   unsafe.Pointer
}
           

函數

NewCond

相當于Cond的構造函數,用于初始化Cond。

參數為Locker執行個體初始化,傳參數的時候必須是引用或指針,比如&sync.Mutex{}或new(sync.Mutex),不然會報異常:cannot use lock (type sync.Mutex) as type sync.Locker in argument to sync.NewCond。

大家可以想想為什麼一定要是指針呢? 因為如果傳入 Locker 執行個體,在調用 c.L.Lock() 和 c.L.Unlock() 的時候,會頻繁發生鎖的複制,會導緻鎖的失效,甚至導緻死鎖。

func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}
           

Wait

等待自動解鎖c.L和暫停執行調用goroutine。恢複執行後,等待鎖c.L傳回之前。與其他系統不同,等待不能傳回,除非通過廣播或信号喚醒。

因為c。當等待第一次恢複時,L并沒有被鎖定,調用者通常不能假定等待傳回時的條件是正确的。相反,調用者應該在循環中等待:

func (c *Cond) Wait() {
    // 檢查c是否是被複制的,如果是就panic
    c.checker.check()
    // 将目前goroutine加入等待隊列
    t := runtime_notifyListAdd(&c.notify)
    // 解鎖
    c.L.Unlock()
    // 等待隊列中的所有的goroutine執行等待喚醒操作
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}
           

判斷cond是否被複制。

type copyChecker uintptr

func (c *copyChecker) check() {
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}
           

Signal

喚醒等待隊列中的一個goroutine,一般都是任意喚醒隊列中的一個goroutine,為什麼沒有選擇FIFO的模式呢?這是因為FiFO模式效率不高,雖然支援,但是很少使用到。

func (c *Cond) Signal() {
    // 檢查c是否是被複制的,如果是就panic
    c.checker.check()
    // 通知等待清單中的一個 
    runtime_notifyListNotifyOne(&c.notify)
}
           

Broadcast

喚醒等待隊列中的所有goroutine。

func (c *Cond) Broadcast() {
    // 檢查c是否是被複制的,如果是就panic
    c.checker.check()
    // 檢查c是否是被複制的,如果是就panic
    runtime_notifyListNotifyAll(&c.notify)
}
           

執行個體

package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func main() {
    for i := 0; i < 40; i++ {
        go func(x int) {
            cond.L.Lock()         //擷取鎖
            defer cond.L.Unlock() //釋放鎖
            cond.Wait()           //等待通知,阻塞目前goroutine
            fmt.Println(x)
            time.Sleep(time.Second * 1)

        }(i)
    }
    time.Sleep(time.Second * 1)
    fmt.Println("Signal...")
    cond.Signal() // 下發一個通知給已經擷取鎖的goroutine
    time.Sleep(time.Second * 1)
    cond.Signal() // 3秒之後 下發一個通知給已經擷取鎖的goroutine
    time.Sleep(time.Second * 3)
    cond.Broadcast() //3秒之後 下發廣播給所有等待的goroutine
    fmt.Println("Broadcast...")
    time.Sleep(time.Second * 60)
}           

繼續閱讀