天天看點

golang 并發goroutine詳解(三)

作者:幹飯人小羽
golang 并發goroutine詳解(三)

go提供了sync包和channel機制來解決協程間的同步與通信。

一、sync.WaitGroup

sync包中的WaitGroup實作了一個類似任務隊列的結構,你可以向隊列中加入任務,任務完成後就把任務從隊列中移除,如果隊列中的任務沒有全部完成,隊列就會觸發阻塞以阻止程式繼續運作,具體用法參考如下代碼:

package main
import (
    "fmt"
    "sync"
)
var waitgroup sync.WaitGroup
func Afunction(shownum int) {
    fmt.Println(shownum)
    waitgroup.Done() //任務完成,将任務隊列中的任務數量-1,其實.Done就是.Add(-1)
}
 
func main() {
    for i := 0; i < 10; i++ {
        waitgroup.Add(1) //每建立一個goroutine,就把任務隊列中任務的數量+1
        go Afunction(i)
    }
    waitgroup.Wait() //.Wait()這裡會發生阻塞,直到隊列中所有的任務結束就會解除阻塞
}           

我們可以利用sync.WaitGroup來滿足這樣的情況:

▲某個地方需要建立多個goroutine,并且一定要等它們都執行完畢後再繼續執行接下來的操作。

是的,WaitGroup最大的優點就是.Wait()可以阻塞到隊列中的任務都完畢後才解除阻塞。

二、channel

channel是一種golang内置的類型,英語的直譯為"通道",其實,它真的就是一根管道,而且是一個先進先出的資料結構。

我們能對channel進行的操作隻有4種:

(1) 建立chennel (通過make()函數)

(2) 放入資料 (通過 channel <- data 操作)

(3) 取出資料 (通過 <-channel 操作)

(4) 關閉channel (通過close()函數)

但是channel有一些非常給力的性質需要你牢記,請一定要記住并了解好它們:

(1) channel是一種阻塞管道,是自動阻塞的。意思就是,如果管道滿了,一個對channel放入資料的操作就會阻塞,直到有某個routine從channel中取出資料,這個放入資料的操作才會執行。相反同理,如果管道是空的,一個從channel取出資料的操作就會阻塞,直到某個routine向這個channel中放入資料,這個取出資料的操作才會執行。這是channel最重要的一個性質,沒有之一。

package main
func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 1
    ch <- 1
    ch <- 1 //這一行操作就會發生阻塞,因為前三行的放入資料的操作已經把channel填滿了           
package main
func main() {
    ch := make(chan int, 3)
    <-ch //這一行會發生阻塞,因為channel才剛建立,是空的,沒有東西可以取出
}           

(2)channel分為有緩沖的channel和無緩沖的channel。兩種channel的建立方法如下:

ch := make(chan int) //無緩沖的channel,同等于make(chan int, 0)
ch := make(chan int, 5) //一個緩沖區大小為5的channel           

操作一個channel時一定要注意其是否帶有緩沖,因為有些操作會觸發channel的阻塞導緻死鎖。下面就來解釋這些需要注意的情景。

首先來看一個一個例子,這個例子是兩段隻有主函數不同的代碼:

package main
 
import "fmt"
 
func Afuntion(ch chan int) {
    fmt.Println("finish")
    <-ch
}
 
func main() {
    ch := make(chan int) //無緩沖的channel
    go Afuntion(ch)
    ch <- 1
    
    // 輸出結果:
    // finish
}           
package main
 
import "fmt"
 
func Afuntion(ch chan int) {
    fmt.Println("finish")
    <-ch
}
 
func main() {
    ch := make(chan int) //無緩沖的channel
    //隻是把這兩行的代碼順序對調一下
    ch <- 1
    go Afuntion(ch)
 
    // 輸出結果:
    // 死鎖,無結果
}           

前一段代碼最終會輸出"finish"并正常結束,但是後一段代碼會發生死鎖。為什麼會出現這種現象呢,咱們把上面兩段代碼的邏輯跑一下。

