- sync.Cond
- 前言
- 什麼是sync.Cond
- 看下源碼
- Wait
- Signal
- Broadcast
- 總結
sync.Cond
前言
本次的代碼是基于
go version go1.13.15 darwin/amd64
什麼是sync.Cond
Go語言标準庫中的條件變量
sync.Cond
,它可以讓一組的
Goroutine
都在滿足特定條件時被喚醒。
每個
Cond
都會關聯一個Lock
(*sync.Mutex or *sync.RWMutex)
var (
locker = new(sync.Mutex)
cond = sync.NewCond(locker)
)
func listen(x int) {
// 擷取鎖
cond.L.Lock()
// 等待通知 暫時阻塞
cond.Wait()
fmt.Println(x)
// 釋放鎖
cond.L.Unlock()
}
func main() {
// 啟動60個被cond阻塞的線程
for i := 1; i <= 60; i++ {
go listen(i)
}
fmt.Println("start all")
// 3秒之後 下發一個通知給已經擷取鎖的goroutine time.Sleep(time.Second * 3)
fmt.Println("++++++++++++++++++++one Signal")
cond.Signal()
// 3秒之後 下發一個通知給已經擷取鎖的goroutine
time.Sleep(time.Second * 3)
fmt.Println("++++++++++++++++++++one Signal")
cond.Signal()
// 3秒之後 下發廣播給所有等待的goroutine
time.Sleep(time.Second * 3)
fmt.Println("++++++++++++++++++++begin broadcast")
cond.Broadcast()
// 阻塞直到所有的全部輸出
time.Sleep(time.Second * 60)
}
上面是個簡單的例子,我們啟動了60個線程,然後都被
cond
阻塞,主函數通過
Signal()
通知一個
goroutine
接觸阻塞,通過
Broadcast()
通知所有被阻塞的全部解除阻塞。

