天天看點

golang 并發goroutine詳解(一)

作者:幹飯人小羽
golang 并發goroutine詳解(一)

概述

1、并行和并發

并行(parallel):指在同一時刻,有多條指令在多個處理器上同時執行。

golang 并發goroutine詳解(一)

并發(concurrency):指在同一時刻隻能有一條指令執行,但多個程序指令被快速的輪換執行,使得在宏觀上具有多個程序同時執行的效果,但在微觀上并不是同時執行的,隻是把時間分成若幹段,使多個程序快速交替的執行。

golang 并發goroutine詳解(一)
  • 并行是兩個隊列同時使用兩台咖啡機
  • 并發是兩個隊列交替使用一台咖啡機
golang 并發goroutine詳解(一)

2、go并發優勢

有人把Go比作21世紀的C語言,第一是因為Go語言設計簡單;第二,21世紀最重要的就是并發程式設計,而Go從語言層面就支援了并發。同時,并發程式的記憶體管理有時候是非常複雜的,而Go語言提供了自動垃圾回收機制。

Go語言為并發程式設計而内置的上層API基于CSP(communication sequential process,順序通信程序)模型。這就意味着顯式鎖都是可以避免的,因為Go語言通過安全的通道發送和接受資料以實作同步,這大大地簡化了并發程式的編寫。

一般情況下,一個普通的桌面計算機跑十幾二十個線程就有點負載過大了,但是同樣這台機器卻可以輕松地讓成百上千甚至過萬個goroutine進行資源競争。

2.1 goroutine是什麼

goroutine是Go并發設計的核心。goroutine說到底其實就是就是協程,但是它比線程更小,十幾個goroutine可能展現在底層就是五六個線程,Go語言内部幫你實作了這些goroutine之間的記憶體共享。執行goroutine隻需極少的記憶體(大概是4~5KB),當然會根據相應的資料伸縮。也正因為如此,可同時運作成千上萬個并發任務。goroutine比thread更易用、更高效、更輕便。

2.2 建立goroutine

隻需要在函數調用語句前添加go關鍵字,就可以建立并發執行單元。開發人員無需了解任何執行細節,排程器會自動将其安排到合适的系統線程上執行。

在并發程式設計裡,我們通常想将一個過程切分成幾塊,然後讓每個goroutine各自負責一塊工作。當一個程式啟動時,其主函數即在一個單獨的goroutine中運作,我們叫它main goroutine。新的goroutine會用go語句來建立。

示例:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    go newTask()    //建立一個goroutine
    for {
        fmt.Println("this is a main goroutine.")
        time.Sleep(time.Second)
    }
}
 
func newTask(){
    for {
        fmt.Println("this is a new Task.")
        time.Sleep(time.Second)    //延時1s
    }
}           

以上執行個體運作結果為:

this is a main goroutine.
this is a new Task.
this is a new Task.
this is a main goroutine.
this is a main goroutine.
this is a new Task.
.....           

2.3 主goroutine先退出

主協程退出了,其他子協程也要跟着退出。

執行個體:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    go func (){
        i:=0
        for {
            fmt.Println("this is a new Task : ",i)
            time.Sleep(time.Second)
            i++
        }
    }()
 
    i := 0
    for {
        fmt.Println("this is a main goroutine :",i)
        time.Sleep(time.Second)
        i++
        if i==2 {
            break
        }
    }
}           

以上執行個體運作結果為:

this is a main goroutine : 0
this is a new Task :  0
this is a new Task :  1
this is a main goroutine : 1           

主協程先退出導緻子協程沒有來得及調用:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    go func (){
        i:=0
        for {
            fmt.Println("this is a new Task : ",i)
            time.Sleep(time.Second)
            i++
        }
    }()
}           

2.4 runtime包

Gosched

runtime.Gosched()用于讓出CPU時間片,讓出目前goroutine的執行權限,排程器安排其他等待的任務運作,并在下次某個時候從該位置恢複執行。

這就像跑接力賽,A跑了一會碰到代碼runtime.Gosched()就把接力棒交給B了,A歇着了,B繼續跑。

執行個體:

package main
 
import (
    "fmt"
    "runtime"
)
 
func main(){
    go func (){
        for i:=0;i<5;i++{
            fmt.Println("Oh!")
        }
    }()
 
    for i:=0;i<2;i++{
        //讓出時間片,先讓别的協程執行,執行完了,再回來執行此協程
        runtime.Gosched()
        fmt.Println("Yeah!")
    }
}           

