天天看點

Go并發程式設計(三)協程池Go并發程式設計(三)協程池

文章目錄

  • Go并發程式設計(三)協程池
    • 為什麼需要協程池
    • 實作
      • 資料結構定義
      • 新增任務&執行任務
      • goroutine異常處理
      • 關閉協程池
      • 使用

Go并發程式設計(三)協程池

本文參考如下部落格實作了一個簡易的協程池

  • 100 行寫一個 go 的協程池 (任務池)

為什麼需要協程池

goroutine 太多仍會導緻排程性能下降、GC 頻繁、記憶體暴漲, 引發一系列問題。在面臨這樣的場景時, 限制 goroutine 的數量、重用 goroutine

實作

實作的基本思路是采用生産者-消費者模型,用來執行任務的goroutine作為消費者,操作任務隊列的goroutine是生産者,任務隊列使用的是go中的buffer channel

Go并發程式設計(三)協程池Go并發程式設計(三)協程池

資料結構定義

任務定義:

// 任務定義
type Task struct {
	Handler func(v ...interface{}) // 任務處理函數
	Params []interface{} 		   // 處理函數參數清單
}
           

協程池定義:

// 任務池定義
type TaskPool struct {
	Capacity int64 		   		   // 任務池容量
	RunningGoroutine int64 		   // 運作中的goroutine數量
	TaskQueue chan *Task   		   // 任務隊列
	Status int64 		     	   // 任務池狀态
	sync.Mutex
	PanicHandler func(interface{}) // goroutine異常處理機制
}
           

協程池狀态常量定義

// 協程池狀态
const(
	RUNNING = iota
	STOP
)
           

全局異常定義:

// 池容量非法異常
var ErrInvalidPoolCap = errors.New("task pool capacity invaild")
var ErrPoolAlreadyClosed = errors.New("pool is already go")
           

新增任務&執行任務

新增任務本質就是做goroutine數量檢查,小于協程池容量則新啟協程,超過就複用原有協程,協程的回收依賴于GC,任務是直接丢進管道,等待消費的goroutine執行

// 新增任務
func (p *TaskPool) Put(t *Task) error{
	p.Lock()
	defer p.Unlock()

	if p.Status == STOP{
		return ErrPoolAlreadyClosed
	}

	// 如果協程池未滿則新啟協程
	if p.RunningGoroutine < p.Capacity{
		// 協程池未滿,則産生協程
		p.run()
	}
	// 任務入隊
	p.TaskQueue <- t

	return nil
}
           

執行任務其實就是監聽channel消費具體的任務,這裡采用的是帶緩沖區的channel,是以消費生産是非阻塞的

// 從任務隊列中取出任務執行
func (pool *TaskPool)run()  {
	// 新增運作中的goroutine
	incRunning(pool)
	go func() {
		// 執行完成後運作中的goroutine--
		defer func() {
			decRunning(pool)
			// goroutine panic
			if r := recover();r != nil{
				if pool.PanicHandler != nil{
					pool.PanicHandler(r);
				} else { // 預設處理
					log.Printf("Worker panic: %s\n", r)
				}
			}
			pool.checkRunningWork()
		}()
		// 具體goroutine執行政策
		for{
			select {
			case task,ok := <- pool.TaskQueue:{
				if !ok{
					// 任務從管道消費失敗
					return
				}
				// 執行任務
				task.Handler(task.Params)
			}
			}
		}
	}()
}
           

goroutine異常處理

如果某一個goroutine抛出panic就會導緻整個程式崩潰退出,為了保證程式安全執行,需要對panic進行recover,進行異常處理,異常處理函數使用者自定義

defer func() {
			decRunning(pool)
			// goroutine panic
			if r := recover();r != nil{
				if pool.PanicHandler != nil{
					pool.PanicHandler(r);
				} else { // 預設處理
					log.Printf("Worker panic: %s\n", r)
				}
			}
			pool.checkRunningWork()
		}()
           

關閉協程池

關閉協程池需要做兩個步驟:

  1. 關閉任務進入隊列的入口
  2. 執行完任務隊列中剩餘的任務
// 安全關閉協程池
func (p *TaskPool) CloseTask() error{
	p.Lock()
	defer p.Unlock()

	if p.Status == STOP{
		return ErrPoolAlreadyClosed
	}

	atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP)

	// 清空任務隊列
	for len(p.TaskQueue) > 0 { // 阻塞等待所有任務被 worker 消費
		time.Sleep(1e6) // 防止等待任務清空 cpu 負載突然變大, 這裡小睡一下
	}

	return nil
}
           

使用

func TestMyPool()  {
	pool,err := InitTaskPool(10)
	if err != nil{
		panic(err)
	}
	for i := 0;i < 20;i++{
		time.Sleep(1e6)
		pool.Put(&Task{Handler: func(v ...interface{}) {
			fmt.Print("i = ",i," ")
		},Params: []interface{}{i}})
		fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize())
	}

}
           
Go并發程式設計(三)協程池Go并發程式設計(三)協程池