sync.Pool實作原理
對象的建立和銷毀會消耗一定的系統資源(記憶體,gc等),過多的建立銷毀對象會帶來記憶體不穩定與更長的gc停頓,因為go的gc不存在分代,因而更加不擅長處理這種問題。因而go早早就推出Pool包用于緩解這種情況。Pool用于核心的功能就是Put和Get。當我們需要一個對象的時候通過Get擷取一個,建立的對象也可以Put放進池子裡,通過這種方式可以反複利用現有對象,這樣gc就不用高頻的促發記憶體gc了。
結構
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
複制
建立時候指定New方法用于建立預設對象,local,localSize會在随後用到的時候生成. local是一個poolLocalInternal的切片指針。
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
}
複制
當不同的p調用Pool時,每個p都會在local上配置設定這樣一個poolLocal,索引值就是p的id。 private存放的對象隻能由建立的p讀寫,shared則會在多個p之間共享。

PUT
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
複制
Put先要通過pin函數擷取目前Pool對應的pid位置上的localPool,然後檢查private是否存在,存在則設定到private上,如果不存在就追加到shared尾部。
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s { // 這句話的意思是如果目前pool的localPool切片尚未建立,尚未建立這句話肯定是false的
return indexLocal(l, pid)
}
return p.pinSlow()
}
複制
pin函數先通過自旋加鎖(可以避免p自身發生并發),在檢查本地local切片的size,size大于目前pid則使用pid去本地local切片上索引到localpool對象,否則就要走pinSlow對象建立本地localPool切片了.
func (p *Pool) pinSlow() *poolLocal {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
複制
pinShow先要取消自旋鎖,因為後面的lock内部也會嘗試自旋鎖,下面可能會操作allpool因而這裡需要使用互斥鎖allPoolsMu,然後又加上自旋鎖,(這裡注釋說不會發生poolCleanup,但是檢視代碼gcstart隻是檢視了目前m的lock狀态,然而避免不了其他m觸發的gc,尚存疑),這裡會再次嘗試之前的操作,因為可能在unpin,pin之間有并發産生了poolocal,确認本地local切片是空的才會生成一個新的pool。後面是建立Pool上的localPool切片,runtime.GOMAXPROCS這裡的作用是傳回p的數量,用于确定pool的localpool的數量.
GET
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
複制
GET 先調用pin擷取本地local,這個具體流程和上面一樣了,如果目前private存在傳回private上面的對象,如果不存在就從shared查找,存在傳回尾部對象,反之就要從其他的p的localPool裡面偷了。
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
複制
首先就要擷取目前size,用于輪詢p的local,這裡的查詢順序不是從0開始,而是是從目前p的位置往後查一圈。查到依次檢查每個p的shared上是否存在對象,如果存在就擷取末尾的值。 如果所有p的poollocal都是空的,那麼初始化的New函數就起作用了,調用這個New函數建立一個新的對象出來。
清理
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Defensively zero out everything, 2 reasons:
// 1. To prevent false retention of whole Pools.
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
// it will retain whole Pool. So next cycle memory consumption would be doubled.
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
複制
pool對象的清理是在每次gc之前清理,通過runtime_registerPoolCleanup函數注冊一個上面的poolCleanup對象,内部會把這個函數設定到clearpool函數上面,然後每次gc之前會調用clearPool來取消所有pool的引用,重置所有的Pool。代碼很簡單就是輪詢一邊設定nil,然後取消所有poollocal,pool引用。方法簡單粗暴。由于clearPool是在STW中調用的,如果Pool存在大量對象會拉長STW的時間,在已經有提案來修複這個問題了(CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]
(adsbygoogle = window.adsbygoogle || []).push({});