天天看點

golang 并發channel實踐

作者:幹飯人小羽
golang 并發channel實踐

前言

在我前面一篇文章Golang受歡迎的原因中已經提到,Golang是在語言層面(runtime)就支援了并發模型。那麼作為程式設計人員,我們在實踐Golang的并發程式設計時,又有什麼需要注意的點呢?下面我會跟大家詳細的介紹一些在實際生産程式設計中很容易踩坑的知識點。

CSP

在介紹Golang的并發實踐前,有必要先介紹簡單介紹一下CSP理論。CSP,全稱是Communicating sequential processes,翻譯為通信順序程序,又翻譯為交換消息的順序程式,用來描述并發性系統的互動模式。CSP有以下三個特點:

1.每個程式是為了順序執行而建立的

2.資料通過管道來通信,而不是通過共享記憶體

3.通過增加相同的程式來擴容

Golang的并發模型基于CSP理論,Golang并發的口号是:不用通過共享記憶體來通信,而是通過通信來共享記憶體。

Golang并發模式

Golang用來支援并發的元素集:

  • goroutines
  • channels
  • select
  • sync package

其中goroutines,channels和select 對應于實作CSP理論,即通過通信來共享記憶體。這幾乎能解決Golang并發的90%問題,另外的10%場景需要通過同步原語來解決,即sync包相關的結構。

看圖識channel

golang 并發channel實踐

如上圖所示,我們從一個簡單的沙桶傳遞小遊戲來認識Golang中的channel。其中藍色的Gopher為發送方,紫色的Gopher為接受方,中間的灰色Gopher代表channel的緩沖區大小。

golang 并發channel實踐

channel介紹

阻塞channel

不帶buffer的channel阻塞情況:

unbuffered := make(chan int)

a := <- unbuffered // 阻塞

unbuffered  := make(chan int) 

// 1) 阻塞

a := <- unbuffered

// 2) 阻塞

unbuffered <- 1

// 3) 同步

go func() { <-unbuffered }()

unbuffered <- 1           

帶buffer的channel阻塞情況:

buffered := make(chan int, 1)

// 4) 阻塞

a := <- buffered

// 5) 不阻塞

buffered <-1

// 6) buffer滿,阻塞

buffered <-2           

上述情況其實歸納起來很簡單:不管有無緩沖區channel,寫滿或者讀空都會阻塞。

不帶buffer和帶buffer的channel用途:

  • 不帶buffer的channel:用于同步通信。
  • 帶buffer的channel:用于異步通信。

關閉channel

c := make(chan int)

close(c)

fmt.Println(<-c) //接收并輸出chan類型的零值,這裡int是0            

需要特殊說明的是,channel不像socket或者檔案,不需要通過close來釋放資源。需要close的唯一情況是,通過close觸發channel讀事件,comma,ok := <- c 中ok為false,表示channel已經關閉。隻能在發送端close channel,因為channel關閉接收端能感覺到,但是發送端感覺不到,隻能主動關閉。往已經關閉的channel發送資訊将會觸發panic。

select

類似switch語句,隻不過case都是channel的讀或者寫操作,也可能是default。case的順序一點都不重要,不要依賴case的先後來定義優先級,第一個非阻塞(send and/or receive)的case将會被選中。

使channel不阻塞

func TryReceive(c <-chan int) (data int, more, ok bool) {

  select {

  case data, more = <- c:

    return data, more, true

  }

  default:

    return 0, true, false

}           

當select中的case都處于阻塞狀态時,就會選中default分支。

或者逾時傳回:

func TryReceiveWithTimeout(c <-chan int, duration time.Duration) (data int, more, ok bool) {

  select {

  case data, more = <-c:

    return data, more, true

  case <- time.After(duration):

    return 0, true, false
  }
}           

time.After(duration)會傳回一個channel,當duration到期時會觸發channel的讀事件。

Channel的缺點:

1.Channel可能會導緻死鎖(循環阻塞)

2.channel中傳遞的都是資料的拷貝,可能會影響性能

3.channel中傳遞指針會導緻資料競态問題(data race/ race conditions)

第三點中提到了資料競态問題,也就是通常所說data race。在接着往下講之前有必要先簡單講解下data race的危害。data race 指的是多線程并發讀寫一個變量,對應到Golang中就是多個goroutine同時讀寫一個變量,這種行為是未定義的,也就是說讀變量出來的值很有可能不是寫入的值,這個值是任意值都有可能。

例如下面這段代碼:

package main

import (
    "fmt"
    "runtime"
    "time"
)

var i int64 = 0

func main() {
    runtime.GOMAXPROCS(2)
    go func() {
        for {
            fmt.Println("i is", i)
            time.Sleep(time.Second)
        }
    }()

    for {
        i += 1
    }
}           

在我mac本地環境會不斷的輸出0。全局變量i被兩個goroutine同時讀寫,也就是我們所說的data race,導緻了i的值是未定義的。如果讀寫的是一塊動态伸縮的記憶體,很有可能會導緻panic。例如多goroutine讀寫map。幸運的是,Golang針對data race有專門的内置工具,例如把上面的代碼儲存為main.go,執行 go run -race main.go 會把相關的data race輸出:

==================

WARNING: DATA RACE

Read at 0x00000121e848 by goroutine 6:

  main.main.func1()

      /Users/saas/src/awesomeProject/datarace/main.go:15 +0x3e

 