Goexit

調用runtime.Goexit()将立即終止目前goroutine執行,排程器確定所有已注冊defer延遲調用被執行

package main
 
import (
    "fmt"
    "runtime"
)
 
func main(){
    //建立協程
    go func(){
        fmt.Println("En...")
        //調用函數
        test()
        fmt.Println("Oops...")
    }()
 
    //不讓主協程結束
    for{}
}
 
func test() {
    defer fmt.Println("Yeah!")
    runtime.Goexit()    //終止所在的協程
    fmt.Println("Oh!")
}           

GOMAXPROCS

調用runtime.GOMAXPROCS()用來設定可以并行計算的CPU核數的最大值,并傳回之前的值。

package main
 
import (
    "fmt"
    "runtime"
)
 
func main(){
    n:=runtime.GOMAXPROCS(1)    //把參數改為2試一試
    fmt.Println("n=",n)
    for {
        go fmt.Print(0)
        fmt.Print(1)
    }
}           

在第一次執行(runtime.GOMAXPROCS(1))時,最多同時隻能有一個goroutine被執行。是以會列印很多1。過了一段時間後,Go排程器會将其置為休眠,并喚醒另一個goroutine,這時候就開始列印很多0了,在列印的時候,goroutine是被排程到作業系統線程上的。

在第二次執行(runtime.GOMAXPROCS(2))時,我們使用了兩個CPU,是以兩個goroutine可以一起被執行,以同樣的頻率交替列印0和1。

多任務資源競争問題:

package main
 
import (
    "fmt"
    "time"
)
 
func Printer(str string){
    for _,data:=range str {
        fmt.Printf("%c",data)
        time.Sleep(time.Second)
    }
    fmt.Printf("\n")
}
 
func person1(){
    Printer("Oh!")
}
 
func person2(){
    Printer("Yeah!")
}
 
func main() {
 
    //建立2個協程,代表2個人。兩個人共同使用列印機
    go person1()
    go person2()
 
    //不讓主協程結束
    for{}
}           

3、channel

goroutine運作在相同的位址空間,是以通路共享記憶體必須做好同步。goroutine奉行通過通信來共享記憶體,而不是共享記憶體來通信。

引用類型channel是CSP模式的具體實作,用于多個goroutine通訊。其内部實作了同步,確定并發安全。

3.1 channel類型

定義一個channel時,也需要定義發送到channel的值的類型。channel可以使用内置的make()函數來建立:

make(chan Type)  //等價于make(chan Type,0)
make(chan Type,capacity)           

當capacity=0時,channel是無緩沖阻塞讀寫的;當capacity>0時,channel有緩沖、是非阻塞的,直到寫滿capacity個元素才阻塞寫入。

channel通過操作符<-來接收和發送資料,發送和接收資料文法:

channel <- value  //發送value到channel
<- channel  //接收并将其丢棄
x := <-channel  //從channel中接收資料,并指派給x
x,ok := <-channel  //功能同上,同時檢查通道是否已關閉或者是否為空           

預設情況下,channel接收和發送資料都是阻塞的,除非另一端已經準備好,這樣就使得goroutine同步變得更加簡單,而不需要顯示的lock。

執行個體:

package main
 
import (
    "fmt"
    "time"
)
 
var ch = make(chan int)
 
func Printer(str string){
    for _,data:=range str {
        fmt.Printf("%c",data)
        time.Sleep(time.Second)
    }
    fmt.Printf("\n")
}
 
//person1執行完成,才到person2執行
func person1(){
    Printer("Oh!")
    ch<-0    //給管道/通道寫資料,發送
}
 
func person2(){
    <-ch    //從管道取資料,接收,如果通道沒有資料它就會阻塞
    Printer("Yeah!")
}
 
func main() {
 
    //建立2個協程,代表2個人。兩個人共同使用列印機
    go person1()
    go person2()
 
    //不讓主協程結束
    for{}
}           

以上執行個體執行結果為:

Oh!
Yeah!           

通過channel實作同步和資料互動。

執行個體:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
    defer fmt.Println("主協程結束。")
 
    ch := make(chan string)
 
    go func() {
        defer fmt.Println("子協程調用完畢。")
        for i := 0; i < 2; i++ {
            fmt.Println("子協程 i = ", i)
            time.Sleep(time.Second)
        }
        ch <- "子協程幹活兒了。" //把這行注釋掉再運作一下,看看什麼結果
    }()
 
    str := <-ch    //沒有資料前,阻塞
    fmt.Println("str = ", str)
}           