看下源碼
// Wait 原子式的 unlock c.L, 并暫停執行調用的 goroutine。
// 在稍後執行後,Wait 會在傳回前 lock c.L. 與其他系統不同,
// 除非被 Broadcast 或 Signal 喚醒,否則等待無法傳回。
//
// 因為等待第一次 resume 時 c.L 沒有被鎖定,是以當 Wait 傳回時,
// 調用者通常不能認為條件為真。相反,調用者應該在循環中使用 Wait():
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
type Cond struct {
// 用于保證結構體不會在編譯期間拷貝
noCopy noCopy
// 鎖
L Locker
// goroutine連結清單,維護等待喚醒的goroutine隊列
notify notifyList
// 保證運作期間不會發生copy
checker copyChecker
}
重點分析下:
notifyList
和
copyChecker
- notify
type notifyList struct {
// 總共需要等待的數量
wait uint32
// 已經通知的數量
notify uint32
// 鎖
lock uintptr
// 指向連結清單頭部
head *sudog
// 指向連結清單尾部
tail *sudog
}
這個是核心,所有
wait
的
goroutine
都會被加入到這個連結清單中,然後在通知的時候再從這個連結清單中擷取。
- copyChecker
保證運作期間不會發生copy
type copyChecker uintptr
// copyChecker holds back pointer to itself to detect object copying
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
Wait
func (c *Cond) Wait() {
// 監測是否複制
c.checker.check()
// 更新 notifyList中需要等待的wait的數量
// 傳回目前需要插傳入連結表節點ticket
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 為目前的加入的waiter建構一個連結清單的節點,插傳入連結表的尾部
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// go/src/runtime/sema.go
// 更新 notifyList中需要等待的wait的數量
// 同時傳回目前的加入的 waiter 的 ticket 編号,從0開始
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 使用atomic原子的對wait字段進行加一操作
return atomic.Xadd(&l.wait, 1) - 1
}
// go/src/runtime/sema.go
// 為目前的加入的waiter建構一個連結清單的節點,插傳入連結表的尾部
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// 當t小于notifyList中的notify,說明目前節點已經被通知了
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 建構目前節點
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 頭結點沒建構,插入頭結點
if l.tail == nil {
l.head = s
} else {
// 插入到尾節點
l.tail.next = s
}
l.tail = s
// 将目前goroutine置于等待狀态并解鎖
// 通過調用goready(gp),可以使goroutine再次可運作。
// 也就是将 M/P/G 解綁,并将 G 調整為等待狀态,放入 sudog 等待隊列中
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
梳理流程
1、首先檢測對象的複制行為,如果有複制發生直接抛出panic;
2、然後調用
runtime_notifyListAdd
對
notifynotifyListList
中的
wait
(需要等待的數量)進行加一操作,同時傳回一個
ticket
,用來作為目前
wait
的編号,這個編号,會和
notifyList
中的
notify
對應起來;
3、然後調用
runtime_notifyListWait
把目前的
wait
封裝成連結清單的一個節點,插入到
notifyList
維護的連結清單的尾部。
Signal
// 喚醒一個被wait的goroutine
func (c *Cond) Signal() {
// 監測是否複制
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// go/src/runtime/sema.go
// 通知連結清單中的第一個
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// wait和notify,說明已經全部通知到了
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lock(&l.lock)
// 這裡做了二次的确認
// wait和notify,說明已經全部通知到了
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 原子的對notify執行+1操作
atomic.Store(&l.notify, t+1)
// 嘗試找到需要被通知的 g
// 如果目前還沒來得及入隊,是無法找到的
// 但是,當它看到通知編号已經發生改變是不會被 park 的
//
// 這個查找過程看起來是線性複雜度,但實際上很快就停了
// 因為 g 的隊列與擷取編号不同,因而隊列中會出現少量重排,但我們希望找到靠前的 g
// 而 g 隻有在不再 race 後才會排在靠前的位置,是以這個疊代也不會太久,
// 同時,即便找不到 g,這個情況也成立:
// 它還沒有休眠,并且已經失去了我們在隊列上找到的(少數)其他 g 的 race。
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
// 順序拿到一個節點的ticket,會和上面會和notifyList中的notify做比較,相同才進行後續的操作
// 這個我們分析了,notifyList中的notify和連結清單節點中的ticket是一一對應的
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
// 通過goready掉起在上面通過goparkunlock挂起的goroutine
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
梳理下流程:
1、首先檢測對象的複制行為,如果有複制發生直接抛出
panic
;
2、判斷
wait
和
notify
,如果兩者相同說明已經已經全部通知到了;
3、調用
notifyListNotifyOne
,通過for循環,依次周遊這個連結清單,直到找到和
notifyList
中的
notify
,相比對的
ticket
的節點;
4、掉起
goroutine
,完成通知。
Broadcast
// 喚醒所有被wait的goroutine
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// go/src/runtime/sema.go
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// wait和notify,說明已經全部通知到了
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加鎖
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// 這個很粗暴,直接将notify的值置換成wait
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 循環連結清單,一個個喚醒goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
梳理下流程:
1、首先檢測對象的複制行為,如果有複制發生直接抛出panic;
2、判斷
wait
和
notify
,如果兩者相同說明已經已經全部通知到了;
3、
notifyListNotifyAll
,就相對簡單了,直接将
notify
的值置為
wait
,标注這個已經全部通知了;
4、循環連結清單,一個個喚醒
goroutine
。
總結
sync.Cond
不是一個常用的同步機制,但是在條件長時間無法滿足時,與使用
for {}
進行忙碌等待相比,
sync.Cond
能夠讓出處理器的使用權,提供
CPU
的使用率。使用時我們也需要注意以下問題:
1、
sync.Cond.Wait
在調用之前一定要使用擷取互斥鎖,否則會觸發程式崩潰;
2、
sync.Cond.Signal
喚醒的
Goroutine
都是隊列最前面、等待最久的
Goroutine
;
3、
sync.Cond.Broadcast
會按照一定順序廣播通知等待的全部
Goroutine
。