天天看點

go語言實作goroutine pool

作者:幹飯人小羽
go語言實作goroutine pool

超大規模并發的場景下,不加限制的大規模的goroutine可能造成記憶體暴漲,給機器帶來極大的壓力,吞吐量下降和處理速度變慢。

而實作一個Goroutine Pool,複用goroutine,減輕runtime的排程壓力以及緩解記憶體壓力,依托這些優化,在大規模goroutine并發的場景下可以極大地提高并發性能。

go語言實作goroutine pool

Pool類型

type Pool struct {
    // capacity of the pool.
    //capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker需要一個goroutine去執行;
    //worker類型為任務類。
    capacity int32
    // running is the number of the currently running goroutines.
    //running是目前正在執行任務的worker數量
    running int32
    // expiryDuration set the expired time (second) of every worker.
    //expiryDuration是worker的過期時長,在空閑隊列中的worker的最新一次運作時間與目前時間之差如果大于這個值則表示已過期,定時清理任務會清理掉這個worker;
    expiryDuration time.Duration
    // workers is a slice that store the available workers.
    //任務隊列
    workers []*Worker
    // release is used to notice the pool to closed itself.
    //當關閉該Pool支援通知所有worker退出運作以防goroutine洩露
    release chan sig
    // lock for synchronous operation
    //用以支援Pool的同步操作
    lock sync.Mutex
    //once用在確定Pool關閉操作隻會執行一次
    once sync.Once
}           

初始化Pool

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("Pool Size <0,not Create")
    }
    p := &Pool{
        capacity:       int32(size),
        release:        make(chan sig, 1),
        expiryDuration: time.Duration(expiry) * time.Second,
        running:        0,
    }
    // 啟動定期清理過期worker任務,獨立goroutine運作,
    // 進一步節省系統資源
    p.monitorAndClear()
    return p, nil
}           

擷取Worker

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
    var w *Worker
    // 标志,表示目前運作的worker數量是否已達容量上限
    waiting := false
    // 涉及從workers隊列取可用worker,需要加鎖
    p.lock.Lock()
    workers := p.workers
    n := len(workers) - 1
    fmt.Println("空閑worker數量:",n+1)
    fmt.Println("協程池現在運作的worker數量:",p.running)
    // 目前worker隊列為空(無空閑worker)
    if n < 0 {
        //沒有空閑的worker有兩種可能:
        //1.運作的worker超出了pool容量
        //2.目前是空pool,從未往pool添加任務或者一段時間内沒有任務添加,被定期清除
        // 運作worker數目已達到該Pool的容量上限,置等待标志
        if p.running >= p.capacity {
            //print("超過上限")
            waiting = true
        } else {
            // 目前無空閑worker但是Pool還沒有滿,
            // 則可以直接新開一個worker執行任務
            p.running++
            w = &Worker{
                pool: p,
                task: make(chan functinType),
                str:make(chan string),
            }
        }
        // 有空閑worker,從隊列尾部取出一個使用
    } else {
        //<-p.freeSignal
        w = workers[n]
        workers[n] = nil
        p.workers = workers[:n]
        p.running++
    }
    // 判斷是否有worker可用結束,解鎖
    p.lock.Unlock()
    if waiting {
        //當一個任務執行完以後會添加到池中,有了空閑的任務就可以繼續執行:
        // 阻塞等待直到有空閑worker
        for len(p.workers) == 0{
            continue
        }
        p.lock.Lock()
        workers = p.workers
        l := len(workers) - 1
        w = workers[l]
        workers[l] = nil
        p.workers = workers[:l]
        p.running++
        p.lock.Unlock()
    }
    return w
}           

定期清理過期Worker