以上執行個體執行結果為:

子協程 i =  0
子協程 i =  1
子協程調用完畢。
str =  子協程幹活兒了。
主協程結束。           

3.2 無緩沖的channel

無緩沖的通道(unbuffersd channel)是指在接收前沒有能力儲存任何值的通道。

這種類型的通道要求發送goroutine和接收goroutine同時準備好,才能完成發送和接收操作。如果兩個goroutine沒有同時準備好,通道會導緻先執行發送或接收操作的goroutine阻塞等待。

這種對通道進行發送和接收的互動行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。

下圖展示兩個goroutine如何利用無緩沖的通道來共享一個值:

golang 并發goroutine詳解(一)
  • 在第1步,兩個goroutine都到達通道,但哪個都沒有開始執行發送或者接收。
  • 在第2步,左側的goroutine将它的手伸進了通道,這模拟了向通道發送資料的行為。這時,這個goroutine會在通道中被鎖住,直到交換完成。
  • 在第3步,右側的goroutine将它的手放入通道,這模拟了從通道裡接收資料。這個goroutine一樣也會在通道中被鎖住,直到交換完成。
  • 在第4步和第5步,進行交換,并最終在第6步,兩個goroutine都将它們的手從通道裡拿出來,這模拟了被鎖住的goroutine得到釋放。兩個goroutine現在都可以去做别的事情了。

無緩沖的channel建立格式:

make(chan Type)  //等價于make(chan Type,0)           

如果沒有指定緩沖區容量,那麼該通道就是同步的,是以會阻塞到發送者準備好發送和接收者準備好接收。

執行個體:

package main
 
import (
    "fmt"
    "time"
    )
 
func main()  {
    //建立一個無緩存的channel
    ch := make(chan int,0)
 
    //len(ch)緩沖區剩餘資料個數,cap(ch)緩沖區大小
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n",len(ch),cap(ch))
 
    //建立協程
    go func() {
        for i:=0;i<3;i++{
            fmt.Println("子協程:i=",i)
            ch <- i
        }
    }()
 
    //延時
    time.Sleep(2*time.Second)
 
    for i:=0;i<3;i++{
        num := <-ch    //讀取管道中内容,沒有内容前,阻塞
        fmt.Println("num =",num)
    }
}           

以上執行個體執行結果為:

len(ch)=0,cap(ch)=0
子協程:i= 0
num = 0
子協程:i= 1
子協程:i= 2
num = 1
num = 2           

3.3 有緩沖的channel

有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或多個值的通道。

這種類型的通道并不強制要求goroutine之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也會不同。隻有在通道中沒有要接收的值時,接收動作才會阻塞。隻有在通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。

這導緻有緩沖的通道和無緩沖的通道之間的一個很大的不同:無緩沖的通道保證進行發送和接收的goroutine會在同一時間進行資料交換;有緩沖的通道沒有這種保證。

golang 并發goroutine詳解(一)
  • 在第1步,右側的goroutine正在從通道接收一個值。
  • 在第2步,右側的這個goroutine獨立完成了接收值的動作,而左側的goroutine正在發送一個新值到通道裡。
  • 在第3步,左側的goroutine還在向通道發送新值,而右側的goroutine正在從通道接收另外一個值。這個步驟裡的兩個操作既不是同步的,也不是互相阻塞。
  • 在第4步,所有的發送和接收都完成,而通道裡還有幾個值,也有一些空間可以存更多的值。

有緩沖的channel建立格式:

make(chan Type,capicity)           

如果給定了一個緩沖區容量,通道就是異步的。隻要緩沖區有未使用空間用于發送資料,或還包含可以接收的資料,那麼其通信就會無阻塞地進行。

執行個體:

package main
 
import "fmt"
 
func main() {
 
    //建立一個有緩存的channel,容量為3
    ch := make(chan int, 3)
    fmt.Printf("len(ch)=%d,cap(ch)=%d", len(ch), cap(ch))
 
}           

輸出結果為:

len(ch)=0,cap(ch)=3           

執行個體:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    //建立一個有緩存的channel,容量為3
    ch := make(chan int, 3)
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch))
 
    //建立協程
    go func() {
        for i := 0; i < 3; i++ {    //改成i<10試試
            ch <- i //不會阻塞,ch容量為3
            fmt.Printf("子協程[%d]:len(ch)=%d,cap(ch)=%d\n", i, len(ch), cap(ch))
        }
    }()
 
    //延時
    time.Sleep(2 * time.Second)
 
    for i := 0; i < 3; i++ {    //改成i<10試試
        num := <-ch //讀取管道中内容,沒有内容前,阻塞
        fmt.Println("num =", num)
    }
 
}           

