簡單介紹了Go使用singleflight解決緩存擊穿,查到了singleflight的源碼,閱讀感受隻能說“讓我感覺自己不太适合幹程式員這項工作~”,今天下決心把singleflight源碼搞懂,尤其是前幾天一直困惑的singleflight何時存儲,何時删除其内部緩存的問題。
而且,其中涉及到了很多Go的并發程式設計知識。很有必要學習一番。
Go并發的一些知識
WaitGroup 等待線程組
WaitGroup線程同步,指等待一組協程goroutine執行完成後才會繼續向下執行。如下為簡單的測試代碼:
package main
import (
"fmt"
"sync"
)
func main() {
var group sync.WaitGroup
group.Add(2)
for i := 0; i < 2; i++ {
go func() {
fmt.Println("other routine finish ")
group.Done()
}()
fmt.Println("i = ", i)
}
group.Wait()
// 将等待兩個協程執行完畢後才執行下面語句
fmt.Println("all group routine finish")
}
運作結果如下:
i = 0
other routine finish
i = 1
other routine finish
all group routine finish
sync.Mutex 互斥鎖
Mutex是一個互斥鎖,可以建立為其他結構體的字段;零值為解鎖狀态。Mutex類型的鎖和線程無關,可以由不同的線程加鎖和解鎖[2]。
注意以下幾點:
- 在一個 goroutine 獲得 Mutex 後,其他 goroutine 隻能等到這個 goroutine 釋放該 Mutex[3]
- 已加鎖後隻能解鎖後再加鎖
- 解鎖未加鎖的會導緻異常
- 适用于讀寫不确定,并且隻有一個讀或者寫的場景[3]
測試代碼如下:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wait := sync.WaitGroup{}
var m sync.Mutex
fmt.Println("Main Routine Locked")
m.Lock()
for i := 0; i <=2 ; i++ {
wait.Add(1)
go func(i int) {
fmt.Println(i, " not get lock, waiting...")
m.Lock()
fmt.Println(i, " get lock, doing...")
time.Sleep(time.Second)
fmt.Println(i, " Unlocked")
m.Unlock()
defer wait.Done()
}(i)
}
time.Sleep(time.Second)
fmt.Println("Main Routine Unlocked")
m.Unlock()
wait.Wait()
}
運作結果如下:
Main Routine Locked
2 not get lock, waiting...
0 not get lock, waiting...
1 not get lock, waiting...
Main Routine Unlocked
2 get lock, doing...
2 Unlocked
0 get lock, doing...
0 Unlocked
1 get lock, doing...
1 Unlocked
singleflight的兩個結構體
call
call儲存目前調用對應的資訊。
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
// 函數傳回值,在wg.Done前隻會寫入一次,在wg.Done後是隻讀的。
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
// 辨別Forget方法是否被調用
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
// 統計調用次數
dups int
// 傳回的 channel
chans []chan<- Result
}
Group
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
// 互斥鎖
mu sync.Mutex // protects m
// 映射表,調用key->調用,懶加載,
m map[string]*call // lazily initialized
}
Do方法
通過
group.mu
,
group.m
確定某個時間點隻有一個方法進入實際的執行。
通過
call.wg
確定實際執行的方法執行完畢後後,其他同樣的方法可以從
call.val
擷取到同樣的資料。
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
// Do執行和傳回給定函數的值,確定某一個時間隻有一個方法被執行。如果一個重複的請求進入,則重複的請求會等待前一個執行完畢并擷取相同的資料,傳回值shared辨別傳回值v是否是傳遞給重複的調用的
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
// 懶加載,初始化
g.m = make(map[string]*call)
}
// 檢查指定key是否已存在請求
if c, ok := g.m[key]; ok {
// 已存在則解鎖,調用次數+1,
c.dups++
g.mu.Unlock()
// 然後等待 call.wg(WaitGroup) 執行完畢,隻要一執行完,所有的 wait 都會被喚醒
c.wg.Wait()
// 我的Go知識還沒學到異常,暫且不表:
// 這裡區分 panic 錯誤和 runtime 的錯誤,避免出現死鎖,後面可以看到為什麼這麼做[4]
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 如果我們沒有找到這個 key 就 new call
c := new(call)
// 然後調用 waitgroup 這裡隻有第一次調用會 add 1,其他的都會調用 wait 阻塞掉
// 是以隻要這次調用傳回,所有阻塞的調用都會被喚醒
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 實際執行fn
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
doCall方法
由于本人Go的知識面還沒有覆寫到Go的異常部分,其對異常的處理暫且不表,借用文章與代碼中的注釋的說法:使用了兩個 defer 巧妙的将 runtime 的錯誤和我們傳入 function 的 panic 差別開來避免了由于傳入的 function panic 導緻的死鎖
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 表示方法是否正常傳回
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
// 如果既沒有正常執行完畢,又沒有 recover 那就說明需要直接退出了
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
// 如果已經 forgot 過了,就不要重複删除這個 key 了
if !c.forgotten {
delete(g.m, key)
}
// 下面應該主要是異常處理的diamante
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
// 使用一個匿名函數來執行實際的fn
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
// 方法實際執行,将值存在c.val中
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}