func (p *Pool) monitorAndClear() {
    go func() {
        for {
            // 周期性循環檢查過期worker并清理
            time.Sleep(p.expiryDuration)
            currentTime := time.Now()
            p.lock.Lock()
            idleWorkers := p.workers
            n := 0
            for i, w := range idleWorkers {
                // 計算目前時間減去該worker的最後運作時間之差是否符合過期時長
                if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                    break
                }
                n = i
                w.stop()
                idleWorkers[i] = nil
            }
            if n > 0 {
                n++
                p.workers = idleWorkers[n:]
            }
            p.lock.Unlock()
        }
    }()
}           

複用Worker

// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
    // 寫入回收時間,亦即該worker的最後運作時間
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.running --
    p.workers = append(p.workers, worker)
    p.lock.Unlock()
 
}           

動态擴容或者縮小容量

// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
    cap := int(p.capacity)
    if size <  cap{
        diff := cap - size
        for i := 0; i < diff; i++ {
            p.getWorker().stop()
        }
    } else if size == cap {
        return
    }
    atomic.StoreInt32(&p.capacity, int32(size))
}            

送出Worker

// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
    if len(p.release) > 0 {
        return errors.New("Pool is Close")
    }
    //建立或得到一個空閑的worker
    w := p.getWorker()
    w.run()
    //将任務參數通過信道傳遞給它
    w.sendarg(str)
    //将任務通過信道傳遞給它
    w.sendTask(task)
    return nil
}           

Worker類

package Poolpkg
 
import (
    "sync/atomic"
    "time"
)
 
type functinType func(string) error
 
 
// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
    // pool who owns this worker.
    pool *Pool
    // task is a job should be done.
    task chan functinType
    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
 
    str chan string
}
 
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
 
    go func() {
        //監聽任務清單,一旦有任務立馬取出運作
        count := 1
        var str string
        var f functinType
        for count <=2{
            select {
            case str_temp, ok := <- w.str:
                if !ok {
                    return
                }
                count ++
                str = str_temp
            case f_temp, ok := <-w.task:
                if !ok {
                    //如果接收到關閉
                    atomic.AddInt32(&w.pool.running, -1)
                    close(w.task)
                    return
                }
                count  ++
                f = f_temp
            }
        }
        err := f(str)
        if err != nil{
            //fmt.Println("執行任務失敗")
        }
        //回收複用
        w.pool.putWorker(w)
        return
    }()
}
 
// stop this worker.
func (w *Worker) stop() {
    w.sendTask(nil)
    close(w.str)
}
 
// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
    w.task <- task
}
 
func (w *Worker) sendarg(str string) {
    w.str <- str
}           

總結和實踐

怎麼了解Woreker,task、Pool的關系

Woker類型其實就是task的載體,Worker類型有兩個很重要的參數:

task chan functinType:用來是傳遞task。           
str chan string:用來傳遞task所需的參數。

           

task是任務本身,它一般為一個函數,在程式中被定義為函數類型:

1

type functinType func(string) error

Pool存儲Worker,當使用者要執行一個task時,首先要得到一個Worker,必須從池中擷取,擷取到一個Worker後,就開啟一個協程去處理,在這個協程中接收任務task和參數。

//建立或得到一個空閑的worker

w := p.getWorker()<br>//開協程去處理

w.run()

//将任務參數通過信道傳遞給它

w.sendarg(str)

//将任務通過信道傳遞給它

w.sendTask(task)

Worker怎麼接收task和參數

1

count定義接收資料的個數,一個Woker必須接收到task和參數才能開始工作。<br>工作完後這個Worker被傳回到Pool中,下次還可以複用這個Worker,也就是複用Worker這個執行個體。

go func() {

//監聽任務清單,一旦有任務立馬取出運作

count := 1

var str string

var f functinType

for count <=2{

select {

case str_temp, ok := <- w.str:

if !ok {

return

}

count ++

str = str_temp

case f_temp, ok := <-w.task:

if !ok {

//如果接收到關閉

atomic.AddInt32(&w.pool.running, -1)

close(w.task)

return

}

count ++

f = f_temp

}

}

err := f(str)

if err != nil{

//fmt.Println("執行任務失敗")

}

//回收複用

w.pool.putWorker(w)

return

}()

