文章目錄
- Go并發程式設計(三)協程池
-
- 為什麼需要協程池
- 實作
-
- 資料結構定義
- 新增任務&執行任務
- goroutine異常處理
- 關閉協程池
- 使用
Go并發程式設計(三)協程池
本文參考如下部落格實作了一個簡易的協程池
- 100 行寫一個 go 的協程池 (任務池)
為什麼需要協程池
goroutine 太多仍會導緻排程性能下降、GC 頻繁、記憶體暴漲, 引發一系列問題。在面臨這樣的場景時, 限制 goroutine 的數量、重用 goroutine
實作
實作的基本思路是采用生産者-消費者模型,用來執行任務的goroutine作為消費者,操作任務隊列的goroutine是生産者,任務隊列使用的是go中的buffer channel
資料結構定義
任務定義:
// 任務定義
type Task struct {
Handler func(v ...interface{}) // 任務處理函數
Params []interface{} // 處理函數參數清單
}
協程池定義:
// 任務池定義
type TaskPool struct {
Capacity int64 // 任務池容量
RunningGoroutine int64 // 運作中的goroutine數量
TaskQueue chan *Task // 任務隊列
Status int64 // 任務池狀态
sync.Mutex
PanicHandler func(interface{}) // goroutine異常處理機制
}
協程池狀态常量定義
// 協程池狀态
const(
RUNNING = iota
STOP
)
全局異常定義:
// 池容量非法異常
var ErrInvalidPoolCap = errors.New("task pool capacity invaild")
var ErrPoolAlreadyClosed = errors.New("pool is already go")
新增任務&執行任務
新增任務本質就是做goroutine數量檢查,小于協程池容量則新啟協程,超過就複用原有協程,協程的回收依賴于GC,任務是直接丢進管道,等待消費的goroutine執行
// 新增任務
func (p *TaskPool) Put(t *Task) error{
p.Lock()
defer p.Unlock()
if p.Status == STOP{
return ErrPoolAlreadyClosed
}
// 如果協程池未滿則新啟協程
if p.RunningGoroutine < p.Capacity{
// 協程池未滿,則産生協程
p.run()
}
// 任務入隊
p.TaskQueue <- t
return nil
}
執行任務其實就是監聽channel消費具體的任務,這裡采用的是帶緩沖區的channel,是以消費生産是非阻塞的
// 從任務隊列中取出任務執行
func (pool *TaskPool)run() {
// 新增運作中的goroutine
incRunning(pool)
go func() {
// 執行完成後運作中的goroutine--
defer func() {
decRunning(pool)
// goroutine panic
if r := recover();r != nil{
if pool.PanicHandler != nil{
pool.PanicHandler(r);
} else { // 預設處理
log.Printf("Worker panic: %s\n", r)
}
}
pool.checkRunningWork()
}()
// 具體goroutine執行政策
for{
select {
case task,ok := <- pool.TaskQueue:{
if !ok{
// 任務從管道消費失敗
return
}
// 執行任務
task.Handler(task.Params)
}
}
}
}()
}
goroutine異常處理
如果某一個goroutine抛出panic就會導緻整個程式崩潰退出,為了保證程式安全執行,需要對panic進行recover,進行異常處理,異常處理函數使用者自定義
defer func() {
decRunning(pool)
// goroutine panic
if r := recover();r != nil{
if pool.PanicHandler != nil{
pool.PanicHandler(r);
} else { // 預設處理
log.Printf("Worker panic: %s\n", r)
}
}
pool.checkRunningWork()
}()
關閉協程池
關閉協程池需要做兩個步驟:
- 關閉任務進入隊列的入口
- 執行完任務隊列中剩餘的任務
// 安全關閉協程池
func (p *TaskPool) CloseTask() error{
p.Lock()
defer p.Unlock()
if p.Status == STOP{
return ErrPoolAlreadyClosed
}
atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP)
// 清空任務隊列
for len(p.TaskQueue) > 0 { // 阻塞等待所有任務被 worker 消費
time.Sleep(1e6) // 防止等待任務清空 cpu 負載突然變大, 這裡小睡一下
}
return nil
}
使用
func TestMyPool() {
pool,err := InitTaskPool(10)
if err != nil{
panic(err)
}
for i := 0;i < 20;i++{
time.Sleep(1e6)
pool.Put(&Task{Handler: func(v ...interface{}) {
fmt.Print("i = ",i," ")
},Params: []interface{}{i}})
fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize())
}
}