Previous write at 0x00000121e848 by main goroutine:

  main.main()

      /Users/saas/src/awesomeProject/datarace/main.go:21 +0x7b

 

Goroutine 6 (running) created at:

  main.main()

      /Users/saas/src/awesomeProject/datarace/main.go:13 +0x4f

==================           

那要怎麼改良這個程式呢?改法很簡單,也有很多種。上面我們已經提到了Golang并發的口号是:不要通過共享記憶體來通信,而是通過通信來共享記憶體。先來看下通過共享記憶體來通信的改良版:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

var i int64 = 0

func main() {
    runtime.GOMAXPROCS(2)
    var m sync.Mutex
    go func() {
        for {
            m.Lock()
            fmt.Println("i is", i)
            m.Unlock()
            time.Sleep(time.Second)
        }
    }()

    for {
        m.Lock()
        i += 1
        m.Unlock()
    }
}           

通過加鎖互斥通路(共享)變量i,也就是上面所說的通過共享記憶體來通信。那麼通過通信來共享記憶體也是怎麼實施的呢?答案是用channel:

package main

import (
    "fmt"
    "runtime"
    "time"
)

var i int64 = 0

func main() {
    runtime.GOMAXPROCS(2)
    c := make(chan int64)
    go func() {
        for {
            fmt.Println("i is", <-c)
            time.Sleep(time.Second)
        }
    }()

    for {
        i += 1
        c<-i
    }
}           

上面提到了一些channel的缺點,文章一開始我也提到了channel能解決Golang并發程式設計的90%問題,那剩下的一些少數并發情況用什麼更優的方案呢?

鎖會不會是個更優的解決方案呢?

鎖就像廁所的坑位一樣,你占用的時間越長,等待的人排的隊就會越長。讀寫鎖隻會減緩這種情況。另外使用多個鎖很容易導緻死鎖。總而言之,鎖不是我們隻在尋找的方案。

原子操作

原子操作是這10%場景有限考慮的解決方案。原子操作是在CPU層面保證了原子性。不用程式設計人員加鎖。Golang對應的操作在sync.atomic 包。Store, Load, Add, Swap 和 CompareAndSwap方法。

CompareAndSwap 方法

type Spinlock struct {

  state *int32

}

const free = int32(0)

func (l *Spinlock) Lock() {

  for !atomic.CompareAndSwapInt32(l.state, free, 42) { //如果state等于0就指派為42

    runtime.Gosched() //讓出CPU

  }

}

func (l *Spinlock) Unlock(){

  atomic.StoreInt32(l.state, free)  // 所有操作state變量的操作都應該是原子的

}           

基于上面的一些并發實踐的建議是:

1.避免阻塞,避免資料競态

2.用channel避免共享記憶體,用select管理channel

3.當channel不适用于你的場景時,盡量用sync包的原子操作,如果實在需要用到鎖,盡量縮小鎖的粒度(鎖住盡量少的代碼)。

并發程式找錯

根據前面介紹的内容,我們來看看下面的這個例子有沒有什麼問題:

func restore(repos []string) error {
    errChan := make(chan error, 1)
    sem := make(chan int, 4) // four jobs at once
    var wg sync.WaitGroup
    wg.Add(len(repos))
    for _, repo := range repos {
        sem <- 1
        go func() {
            defer func() {
                wg.Done()
                <- sem    
            }()
            if err := fetch(repo); err != nil {
                errChan <- err
            }
        }()
    }
    wg.Wait()
    close(sem)
    close(errChan)
    return <- errChan
}           

Bug1. sem無需關閉

Bug2.go和匿名函數觸發的bug,repo不斷在更新,fetch拿到的repo是未定義的。有data race問題。

Bug3.sem<-1放在go func外面啟動同時有4個goroutine在運作,并不能很好的控制同時有4個fetch任務。

Bug4. errChan的緩沖區大小為1,當多個fetch産生err時,将會導緻程式死鎖。

改良後的程式:

func restore(repos []string) error {
    errChan := make(chan error, 1)
    sem := make(chan int, 4) // four jobs at once
    var wg sync.WaitGroup
    wg.Add(len(repos))
    for _, repo := range repos {
        go worker(repo, sem, &wg, errChan)
    }
    wg.Wait()
    close(errChan)
    return <- errChan
}

Func worker(repo string, sem chan int, wg *sync.WaitGroup, errChan chan err) {
    defer wg.Done()
    sem <- 1        
    if err := fetch(repo); err != nil {
        select {
        case errChan <- err:
            // we are the first worker to fail
        default:
            // some other failure has already happened, drop this one
        }
    }
    <- sem    
}           

最後思考:為什麼errChan一定要close?

因為最後的return<-errChan,如果fetch的err都為nil,那麼errChan就是空,<-errChan是個永久阻塞的操作,close(sem)會觸發讀事件,傳回chan累心的零值,這裡是nil。

基于上面的一些并發實踐的建議是:

1.channel不是socket和file這種資源,不需要通過close來釋放資源

2.避免将goroutine和匿名函數一起使用

3.在你啟動一個goroutine之前,一定要清楚它會在什麼時候,什麼情況下會退出。

總結

本文介紹了Golang并發程式設計的一些高效實踐建議,旨在讓大家在Golang并發實踐中少踩坑。其中data race問題和goroutine退出的時機尤為重要。