Pool怎麼處理使用者送出task擷取Worker的請求

1.先得到Pool池中空閑Worker的數量,然後判斷

2.如果小于零,則表示池中沒有空閑的Worker,這裡有兩種原因:

  • 1.運作的Worker數量超過了Pool容量,當使用者擷取Worker的請求數量激增,池中大多數Worker都是執行完任務的Worker重新添加到池中的,傳回的Worker跟不上激增的需求。
  • 2.目前是空pool,從未往pool添加任務或者一段時間内沒有Worker任務運作,被定期清除。

3.如果大于或者等于零,有空閑的Worker直接從池中擷取最後一個Worker。

4.如果是第二種的第一種情況,則阻塞等待池中有空閑的Worker。

if waiting {

//當一個任務執行完以後會添加到池中,有了空閑的任務就可以繼續執行:

// 阻塞等待直到有空閑worker

for len(p.workers) == 0{

continue

}

p.lock.Lock()

workers = p.workers

l := len(workers) - 1

w = workers[l]

workers[l] = nil

p.workers = workers[:l]

p.running++

p.lock.Unlock()

}

5.如果是第二種的第二種情況,直接建立一個Worker執行個體。

// 目前無空閑worker但是Pool還沒有滿,

// 則可以直接新開一個worker執行任務

p.running++

w = &Worker{

pool: p,

task: make(chan functinType),

str:make(chan string),

}

測試

package main

import (

"Pool/Poolpkg"

"fmt"

)

func main(){<br>     //開20個大小的Pool池,過期清除時間5分鐘

Pool,err := Poolpkg.NewPool(20,5)

i :=0

for i < 50 {

err = Pool.Submit(Print_Test1,"并發測試!")

if err != nil{

fmt.Println(err)

}

i++

}

}

go語言實作goroutine pool
go語言實作goroutine pool

源碼

Pool

package Poolpkg
 
import (
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)
 
type sig struct{}
 
 
 
// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
    // capacity of the pool.
    //capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker需要一個goroutine去執行;
    //worker類型為任務類。
    capacity int32
    // running is the number of the currently running goroutines.
    //running是目前正在執行任務的worker數量
    running int32
    // expiryDuration set the expired time (second) of every worker.
    //expiryDuration是worker的過期時長,在空閑隊列中的worker的最新一次運作時間與目前時間之差如果大于這個值則表示已過期,定時清理任務會清理掉這個worker;
    expiryDuration time.Duration
    // workers is a slice that store the available workers.
    //任務隊列
    workers []*Worker
    // release is used to notice the pool to closed itself.
    //當關閉該Pool支援通知所有worker退出運作以防goroutine洩露
    release chan sig
    // lock for synchronous operation
    //用以支援Pool的同步操作
    lock sync.Mutex
    //once用在確定Pool關閉操作隻會執行一次
    once sync.Once
}
 
// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("Pool Size <0,not Create")
    }
    p := &Pool{
        capacity:       int32(size),
        release:        make(chan sig, 1),
        expiryDuration: time.Duration(expiry) * time.Second,
        running:        0,
    }
    // 啟動定期清理過期worker任務,獨立goroutine運作,
    // 進一步節省系統資源
    p.monitorAndClear()
    return p, nil
}
 
// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
    if len(p.release) > 0 {
        return errors.New("Pool is Close")
    }
    //建立或得到一個空閑的worker
    w := p.getWorker()
    w.run()
    //将任務參數通過信道傳遞給它
    w.sendarg(str)
    //将任務通過信道傳遞給它
    w.sendTask(task)
    return nil
}
 
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
    var w *Worker
    // 标志,表示目前運作的worker數量是否已達容量上限
    waiting := false
    // 涉及從workers隊列取可用worker,需要加鎖
    p.lock.Lock()
    workers := p.workers
    n := len(workers) - 1
    fmt.Println("空閑worker數量:",n+1)
    fmt.Println("協程池現在運作的worker數量:",p.running)
    // 目前worker隊列為空(無空閑worker)
    if n < 0 {
        //沒有空閑的worker有兩種可能:
        //1.運作的worker超出了pool容量
        //2.目前是空pool,從未往pool添加任務或者一段時間内沒有任務添加,被定期清除
        // 運作worker數目已達到該Pool的容量上限,置等待标志
        if p.running >= p.capacity {
            //print("超過上限")
            waiting = true
        } else {
            // 目前無空閑worker但是Pool還沒有滿,
            // 則可以直接新開一個worker執行任務
            p.running++
            w = &Worker{
                pool: p,
                task: make(chan functinType),
                str:make(chan string),
            }
        }
        // 有空閑worker,從隊列尾部取出一個使用
    } else {
        //<-p.freeSignal
        w = workers[n]
        workers[n] = nil
        p.workers = workers[:n]
        p.running++
    }
    // 判斷是否有worker可用結束,解鎖
    p.lock.Unlock()
    if waiting {
        //當一個任務執行完以後會添加到池中,有了空閑的任務就可以繼續執行:
        // 阻塞等待直到有空閑worker
        for len(p.workers) == 0{
            continue
        }
        p.lock.Lock()
        workers = p.workers
        l := len(workers) - 1
        w = workers[l]
        workers[l] = nil
        p.workers = workers[:l]
        p.running++
        p.lock.Unlock()
    }
    return w
}
 
//定期清理過期Worker
func (p *Pool) monitorAndClear() {
    go func() {
        for {
            // 周期性循環檢查過期worker并清理
            time.Sleep(p.expiryDuration)
            currentTime := time.Now()
            p.lock.Lock()
            idleWorkers := p.workers
            n := 0
            for i, w := range idleWorkers {
                // 計算目前時間減去該worker的最後運作時間之差是否符合過期時長
                if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                    break
                }
                n = i
                w.stop()
                idleWorkers[i] = nil
                p.running--
            }
            if n > 0 {
                n++
                p.workers = idleWorkers[n:]
            }
            p.lock.Unlock()
        }
    }()
}
 
//Worker回收(goroutine複用)
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
    // 寫入回收時間,亦即該worker的最後運作時間
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.running --
    p.workers = append(p.workers, worker)
    p.lock.Unlock()
 
}
 
//動态擴容或者縮小池容量
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
    cap := int(p.capacity)
    if size <  cap{
        diff := cap - size
        for i := 0; i < diff; i++ {
            p.getWorker().stop()
        }
    } else if size == cap {
        return
    }
    atomic.StoreInt32(&p.capacity, int32(size))           

Woker

package Poolpkg
 
import (
    "sync/atomic"
    "time"
)
 
type functinType func(string) error
 
 
// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
    // pool who owns this worker.
    pool *Pool
    // task is a job should be done.
    task chan functinType
    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
 
    str chan string
}
 
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
 
    go func() {
        //監聽任務清單,一旦有任務立馬取出運作
        count := 1
        var str string
        var f functinType
        for count <=2{
            select {
            case str_temp, ok := <- w.str:
                if !ok {
                    return
                }
                count ++
                str = str_temp
            case f_temp, ok := <-w.task:
                if !ok {
                    //如果接收到關閉
                    atomic.AddInt32(&w.pool.running, -1)
                    close(w.task)
                    return
                }
                count  ++
                f = f_temp
            }
        }
        err := f(str)
        if err != nil{
            //fmt.Println("執行任務失敗")
        }
        //回收複用
        w.pool.putWorker(w)
        return
    }()
}
 
// stop this worker.
func (w *Worker) stop() {
    w.sendTask(nil)
    close(w.str)
}
 
// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
    w.task <- task
}
 
func (w *Worker) sendarg(str string) {
    w.str <- str
}           

繼續閱讀