輸出結果為:

len(ch)=0,cap(ch)=3
子協程[0]:len(ch)=1,cap(ch)=3
子協程[1]:len(ch)=2,cap(ch)=3
子協程[2]:len(ch)=3,cap(ch)=3
num = 0
num = 1
num = 2           

3.4 range和close

close的用法

package main
 
import (
    "fmt"
)
 
func main() {
 
    //建立一個無緩存的channel
    ch := make(chan int)
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch))
 
    //建立協程
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i //往通道寫資料
        }
        //不需要再寫資料,關閉channel
        close(ch)
                ch <- 5  //關閉channel後無法再發送資料
    }()
 
    for {
        //如果ok為true,說明通道沒有關閉
        if num,ok:=<-ch;ok==true{
            fmt.Println("num = ",num)
        }else {        //通道關閉
            //fmt.Println(num)
            break
        }
    }
 
}           

上述執行個體列印結果為:

len(ch)=0,cap(ch)=0
num =  0
num =  1
num =  2
num =  3
num =  4           

注意點:

  • channel不像檔案一樣需要經常去關閉,隻有當你确定沒有任何發送資料了,或者你想顯式地結束range循環之類的,才去關閉channel;
  • 關閉channel後,無法向channel再發送資料(引發panic錯誤後導緻接收立即傳回零值);
  • 關閉channel後,可以繼續從channel接收資料;
  • 對于nil channel,無論收發都會被阻塞。

range的用法:

package main
 
import (
    "fmt"
)
 
func main() {
 
    //建立一個無緩存的channel
    ch := make(chan int)
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch))
 
    //建立協程
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i //往通道寫資料
        }
        //不需要再寫資料,關閉channel
        close(ch)
        //ch <- 5  //關閉channel後無法再發送資料
    }()
 
    for num:=range ch{        //可以自動跳出循環
        fmt.Println("num = ",num)
    }
 
}           

上述執行個體列印結果為:

len(ch)=0,cap(ch)=0
num =  0
num =  1
num =  2
num =  3
num =  4           

3.5 單方向的channel

預設情況下,通道是雙向的,也就是,既可以往裡面發送資料也可以從裡面取出資料。

但是,我們經常見一個通道作為參數進行值傳遞而且希望對方是單向使用的,要麼隻讓它發送資料,要麼隻讓它接收資料,這時候我們可以指定通道的方向。

單向channel變量的聲明非常簡單,如下:

var ch1 chan int  //ch1是一個正常的channel,不是單向的
var ch2 chan<- float64  //ch2是單向channel,隻用于寫float64資料
var ch3 <-chan int  //ch3是單向channel,隻用于讀取int資料           
  • chan<- 表示資料進入管道,要把資料寫進管道,對于調用者就是輸出。
  • <-chan 表示資料從管道出來,對于調用者就是得到管道的資料,當然就是輸入。

可以将channel隐式轉換為單向隊列,隻收或隻發,不能将單向channel轉換為普通channel。

執行個體:

package main
 
import "fmt"
 
func main()  {
 
    //建立一個雙向通道
    ch := make(chan int)
 
    //生産者,生産數字,寫入channel
    //新開一個協程
    go producer(ch)        //channel傳參,引用傳遞
 
    //消費者,從channel讀内容
    consumer(ch)
 
}
 
//此channel隻能寫
func producer(in chan<- int){
    for i:=0;i<10;i++{
        in<-i
    }
    close(in)
}
 
//此channel隻能讀
func consumer(out <-chan int)  {
    for num := range out{
        fmt.Println("num = ",num)
    }
}           

上述執行個體列印結果為:

num =  0
num =  1
num =  2
num =  3
num =  4
num =  5
num =  6
num =  7
num =  8
num =  9
           

3.6 定時器

1.Timer

Timer是一個定時器,代表未來的一個單一事件,你可以告訴timer你要等待多長時間,它提供一個channel,在未來的那個時間那個channel提供了一個時間值。

time.MewTimer()方法:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
    //建立一個定時器,設定時間為2s,2s後往time通道寫内容(目前時間)
    timer := time.NewTimer(2*time.Second)
    fmt.Println("Current time :",time.Now())
 
    // 2s後,往timer.C寫資料,有資料後,就可以讀取
    t := <-timer.C    //channel沒有資料前後阻塞
    fmt.Println("t = ",t)
}           