第一段代碼:

1. 建立了一個無緩沖channel

2. 啟動了一個goroutine,這個routine中對channel執行取出操作,但是因為這時候channel為空,是以這個取出操作發生阻塞,但是主routine可沒有發生阻塞,它還在繼續運作呢

3. 主goroutine這時候繼續執行下一行,往channel中放入了一個資料

4. 這時阻塞的那個routine檢測到了channel中存在資料了,是以接觸阻塞,從channel中取出資料,程式就此完畢

第二段代碼:

1. 建立了一個無緩沖的channel

2. 主routine要向channel中放入一個資料,但是因為channel沒有緩沖,相當于channel一直都是滿的,是以這裡會發生阻塞。可是下面的那個goroutine還沒有建立呢,主routine在這裡一阻塞,整個程式就隻能這麼一直阻塞下去了,然後。。。然後就沒有然後了。。死鎖!

※從這裡可以看出,對于無緩沖的channel,放入操作和取出操作不能再同一個routine中,而且應該是先確定有某個routine對它執行取出操作,然後才能在另一個routine中執行放入操作。

對于帶緩沖的channel,就沒那麼多講究了,因為有緩沖空間,是以隻要緩沖區不滿,放入操作就不會阻塞,同樣,隻要緩沖區不空,取出操作就不會阻塞。而且,帶有緩沖的channel的放入和取出可以用在同一個routine中。

但是,并不是說有了緩沖就可以随意使用channel的放入和取出了,我們一定要注意放入和取出的速率問題。下面我們就舉個例子來說明這種問題:

我們經常會用利用channel自動阻塞的性質來控制目前運作的goroutine的總數量,如下:

package main
 
import (
    "fmt"
)
 
func Afunction(ch chan int) {
    fmt.Println("finish")
    <-ch //goroutine執行完了就從channel取出一個資料
}
 
func main() {
    ch := make(chan int, 10)
    for i := 0; i < 1000; i++ {
        //每當建立goroutine的時候就向channel中放入一個資料,如果裡面已經有10個資料了,就會
        //阻塞,由此我們将同時運作的goroutine的總數控制在<=10個的範圍内
        ch <- 1
        go Afunction(ch)
    }
    // 這裡隻是示範個例子,當然,接下來應該有些更加周密的同步操作
}           

上面這種channel的使用方式幾乎經常會用到,但是再看一下接下來這段代碼,它和上面這種使用channel的方式幾乎一樣,但是它會造成問題:

package main
func Afunction(ch chan int) {
    ch <- 1
    ch <- 1
    ch <- 1
    ch <- 1
    ch <- 1
 
    <-ch
}
 
func main() {
    //主routine的操作同上面那段代碼
    ch := make(chan int, 10)
    for i := 0; i < 100; i++ {
        ch <- 1
        go Afunction(ch)
    }
 
    // 這段代碼運作的結果為死鎖
}           

上面這段運作和之前那一段基本上原理是一樣的,但是運作後卻會發生死鎖。為什麼呢?其實總結起來就一句話,"放得太快,取得太慢了"。

按理說,我們應該在我們主routine中建立子goroutine并每次向channel中放入資料,而子goroutine負責從channel中取出資料。但是我們的這段代碼在建立了子goroutine後,每個routine會向channel中放入5個資料。這樣,每向channel中放入6個資料才會執行一次取出操作,這樣一來就可能會有某一時刻,channel已經滿了,但是所有的routine都在執行放入操作(因為它們目前執行放入操作的機率是執行取出操作的6倍),這樣一來,所有的routine都阻塞了,進而導緻死鎖。

在使用帶緩沖的channel時一定要注意放入與取出的速率問題。

(3)關閉後的channel可以取資料,但是不能放資料。而且,channel在執行了close()後并沒有真的關閉,channel中的資料全部取走之後才會真正關閉。

