天天看點

Go 并發程式設計:防止Goroutine洩露

防止Goroutine洩露

Goroutine開啟後一般會一直執行到它終止,也有遇到不可恢複的錯誤(如協程内部錯誤或父協程退出)時被迫終止。如果沒有一定的手段,父協程是沒法控制子協程的,有沒有什麼方式讓父協程控制或感覺子協程的運作呢?答案是有的,一般有三種典型方式:

  • 使用done通道控制
  • 使用sync.WaitGroup同步組
  • 使用Context

一、使用done通道控制

建立一個監控通道done,使主協程知道其下的工作子協程的完成狀态。

最簡單的經典并發寫法,使用一個監控通道done,工作協程完成具體任務,主協程隻監視工作子協程的完成狀況,為了最大化通道的吞吐量,通道的緩沖數等于子協程數。

func Demo11() {
    JobList := make([]Job, 0, 5)
    JobList = append(JobList, Job{"task1"}, Job{"task2"}, Job{"task3"}, Job{"task4"}, Job{"task5"})

    jobs := make(chan Job)
    done := make(chan bool, len(JobList))

    // 發送任務協程
    go func() {
        for _, job := range JobList {
            jobs <- job
        }
        // 發送完所有任務後關閉通道
        close(jobs)
    }()

    // 處理任務協程
    go func() {
        // 周遊通道直到管道被關閉
        for job := range jobs {
            fmt.Println("Doing ", job.task)
            done <- true
        }
    }()

    for i := 0; i < len(JobList); i++ {
        // 阻塞,等待接收任務完成的信号
        <-done
    }
    fmt.Println("All Task Done!")
}
           

使用done通道,我們可以在很大程度上防止協程洩露,即某些子協程失去控制以緻未能正常關閉的情況。

二、使用sync.WaitGroup同步組

以上寫法也能用等待組方式處理:

關于等待組的用法,我們已在《Go基礎系列》的《Go并發程式設計(三): Go并發的傳統同步機制》已經簡述,這裡再做一個示例

func Demo12() {
    JobList := make([]Job, 0, 5)
    JobList = append(JobList, Job{"task1"}, Job{"task2"}, Job{"task3"}, Job{"task4"}, Job{"task5"})
    jobs := make(chan Job)

    wg := sync.WaitGroup{}

    wg.Add(1)
    // 發送任務協程
    go func() {
        for _, job := range JobList {
            jobs <- job
        }
        // 發送完所有任務後關閉通道
        close(jobs)
        wg.Done()
    }()

    wg.Add(1)
    // 處理任務協程
    go func() {
        // 周遊通道直到通道被關閉
        for job := range jobs {
            fmt.Println("Doing ", job.task)
        }
        wg.Done()
    }()

    wg.Wait()
    fmt.Println("All Task Done!")
}
           

無論是done通道控制還是等待組,都是最常見的Go并發程式設計範式了。

三、Contenxt 上下文

關于Context,它類似與使用done通道防止協程洩露的方法,不過它的功能更加強大,它出現在Go标準庫可見Go團隊希望context成為預設的控制多協程的解決方案。我們已經在《Go進階系列的》的《Go Context 上下文)已有介紹,這裡再做一個示範:

func Demo13() {
    JobList := make([]Job, 0, 5)
    JobList = append(JobList, Job{"task1"}, Job{"task2"}, Job{"task3"}, Job{"task4"}, Job{"task5"})
    jobs := make(chan Job)
    var sendCount, doneCount int
    var err error
    // 兩種方式關閉子協程:(1)逾時(2)執行cancelFunc函數
    ctx, cancelFunc := context.WithTimeout(context.TODO(), 5*time.Second)
    defer func() {
        fmt.Printf("JobCount:%d,SendCount:%d,DoneCount:%d\n", len(JobList), sendCount, doneCount)
        cancelFunc()
    }()

    go func() {
        sendCount, err = sendJobs(ctx, jobs, JobList)
        if err != nil {
            fmt.Println("SendJobs Error:", err.Error(), ",SendCount:", sendCount)
        }
    }()

    go func() {
        doneCount, err = do(ctx, jobs)
        if err != nil {
            fmt.Println("DoneJobs Error:", err.Error(), ",DoneCount:", doneCount)
        }
    }()

    time.Sleep(time.Duration(len(JobList)) * time.Second)

}

// 發送任務
func sendJobs(ctx context.Context, jobs chan<- Job, jobList []Job) (int, error) {
    var sendCount int
    for _, job := range jobList {
        // 模拟耗時每秒添加一次任務
        time.Sleep(time.Second)
        select {
        case <-ctx.Done():
            return sendCount, ctx.Err()
        case jobs <- job:
            sendCount++
            fmt.Println("Send Job: ", job.task)
        }
    }
    // 發送完所有任務後關閉管道
    close(jobs)
    return sendCount, nil
}

// 處理任務
func do(ctx context.Context, jobs <-chan Job) (int, error) {
    var doneCount int
    // 周遊管道直到管道被關閉
    for {
        select {
        case <-ctx.Done():
            return doneCount, ctx.Err()
        case job, ok := <-jobs:
            if !ok {
                return doneCount, nil
            } else {
                doneCount++
                fmt.Println("Done Job: ", job.task)
            }
        }
    }
}
           

執行結果:

=== RUN   TestDemo13
Done Job:  task1
Send Job:  task1
Send Job:  task2
Done Job:  task2
Send Job:  task3
Done Job:  task3
Send Job:  task4
Done Job:  task4
DoneJobs Error: context deadline exceeded ,DoneCount: 4
JobCount:5,SendCount:0,DoneCount:4
--- PASS: TestDemo13 (5.00s)
PASS