上述執行個體列印結果為:

Current time : 2018-05-25 19:06:32.3679043 +0800 CST m=+0.005014201
t =  2018-05-25 19:06:34.3681931 +0800 CST m=+2.005303101           

time.NewTimer()時間到了,隻會響應一次:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
    //建立一個定時器,設定時間為2s,2s後往time通道寫内容(目前時間)
    timer := time.NewTimer(2*time.Second)
 
    for {
        <-timer.C    //隻會寫一次,然後就阻塞,死鎖報錯
        fmt.Println("Time out.")
    }
 
}           

上述執行個體輸出結果為:

time out.
fatal error: all goroutines are asleep - deadlock!           

time.Sleep()方法

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    //延時2s後列印
    time.Sleep(2*time.Second)
    fmt.Println("Time out.")
 
}           

2s後列印:

Time out.           

time.After()方法:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    <-time.After(2*time.Second)        //定時2s,阻塞2s,2s後産生一個事件,往channel寫内容
    fmt.Println("Time out.")
 
}           

2s後列印

Time out.           

time的停用:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    timer := time.NewTimer(3*time.Second)
    
    go func() {
        <-timer.C
        fmt.Println("Time out.")
    }()
    
    timer.Stop()    //停止定時器
 
    for {
 
    }
 
}           

time的重置:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    timer := time.NewTimer(3*time.Second)
    timer.Reset(1*time.Second)
 
    <-timer.C
    fmt.Println("Time out.")
 
}           

2.Ticker

Ticker是一個定時觸發的計時器,它會以一個間隔(interval)往channel發送一個事件(目前時間),而channel的接收者可以以固定的時間間隔從channel中讀取事件。

執行個體:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    ticker := time.NewTicker(1*time.Second)
 
    i:=0
    for {
        <-ticker.C
        i++
        fmt.Println(i)
    }
}           

上述執行個體輸出結果為:

1
2
3
4
5
6
7
8
9
...           

ticker的停止:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    ticker := time.NewTicker(1*time.Second)
 
    i:=0
    for {
        <-ticker.C
        fmt.Println(i)
        i++
        if i==5{
            ticker.Stop()
            break
        }
    }
}           

上述執行個體輸出結果為:

0
1
2
3
4
           

4、select

Go裡面提供了一個關鍵字select,通過select可以監聽channel上的資料流動。

select的用法與switch語言非常類似,由select開始一個新的選擇塊,每個選擇條件由case語句來描述。

與switch語句可以選擇任何可使用相等比較的條件相比,select有比較多的限制,其中最大的一條限制就是每個case語句裡必須是一個IO操作,大緻的結構如下:

select{
case <-chan1:
    //如果chan1成功讀到資料,則進行case處理語句
case chan2<-1:
    //如果成功向chan2寫入資料,則進行該case處理語句
default:
    //如果上面都沒有成功,則進入default處理流程
}           

在一個select語句中,Go語言會按順序從頭至尾評估每一個發送和接收的語句。

如果其中的任意一語句可以繼續執行(即沒有被阻塞),那麼就從那些可以執行的語句中任意選擇一條來使用。

如果沒有任意一條語句可以執行(即所有的通道都被阻塞),那麼有兩種可能的情況:

  • 如果給出了default語句,那麼就會執行default語句,同時程式的執行會從select語句後的語句中恢複。
  • 如果沒有default語句,那麼select語句将被阻塞,直到至少有一個通信可以進行下去。

有時候會出現goroutine阻塞的情況,那麼我們如何避免整個程式進入阻塞的情況呢?我們可以利用select來設定逾時,通過如下的方式實作:

package main
 
import (
    "fmt"
    "time"
)
 
func main()  {
 
    ch := make(chan int)
    quit := make(chan bool)
 
    // 新開一個協程
    go func() {
        for ; ;  {
            select {
            case v := <-ch:
                fmt.Println(v)
            case <-time.After(3*time.Second):
                fmt.Println("Timeout.")
                quit<-true
                break
            }
        }
    }()
 
    //往ch中存放資料
    for i:=0;i<5;i++{
        ch<-i
        time.Sleep(time.Second)
    }
 
    <-quit
    fmt.Println("It is the end of the program.")
}           

上述執行個體輸出結果為:

0
1
2
3
4
Timeout.
It is the end of the program.