package main
func main() {
    ch := make(chan int, 5)
    ch <- 1
    ch <- 1
    close(ch)
    ch <- 1 //不能對關閉的channel執行放入操作
        
        // 會觸發panic
}           
package main
func main() {
    ch := make(chan int, 5)
    ch <- 1
    ch <- 1
    close(ch)
    <-ch //隻要channel還有資料,就可能執行取出操作
 
        //正常結束
}           
package main
 
import "fmt"
 
func main() {
    ch := make(chan int, 5)
    ch <- 1
    ch <- 1
    ch <- 1
    ch <- 1
    close(ch)  //如果執行了close()就立即關閉channel的話,下面的循環就不會有任何輸出了
    for {
        data, ok := <-ch
        if !ok {
            break
        }
        fmt.Println(data)
    }
    
    // 輸出:
    // 1
    // 1
    // 1
    // 1
    // 
    // 調用了close()後,隻有channel為空時,channel才會真的關閉
}

           

三、使用channel控制goroutine數量

channel的性質到這裡就介紹完了,但是看上去,channel的使用似乎比WaitGroup要注意更多的細節,那麼有什麼理由一定要用channel來實作同步呢?channel相比WaitGroup有一個很大的優點,就是channel不僅可以實作協程的同步,而且可以控制目前正在運作的goroutine的總數。

下面就介紹幾種利用channel控制goroutine數量的方法:

1.如果任務數量是固定的:

ackage main
func Afunction(ch chan int) {
    ch <- 1
}
 
func main() {
    var (
        ch        chan int = make(chan int, 20) //可以同時運作的routine數量為20
        dutycount int      = 500
    )
    for i := 0; i < dutycount; i++ {
        go Afunction(ch)
    }
 
    //知道了任務總量,可以像這樣利用固定循環次數的循環檢測所有的routine是否工作完畢
    for i := 0; i < dutycount; i++ {
        <-ch
    }
}           

2.如果任務的數量不固定

package main
 
import (
    "fmt"
)
 
func Afunction(routineControl chan int, feedback chan string) {
    defer func() {
        <-routineControl
        feedback <- "finish"
    }()
 
    // do some process
    // ...
}
 
func main() {
    var (
        routineCtl chan int    = make(chan int, 20)
        feedback   chan string = make(chan string, 10000)
 
        msg      string
        allwork  int
        finished int
    )
    for i := 0; i < 1000; i++ {
        routineCtl <- 1
        allwork++
        go Afunction(routineCtl, feedback)
    }
 
    for {
        msg = <-feedback
        if msg == "finish" {
            finished++
        }
        if finished == allwork {
            break
        }
    }
}           

四、不要使用無限循環檢查goroutine是否完成工作

在使用goroutine時,我們經常會寫出這樣的代碼:

package main
 
import (
    "fmt"
)
 
var (
    flag bool
    str  string
)
 
func foo() {
    flag = true
    str = "setup complete!"
}
 
func main() {
    go foo()
    for !flag {
        //按照我們的本意,foo()執行完畢後,flag=true,循環就會退出。
        //但是其實這個循環永遠都不會退出
    }
    fmt.Println(str)
}           

運作之後發現main中的無限循環永遠也無法退出,是以Go中不要用這種無限輪詢的方式來檢查goroutine是否完成了工作。

我們可以通過使用channel,讓foo()和main()實作通信,讓foo()執行完畢後通過channel發送一個消息給main(),告訴它自己的事兒完成了,然後main()收到消息後繼續執行其他操作:

package main
 
import (
    "fmt"
)
 
var (
    flag bool
    str  string
)
 
func foo(ch chan string) {
    flag = true
    str = "setup complete!"
    ch <- "I'm complete." //foo():我的任務完成了,發個消息給你~
}
 
func main() {
    ch := make(chan string)
    go foo(ch)
    <-ch //main():OK,收到你的消息了~
    for !flag {
    }
    fmt.Println(str)
}